API reference#

yt_framework symbols below are generated from Python docstrings. Job-side entrypoints (ytjobs) have a separate page.

Tip

Navigation

Use the sidebar headings for modules. Anything not listed is still in the source tree; use help(module) locally.

Layout#

  • Core — pipeline, stage, registry, discovery, self.deps injection

  • ContractsStageDependencies and StageContext shared by core and operation drivers (prefer yt_framework.contracts; operations.stage_contracts is a thin re-export)

  • Operations — map, vanilla, map-reduce/reduce, S3 helpers, table helpers, checkpoint upload, sort, tokenizer artifact wiring

  • Typed jobsStageBootstrapTypedJob

  • YTyt.support (shared runtime helpers), yt.clients (public client API and mixins), and yt entry (factory, package exports)

  • Utils — env files, logging setup, ignore patterns

  • Packaging — upload helpers shared by operations

  • ytjobsJob-side reference

Note

Narrative docs

How-to guides under docs/operations, docs/configuration, and docs/advanced spell out YAML and stage patterns; this page is the raw API surface.

Core Modules#

Pipeline#

Pipeline base classes, CLI entry point (main), and code upload orchestration.

Subclass BasePipeline, implement setup() to register stages with StageRegistry, then run via Pipeline.main() from pipeline.py. DefaultPipeline auto-discovers stages under stages/*/stage.py.

class yt_framework.core.pipeline.BasePipeline(config, pipeline_dir, log_level=20)[source]#

Bases: object

Base class for all pipelines.

Provides common functionality: - CLI entry point via main() class method - Code upload to YT build folder - YT client initialization - Default stage execution loop

Subclasses must: - In setup(), create StageRegistry and register stages via set_stage_registry()

Subclasses may override: - setup(): Register stages and initialize pipeline-specific clients (S3, etc.) - run(): Custom execution flow (rare, most use default)

Parameters:
  • config (DictConfig)

  • pipeline_dir (Path)

  • log_level (int)

__init__(config, pipeline_dir, log_level=20)[source]#

Initialize the base pipeline.

Parameters:
  • config (DictConfig) – Configuration object (OmegaConf DictConfig)

  • pipeline_dir (Path) – Path to pipeline directory

  • log_level (int) – Logging level

Returns:

None

Raises:

ValueError – If pipeline directory does not exist.

Return type:

None

setup()[source]#

Run pipeline-specific initialization.

Override this method in subclasses to: 1. Register stages using StageRegistry and set_stage_registry() 2. Initialize custom clients (e.g., S3 client, database connections, etc.)

This method is called automatically after base initialization.

Returns:

None

Return type:

None

set_stage_registry(registry)[source]#

Set the stage registry for this pipeline.

Parameters:

registry (StageRegistry) – StageRegistry instance with registered stages

Returns:

None

Return type:

None

create_stage_dependencies()[source]#

Create stage dependencies for injection.

This method creates a dependency container with only what stages need, following the Interface Segregation Principle.

Returns:

PipelineStageDependencies with yt_client, pipeline_config, configs_dir

Return type:

PipelineStageDependencies

upload_code(build_folder=None)[source]#

Upload code to YT build folder.

Only uploads code if any enabled stages need code execution on YT. If no stages need code execution, this method does nothing.

Parameters:

build_folder (str | None) – Optional YT build folder path. If None, uses config.pipeline.build_folder

Returns:

None

Raises:

ValueError – If build_folder is required but not provided in config.

Return type:

None

run()[source]#

Run the pipeline by executing enabled stages.

Default implementation: 1. Upload code to YT (only if stages need code execution) 2. Get enabled stages from config 3. Execute stages in order using stage registry 4. Pass context between stages

Override this method only if you need completely custom execution flow.

Returns:

None

Raises:
  • ValueError – If no enabled_stages found in config or unknown stage name.

  • AttributeError – If stage registry is not set in setup().

Return type:

None

classmethod main(argv=None)[source]#

CLI entry point for the pipeline.

Handles: - Argument parsing (–config option) - Config file loading - Pipeline instantiation - Error handling and exit codes

Parameters:

argv (list[str] | None) – Optional command-line arguments. If None, uses sys.argv.

Returns:

None (exits with code 0 on success, 1 on failure)

Return type:

None

Usage:

python pipeline.py python pipeline.py –config configs/custom.yaml

class yt_framework.core.pipeline.DefaultPipeline(config, pipeline_dir, log_level=20)[source]#

Bases: BasePipeline

Pipeline with automatic stage discovery.

Discovers BaseStage subclasses from stages/<name>/stage.py and registers them. Run with DefaultPipeline.main() from pipeline.py. Which stages execute is still controlled by enabled_stages in config.

Parameters:
  • config (DictConfig)

  • pipeline_dir (Path)

  • log_level (int)

setup()[source]#

Automatically discover and register stages from the stages directory.

Looks for stage.py under each stages/<name>/ folder and registers every BaseStage subclass found.

Returns:

None

Return type:

None

yt_framework.core.pipeline.normalize_upload_modules(raw)[source]#

Normalize upload_modules config: accept list, tuple, or single string.

Parameters:

raw (object)

Return type:

list[str]

yt_framework.core.pipeline.normalize_upload_paths(raw)[source]#

Normalize upload_paths config: must be a list of {source, target?} mappings.

Parameters:

raw (object)

Return type:

list[dict[str, str]]

Pipeline config helpers#

OmegaConf normalization helpers for pipeline and upload configuration.

yt_framework.core.pipeline_config.normalize_upload_modules(raw)[source]#

Normalize upload_modules config: accept list, tuple, or single string.

Parameters:

raw (object)

Return type:

list[str]

yt_framework.core.pipeline_config.normalize_upload_paths(raw)[source]#

Normalize upload_paths config: must be a list of {source, target?} mappings.

Parameters:

raw (object)

Return type:

list[dict[str, str]]

yt_framework.core.pipeline_config.yt_mode_from_pipeline_config(raw)[source]#

Coerce pipeline.mode to a literal prod/dev or None (caller may default).

Parameters:

raw (object)

Return type:

Literal[‘prod’, ‘dev’] | None

yt_framework.core.pipeline_config.pickling_dict_from_config(pickling_cfg)[source]#

Return a plain dict for create_yt_client(..., pickling=...).

Parameters:

pickling_cfg (object)

Return type:

dict[str, Any]

yt_framework.core.pipeline_config.enabled_stage_names(enabled)[source]#

Normalize stages.enabled_stages to a list of directory names.

Parameters:

enabled (object)

Return type:

list[str]

Pipeline CLI helpers#

CLI bootstrap for BasePipeline subclasses.

yt_framework.core.pipeline_cli.build_pipeline_cli_parser(cls_name)[source]#

Build the argparse parser used by main().

Parameters:

cls_name (str)

Return type:

ArgumentParser

yt_framework.core.pipeline_cli.resolve_pipeline_config_path(pipeline_dir, config_arg)[source]#

Resolve --config to an absolute path (relative paths are under pipeline_dir).

Parameters:
  • pipeline_dir (Path)

  • config_arg (str)

Return type:

Path

yt_framework.core.pipeline_cli.read_pipeline_mode_for_header(config_path, logger)[source]#

Return pipeline.mode from the config file for log banners; default dev on errors.

Parameters:
Return type:

str

yt_framework.core.pipeline_cli.load_dict_config_or_exit(config_path, logger)[source]#

Load YAML as a DictConfig or log and sys.exit(1).

Parameters:
Return type:

DictConfig

yt_framework.core.pipeline_cli.run_pipeline_instance_or_exit(cls, config, pipeline_dir, logger)[source]#

Instantiate cls, run the pipeline, then sys.exit(0) or sys.exit(1).

Parameters:
Return type:

None

Stage#

BaseStage — pipeline authors subclass this and implement run().

The framework derives the stage name from the stages/<name>/ directory.

class yt_framework.core.stage.BaseStage(deps, logger)[source]#

Bases: ABC

Abstract base class for pipeline stages.

Stage name and config are automatically detected from the directory. Directory structure: stages/<stage_name>/stage.py Config file: stages/<stage_name>/config.yaml

Each stage must: - Implement __init__ to receive deps and logger - Implement run method to execute stage logic

Example

class MyStage(BaseStage):
def __init__(self, deps, logger):

super().__init__(deps, logger) # self.config is automatically loaded from stages/<stage_name>/config.yaml # Access dependencies via self.deps.yt_client, self.deps.pipeline_config

def run(self, debug: DebugContext) -> DebugContext:

# Stage logic here # Note: ‘debug’ here is the shared data dict, NOT dependencies return {“result”: “value”}

Parameters:
__init__(deps, logger)[source]#

Initialize stage with injected dependencies.

Stage name and config are automatically detected from the directory containing stage.py.

Parameters:
  • deps (StageDependencies) – Injected dependencies (yt_client, pipeline_config, configs_dir)

  • logger (Logger) – Logger instance for stage logging

Returns:

None

Raises:
  • FileNotFoundError – If config.yaml file is not found in stage directory.

  • TypeError – If config.yaml does not contain a dictionary (wrong node type).

Return type:

None

property stage_dir: Path#

Path to the stage directory.

Returns:

Absolute path to the stage directory (stages/<stage_name>/).

Return type:

Path

abstractmethod run(debug)[source]#

Execute the stage.

Parameters:

debug (dict[str, Any]) – Shared context dictionary from previous stages. Contains results from earlier stages. Can be empty dict for the first stage.

Returns:

Dictionary with stage results to be merged into context

and passed to the next stage.

Return type:

DebugContext

property context: StageContext#

Stage context containing all stage-related information.

Returns:

Dataclass instance with name, config, stage_dir,

logger, and deps attributes.

Return type:

StageContext

Registry#

Mutable registry of BaseStage subclasses keyed by stage name.

class yt_framework.core.registry.StageRegistry[source]#

Bases: object

Builder for registering pipeline stages.

Automatically detects stage names from directory structure.

Example

registry = StageRegistry() registry.add_stage(CreateTableStage) registry.add_stage(RunMapStage) pipeline.set_stage_registry(registry)

__init__()[source]#

Initialize empty stage registry.

Return type:

None

add_stage(stage_class)[source]#

Register a stage class.

Stage name is automatically detected from the directory containing stage.py.

Parameters:

stage_class (type[BaseStage]) – Stage class to register

Returns:

Self for method chaining

Return type:

StageRegistry

get_stage(stage_name)[source]#

Get stage class by name.

Parameters:

stage_name (str) – Name of the stage to retrieve (matches directory name).

Returns:

The stage class registered with the given name.

Return type:

Type[BaseStage]

Raises:

KeyError – If no stage is registered with the given name.

Example

>>> registry = StageRegistry()
>>> registry.add_stage(MyStage)
>>> stage_class = registry.get_stage("my_stage")
has_stage(stage_name)[source]#

Check if stage is registered.

Parameters:

stage_name (str) – Name of the stage to check.

Returns:

True if the stage is registered, False otherwise.

Return type:

bool

get_all_stages()[source]#

Get all registered stages.

Returns:

Dictionary mapping stage names to stage classes.

Returns a copy to prevent external modification.

Return type:

Dict[str, Type[BaseStage]]

Discovery#

Filesystem scan that imports stages/*/stage.py and collects BaseStage types.

