YT jobs library (ytjobs)#

The ytjobs package is part of the same distribution as yt_framework and is intended to run inside YTsaurus jobs (mappers, reducers, command-mode scripts): code you import from uploaded job bundles, not from the orchestration process on your laptop.

  • yt_framework: pipeline layout, stage discovery, YT clients in dev/prod, operation configs.

  • ytjobs: small, job-safe helpers—JSON stdin/stdout mappers, S3 access from workers, stderr logging, JOB_CONFIG_PATH, and Cypress checkpoint I/O via yt.wrapper.

See also Environment variables for JOB_CONFIG_PATH and related sandbox variables.

Note

Two “checkpoint” ideas

Checkpoint management describes model files the framework uploads and mounts in operations. ytjobs.checkpoint is separate: save/load byte blobs and JSON state under Cypress paths inside a running job. See the API sections below.

Cluster Docker images must include dependencies your job code uses (for example boto3 for ytjobs.s3, ytsaurus-client for ytjobs.checkpoint). See Cluster requirements.

Convenience imports match the package __all__:

from ytjobs import (
    S3Client,
    get_logger,
    log_with_extra,
    redirect_stdout_to_stderr,
    get_config_path,
    read_input_rows,
    StreamMapper,
    BatchMapper,
)

The submodules below are the source of truth for API details (generated from docstrings).

Mapper utilities (ytjobs.mapper)#

JSON stdin/stdout mapper helpers (StreamMapper, BatchMapper, row readers).

ytjobs.mapper.read_input_rows()[source]#

Read and parse input rows from stdin.

Reads JSON lines from stdin, parses them, and creates Row objects. Skips empty lines and logs parsing errors to stderr.

Returns:

Iterable of objects

Return type:

Iterable[object]

Example

for row in read_input_rows():

print(row.bucket, row.path)

class ytjobs.mapper.StreamMapper[source]#

Bases: object

Mapper that processes stdin one line at a time.

Reads JSON lines from stdin, processes each individually, and writes results to stdout. Define processing_func(row) that yields result dicts, then call StreamMapper().map(processing_func).

map(processing_func, redirect_processing_output=True, **kwargs)[source]#

Read stdin line-by-line, process each, write results to stdout.

Parameters:
  • processing_func (Callable[[Any], Iterator[Any]]) – Function that takes a row dict and returns Iterator of results

  • redirect_processing_output (bool) – If True, redirect stdout to stderr during processing

  • **kwargs (Any) – Additional keyword arguments to pass to processing_func

Return type:

None

class ytjobs.mapper.BatchMapper(batch_size=None)[source]#

Bases: object

Mapper that processes stdin in batches.

Reads JSON lines from stdin in batches, processes each batch, and writes results to stdout. Pass batch_size (or None to buffer all stdin), define processing_func(rows) that yields result dicts, then call mapper.map(processing_func).

Parameters:

batch_size (int | None)

__init__(batch_size=None)[source]#

Initialize batch mapper.

Parameters:

batch_size (int | None) – Number of rows per batch, or None to process all rows at once

map(processing_func, redirect_processing_output=True, **kwargs)[source]#

Read stdin in batches, process each batch, write results to stdout.

Parameters:
  • processing_func (Callable[[...], Iterator[Any]]) – Function that takes a list of rows (and optional kwargs) and returns Iterator of results

  • redirect_processing_output (bool) – If True, redirect stdout to stderr during processing

  • **kwargs (Any) – Additional keyword arguments to pass to processing_func

Return type:

None

S3 client (ytjobs.s3.client)#

Minimal boto3 wrapper for job-side list/get/put helpers.

class ytjobs.s3.client.S3Client(endpoint, access_key, secret_key, max_retries=30, timeout=360, logger=None, *, region_name=None, boto_config=None)[source]#

Bases: object

Thin boto3 S3 wrapper for job code (list, download, upload, head).

Parameters:
  • endpoint (str)

  • access_key (str)

  • secret_key (str)

  • max_retries (int)

  • timeout (int)

  • logger (Logger | None)

  • region_name (str | None)

  • boto_config (Config | None)

__init__(endpoint, access_key, secret_key, max_retries=30, timeout=360, logger=None, *, region_name=None, boto_config=None)[source]#

Build a boto3 S3 client for the given endpoint and credentials.

