YT jobs library (ytjobs)#
The ytjobs package is part of the same distribution as yt_framework and is intended to run inside YTsaurus jobs (mappers, reducers, command-mode scripts): code you import from uploaded job bundles, not from the orchestration process on your laptop.
yt_framework: pipeline layout, stage discovery, YT clients in dev/prod, operation configs.ytjobs: small, job-safe helpers—JSON stdin/stdout mappers, S3 access from workers, stderr logging,JOB_CONFIG_PATH, and Cypress checkpoint I/O viayt.wrapper.
See also Environment variables for JOB_CONFIG_PATH and related sandbox variables.
Note
Two “checkpoint” ideas
Checkpoint management describes model files the framework uploads and mounts in operations. ytjobs.checkpoint is separate: save/load byte blobs and JSON state under Cypress paths inside a running job. See the API sections below.
Cluster Docker images must include dependencies your job code uses (for example boto3 for ytjobs.s3, ytsaurus-client for ytjobs.checkpoint). See Cluster requirements.
Convenience imports match the package __all__:
from ytjobs import (
S3Client,
get_logger,
log_with_extra,
redirect_stdout_to_stderr,
get_config_path,
read_input_rows,
StreamMapper,
BatchMapper,
)
The submodules below are the source of truth for API details (generated from docstrings).
Mapper utilities (ytjobs.mapper)#
JSON stdin/stdout mapper helpers (StreamMapper, BatchMapper, row readers).
- ytjobs.mapper.read_input_rows()[source]#
Read and parse input rows from stdin.
Reads JSON lines from stdin, parses them, and creates Row objects. Skips empty lines and logs parsing errors to stderr.
Example
- for row in read_input_rows():
print(row.bucket, row.path)
- class ytjobs.mapper.StreamMapper[source]#
Bases:
objectMapper that processes stdin one line at a time.
Reads JSON lines from stdin, processes each individually, and writes results to stdout. Define
processing_func(row)that yields result dicts, then callStreamMapper().map(processing_func).
- class ytjobs.mapper.BatchMapper(batch_size=None)[source]#
Bases:
objectMapper that processes stdin in batches.
Reads JSON lines from stdin in batches, processes each batch, and writes results to stdout. Pass
batch_size(orNoneto buffer all stdin), defineprocessing_func(rows)that yields result dicts, then callmapper.map(processing_func).- Parameters:
batch_size (int | None)
- __init__(batch_size=None)[source]#
Initialize batch mapper.
- Parameters:
batch_size (int | None) – Number of rows per batch, or None to process all rows at once
- map(processing_func, redirect_processing_output=True, **kwargs)[source]#
Read stdin in batches, process each batch, write results to stdout.
- Parameters:
processing_func (Callable[[...], Iterator[Any]]) – Function that takes a list of rows (and optional kwargs) and returns Iterator of results
redirect_processing_output (bool) – If True, redirect stdout to stderr during processing
**kwargs (Any) – Additional keyword arguments to pass to processing_func
- Return type:
None
S3 client (ytjobs.s3.client)#
Minimal boto3 wrapper for job-side list/get/put helpers.
- class ytjobs.s3.client.S3Client(endpoint, access_key, secret_key, max_retries=30, timeout=360, logger=None, *, region_name=None, boto_config=None)[source]#
Bases:
objectThin boto3 S3 wrapper for job code (list, download, upload, head).
- Parameters:
- __init__(endpoint, access_key, secret_key, max_retries=30, timeout=360, logger=None, *, region_name=None, boto_config=None)[source]#
Build a boto3 S3 client for the given endpoint and credentials.
- Parameters:
endpoint (str) – S3 API endpoint URL (e.g. from
S3_ENDPOINT).access_key (str) – Access key id for this client.
secret_key (str) – Secret access key for this client.
max_retries (int) – Boto3 retry
max_attemptswhenboto_configis omitted.timeout (int) – Read timeout in seconds when
boto_configis omitted.logger (Logger | None) – Optional logger; defaults to the module logger.
region_name (str | None) – Optional AWS region passed to
boto3.client.boto_config (Config | None) – If set, used as-is instead of the default
BotoConfig.
- static parse_s3_uri(uri)[source]#
Split
s3://bucket/key/pathinto(bucket, key).- Parameters:
uri (str) – S3 URI with non-empty bucket and key path.
- Returns:
(bucket, key)wherekeyhas no leading slash.- Raises:
ValueError – If the URI is not a valid S3 URI.
- Return type:
- static create(secrets, client_type='download')[source]#
Create S3 client from secrets dictionary.
- Parameters:
- Returns:
Configured
S3Clientinstance.- Raises:
ValueError – If
client_typeis unknown or required secrets are missing.- Return type:
- list_files(bucket, prefix='', extension=None, max_files=None)[source]#
List object keys under
prefixinbucket.- Parameters:
- Returns:
List of S3 object keys (not full
s3://URIs).- Raises:
Exception – Propagates boto3/client errors after logging.
- Return type:
- download_by_uri(s3_uri)[source]#
Download object bytes from
s3://bucket/key.- Parameters:
s3_uri (str) – Valid S3 URI.
- Returns:
Same as
download.- Raises:
ValueError – If
s3_uriis invalid (viaparse_s3_uri).Exception – Propagates boto3/client errors from
download.
- Return type:
Logging (ytjobs.logging)#
Logging utilities for YT jobs.
- ytjobs.logging.get_logger(name=None, level=20)[source]#
Get a logger configured for human-readable text output to stderr.
- ytjobs.logging.log_with_extra(logger, level, message, **kwargs)[source]#
Log a message with extra context fields.
- ytjobs.logging.manage_output(mode='redirect')[source]#
Decorator factory that either suppresses output or redirects stdout to stderr.
- Parameters:
mode –
"suppress"for full silence,"redirect"to send stdout to stderr (keeps job stdout clean for JSON lines).- Returns:
A decorator, e.g.
@manage_output(mode="redirect")above a function.
- ytjobs.logging.redirect_stdout_to_stderr()[source]#
Context manager that redirects stdout to stderr.
This is useful for YTsaurus mappers where you need clean JSON on stdout, but processing functions might print debug messages.
- Usage:
- with redirect_stdout_to_stderr():
print(“This goes to stderr”) # Won’t corrupt stdout some_function_that_prints()
- ytjobs.logging.suppress_all_output()[source]#
Context manager that suppresses ALL output: stdout, stderr, and warnings.
This is useful when you need complete silence from noisy libraries like OpenCV, Ultralytics YOLO, TensorFlow, Ceres Solver, etc.
- Usage:
- with suppress_all_output():
# All prints, warnings, and library output are suppressed model = YOLO(‘yolov8n.pt’) # No output results = model(image) # No progress bars cv2.imread(path) # No warnings
Job config (ytjobs.config)#
Resolve JOB_CONFIG_PATH to the staged config.yaml inside sandboxes.
- ytjobs.config.get_config_path()[source]#
Get the path to the job configuration file.
- Returns:
Path object pointing to the config file
- Raises:
ValueError – If JOB_CONFIG_PATH environment variable is not set
- Return type:
Cypress checkpoints (ytjobs.checkpoint)#
Checkpoint utilities for YT jobs.
- ytjobs.checkpoint.get_checkpoint_path(checkpoint_name, base_path=None)[source]#
Get full YT path for a checkpoint.
- ytjobs.checkpoint.save_checkpoint(data, checkpoint_name, metadata=None, base_path=None, logger=None)[source]#
Save checkpoint to YTsaurus Cypress file system.
- Parameters:
- Returns:
Full YT path to saved checkpoint
- Return type:
- ytjobs.checkpoint.load_checkpoint(checkpoint_name, base_path=None, logger=None)[source]#
Load checkpoint and metadata from YTsaurus Cypress.
- ytjobs.checkpoint.list_checkpoints(base_path=None, pattern=None, logger=None)[source]#
List available checkpoints in YTsaurus.
- ytjobs.checkpoint.delete_checkpoint(checkpoint_name, base_path=None, logger=None)[source]#
Delete checkpoint and its metadata from YTsaurus.
- ytjobs.checkpoint.save_processing_state(state, state_name='processing_state', base_path=None, logger=None)[source]#
Save processing state (e.g., processed video list, iteration count) to checkpoint.
Convenience function for saving JSON-serializable state dictionaries.