yt_framework.core.discovery.discover_stages(pipeline_dir, logger)[source]#

Automatically discover all stage classes from the stages directory tree.

Searches for stage.py under each stages child directory and imports all BaseStage subclasses found.

Expected layout: pipeline_dir/stages/<stage_name>/stage.py with a BaseStage subclass in each module.

Parameters:
  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

Returns:

List of discovered stage classes

Return type:

list[type[BaseStage]]

Core injection (self.deps)#

Concrete PipelineStageDependencies for stage run() injection.

The StageDependencies protocol and StageContext live under yt_framework.contracts so core and operation drivers share types without operations importing core.

class yt_framework.core.dependencies.PipelineStageDependencies(yt_client, pipeline_config, configs_dir)[source]#

Bases: object

Default implementation of StageDependencies.

Used by BasePipeline to inject dependencies into stages. This class is instantiated by the pipeline and passed to each stage.

Parameters:
  • yt_client (BaseYTClient)

  • pipeline_config (DictConfig)

  • configs_dir (Path)

yt_client#

YT client instance for performing operations on YTsaurus cluster or local filesystem (dev mode).

Type:

yt_framework.yt.clients.client_base.BaseYTClient

pipeline_config#

Pipeline-level configuration containing mode, build_folder, and other pipeline-wide settings.