Parameters:
  • endpoint (str) – S3 API endpoint URL (e.g. from S3_ENDPOINT).

  • access_key (str) – Access key id for this client.

  • secret_key (str) – Secret access key for this client.

  • max_retries (int) – Boto3 retry max_attempts when boto_config is omitted.

  • timeout (int) – Read timeout in seconds when boto_config is omitted.

  • logger (Logger | None) – Optional logger; defaults to the module logger.

  • region_name (str | None) – Optional AWS region passed to boto3.client.

  • boto_config (Config | None) – If set, used as-is instead of the default BotoConfig.

static parse_s3_uri(uri)[source]#

Split s3://bucket/key/path into (bucket, key).

Parameters:

uri (str) – S3 URI with non-empty bucket and key path.

Returns:

(bucket, key) where key has no leading slash.

Raises:

ValueError – If the URI is not a valid S3 URI.

Return type:

tuple[str, str]

static create(secrets, client_type='download')[source]#

Create S3 client from secrets dictionary.

Parameters:
  • secrets (Dict[str, str]) – Dictionary containing S3 credentials. Expected keys: - S3_ENDPOINT - S3_DOWNLOAD_ACCESS_KEY - S3_DOWNLOAD_SECRET_KEY - S3_UPLOAD_ACCESS_KEY - S3_UPLOAD_SECRET_KEY

  • client_type (Literal['download', 'upload']) – download or upload (default: download).

Returns:

Configured S3Client instance.

Raises:

ValueError – If client_type is unknown or required secrets are missing.

Return type:

S3Client

list_files(bucket, prefix='', extension=None, max_files=None)[source]#

List object keys under prefix in bucket.

Parameters:
  • bucket (str) – Bucket name.

  • prefix (str) – Key prefix filter; use "" to list from the bucket root.

  • extension (str | None) – If set, keep only keys ending with .<extension>.

  • max_files (int | None) – If set, stop after this many keys (best-effort).

Returns:

