S3 Operations#

YT Framework provides S3 integration for listing files, downloading data, and processing S3 objects. This is useful for pipelines that need to work with data stored in S3.

Overview#

S3 operations allow you to:

  • List files in S3 buckets

  • Filter files by prefix, extension, or count

  • Save S3 paths to YT tables for processing

  • Integrate S3 data with YT pipelines

Key characteristics:

  • S3 client integration

  • File listing and filtering

  • Table integration

  • Credential management

Quick Start#

Minimal Example#

Stage (stages/list_s3/stage.py):

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client

class ListS3Stage(BaseStage):
    def __init__(self, deps, logger):
        super().__init__(deps, logger)
        
        # Create S3 client
        secrets = load_secrets(self.deps.configs_dir)
        self.s3_client = S3Client.create(
            secrets=secrets,
            client_type="download",  # or "upload" for write access
        )
    
    def run(self, debug: DebugContext) -> DebugContext:
        # List files from S3
        paths = list_s3_files(
            s3_client=self.s3_client,
            bucket=self.config.client.input_bucket,
            prefix=self.config.client.input_prefix,
            logger=self.logger,
        )
        
        # Save paths to YT table
        save_s3_paths_to_table(
            yt_client=self.deps.yt_client,
            bucket=self.config.client.input_bucket,
            paths=paths,
            output_table=self.config.client.output_table,
            logger=self.logger,
        )
        
        return debug

Stage config (stages/list_s3/config.yaml):

client:
  input_bucket: my-bucket
  input_prefix: data/2024/
  output_table: //tmp/my_pipeline/s3_paths

Secrets (configs/secrets.env):

S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key

See Example: 06_s3_integration for a complete example.

S3 Client Setup#

Creating S3 Client#

Create an S3 client in your stage’s __init__ method:

from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client

class MyStage(BaseStage):
    def __init__(self, deps, logger):
        super().__init__(deps, logger)
        
        # Load secrets
        secrets = load_secrets(self.deps.configs_dir)
        
        # Create S3 client
        self.s3_client = S3Client.create(
            secrets=secrets,
            client_type="download",  # or "upload"
        )

Client types:

  • "download": Read-only access (for listing and downloading)

  • "upload": Write access (for uploading files)

S3 Credentials#

S3 credentials are stored in configs/secrets.env:

S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key

Required fields:

  • S3_ENDPOINT: S3 endpoint URL (e.g., https://s3.example.com)

  • S3_DOWNLOAD_ACCESS_KEY: Access key for download operations

  • S3_DOWNLOAD_SECRET_KEY: Secret key for download operations

  • S3_UPLOAD_ACCESS_KEY: Access key for upload operations (optional, if different from download)

  • S3_UPLOAD_SECRET_KEY: Secret key for upload operations (optional, if different from download)

Operations#

List S3 Files#

List files in an S3 bucket with optional filtering:

from yt_framework.operations.s3 import list_s3_files

paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/2024/",
    logger=self.logger,
    extension=".json",      # Optional: filter by extension
    max_files=1000,         # Optional: limit results
)

Parameters:

  • s3_client: S3Client instance

  • bucket: S3 bucket name

  • prefix: S3 prefix to filter (required, but can be empty string "" to list all files)

  • logger: Logger instance

  • extension: File extension filter (optional, e.g., ".json")

  • max_files: Maximum number of files to return (optional)

Returns: List of S3 paths (strings)

Example:

# List all JSON files in bucket with prefix
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    extension=".json",
    logger=self.logger,
)

# List all files in bucket (no prefix filter - use empty string)
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="",  # Empty string lists all files
    logger=self.logger,
)

# List first 100 files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    max_files=100,
    logger=self.logger,
)

Save S3 Paths to Table#

Save S3 file paths to a YT table for processing:

from yt_framework.operations.s3 import save_s3_paths_to_table

save_s3_paths_to_table(
    yt_client=self.deps.yt_client,
    bucket="my-bucket",
    paths=paths,
    output_table="//tmp/my_pipeline/s3_paths",
    logger=self.logger,
)

Parameters:

  • yt_client: YT client instance

  • bucket: S3 bucket name

  • paths: List of S3 paths (from list_s3_files)

  • output_table: YT table path to write

  • logger: Logger instance

Table schema:

The output table has the following schema:

{
    "bucket": "my-bucket",
    "path": "data/file1.json",  # S3 key (not full s3:// URL)
}

Example:

# List files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    logger=self.logger,
)

# Save to table
save_s3_paths_to_table(
    yt_client=self.deps.yt_client,
    bucket="my-bucket",
    paths=paths,
    output_table="//tmp/my_pipeline/s3_paths",
    logger=self.logger,
)

# Process paths from table
rows = list(self.deps.yt_client.read_table("//tmp/my_pipeline/s3_paths"))
for row in rows:
    s3_path = row["path"]
    # Process S3 file...