Type:

omegaconf.dictconfig.DictConfig

configs_dir#

Path to directory containing secrets.env and configuration files.

Type:

pathlib.Path

yt_client: BaseYTClient#
pipeline_config: DictConfig#
configs_dir: Path#

Contracts#

Stage injection types#

Stage execution contracts shared by operation drivers and core orchestration.

StageDependencies and StageContext live in yt_framework.contracts so yt_framework.core can depend on this package instead of reaching through yt_framework.operations only for types, while operation drivers stay free of core imports (see tach.toml and docs/architecture/layers.md).

class yt_framework.contracts.stage.StageDependencies(*args, **kwargs)[source]#

Bases: Protocol

Protocol defining what dependencies stages need.

This is NOT the same as the context parameter in run() methods: - StageDependencies: injected services and config (yt_client, etc.) - context parameter: shared data dict passed between stages

property yt_client: BaseYTClient#

YT client for operations (dev or prod implementation).

property pipeline_config: DictConfig#

Pipeline-level configuration (build_folder, mode, secrets paths).

property configs_dir: Path#

Directory containing secrets.env and other config files.

class yt_framework.contracts.stage.StageContext(name, config, stage_dir, logger, deps)[source]#

Bases: object

Stage context: name, config, paths, logger, and injected dependencies.

Parameters:
name: str#
config: DictConfig#
stage_dir: Path#
logger: Logger#
deps: StageDependencies#
fork(name=None, stage_dir=None)[source]#

Return a shallow copy with selective name / stage_dir overrides.

Parameters:
  • name (str | None)

  • stage_dir (Path | None)

Return type:

StageContext

Import path note#

yt_framework.operations.stage_contracts re-exports :class:~yt_framework.contracts.stage.StageContext and :class:~yt_framework.contracts.stage.StageDependencies only. Prefer from yt_framework.contracts import .

Operations#

Map Operations#

Driver helpers to package src/mapper.py and submit YT map operations.

class yt_framework.operations.command_ops.map.MapOperationData(mapper_path, dependencies, environment, docker_auth, command=None)[source]#

Bases: object

Data container for map operation configuration.

Parameters:
mapper_path#

Path to mapper.py script in YT (or bash wrapper if tar mode).

Type:

str

dependencies#

List of (yt_path, local_path) tuples for files to upload.

Type:

list[tuple[str, str]]

environment#

Environment variables dictionary (secrets only).

Type:

dict[str, str]

docker_auth#

Optional Docker authentication dictionary for private registries.

Type:

dict[str, str] | None

command#

Optional command to execute (used in tar archive mode).

Type:

str | None

mapper_path: str#
dependencies: list[tuple[str, str]]#
environment: dict[str, str]#
docker_auth: dict[str, str] | None#
command: str | None = None#
yt_framework.operations.command_ops.map.run_map(context, operation_config, output_schema=None, mapper=None, job=None)[source]#

Run YT map operation and wait for completion.

All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger, stage_dir)

  • operation_config (DictConfig) – Operation-specific config (from client.operations.map). Optional key append (bool): append mapper output to an existing output table.

  • output_schema (TableSchema | None) – Optional YT TableSchema for typed output table

  • mapper (object | None) – Optional mapper leg (legacy name). When omitted, framework uses command wrapper. Can be a TypedJob instance or command string.

  • job (object | None) – Preferred mapper leg alias. Can be a TypedJob instance or command string.

Returns:

True if successful, False otherwise

Return type:

bool

Vanilla Operations#

Driver helpers to package src/vanilla.py and submit YT vanilla operations.

class yt_framework.operations.command_ops.vanilla.VanillaOperationData(script_path, dependencies, environment, docker_auth, command=None)[source]#

Bases: object

Data container for vanilla operation configuration.

Parameters:
script_path#

Path to vanilla.py script in YT (or placeholder if tar mode).

Type:

str

dependencies#

List of (yt_path, local_path) tuples for files to upload.

Type:

list[tuple[str, str]]

environment#

Environment variables dictionary (secrets only).

Type:

dict[str, str]

docker_auth#

Optional Docker authentication dictionary for private registries.

Type:

dict[str, str] | None

command#

Optional command to execute (used in tar archive mode).

Type:

str | None

script_path: str#
dependencies: list[tuple[str, str]]#
environment: dict[str, str]#
docker_auth: dict[str, str] | None#
command: str | None = None#
yt_framework.operations.command_ops.vanilla.run_vanilla(context, operation_config, job=None)[source]#

Run YT vanilla operation and wait for completion.

All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.vanilla. The task name is automatically set to the stage name.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger, stage_dir, name)

  • operation_config (DictConfig) – Operation-specific config (from client.operations.vanilla)

  • job (str | None) – Preferred command alias. When omitted, framework wrapper command is used.

Returns:

True if successful, False otherwise

Return type:

bool

Map-reduce and reduce#

Submit map-reduce and reduce-only YT operations (TypedJob or command strings).

Builds archives plus optional file dependencies; cluster credentials still come from configs/secrets.env like other operations.

yt_framework.operations.command_ops.map_reduce.run_map_reduce(context, operation_config, mapper=None, reducer=None, output_schema=None, map_job=None, reduce_job=None)[source]#

Run a YT map-reduce operation and wait for completion.