List of S3 object keys (not full s3:// URIs).

Raises:

Exception – Propagates boto3/client errors after logging.

Return type:

List[str]

download(bucket, key)[source]#

Download one object body as bytes.

Parameters:
  • bucket (str) – Bucket name.

  • key (str) – Object key.

Returns:

Raw object bytes (HTTP-chunked payloads may be normalized).

Raises:

Exception – Propagates boto3/client errors after logging.

Return type:

bytes

download_by_uri(s3_uri)[source]#

Download object bytes from s3://bucket/key.

Parameters:

s3_uri (str) – Valid S3 URI.

Returns:

Same as download.

Raises:
  • ValueError – If s3_uri is invalid (via parse_s3_uri).

  • Exception – Propagates boto3/client errors from download.

Return type:

bytes

upload(data, bucket, key, content_type=None)[source]#

Upload bytes to s3://bucket/key via put_object.

Parameters:
  • data (bytes) – Object body.

  • bucket (str) – Bucket name.

  • key (str) – Object key.

  • content_type (str | None) – Optional ContentType header.

Raises:

Exception – Propagates boto3/client errors after logging.

Return type:

None

exists(bucket, key)[source]#

Return whether an object exists (head_object succeeds).

Parameters:
  • bucket (str) – Bucket name.

  • key (str) – Object key.

Returns:

True if the object exists; False on any head_object error.

Return type:

bool

Logging (ytjobs.logging)#

Logging utilities for YT jobs.

ytjobs.logging.get_logger(name=None, level=20)[source]#

Get a logger configured for human-readable text output to stderr.

Parameters:
  • name (str | None) – Logger name (default: root logger)

  • level (int) – Logging level (default: INFO)

Returns:

Configured logger instance

Return type:

Logger

ytjobs.logging.log_with_extra(logger, level, message, **kwargs)[source]#

Log a message with extra context fields.

Parameters:
  • logger (Logger) – Logger instance

  • level (int) – Log level (e.g., logging.INFO)

  • message (str) – Log message

  • **kwargs (Any) – Additional context fields to include in log

Return type:

None

ytjobs.logging.manage_output(mode='redirect')[source]#

Decorator factory that either suppresses output or redirects stdout to stderr.

Parameters:

mode"suppress" for full silence, "redirect" to send stdout to stderr (keeps job stdout clean for JSON lines).

Returns:

A decorator, e.g. @manage_output(mode="redirect") above a function.

ytjobs.logging.redirect_stdout_to_stderr()[source]#

Context manager that redirects stdout to stderr.

This is useful for YTsaurus mappers where you need clean JSON on stdout, but processing functions might print debug messages.

Usage:
with redirect_stdout_to_stderr():

print(“This goes to stderr”) # Won’t corrupt stdout some_function_that_prints()

ytjobs.logging.suppress_all_output()[source]#

Context manager that suppresses ALL output: stdout, stderr, and warnings.

This is useful when you need complete silence from noisy libraries like OpenCV, Ultralytics YOLO, TensorFlow, Ceres Solver, etc.

Usage:
with suppress_all_output():

# All prints, warnings, and library output are suppressed model = YOLO(‘yolov8n.pt’) # No output results = model(image) # No progress bars cv2.imread(path) # No warnings

Job config (ytjobs.config)#

Resolve JOB_CONFIG_PATH to the staged config.yaml inside sandboxes.

ytjobs.config.get_config_path()[source]#

Get the path to the job configuration file.

Returns:

Path object pointing to the config file

Raises:

ValueError – If JOB_CONFIG_PATH environment variable is not set

Return type:

Path

Cypress checkpoints (ytjobs.checkpoint)#

Checkpoint utilities for YT jobs.

ytjobs.checkpoint.get_checkpoint_path(checkpoint_name, base_path=None)[source]#

Get full YT path for a checkpoint.

Parameters:
  • checkpoint_name (str) – Name of checkpoint file

  • base_path (str | None) – Base YT path (defaults to user’s checkpoints folder)

Returns:

Full YT path to checkpoint

Return type:

str

ytjobs.checkpoint.save_checkpoint(data, checkpoint_name, metadata=None, base_path=None, logger=None)[source]#

Save checkpoint to YTsaurus Cypress file system.

Parameters:
  • data (bytes) – Checkpoint data as bytes

  • checkpoint_name (str) – Name of checkpoint file

  • metadata (Dict[str, Any] | None) – Optional metadata dictionary

  • base_path (str | None) – Base YT path for checkpoints

  • logger (Logger | None) – Optional logger

Returns:

Full YT path to saved checkpoint

Return type:

str

ytjobs.checkpoint.load_checkpoint(checkpoint_name, base_path=None, logger=None)[source]#

Load checkpoint and metadata from YTsaurus Cypress.

Parameters:
  • checkpoint_name (str) – Name of checkpoint file

  • base_path (str | None) – Base YT path for checkpoints

  • logger (Logger | None) – Optional logger

Returns:

Tuple of (checkpoint_data, metadata_dict) or (None, None) if not found

Return type:

tuple[bytes | None, Dict[str, Any] | None]

ytjobs.checkpoint.list_checkpoints(base_path=None, pattern=None, logger=None)[source]#

List available checkpoints in YTsaurus.

Parameters:
  • base_path (str | None) – Base YT path for checkpoints

  • pattern (str | None) – Optional pattern to filter checkpoint names

  • logger (Logger | None) – Optional logger

Returns:

List of checkpoint names (without .meta files)

Return type:

list[str]

ytjobs.checkpoint.delete_checkpoint(checkpoint_name, base_path=None, logger=None)[source]#

Delete checkpoint and its metadata from YTsaurus.

Parameters:
  • checkpoint_name (str) – Name of checkpoint file

  • base_path (str | None) – Base YT path for checkpoints

  • logger (Logger | None) – Optional logger

Returns:

True if deleted successfully, False otherwise

Return type:

bool

ytjobs.checkpoint.save_processing_state(state, state_name='processing_state', base_path=None, logger=None)[source]#

Save processing state (e.g., processed video list, iteration count) to checkpoint.

Convenience function for saving JSON-serializable state dictionaries.

Parameters:
  • state (Dict[str, Any]) – Dictionary with processing state

  • state_name (str) – Name for the state checkpoint

  • base_path (str | None) – Base YT path for checkpoints

  • logger (Logger | None) – Optional logger

Returns:

Full YT path to saved checkpoint

Return type:

str

ytjobs.checkpoint.load_processing_state(state_name='processing_state', base_path=None, logger=None)[source]#

Load processing state from checkpoint.

Convenience function for loading JSON-serializable state dictionaries.

Parameters:
  • state_name (str) – Name of the state checkpoint

  • base_path (str | None) – Base YT path for checkpoints

  • logger (Logger | None) – Optional logger

Returns:

State dictionary or None if not found

Return type:

Dict[str, Any] | None