Source code for yt_framework.operations.s3

"""Driver-side helpers to list S3 keys and persist paths into Cypress tables."""

import logging

from yt_framework.yt.clients.client_base import BaseYTClient
from ytjobs.s3.client import S3Client

_S3_PATH_DEBUG_PREVIEW = 3


[docs] def list_s3_files( s3_client: S3Client, bucket: str, prefix: str, logger: logging.Logger, extension: str | None = None, max_files: int | None = None, ) -> list[str]: """List files from S3 bucket with optional filtering. Args: s3_client: S3 client instance bucket: S3 bucket name prefix: S3 prefix path logger: Logger instance extension: Optional file extension filter (e.g., 'mp4') max_files: Optional maximum number of files to return Returns: List of S3 file paths """ logger.info("Listing files from S3: s3://%s/%s", bucket, prefix) paths = s3_client.list_files( bucket=bucket, prefix=prefix, extension=extension, max_files=max_files, ) logger.info("Found %s files", len(paths)) if paths: logger.debug("Sample paths:") for path in paths[:_S3_PATH_DEBUG_PREVIEW]: logger.debug(" - %s", path) if len(paths) > _S3_PATH_DEBUG_PREVIEW: logger.debug(" ... and %s more", len(paths) - _S3_PATH_DEBUG_PREVIEW) return paths
[docs] def save_s3_paths_to_table( yt_client: BaseYTClient, bucket: str, paths: list[str], output_table: str, logger: logging.Logger, ) -> None: """Save S3 file paths to YT table as bucket and path columns. Args: yt_client: YT client instance bucket: S3 bucket name paths: List of S3 file paths output_table: YT table path logger: Logger instance Returns: None """ logger.info("Saving %s paths to YT table: %s", len(paths), output_table) # Convert paths to table rows rows = [{"bucket": bucket, "path": path} for path in paths] yt_client.write_table(table_path=output_table, rows=rows, append=False) logger.info("✓ Saved %s paths → %s", len(rows), output_table)