Pass mapper and reducer either both as TypedJob instances or both as command strings (JSON stdin/stdout). Mixing kinds raises ValueError.

Set operation_config.tar_command_bootstrap: true to wrap string legs with the same tar -xzf source.tar.gz + wrapper pattern as map operations (requires matching wrappers in the uploaded tarball; see docs).

Parameters:
  • context (StageContext) – Stage context (deps, logger, stage_dir, config).

  • operation_config (DictConfig) – client.operations.map_reduce config (input_table, output_table, reduce_by, sort_by, resources, file_paths, etc.).

  • mapper (object) – Deprecated — use map_job instead.

  • reducer (object) – Deprecated — use reduce_job instead.

  • output_schema (TableSchema | None) – Optional YT TableSchema for output table.

  • map_job (object) – Mapper leg (TypedJob instance or command string).

  • reduce_job (object) – Reducer leg (TypedJob instance or command string).

Returns:

True if the operation completed successfully.

Return type:

bool

yt_framework.operations.command_ops.map_reduce.run_reduce(context, operation_config, reducer=None, output_schema=None, job=None)[source]#

Run a YT reduce-only operation and wait for completion.

Pass reducer as a TypedJob or a command string. With operation_config.tar_command_bootstrap: true, string reducers get the same tar extract + wrapper bootstrap as map (see docs).

Parameters:
  • context (StageContext) – Stage context.

  • operation_config (DictConfig) – client.operations.reduce config.

  • reducer (object) – Reducer leg (legacy name).

  • output_schema (TableSchema | None) – Optional output table schema.

  • job (object) – Preferred reducer leg alias.

Returns:

True if the operation completed successfully.

Return type:

bool

YQL Operations#

YQL operations are *_request methods on the YT client, each taking a frozen request type from yt_framework.yt.clients.yql.yql_requests (for example JoinTablesRequest). See :doc:../operations/yql.

Note

YQL Operations Location

YQL operations are implemented as methods on BaseYTClient and its subclasses (YTDevClient and YTProdClient). They are not in a separate operations module.

S3 Operations#

Driver-side helpers to list S3 keys and persist paths into Cypress tables.

yt_framework.operations.s3.list_s3_files(s3_client, bucket, prefix, logger, extension=None, max_files=None)[source]#

List files from S3 bucket with optional filtering.

Parameters:
  • s3_client (S3Client) – S3 client instance

  • bucket (str) – S3 bucket name

  • prefix (str) – S3 prefix path

  • logger (Logger) – Logger instance

  • extension (str | None) – Optional file extension filter (e.g., ‘mp4’)

  • max_files (int | None) – Optional maximum number of files to return

Returns:

List of S3 file paths

Return type:

list[str]

yt_framework.operations.s3.save_s3_paths_to_table(yt_client, bucket, paths, output_table, logger)[source]#

Save S3 file paths to YT table as bucket and path columns.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • bucket (str) – S3 bucket name

  • paths (list[str]) – List of S3 file paths

  • output_table (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

Table operations#

Helpers to count rows, load rows into memory, and export tables to JSONL.

For low-level access, use BaseYTClient methods (read_table, row_count, etc.) directly on self.deps.yt_client. These functions add logging and convenience.

yt_framework.operations.table.get_row_count(yt_client, table_path, logger)[source]#

Get number of rows in a YT table.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

Number of rows in table

Return type:

int

yt_framework.operations.table.read_table(yt_client, table_path, logger)[source]#

Read rows from a YT table.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

List of rows as dictionaries

Return type:

list[dict[str, Any]]

yt_framework.operations.table.download_table(yt_client, table_path, output_file, logger)[source]#

Download YT table to local JSONL file.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • output_file (Path) – Local file path for output

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

Checkpoint Operations#

Upload or reuse single-file model checkpoints and wire them into operation specs.

yt_framework.operations.checkpoint.init_checkpoint_directory(context, checkpoint_config)[source]#

Initialize checkpoint directory in YTsaurus if it doesn’t exist.

Uses checkpoint_base from checkpoint_config. Also uploads local checkpoint if specified. Validates that required checkpoint exists in YT before proceeding.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger)

  • checkpoint_config (DictConfig) – Checkpoint-specific config (from client.operations.<op>.checkpoint)

Returns:

None

Raises:
Return type:

None

Sort operations#

Submit YT sort jobs using the same (context, operation_config) pattern as map.

yt_framework.operations.command_ops.sort.run_sort(context, operation_config)[source]#

Run a YT sort operation and wait for completion.

Parameters:
  • context (StageContext) – Stage context (deps, logger, stage_dir, config).

  • operation_config (DictConfig) –

    Sort-specific config. Required keys:

    • input_table — table to sort in-place.

    • sort_by — list of column names.

    Optional keys mirror run_map_reduce:

    • resources.pool / resources.pool_tree — scheduler pool.

    • resources.memory_limit_gb, resources.cpu_limit — resource hints.

Returns:

True if the sort completed successfully.

Return type:

bool

Example config (client.operations.sort in stage config.yaml):

sort:
  sort_by: [shard_order, mock_field]
  resources:
    pool: my_pool

Then in the stage:

from yt_framework.operations.command_ops.sort import run_sort
sort_cfg = OmegaConf.merge(
    self.config.client.operations.sort,
    {"input_table": intermediate_table},
)
run_sort(context=self.context, operation_config=sort_cfg)

Tokenizer artifact#

Pack tokenizer/processor tarballs, upload to Cypress, and expose sandbox env vars.

yt_framework.operations._internal.tokenizer_artifact.resolve_tokenizer_artifact_name(stage_config, tokenizer_artifact_config)[source]#

Resolve logical tokenizer artifact name from config.

Parameters:
  • stage_config (DictConfig)

  • tokenizer_artifact_config (DictConfig)

Return type:

str | None

yt_framework.operations._internal.tokenizer_artifact.resolve_tokenizer_archive_name(artifact_name)[source]#

Convert logical artifact name to mounted tar filename.

Parameters:

artifact_name (str)

Return type:

str