Complete Example#

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client

class ListS3Stage(BaseStage):
    def __init__(self, deps, logger):
        super().__init__(deps, logger)
        
        # Create S3 client
        secrets = load_secrets(self.deps.configs_dir)
        self.s3_client = S3Client.create(
            secrets=secrets,
            client_type="download",
        )
    
    def run(self, debug: DebugContext) -> DebugContext:
        self.logger.info("Listing files from S3...")
        
        # List files with optional filters
        paths = list_s3_files(
            s3_client=self.s3_client,
            bucket=self.config.client.input_bucket,
            prefix=self.config.client.input_prefix,
            logger=self.logger,
            extension=self.config.client.get("file_extension"),  # Optional
            max_files=self.config.client.get("max_files"),       # Optional
        )
        
        if not paths:
            self.logger.warning("No files found in S3")
            return debug
        
        self.logger.info(f"Found {len(paths)} files")
        
        # Save paths to YT table
        save_s3_paths_to_table(
            yt_client=self.deps.yt_client,
            bucket=self.config.client.input_bucket,
            paths=paths,
            output_table=self.config.client.output_table,
            logger=self.logger,
        )
        
        self.logger.info(f"Saved {len(paths)} paths to {self.config.client.output_table}")
        
        return debug

Configuration (stages/list_s3/config.yaml):

client:
  input_bucket: my-bucket
  input_prefix: data/2024/
  file_extension: .json  # Optional: filter by extension
  max_files: 1000        # Optional: limit results
  output_table: //tmp/my_pipeline/s3_paths

Processing S3 Files#

After saving S3 paths to a table, you can process them in subsequent stages:

Map Operation Example#

# stages/process_s3/stage.py
class ProcessS3Stage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Read S3 paths from table
        s3_paths = list(self.deps.yt_client.read_table(
            self.config.client.s3_paths_table
        ))
        
        # Process each S3 file
        results = []
        for row in s3_paths:
            s3_path = row["path"]
            # Download and process file
            result = process_s3_file(s3_path)
            results.append(result)
        
        # Write results to output table
        self.deps.yt_client.write_table(
            self.config.client.output_table,
            results,
        )
        
        return debug

Using Map Operation#

For large numbers of files, use a map operation:

# stages/process_s3/src/mapper.py
import sys
import json
import boto3
from omegaconf import OmegaConf
from ytjobs.config import get_config_path

def main():
    config = OmegaConf.load(get_config_path())
    s3_client = boto3.client('s3')
    
    for line in sys.stdin:
        row = json.loads(line)
        s3_path = row["path"]
        bucket = row["bucket"]
        key = row["key"]
        
        # Download file from S3
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = response['Body'].read()
        
        # Process data
        result = process_data(data)
        
        # Output result
        output_row = {
            "path": s3_path,
            "result": result,
        }
        print(json.dumps(output_row), flush=True)

if __name__ == "__main__":
    main()

Configuration#

Stage Configuration#

# stages/list_s3/config.yaml
client:
  input_bucket: my-bucket
  input_prefix: data/2024/
  file_extension: .json      # Optional
  max_files: 1000           # Optional
  output_table: //tmp/my_pipeline/s3_paths

Secrets Configuration#

# configs/secrets.env
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key

Best Practices#

  1. Filter early: Use prefix and extension to filter files before listing

  2. Limit results: Use max_files for large buckets

  3. Process in batches: For many files, use map operations

  4. Handle errors: Check for empty results and handle S3 errors

  5. Secure credentials: Never commit secrets to version control

  6. Use appropriate client type: Use "download" for read-only, "upload" for write

Common Patterns#

List and Process#

# List files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    logger=self.logger,
)

# Save to table
save_s3_paths_to_table(
    yt_client=self.deps.yt_client,
    bucket="my-bucket",
    paths=paths,
    output_table="//tmp/my_pipeline/s3_paths",
    logger=self.logger,
)

# Process in next stage using map operation

Filter by Extension#

# List only JSON files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    extension=".json",
    logger=self.logger,
)

Limit Results#

# List first 100 files
paths = list_s3_files(
    s3_client=self.s3_client,
    bucket="my-bucket",
    prefix="data/",
    max_files=100,
    logger=self.logger,
)

Troubleshooting#

Issue: S3 client creation fails#

  • Check AWS credentials in secrets.env

  • Verify credentials have S3 access

  • Check AWS region is correct

Issue: No files found#

  • Verify bucket name is correct

  • Check prefix path is correct

  • Ensure files exist in S3

Issue: Permission denied#

  • Check IAM permissions for S3 access

  • Verify credentials have read/list permissions

  • Check bucket policy

Issue: Table write fails#

  • Verify YT table path is correct

  • Check YT credentials and permissions

  • Ensure output table path exists or can be created

Next Steps#