S3 Operations#
YT Framework provides S3 integration for listing files, downloading data, and processing S3 objects. This is useful for pipelines that need to work with data stored in S3.
Overview#
S3 operations allow you to:
List files in S3 buckets
Filter files by prefix, extension, or count
Save S3 paths to YT tables for processing
Integrate S3 data with YT pipelines
Key characteristics:
S3 client integration
File listing and filtering
Table integration
Credential management
Quick Start#
Minimal Example#
Stage (stages/list_s3/stage.py):
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client
class ListS3Stage(BaseStage):
def __init__(self, deps, logger):
super().__init__(deps, logger)
# Create S3 client
secrets = load_secrets(self.deps.configs_dir)
self.s3_client = S3Client.create(
secrets=secrets,
client_type="download", # or "upload" for write access
)
def run(self, debug: DebugContext) -> DebugContext:
# List files from S3
paths = list_s3_files(
s3_client=self.s3_client,
bucket=self.config.client.input_bucket,
prefix=self.config.client.input_prefix,
logger=self.logger,
)
# Save paths to YT table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket=self.config.client.input_bucket,
paths=paths,
output_table=self.config.client.output_table,
logger=self.logger,
)
return debug
Stage config (stages/list_s3/config.yaml):
client:
input_bucket: my-bucket
input_prefix: data/2024/
output_table: //tmp/my_pipeline/s3_paths
Secrets (configs/secrets.env):
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key
See Example: 06_s3_integration for a complete example.
S3 Client Setup#
Creating S3 Client#
Create an S3 client in your stage’s __init__ method:
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client
class MyStage(BaseStage):
def __init__(self, deps, logger):
super().__init__(deps, logger)
# Load secrets
secrets = load_secrets(self.deps.configs_dir)
# Create S3 client
self.s3_client = S3Client.create(
secrets=secrets,
client_type="download", # or "upload"
)
Client types:
"download": Read-only access (for listing and downloading)"upload": Write access (for uploading files)
S3 Credentials#
S3 credentials are stored in configs/secrets.env:
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key
Required fields:
S3_ENDPOINT: S3 endpoint URL (e.g.,https://s3.example.com)S3_DOWNLOAD_ACCESS_KEY: Access key for download operationsS3_DOWNLOAD_SECRET_KEY: Secret key for download operationsS3_UPLOAD_ACCESS_KEY: Access key for upload operations (optional, if different from download)S3_UPLOAD_SECRET_KEY: Secret key for upload operations (optional, if different from download)
Operations#
List S3 Files#
List files in an S3 bucket with optional filtering:
from yt_framework.operations.s3 import list_s3_files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/2024/",
logger=self.logger,
extension=".json", # Optional: filter by extension
max_files=1000, # Optional: limit results
)
Parameters:
s3_client: S3Client instancebucket: S3 bucket nameprefix: S3 prefix to filter (required, but can be empty string""to list all files)logger: Logger instanceextension: File extension filter (optional, e.g.,".json")max_files: Maximum number of files to return (optional)
Returns: List of S3 paths (strings)
Example:
# List all JSON files in bucket with prefix
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
extension=".json",
logger=self.logger,
)
# List all files in bucket (no prefix filter - use empty string)
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="", # Empty string lists all files
logger=self.logger,
)
# List first 100 files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
max_files=100,
logger=self.logger,
)
Save S3 Paths to Table#
Save S3 file paths to a YT table for processing:
from yt_framework.operations.s3 import save_s3_paths_to_table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket="my-bucket",
paths=paths,
output_table="//tmp/my_pipeline/s3_paths",
logger=self.logger,
)
Parameters:
yt_client: YT client instancebucket: S3 bucket namepaths: List of S3 paths (fromlist_s3_files)output_table: YT table path to writelogger: Logger instance
Table schema:
The output table has the following schema:
{
"bucket": "my-bucket",
"path": "data/file1.json", # S3 key (not full s3:// URL)
}
Example:
# List files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
logger=self.logger,
)
# Save to table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket="my-bucket",
paths=paths,
output_table="//tmp/my_pipeline/s3_paths",
logger=self.logger,
)
# Process paths from table
rows = list(self.deps.yt_client.read_table("//tmp/my_pipeline/s3_paths"))
for row in rows:
s3_path = row["path"]
# Process S3 file...
Complete Example#
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.s3 import list_s3_files, save_s3_paths_to_table
from yt_framework.utils.env import load_secrets
from ytjobs.s3.client import S3Client
class ListS3Stage(BaseStage):
def __init__(self, deps, logger):
super().__init__(deps, logger)
# Create S3 client
secrets = load_secrets(self.deps.configs_dir)
self.s3_client = S3Client.create(
secrets=secrets,
client_type="download",
)
def run(self, debug: DebugContext) -> DebugContext:
self.logger.info("Listing files from S3...")
# List files with optional filters
paths = list_s3_files(
s3_client=self.s3_client,
bucket=self.config.client.input_bucket,
prefix=self.config.client.input_prefix,
logger=self.logger,
extension=self.config.client.get("file_extension"), # Optional
max_files=self.config.client.get("max_files"), # Optional
)
if not paths:
self.logger.warning("No files found in S3")
return debug
self.logger.info(f"Found {len(paths)} files")
# Save paths to YT table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket=self.config.client.input_bucket,
paths=paths,
output_table=self.config.client.output_table,
logger=self.logger,
)
self.logger.info(f"Saved {len(paths)} paths to {self.config.client.output_table}")
return debug
Configuration (stages/list_s3/config.yaml):
client:
input_bucket: my-bucket
input_prefix: data/2024/
file_extension: .json # Optional: filter by extension
max_files: 1000 # Optional: limit results
output_table: //tmp/my_pipeline/s3_paths
Processing S3 Files#
After saving S3 paths to a table, you can process them in subsequent stages:
Map Operation Example#
# stages/process_s3/stage.py
class ProcessS3Stage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Read S3 paths from table
s3_paths = list(self.deps.yt_client.read_table(
self.config.client.s3_paths_table
))
# Process each S3 file
results = []
for row in s3_paths:
s3_path = row["path"]
# Download and process file
result = process_s3_file(s3_path)
results.append(result)
# Write results to output table
self.deps.yt_client.write_table(
self.config.client.output_table,
results,
)
return debug
Using Map Operation#
For large numbers of files, use a map operation:
# stages/process_s3/src/mapper.py
import sys
import json
import boto3
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
def main():
config = OmegaConf.load(get_config_path())
s3_client = boto3.client('s3')
for line in sys.stdin:
row = json.loads(line)
s3_path = row["path"]
bucket = row["bucket"]
key = row["key"]
# Download file from S3
response = s3_client.get_object(Bucket=bucket, Key=key)
data = response['Body'].read()
# Process data
result = process_data(data)
# Output result
output_row = {
"path": s3_path,
"result": result,
}
print(json.dumps(output_row), flush=True)
if __name__ == "__main__":
main()
Configuration#
Stage Configuration#
# stages/list_s3/config.yaml
client:
input_bucket: my-bucket
input_prefix: data/2024/
file_extension: .json # Optional
max_files: 1000 # Optional
output_table: //tmp/my_pipeline/s3_paths
Secrets Configuration#
# configs/secrets.env
S3_ENDPOINT=https://your-s3-endpoint.com
S3_DOWNLOAD_ACCESS_KEY=your-download-access-key
S3_DOWNLOAD_SECRET_KEY=your-download-secret-key
S3_UPLOAD_ACCESS_KEY=your-upload-access-key
S3_UPLOAD_SECRET_KEY=your-upload-secret-key
Best Practices#
Filter early: Use
prefixandextensionto filter files before listingLimit results: Use
max_filesfor large bucketsProcess in batches: For many files, use map operations
Handle errors: Check for empty results and handle S3 errors
Secure credentials: Never commit secrets to version control
Use appropriate client type: Use
"download"for read-only,"upload"for write
Common Patterns#
List and Process#
# List files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
logger=self.logger,
)
# Save to table
save_s3_paths_to_table(
yt_client=self.deps.yt_client,
bucket="my-bucket",
paths=paths,
output_table="//tmp/my_pipeline/s3_paths",
logger=self.logger,
)
# Process in next stage using map operation
Filter by Extension#
# List only JSON files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
extension=".json",
logger=self.logger,
)
Limit Results#
# List first 100 files
paths = list_s3_files(
s3_client=self.s3_client,
bucket="my-bucket",
prefix="data/",
max_files=100,
logger=self.logger,
)
Troubleshooting#
Issue: S3 client creation fails#
Check AWS credentials in
secrets.envVerify credentials have S3 access
Check AWS region is correct
Issue: No files found#
Verify bucket name is correct
Check prefix path is correct
Ensure files exist in S3
Issue: Permission denied#
Check IAM permissions for S3 access
Verify credentials have read/list permissions
Check bucket policy
Issue: Table write fails#
Verify YT table path is correct
Check YT credentials and permissions
Ensure output table path exists or can be created
Next Steps#
Learn about Map Operations for processing S3 files
Explore Configuration for secrets management
Check out Example: 06_s3_integration for complete example