yt_framework.operations._internal.tokenizer_artifact.resolve_tokenizer_artifact_yt_path(stage_config, tokenizer_artifact_config)[source]#

Resolve full YT file path for tokenizer artifact tarball.

Parameters:
  • stage_config (DictConfig)

  • tokenizer_artifact_config (DictConfig)

Return type:

str | None

yt_framework.operations._internal.tokenizer_artifact.tokenizer_artifact_name_or_raise(stage_config, tokenizer_artifact_config)[source]#
Parameters:
  • stage_config (DictConfig)

  • tokenizer_artifact_config (DictConfig)

Return type:

str

yt_framework.operations._internal.tokenizer_artifact.verify_tokenizer_path_or_raise(context, yt_artifact_path)[source]#
Parameters:
Return type:

None

yt_framework.operations._internal.tokenizer_artifact.init_tokenizer_artifact_directory(context, tokenizer_artifact_config)[source]#

Initialize tokenizer artifact in YT (if configured).

Behavior: - creates artifact_base if needed; - uploads local artifact from local_artifact_path if provided and missing in YT; - validates artifact presence in YT.

Parameters:
  • context (StageContext)

  • tokenizer_artifact_config (DictConfig)

Return type:

None

Typed jobs#

TypedJob helpers for YTsaurus execution.

class yt_framework.typed_jobs.StageBootstrapTypedJob[source]#

Bases: TypedJob

Base class for TypedJob legs with worker-side bootstrap.

On unpickle, extracts source.tar.gz when needed, prepends the archive root and stages/<stage>/src to sys.path, and sets JOB_CONFIG_PATH when the stage config file exists. Uses __getstate__ / __setstate__ so bootstrap runs on the worker before __call__.

Utilities#

Environment#

Load KEY=value files such as configs/secrets.env into plain dicts.

yt_framework.utils.env.load_env_file(env_path)[source]#

Load environment variables from a .env file.

