# S3 operations Driver-side helpers list buckets or prefixes and often write resulting paths into YT tables for later map/YQL stages. Inside mappers you typically construct `ytjobs.s3.client.S3Client` from secrets. ```{note} Job-side `S3Client` lives in `ytjobs`; see [YT jobs (`ytjobs`)](../reference/ytjobs.md). ``` ## Overview Common workflow: 1. List or filter object keys under a prefix. 2. Persist key list (or small metadata) to a Cypress table. 3. Downstream stages read that table. Credentials come from `load_secrets(self.deps.configs_dir)` and the `S3_*` keys documented in [Secrets](../configuration/secrets.md). ## Quick Start ### Minimal Example **Stage** (`stages/list_s3/stage.py`): ```python from yt_framework.core.pipeline import DebugContext from yt_framework.core.stage import BaseStage from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table from yt_framework.utils.env import load_secrets from ytjobs.s3.client import S3Client class ListS3Stage(BaseStage): def __init__(self, deps, logger): super().__init__(deps, logger) # Create S3 client secrets = load_secrets(self.deps.configs_dir) self.s3_client = S3Client.create( secrets=secrets, client_type="download", # or "upload" for write access ) def run(self, debug: DebugContext) -> DebugContext: # List files from S3 paths = list_s3_files( s3_client=self.s3_client, bucket=self.config.client.input_bucket, prefix=self.config.client.input_prefix, logger=self.logger, ) # Save paths to YT table save_s3_paths_to_table( yt_client=self.deps.yt_client, bucket=self.config.client.input_bucket, paths=paths, output_table=self.config.client.output_table, logger=self.logger, ) return debug ``` **Stage config** (`stages/list_s3/config.yaml`): ```yaml client: input_bucket: my-bucket input_prefix: data/2024/ output_table: //tmp/my_pipeline/s3_paths ``` **Secrets** (`configs/secrets.env`): ```bash S3_ENDPOINT=https://your-s3-endpoint.com S3_DOWNLOAD_ACCESS_KEY=your-download-access-key S3_DOWNLOAD_SECRET_KEY=your-download-secret-key S3_UPLOAD_ACCESS_KEY=your-upload-access-key S3_UPLOAD_SECRET_KEY=your-upload-secret-key ``` See [Example: 06_s3_integration](https://github.com/GregoryKogan/yt-framework/tree/main/examples/06_s3_integration/) for a complete example. ## S3 Client Setup ### Creating S3 Client Create an S3 client in your stage's `__init__` method: ```python from yt_framework.utils.env import load_secrets from ytjobs.s3.client import S3Client class MyStage(BaseStage): def __init__(self, deps, logger): super().__init__(deps, logger) # Load secrets secrets = load_secrets(self.deps.configs_dir) # Create S3 client self.s3_client = S3Client.create( secrets=secrets, client_type="download", # or "upload" ) ``` **Client types:** - **`"download"`**: Read-only access (for listing and downloading) - **`"upload"`**: Write access (for uploading files) ### S3 Credentials S3 credentials are stored in `configs/secrets.env`: ```bash S3_ENDPOINT=https://your-s3-endpoint.com S3_DOWNLOAD_ACCESS_KEY=your-download-access-key S3_DOWNLOAD_SECRET_KEY=your-download-secret-key S3_UPLOAD_ACCESS_KEY=your-upload-access-key S3_UPLOAD_SECRET_KEY=your-upload-secret-key ``` **Required fields:** - **`S3_ENDPOINT`**: S3 endpoint URL (e.g., `https://s3.example.com`) - **`S3_DOWNLOAD_ACCESS_KEY`**: Access key for download operations - **`S3_DOWNLOAD_SECRET_KEY`**: Secret key for download operations - **`S3_UPLOAD_ACCESS_KEY`**: Access key for upload operations (optional, if different from download) - **`S3_UPLOAD_SECRET_KEY`**: Secret key for upload operations (optional, if different from download) ## Operations ### List S3 Files List files in an S3 bucket with optional filtering: ```python from yt_framework.operations.s3 import list_s3_files paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/2024/", logger=self.logger, extension=".json", # Optional: filter by extension max_files=1000, # Optional: limit results ) ``` **Parameters:** - **`s3_client`**: S3Client instance - **`bucket`**: S3 bucket name - **`prefix`**: S3 prefix to filter (required, but can be empty string `""` to list all files) - **`logger`**: Logger instance - **`extension`**: File extension filter (optional, e.g., `".json"`) - **`max_files`**: Maximum number of files to return (optional) **Returns:** List of S3 paths (strings) **Example:** ```python # List all JSON files in bucket with prefix paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/", extension=".json", logger=self.logger, ) # List all files in bucket (no prefix filter - use empty string) paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="", # Empty string lists all files logger=self.logger, ) # List first 100 files paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/", max_files=100, logger=self.logger, ) ``` ### Save S3 Paths to Table Save S3 file paths to a YT table for processing: ```python from yt_framework.operations.s3 import save_s3_paths_to_table save_s3_paths_to_table( yt_client=self.deps.yt_client, bucket="my-bucket", paths=paths, output_table="//tmp/my_pipeline/s3_paths", logger=self.logger, ) ``` **Parameters:** - **`yt_client`**: YT client instance - **`bucket`**: S3 bucket name - **`paths`**: List of S3 paths (from `list_s3_files`) - **`output_table`**: YT table path to write - **`logger`**: Logger instance **Table schema:** The output table has the following schema: ```python { "bucket": "my-bucket", "path": "data/file1.json", # S3 key (not full s3:// URL) } ``` **Example:** ```python # List files paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/", logger=self.logger, ) # Save to table save_s3_paths_to_table( yt_client=self.deps.yt_client, bucket="my-bucket", paths=paths, output_table="//tmp/my_pipeline/s3_paths", logger=self.logger, ) # Process paths from table rows = list(self.deps.yt_client.read_table("//tmp/my_pipeline/s3_paths")) for row in rows: s3_path = row["path"] # Process S3 file... ``` ## Complete Example ```python from yt_framework.core.pipeline import DebugContext from yt_framework.core.stage import BaseStage from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table from yt_framework.utils.env import load_secrets from ytjobs.s3.client import S3Client class ListS3Stage(BaseStage): def __init__(self, deps, logger): super().__init__(deps, logger) # Create S3 client secrets = load_secrets(self.deps.configs_dir) self.s3_client = S3Client.create( secrets=secrets, client_type="download", ) def run(self, debug: DebugContext) -> DebugContext: self.logger.info("Listing files from S3...") # List files with optional filters paths = list_s3_files( s3_client=self.s3_client, bucket=self.config.client.input_bucket, prefix=self.config.client.input_prefix, logger=self.logger, extension=self.config.client.get("file_extension"), # Optional max_files=self.config.client.get("max_files"), # Optional ) if not paths: self.logger.warning("No files found in S3") return debug self.logger.info(f"Found {len(paths)} files") # Save paths to YT table save_s3_paths_to_table( yt_client=self.deps.yt_client, bucket=self.config.client.input_bucket, paths=paths, output_table=self.config.client.output_table, logger=self.logger, ) self.logger.info(f"Saved {len(paths)} paths to {self.config.client.output_table}") return debug ``` **Configuration** (`stages/list_s3/config.yaml`): ```yaml client: input_bucket: my-bucket input_prefix: data/2024/ file_extension: .json # Optional: filter by extension max_files: 1000 # Optional: limit results output_table: //tmp/my_pipeline/s3_paths ``` ## Processing S3 Files After saving S3 paths to a table, you can process them in subsequent stages: ### Map Operation Example ```python # stages/process_s3/stage.py class ProcessS3Stage(BaseStage): def run(self, debug: DebugContext) -> DebugContext: # Read S3 paths from table s3_paths = list(self.deps.yt_client.read_table( self.config.client.s3_paths_table )) # Process each S3 file results = [] for row in s3_paths: s3_path = row["path"] # Download and process file result = process_s3_file(s3_path) results.append(result) # Write results to output table self.deps.yt_client.write_table( self.config.client.output_table, results, ) return debug ``` ### Using Map Operation For large numbers of files, use a map operation: ```python # stages/process_s3/src/mapper.py import sys import json import boto3 from omegaconf import OmegaConf from ytjobs.config import get_config_path def main(): config = OmegaConf.load(get_config_path()) s3_client = boto3.client('s3') for line in sys.stdin: row = json.loads(line) s3_path = row["path"] bucket = row["bucket"] key = row["key"] # Download file from S3 response = s3_client.get_object(Bucket=bucket, Key=key) data = response['Body'].read() # Process data result = process_data(data) # Output result output_row = { "path": s3_path, "result": result, } print(json.dumps(output_row), flush=True) if __name__ == "__main__": main() ``` ## Configuration ### Stage Configuration ```yaml # stages/list_s3/config.yaml client: input_bucket: my-bucket input_prefix: data/2024/ file_extension: .json # Optional max_files: 1000 # Optional output_table: //tmp/my_pipeline/s3_paths ``` ### Secrets Configuration ```bash # configs/secrets.env S3_ENDPOINT=https://your-s3-endpoint.com S3_DOWNLOAD_ACCESS_KEY=your-download-access-key S3_DOWNLOAD_SECRET_KEY=your-download-secret-key S3_UPLOAD_ACCESS_KEY=your-upload-access-key S3_UPLOAD_SECRET_KEY=your-upload-secret-key ``` ## Best Practices 1. **Filter early**: Use `prefix` and `extension` to filter files before listing 2. **Limit results**: Use `max_files` for large buckets 3. **Process in batches**: For many files, use map operations 4. **Handle errors**: Check for empty results and handle S3 errors 5. **Secure credentials**: Never commit secrets to version control 6. **Use appropriate client type**: Use `"download"` for read-only, `"upload"` for write ## Common Patterns ### List and Process ```python # List files paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/", logger=self.logger, ) # Save to table save_s3_paths_to_table( yt_client=self.deps.yt_client, bucket="my-bucket", paths=paths, output_table="//tmp/my_pipeline/s3_paths", logger=self.logger, ) # Process in next stage using map operation ``` ### Filter by Extension ```python # List only JSON files paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/", extension=".json", logger=self.logger, ) ``` ### Limit Results ```python # List first 100 files paths = list_s3_files( s3_client=self.s3_client, bucket="my-bucket", prefix="data/", max_files=100, logger=self.logger, ) ``` ## Troubleshooting ### Issue: S3 client creation fails - Check AWS credentials in `secrets.env` - Verify credentials have S3 access - Check AWS region is correct ### Issue: No files found - Verify bucket name is correct - Check prefix path is correct - Ensure files exist in S3 ### Issue: Permission denied - Check IAM permissions for S3 access - Verify credentials have read/list permissions - Check bucket policy ### Issue: Table write fails - Verify YT table path is correct - Check YT credentials and permissions - Ensure output table path exists or can be created ## Next Steps - Learn about [Map Operations](map.md) for processing S3 files - Explore [Configuration](../configuration/index.md) for secrets management - Check out [Example: 06_s3_integration](https://github.com/GregoryKogan/yt-framework/tree/main/examples/06_s3_integration/) for complete example