File format: KEY=VALUE (one per line, # for comments) Missing file is optional and returns empty dict.

Parameters:

env_path (Path) – Path to the .env file

Returns:

Dictionary of loaded environment variables (key -> value). Returns empty dict if file doesn’t exist or cannot be read.

Warns:
  • UserWarning – If the file exists but cannot be read or parsed (non-fatal).

  • Missing file is silent (returns empty dict)

Return type:

dict[str, str]

Example

>>> env_vars = load_env_file(Path("configs/secrets.env"))
>>> print(env_vars.get("YT_TOKEN"))
yt_framework.utils.env.load_secrets(secrets_dir, env_file='secrets.env')[source]#

Load secrets from secrets.env file in the specified directory.

Parameters:
  • secrets_dir (Path) – Directory containing the secrets.env file

  • env_file (str) – Name of the environment file (default: “secrets.env”)

Returns:

Dictionary of loaded secrets (key -> value). Returns empty dict if file doesn’t exist or cannot be read.

Warns:

UserWarning – If the secrets file exists but cannot be read or parsed (non-fatal).

Return type:

dict[str, str]

Example

>>> secrets = load_secrets(Path("configs"))
>>> yt_token = secrets.get("YT_TOKEN")

Logging#

Colored console formatter and helpers used by pipeline startup.

class yt_framework.utils.logging.ColoredFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#

Bases: Formatter

Custom formatter with colors for different log levels.

COLORS: ClassVar[dict[str, str]] = {'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'WARNING': '\x1b[33m'}#
RESET = '\x1b[0m'#
ICONS: ClassVar[dict[str, str]] = {'CRITICAL': '🚨', 'DEBUG': '🔍', 'ERROR': '✗', 'WARNING': '⚠️'}#
format(record)[source]#

Format log record with colors and icons.

Parameters:

record (LogRecord) – Log record to format.

Returns:

Formatted log message with ANSI color codes and icons.

Return type:

str

yt_framework.utils.logging.setup_logging(level=20, name=None, use_colors=True)[source]#

Configure logging with consistent formatting.

Parameters:
  • level (int) – Logging level (default: INFO)

  • name (str | None) – Logger name (default: root logger)

  • use_colors (bool) – Whether to use colored output

Returns:

Configured logger instance

Return type:

Logger

yt_framework.utils.logging.log_header(logger, title, context=None)[source]#

Log a compact section header in format: [Title] context.

Parameters:
  • logger (Logger) – Logger instance

  • title (str) – Section title (will be wrapped in brackets)

  • context (str | None) – Optional additional context information

Return type:

None

yt_framework.utils.logging.log_operation(logger, message)[source]#

Log an operation start message with → prefix.

Parameters:
  • logger (Logger) – Logger instance

  • message (str) – Operation description

Return type:

None

yt_framework.utils.logging.log_success(logger, message)[source]#

Log a success/completion message with ✓ prefix.

Parameters:
  • logger (Logger) – Logger instance

  • message (str) – Success message

Return type:

None

yt_framework.utils.logging.log_config(logger, config_dict, title='Configuration')[source]#

Log configuration in a readable format.

Automatically masks sensitive values (keys containing ‘secret’ or ‘key’) by showing only the last 4 characters.

Parameters:
  • logger (Logger) – Logger instance

  • config_dict (dict[str, object]) – Configuration dictionary to log

  • title (str) – Title for the configuration section

Returns:

None

Return type:

None

Example

>>> config = {"api_key": "secret12345", "mode": "dev"}
>>> log_config(logger, config)
[Configuration]
    api_key: ***2345
    mode: dev

Ignore Patterns#

.ytignore parsing (gitignore-style) for upload tarballs.

class yt_framework.utils.ignore.YTIgnorePattern(pattern, base_dir, is_negation=False)[source]#

Bases: object

Represents a single .ytignore pattern.

This class compiles a pattern string into a regex for efficient matching against file paths. It supports the same syntax as .gitignore files.

Pattern Types: - Simple wildcards: *.pyc, test? - Character classes: *.py[cod] - Recursive wildcards: **/*.log - Directory patterns: build/ (trailing slash) - Rooted patterns: /config (leading slash, root-only) - Negation patterns: !important.py (un-ignore)

Examples

Simple wildcard:

>>> from pathlib import Path
>>> pattern = YTIgnorePattern("*.pyc", Path("/project"))
>>> pattern.matches(Path("/project/test.pyc"))
True
>>> pattern.matches(Path("/project/test.py"))
False

Directory pattern:

>>> pattern = YTIgnorePattern("__pycache__/", Path("/project"))
>>> pattern.matches(Path("/project/__pycache__/module.pyc"))
True
>>> pattern.matches(Path("/project/src/__pycache__/module.pyc"))
True

Recursive wildcard:

>>> pattern = YTIgnorePattern("**/*.log", Path("/project"))
>>> pattern.matches(Path("/project/src/debug.log"))
True
>>> pattern.matches(Path("/project/debug.log"))
False  # Requires at least one directory level

Rooted pattern:

>>> pattern = YTIgnorePattern("/build", Path("/project"))
>>> pattern.matches(Path("/project/build"))
True
>>> pattern.matches(Path("/project/src/build"))
False  # Only matches at root
Parameters:
__init__(pattern, base_dir, is_negation=False)[source]#

Initialize a pattern.

Parameters:
  • pattern (str) – The pattern string (without leading !)

  • base_dir (Path) – Directory where the .ytignore file is located

  • is_negation (bool) – Whether this is a negation pattern (!)

Return type:

None

matches(file_path)[source]#

Check if a file path matches this pattern.

Parameters:

file_path (Path) – Absolute or relative file path to check

Returns:

True if the path matches

Return type:

bool

class yt_framework.utils.ignore.YTIgnoreMatcher(base_dir)[source]#

Bases: object

Matches file paths against .ytignore patterns.

This class loads .ytignore files from the specified directory and all parent directories up to the filesystem root, then provides methods to check if files should be ignored based on the loaded patterns.

Patterns follow .gitignore syntax: - *.pyc - matches any file ending in .pyc - __pycache__/ - matches any __pycache__ directory - **/test_*.py - matches test_*.py in any subdirectory (but not root) - !important.py - negates a previous ignore pattern - /build - matches only at root level (not in subdirectories) - src/*.log - matches .log files in src/ but not src/subdir/

Examples

Basic usage:

>>> from pathlib import Path
>>> matcher = YTIgnoreMatcher(Path("/project"))
>>> matcher.should_ignore(Path("/project/test.pyc"))
True
>>> matcher.should_ignore(Path("/project/main.py"))
False

With custom .ytignore file:

>>> # Create a .ytignore file
>>> project_dir = Path("/tmp/myproject")
>>> project_dir.mkdir(exist_ok=True)
>>> (project_dir / ".ytignore").write_text("*.pyc\\n__pycache__/\\n")
>>> matcher = YTIgnoreMatcher(project_dir)
>>> matcher.should_ignore(project_dir / "module.pyc")
True
>>> matcher.should_ignore(project_dir / "module.py")
False

Negation patterns:

>>> # Ignore all .log files except important.log
>>> (project_dir / ".ytignore").write_text("*.log\\n!important.log\\n")
>>> matcher = YTIgnoreMatcher(project_dir)
>>> matcher.should_ignore(project_dir / "debug.log")
True
>>> matcher.should_ignore(project_dir / "important.log")
False
Parameters:

base_dir (Path)

__init__(base_dir)[source]#

Initialize matcher.

Parameters:

base_dir (Path) – Base directory for file matching

Return type:

None

should_ignore(file_path)[source]#

Check if a file should be ignored.

Parameters:

file_path (Path) – Path to the file (can be absolute or relative)

Returns:

True if the file should be ignored

Return type:

bool

yt_framework.utils.ignore.should_ignore_file(file_path, base_dir)[source]#

Return whether file_path should be ignored under base_dir.

This is a shorthand for creating a YTIgnoreMatcher and checking a single file. For checking multiple files, create a YTIgnoreMatcher instance directly to avoid reloading .ytignore files for each check.

Parameters:
  • file_path (Path) – Path to the file to check

  • base_dir (Path) – Base directory for .ytignore lookup

Returns:

True if the file should be ignored

Return type:

bool

Examples

>>> from pathlib import Path
>>> # Assuming .ytignore contains "*.pyc"
>>> should_ignore_file(Path("/project/test.pyc"), Path("/project"))
True
>>> should_ignore_file(Path("/project/test.py"), Path("/project"))
False

Packaging and operation helpers (advanced)#

These modules support code upload, file dependencies, and environment wiring. Most pipelines rely on defaults; use the API when extending the framework.

Upload and local build#

Upload pipeline: wrappers, local build orchestration, and archive upload.

yt_framework.operations.upload.build_code_locally(build_dir, pipeline_dir, logger, create_wrappers=False, upload_modules=None, upload_paths=None)[source]#

Build all code in a local build directory.

Copies ytjobs package, optional custom modules/paths, and all stages’ code to the build directory, preserving the same structure as would be uploaded to YT.

Parameters:
  • build_dir (Path) – Local build directory path

  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

  • create_wrappers (bool) – If True, create wrapper scripts for all stages

  • upload_modules (list[str] | None) – Optional list of module names to upload

  • upload_paths (list[dict[str, str]] | None) – Optional list of {source, target?} for local paths

Returns:

Total number of files copied

Return type:

int

yt_framework.operations.upload.create_code_archive(build_dir, archive_path, logger)[source]#

Create a tar.gz archive from the build directory.

Parameters:
  • build_dir (Path) – Local build directory path

  • archive_path (Path) – Path where the archive should be created

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

yt_framework.operations.upload.upload_code_archive(yt_client, archive_path, build_folder, logger)[source]#

Upload code archive to YT.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • archive_path (Path) – Local path to the tar.gz archive

  • build_folder (str) – YT build folder path

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

yt_framework.operations.upload.upload_all_code(yt_client, build_folder, pipeline_dir, logger, upload_modules=None, upload_paths=None)[source]#

Upload all code to YT: ytjobs package, optional custom modules/paths, and stages.

Builds code locally, creates tar archive, and uploads it to YT.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • build_folder (str) – YT build folder path

  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

  • upload_modules (list[str] | None) – Optional list of module names to upload

  • upload_paths (list[dict[str, str]] | None) – Optional list of {source, target?} for local paths

Returns:

None

Return type:

None

Stage dependency file lists#

Collect extra file dependencies (including implicit ytjobs) for YT jobs.

yt_framework.operations.dependencies.build_stage_dependencies(build_folder, stage_dir, logger)[source]#

Build dependency list for a single stage.

Includes: - config.yaml (if exists locally) - All .py files from src/ directory

Parameters:
  • build_folder (str) – YT build folder path

  • stage_dir (Path) – Path to stage directory (e.g., stages/run_map/)

  • logger (Logger) – Logger instance

Returns:

List of (yt_path, local_path) tuples

Return type:

list[tuple[str, str]]

yt_framework.operations.dependencies.build_ytjobs_dependencies(build_folder, logger)[source]#

Build dependency list for ytjobs package.

Includes all .py files from ytjobs/ directory.

Parameters:
  • build_folder (str) – YT build folder path

  • logger (Logger) – Logger instance

Returns:

List of (yt_path, local_path) tuples

Return type:

list[tuple[str, str]]

yt_framework.operations.dependencies.add_checkpoint(dependencies, model_name, checkpoint_base, logger)[source]#

Add checkpoint file to dependencies if configured.

Parameters:
  • dependencies (list[tuple[str, str]]) – List of (yt_path, local_path) tuples

  • model_name (str | None) – Optional model name for checkpoint

  • checkpoint_base (str | None) – Optional checkpoint base path in YT

  • logger (Logger) – Logger instance

Returns:

Updated dependency list (new list with checkpoint added, or same list)

Return type:

list[tuple[str, str]]

yt_framework.operations.dependencies.build_vanilla_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#

Build complete dependency list for a vanilla operation.

Combines: - Stage files (config + src/) - Ytjobs package

Parameters:
  • build_folder (str) – YT build folder path

  • stage_dir (Path) – Path to stage directory

  • model_name (str | None) – Optional model name for checkpoint

  • checkpoint_base (str | None) – Optional checkpoint base path in YT

  • logger (Logger) – Logger instance

Returns:

Tuple of (script_path, dependency_files) - script_path: Path to vanilla.py in YT - dependency_files: Complete list of dependencies

Return type:

tuple[str, list[tuple[str, str]]]

yt_framework.operations.dependencies.build_map_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#

Build complete dependency list for a map operation.

Combines: - Stage files (config + src/) - Ytjobs package - Checkpoint (if configured)

Parameters:
  • build_folder (str) – YT build folder path

  • stage_dir (Path) – Path to stage directory

  • model_name (str | None) – Optional model name for checkpoint

  • checkpoint_base (str | None) – Optional checkpoint base path in YT

  • logger (Logger) – Logger instance

Returns:

Tuple of (mapper_path, dependency_files) - mapper_path: Path to mapper.py in YT - dependency_files: Complete list of dependencies

Return type:

tuple[str, list[tuple[str, str]]]

Shared operation utilities#

Shared helpers for map/vanilla/map-reduce (resources, secrets, tokenizer wiring).

yt_framework.operations.common.build_environment(configs_dir, logger)[source]#

Build environment variables for map and vanilla operations.

Jobs read configuration from config.yaml, so only secrets are passed via environment variables.

Parameters:
  • configs_dir (Path) – Directory containing secrets.env file

  • logger (logging.Logger) – Logger instance

Returns:

Dictionary of secret environment variables

Return type:

dict[str, str]

yt_framework.operations.common.prepare_docker_auth(docker_image, docker_username, docker_password)[source]#

Prepare Docker authentication dictionary.

Parameters:
  • docker_image (str | None) – Optional Docker image name

  • docker_username (str | None) – Optional Docker registry username

  • docker_password (str | None) – Optional Docker registry password

Returns:

Docker authentication dict if all credentials provided, None otherwise

Return type:

dict[str, str] | None

yt_framework.operations.common.extract_operation_resources(operation_config, logger)[source]#

Extract OperationResources from operation config with fallback defaults.

Parameters:
Return type:

OperationResources

yt_framework.operations.common.extract_secure_env_client_kwargs(operation_config)[source]#

Options for YTProdClient secure vault / public env partitioning.

Parameters:

operation_config (DictConfig)

Return type:

dict[str, Any]

yt_framework.operations.common.collect_passthrough_kwargs(operation_config, reserved_keys)[source]#

Collect top-level config values to forward to YT client.

OmegaConf dict nodes are resolved to plain Python containers.

Parameters:
  • operation_config (DictConfig)

  • reserved_keys (set[str])

Return type:

dict[str, Any]

yt_framework.operations.common.build_operation_environment(context, operation_config, logger, include_stage_name=True, include_tokenizer_artifact=True)[source]#

Build operation environment from secrets + explicit env config + optional helpers.

Parameters:
Return type:

dict[str, str]

yt_framework.operations.common.docker_auth_from_op_config(operation_config, env)[source]#

Resolve docker image from config and return auth payload if credentials exist.

Parameters:
  • operation_config (DictConfig)

  • env (dict[str, str])

Return type:

dict[str, str] | None

yt_framework.operations.common.extract_max_failed_jobs(operation_config, logger)[source]#

Extract max_failed_job_count with default.

Parameters:
Return type:

int

YT Client#

Client Factory#

Factory that returns YTDevClient or YTProdClient based on pipeline mode.

yt_framework.yt.factory.create_yt_client(logger=None, mode='dev', pipeline_dir=None, secrets=None, pickling=None)[source]#

Create a YT client for the given pipeline mode.

Parameters:
  • logger (Logger | None) – Logger instance (default: creates new logger)

  • mode (Literal['prod', 'dev'] | None) – “prod” for production YT client, “dev” for local development

  • pipeline_dir (Path | str | None) – Pipeline directory (required for dev mode)

  • secrets (dict[str, str] | None) – Optional dictionary containing YT credentials. Required only for prod mode. Expected keys: - YT_PROXY: YTsaurus proxy URL - YT_TOKEN: YTsaurus authentication token

  • pickling (dict[str, Any] | None) – Optional pickling-related client config (prod only).

Returns:

BaseYTClient instance (YTProdClient or YTDevClient)

Raises:

ValueError – If secrets are required for prod mode but not provided.

Return type:

BaseYTClient

Example

>>> # Dev mode (local filesystem simulation)
>>> client = create_yt_client(mode="dev", pipeline_dir=Path("."))
>>>
>>> # Prod mode (real YT cluster)
>>> secrets = {"YT_PROXY": "my-proxy", "YT_TOKEN": "my-token"}
>>> client = create_yt_client(mode="prod", secrets=secrets)

Dev Client#

Local filesystem stand-in for Cypress tables and subprocess-backed jobs.

class yt_framework.yt.clients.client_dev.YTDevClient(logger, pipeline_dir=None)[source]#

Bases: ClientDevYqlMixin, ClientDevOpsMixin, BaseYTClient

Development YT client implementation.

Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory.

Parameters:
__init__(logger, pipeline_dir=None)[source]#

Initialize development YT client.

Parameters:
  • logger (Logger) – Logger instance

  • pipeline_dir (Path | None) – Pipeline directory (required for dev mode)

Return type:

None

create_path(path, node_type='map_node')[source]#

Create a path in YT (no-op in dev mode).

Parameters:
  • path (str) – YT path to create (not used in dev mode).

  • node_type (Literal['table', 'file', 'map_node', 'list_node', 'document']) – Type of node to create (not used in dev mode).

Returns:

None

Return type:

None

exists(path)[source]#

Check if a path exists in YT.

In dev mode, always returns True (assumes files exist locally).

Parameters:

path (str) – YT path to check.

Returns:

Always True in dev mode.

Return type:

bool

write_table(table_path, rows, *, append=False, replication_factor=1)[source]#

Write rows to a YT table (saves as local .jsonl file).

In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line.

Parameters:
  • table_path (str) – YT table path (e.g., “//tmp/my_table”).

  • rows (list[dict[str, Any]]) – List of dictionaries representing table rows.

  • append (bool) – If True, append to existing file; otherwise overwrite.

  • replication_factor (int) – Not used in dev mode (kept for API compatibility).

Returns:

None

Return type:

None

Example

>>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}])
>>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\\n
read_table(table_path)[source]#

Read rows from a YT table (reads from local .jsonl file).

Parameters:

table_path (str) – YT table path (e.g., “//tmp/my_table”).

Returns:

List of dictionaries representing table rows.

Returns empty list if file doesn’t exist.

Return type:

List[Dict[str, Any]]

row_count(table_path)[source]#

Get number of rows in a YT table (counts lines in local .jsonl file).

Parameters:

table_path (str) – YT table path (e.g., “//tmp/my_table”).

Returns:

Number of non-empty lines in the JSONL file. Returns 0 if file doesn’t exist.

Return type:

int

run_yql(query, pool='default', max_row_weight=None)[source]#

Execute a YQL query locally using DuckDB simulation.

Parameters:
  • query (str) – YQL query string to execute

  • pool (str) – YT pool name (default: ‘default’)

  • max_row_weight (str | None) – Optional max row weight override

Return type:

None

Production Client#

Thin wrapper around yt.wrapper.YtClient for real cluster operations.

class yt_framework.yt.clients.client_prod.YTProdClient(logger, secrets, pickling=None)[source]#

Bases: ClientProdYqlMixin, ClientProdOpsMixin, BaseYTClient

Production YT client implementation.

Uses actual YTsaurus client for all operations.

Parameters:
__init__(logger, secrets, pickling=None)[source]#

Initialize production YT client.

Parameters:
  • logger (Logger) – Logger instance

  • secrets (dict[str, str]) – Dictionary containing YT credentials. Expected keys: - YT_PROXY - YT_TOKEN

  • pickling (dict[str, Any] | None) – Optional pickling-related client config (see _apply_pickling_config).

Return type:

None

create_path(path, node_type='map_node')[source]#

Create a path in YT.

Parameters:
  • path (str) – YT path to create.

  • node_type (Literal['table', 'file', 'map_node', 'list_node', 'document']) – Type of node to create (default: “map_node”).

Returns:

None

Raises:

Exception – If path creation fails.

Return type:

None

exists(path)[source]#

Check if a path exists in YT.

Parameters:

path (str) – YT path to check.

Returns:

True if path exists, False otherwise.

Return type:

bool

Raises:

Exception – If check fails.

write_table(table_path, rows, *, append=False, replication_factor=1, make_parents=True)[source]#

Write rows to a YT table.

Parameters:
  • table_path (str) – YT table path

  • rows (list[dict[str, Any]]) – List of dictionaries representing table rows

  • append (bool) – If True, append to existing table (default: False)

  • replication_factor (int) – Replication factor for the table (default: 1)

  • make_parents (bool) – If True, create parent directories if they don’t exist (default: True)

Return type:

None

read_table(table_path)[source]#

Read rows from a YT table.

Parameters:

table_path (str) – YT table path to read.

Returns:

List of dictionaries representing table rows.

Return type:

List[Dict[str, Any]]

Raises:

Exception – If table read fails.

row_count(table_path)[source]#

Get number of rows in a YT table.

Parameters:

table_path (str) – YT table path.

Returns:

Number of rows in the table.

Return type:

int

Raises:

Exception – If row count query fails.

run_yql(query, pool='default', max_row_weight=None)[source]#

Execute a YQL query on YT cluster.

Parameters:
  • query (str) – YQL query string to execute

  • pool (str) – YT pool name (default: ‘default’)

  • max_row_weight (str | None) – Optional max row weight override

Raises:

Exception – If query execution fails

Return type:

None