API reference#

yt_framework symbols below are generated from Python docstrings. Job-side entrypoints (ytjobs) have a separate page.

Tip

Navigation

Use the sidebar headings for modules. Anything not listed is still in the source tree; use help(module) locally.

Layout#

  • Core — pipeline, stage, registry, discovery, self.deps injection

  • Operations — map, vanilla, map-reduce/reduce, S3 helpers, table helpers, checkpoint upload, sort, tokenizer artifact wiring

  • Typed jobsStageBootstrapTypedJob

  • YT — client factory, dev and prod clients (YQL helpers live on the clients)

  • Utils — env files, logging setup, ignore patterns

  • Packaging — upload helpers shared by operations

  • ytjobsJob-side reference

Note

Narrative docs

How-to guides under docs/operations, docs/configuration, and docs/advanced spell out YAML and stage patterns; this page is the raw API surface.

Core Modules#

Pipeline#

Pipeline base classes, CLI entry point (main), and code upload orchestration.

Subclass BasePipeline, implement setup() to register stages with StageRegistry, then run via Pipeline.main() from pipeline.py. DefaultPipeline auto-discovers stages under stages/*/stage.py.

class yt_framework.core.pipeline.BasePipeline(config, pipeline_dir, log_level=20)[source]#

Bases: object

Base class for all pipelines.

Provides common functionality: - CLI entry point via main() class method - Code upload to YT build folder - YT client initialization - Default stage execution loop

Subclasses must: - In setup(), create StageRegistry and register stages via set_stage_registry()

Subclasses may override: - setup(): Register stages and initialize pipeline-specific clients (S3, etc.) - run(): Custom execution flow (rare, most use default)

Parameters:
  • config (DictConfig)

  • pipeline_dir (Path)

  • log_level (int)

__init__(config, pipeline_dir, log_level=20)[source]#

Initialize the base pipeline.

Parameters:
  • config (DictConfig) – Configuration object (OmegaConf DictConfig)

  • pipeline_dir (Path) – Path to pipeline directory

  • log_level (int) – Logging level

Returns:

None

Raises:

ValueError – If pipeline directory does not exist.

setup()[source]#

Hook for pipeline-specific initialization.

Override this method in subclasses to: 1. Register stages using StageRegistry and set_stage_registry() 2. Initialize custom clients (e.g., S3 client, database connections, etc.)

This method is called automatically after base initialization.

Returns:

None

Return type:

None

set_stage_registry(registry)[source]#

Set the stage registry for this pipeline.

Parameters:

registry (StageRegistry) – StageRegistry instance with registered stages

Returns:

None

Return type:

None

create_stage_dependencies()[source]#

Create stage dependencies for injection.

This method creates a dependency container with only what stages need, following the Interface Segregation Principle.

Returns:

PipelineStageDependencies with yt_client, pipeline_config, configs_dir

Return type:

PipelineStageDependencies

upload_code(build_folder=None)[source]#

Upload code to YT build folder.

Only uploads code if any enabled stages need code execution on YT. If no stages need code execution, this method does nothing.

Parameters:

build_folder (str | None) – Optional YT build folder path. If None, uses config.pipeline.build_folder

Returns:

None

Raises:

ValueError – If build_folder is required but not provided in config.

Return type:

None

run()[source]#

Run the pipeline by executing enabled stages.

Default implementation: 1. Upload code to YT (only if stages need code execution) 2. Get enabled stages from config 3. Execute stages in order using stage registry 4. Pass context between stages

Override this method only if you need completely custom execution flow.

Returns:

None

Raises:
  • ValueError – If no enabled_stages found in config or unknown stage name.

  • AttributeError – If stage registry is not set in setup().

Return type:

None

classmethod main(argv=None)[source]#

CLI entry point for the pipeline.

Handles: - Argument parsing (–config option) - Config file loading - Pipeline instantiation - Error handling and exit codes

Parameters:

argv – Optional command-line arguments. If None, uses sys.argv.

Returns:

None (exits with code 0 on success, 1 on failure)

Return type:

None

Usage:

python pipeline.py python pipeline.py –config configs/custom.yaml

class yt_framework.core.pipeline.DefaultPipeline(config, pipeline_dir, log_level=20)[source]#

Bases: BasePipeline

Pipeline with automatic stage discovery.

Discovers BaseStage subclasses from stages/<name>/stage.py and registers them. Run with DefaultPipeline.main() from pipeline.py. Which stages execute is still controlled by enabled_stages in config.

Parameters:
  • config (DictConfig)

  • pipeline_dir (Path)

  • log_level (int)

setup()[source]#

Automatically discover and register stages from the stages directory.

Looks for stage.py under each stages/<name>/ folder and registers every BaseStage subclass found.

Returns:

None

Return type:

None

Stage#

BaseStage — pipeline authors subclass this and implement run().

The framework derives the stage name from the stages/<name>/ directory.

class yt_framework.core.stage.StageContext(name, config, stage_dir, logger, deps)[source]#

Bases: object

Stage context containing all stage-related information.

Parameters:
name#

Stage name (automatically detected from directory name).

Type:

str

config#

Stage-specific configuration loaded from config.yaml.

Type:

omegaconf.dictconfig.DictConfig

stage_dir#

Path to the stage directory (stages/<stage_name>/).

Type:

pathlib.Path

logger#

Logger instance for stage logging.

Type:

logging.Logger

deps#

Injected dependencies (yt_client, pipeline_config, configs_dir).

Type:

yt_framework.core.dependencies.StageDependencies

name: str#
config: DictConfig#
stage_dir: Path#
logger: Logger#
deps: StageDependencies#
fork(name=None, stage_dir=None)[source]#

Return a shallow copy with selective overrides.

Use this in multi-operation stages when a later operation needs a slightly different context (e.g., a different stage_dir so that TarArchiveDependencyBuilder resolves wrapper scripts from the correct location).

Only name and stage_dir can be overridden; all other fields (config, logger, deps) are inherited from the parent context, which is intentional — they represent shared pipeline state.

Parameters:
  • name (str | None) – Override for the stage name. Defaults to the current name.

  • stage_dir (Path | None) – Override for the stage directory. Defaults to the current stage_dir.

Returns:

A new StageContext with the specified fields replaced.

Return type:

StageContext

Example:

ctx_reduce = self.context.fork(name="mds", stage_dir=Path(__file__).parent)
run_reduce(context=ctx_reduce, operation_config=reduce_cfg, reducer=MyReducer())
class yt_framework.core.stage.BaseStage(deps, logger)[source]#

Bases: ABC

Abstract base class for pipeline stages.

Stage name and config are automatically detected from the directory. Directory structure: stages/<stage_name>/stage.py Config file: stages/<stage_name>/config.yaml

Each stage must: - Implement __init__ to receive deps and logger - Implement run method to execute stage logic

Example

class MyStage(BaseStage):
def __init__(self, deps, logger):

super().__init__(deps, logger) # self.config is automatically loaded from stages/<stage_name>/config.yaml # Access dependencies via self.deps.yt_client, self.deps.pipeline_config

def run(self, debug: DebugContext) -> DebugContext:

# Stage logic here # Note: ‘debug’ here is the shared data dict, NOT dependencies return {“result”: “value”}

Parameters:
__init__(deps, logger)[source]#

Initialize stage with injected dependencies.

Stage name and config are automatically detected from the directory containing stage.py.

Parameters:
  • deps (StageDependencies) – Injected dependencies (yt_client, pipeline_config, configs_dir)

  • logger (Logger) – Logger instance for stage logging

Returns:

None

Raises:
  • FileNotFoundError – If config.yaml file is not found in stage directory.

  • ValueError – If config.yaml does not contain a dictionary.

Return type:

None

name: str#
property stage_dir: Path#

Path to the stage directory.

Returns:

Absolute path to the stage directory (stages/<stage_name>/).

Return type:

Path

abstractmethod run(debug)[source]#

Execute the stage.

Parameters:

debug (Dict[str, Any]) – Shared context dictionary from previous stages. Contains results from earlier stages. Can be empty dict for the first stage.

Returns:

Dictionary with stage results to be merged into context

and passed to the next stage.

Return type:

DebugContext

property context: StageContext#

Stage context containing all stage-related information.

Returns:

Dataclass instance with name, config, stage_dir,

logger, and deps attributes.

Return type:

StageContext

Registry#

Mutable registry of BaseStage subclasses keyed by stage name.

class yt_framework.core.registry.StageRegistry[source]#

Bases: object

Builder for registering pipeline stages.

Automatically detects stage names from directory structure.

Example

registry = StageRegistry() registry.add_stage(CreateTableStage) registry.add_stage(RunMapStage) pipeline.set_stage_registry(registry)

__init__()[source]#

Initialize empty stage registry.

Return type:

None

add_stage(stage_class)[source]#

Register a stage class.

Stage name is automatically detected from the directory containing stage.py.

Parameters:

stage_class (Type[BaseStage]) – Stage class to register

Returns:

Self for method chaining

Return type:

StageRegistry

get_stage(stage_name)[source]#

Get stage class by name.

Parameters:

stage_name (str) – Name of the stage to retrieve (matches directory name).

Returns:

The stage class registered with the given name.

Return type:

Type[BaseStage]

Raises:

KeyError – If no stage is registered with the given name.

Example

>>> registry = StageRegistry()
>>> registry.add_stage(MyStage)
>>> stage_class = registry.get_stage("my_stage")
has_stage(stage_name)[source]#

Check if stage is registered.

Parameters:

stage_name (str) – Name of the stage to check.

Returns:

True if the stage is registered, False otherwise.

Return type:

bool

get_all_stages()[source]#

Get all registered stages.

Returns:

Dictionary mapping stage names to stage classes.

Returns a copy to prevent external modification.

Return type:

Dict[str, Type[BaseStage]]

Discovery#

Filesystem scan that imports stages/*/stage.py and collects BaseStage types.

yt_framework.core.discovery.discover_stages(pipeline_dir, logger)[source]#

Automatically discover all stage classes from the stages directory tree.

Searches for stage.py under each stages child directory and imports all BaseStage subclasses found.

Expected layout: pipeline_dir/stages/<stage_name>/stage.py with a BaseStage subclass in each module.

Parameters:
  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

Returns:

List of discovered stage classes

Return type:

List[Type[BaseStage]]

Core injection (self.deps)#

Injection protocol and PipelineStageDependencies for stage run() methods.

Defines StageDependencies (protocol) and the concrete dataclass the pipeline passes as self.deps (YT client, pipeline config, configs directory).

class yt_framework.core.dependencies.StageDependencies(*args, **kwargs)[source]#

Bases: Protocol

Protocol defining what dependencies stages need.

This is NOT the same as the ‘context’ parameter in run() methods: - StageDependencies: Injected services/config (yt_client, config, etc.) - context parameter: Shared data dictionary passed between stages

Benefits: - Dependency Inversion: Depends on abstraction, not concrete Pipeline - Interface Segregation: Only exposes what stages actually use - Testability: Easy to create mock dependencies for testing

property yt_client: BaseYTClient#

YT client for operations.

Returns:

YT client instance (either YTDevClient or YTProdClient)

for performing table operations, running map/vanilla jobs, etc.

Return type:

BaseYTClient

property pipeline_config: DictConfig#

Pipeline-level configuration (contains build_folder and secrets).

Returns:

OmegaConf configuration object containing pipeline-wide

settings like mode, build_folder, and other pipeline parameters.

Return type:

DictConfig

property configs_dir: Path#

Directory containing secrets.env and other config files.

Returns:

Absolute path to the configs directory where secrets.env

and other configuration files are stored.

Return type:

Path

class yt_framework.core.dependencies.PipelineStageDependencies(yt_client, pipeline_config, configs_dir)[source]#

Bases: object

Default implementation of StageDependencies.

Used by BasePipeline to inject dependencies into stages. This class is instantiated by the pipeline and passed to each stage.

Parameters:
  • yt_client (BaseYTClient)

  • pipeline_config (DictConfig)

  • configs_dir (Path)

yt_client#

YT client instance for performing operations on YTsaurus cluster or local filesystem (dev mode).

Type:

yt_framework.yt.client_base.BaseYTClient

pipeline_config#

Pipeline-level configuration containing mode, build_folder, and other pipeline-wide settings.

Type:

omegaconf.dictconfig.DictConfig

configs_dir#

Path to directory containing secrets.env and configuration files.

Type:

pathlib.Path

yt_client: BaseYTClient#
pipeline_config: DictConfig#
configs_dir: Path#

Operations#

Map Operations#

Driver helpers to package src/mapper.py and submit YT map operations.

class yt_framework.operations.map.MapOperationData(mapper_path, dependencies, environment, docker_auth, command=None)[source]#

Bases: object

Data container for map operation configuration.

Parameters:
mapper_path#

Path to mapper.py script in YT (or bash wrapper if tar mode).

Type:

str

dependencies#

List of (yt_path, local_path) tuples for files to upload.

Type:

List[Tuple[str, str]]

environment#

Environment variables dictionary (secrets only).

Type:

Dict[str, str]

docker_auth#

Optional Docker authentication dictionary for private registries.

Type:

Dict[str, str] | None

command#

Optional command to execute (used in tar archive mode).

Type:

str | None

mapper_path: str#
dependencies: List[Tuple[str, str]]#
environment: Dict[str, str]#
docker_auth: Dict[str, str] | None#
command: str | None = None#
yt_framework.operations.map.run_map(context, operation_config, output_schema=None, mapper=None, job=None)[source]#

Run YT map operation and wait for completion.

All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger, stage_dir)

  • operation_config (DictConfig) – Operation-specific config (from client.operations.map). Optional key append (bool): append mapper output to an existing output table.

  • output_schema (TableSchema | None) – Optional YT TableSchema for typed output table

  • mapper (Any | None) – Optional mapper leg (legacy name). When omitted, framework uses command wrapper. Can be a TypedJob instance or command string.

  • job (Any | None) – Preferred mapper leg alias. Can be a TypedJob instance or command string.

Returns:

True if successful, False otherwise

Return type:

bool

Vanilla Operations#

Driver helpers to package src/vanilla.py and submit YT vanilla operations.

class yt_framework.operations.vanilla.VanillaOperationData(script_path, dependencies, environment, docker_auth, command=None)[source]#

Bases: object

Data container for vanilla operation configuration.

Parameters:
script_path#

Path to vanilla.py script in YT (or placeholder if tar mode).

Type:

str

dependencies#

List of (yt_path, local_path) tuples for files to upload.

Type:

List[Tuple[str, str]]

environment#

Environment variables dictionary (secrets only).

Type:

Dict[str, str]

docker_auth#

Optional Docker authentication dictionary for private registries.

Type:

Dict[str, str] | None

command#

Optional command to execute (used in tar archive mode).

Type:

str | None

script_path: str#
dependencies: List[Tuple[str, str]]#
environment: Dict[str, str]#
docker_auth: Dict[str, str] | None#
command: str | None = None#
yt_framework.operations.vanilla.run_vanilla(context, operation_config, job=None)[source]#

Run YT vanilla operation and wait for completion.

All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.vanilla. The task name is automatically set to the stage name.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger, stage_dir, name)

  • operation_config (DictConfig) – Operation-specific config (from client.operations.vanilla)

  • job (str | None) – Preferred command alias. When omitted, framework wrapper command is used.

Returns:

True if successful, False otherwise

Return type:

bool

Map-reduce and reduce#

Submit map-reduce and reduce-only YT operations (TypedJob or command strings).

Builds archives plus optional file dependencies; cluster credentials still come from configs/secrets.env like other operations.

yt_framework.operations.map_reduce.run_map_reduce(context, operation_config, mapper=None, reducer=None, output_schema=None, map_job=None, reduce_job=None)[source]#

Run a YT map-reduce operation and wait for completion.

Pass mapper and reducer either both as TypedJob instances or both as command strings (JSON stdin/stdout). Mixing kinds raises ValueError.

Set operation_config.tar_command_bootstrap: true to wrap string legs with the same tar -xzf source.tar.gz + wrapper pattern as map operations (requires matching wrappers in the uploaded tarball; see docs).

Parameters:
  • context (StageContext) – Stage context (deps, logger, stage_dir, config).

  • operation_config (DictConfig) – client.operations.map_reduce config (input_table, output_table, reduce_by, sort_by, resources, file_paths, etc.).

  • mapper (Any) – Deprecated — use map_job instead.

  • reducer (Any) – Deprecated — use reduce_job instead.

  • output_schema (Any | None) – Optional YT TableSchema for output table.

  • map_job (Any) – Mapper leg (TypedJob instance or command string).

  • reduce_job (Any) – Reducer leg (TypedJob instance or command string).

Returns:

True if the operation completed successfully.

Return type:

bool

yt_framework.operations.map_reduce.run_reduce(context, operation_config, reducer=None, output_schema=None, job=None)[source]#

Run a YT reduce-only operation and wait for completion.

Pass reducer as a TypedJob or a command string. With operation_config.tar_command_bootstrap: true, string reducers get the same tar extract + wrapper bootstrap as map (see docs).

Parameters:
  • context (StageContext) – Stage context.

  • operation_config (DictConfig) – client.operations.reduce config.

  • reducer (Any) – Reducer leg (legacy name).

  • output_schema (Any | None) – Optional output table schema.

  • job (Any) – Preferred reducer leg alias.

Returns:

True if the operation completed successfully.

Return type:

bool

YQL Operations#

YQL operations are methods on the YT client. See the YT Client sections below for join_tables, filter_table, select_columns, group_by_aggregate, union_tables, distinct, sort_table, and limit_table.

Note

YQL Operations Location

YQL operations are implemented as methods on BaseYTClient and its subclasses (YTDevClient and YTProdClient). They are not in a separate operations module.

S3 Operations#

Driver-side helpers to list S3 keys and persist paths into Cypress tables.

yt_framework.operations.s3.list_s3_files(s3_client, bucket, prefix, logger, extension=None, max_files=None)[source]#

List files from S3 bucket with optional filtering.

Parameters:
  • s3_client (S3Client) – S3 client instance

  • bucket (str) – S3 bucket name

  • prefix (str) – S3 prefix path

  • logger (Logger) – Logger instance

  • extension (str | None) – Optional file extension filter (e.g., ‘mp4’)

  • max_files (int | None) – Optional maximum number of files to return

Returns:

List of S3 file paths

Return type:

List[str]

yt_framework.operations.s3.save_s3_paths_to_table(yt_client, bucket, paths, output_table, logger)[source]#

Save S3 file paths to YT table as bucket and path columns.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • bucket (str) – S3 bucket name

  • paths (List[str]) – List of S3 file paths

  • output_table (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

Table operations#

Helpers to count rows, load rows into memory, and export tables to JSONL.

For low-level access, use BaseYTClient methods (read_table, row_count, etc.) directly on self.deps.yt_client. These functions add logging and convenience.

yt_framework.operations.table.get_row_count(yt_client, table_path, logger)[source]#

Get number of rows in a YT table.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

Number of rows in table

Return type:

int

yt_framework.operations.table.read_table(yt_client, table_path, logger)[source]#

Read rows from a YT table.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

List of rows as dictionaries

Return type:

List[Dict[str, Any]]

yt_framework.operations.table.download_table(yt_client, table_path, output_file, logger)[source]#

Download YT table to local JSONL file.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • output_file (Path) – Local file path for output

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

Checkpoint Operations#

Upload or reuse single-file model checkpoints and wire them into operation specs.

yt_framework.operations.checkpoint.init_checkpoint_directory(context, checkpoint_config)[source]#

Initialize checkpoint directory in YTsaurus if it doesn’t exist.

Uses checkpoint_base from checkpoint_config. Also uploads local checkpoint if specified. Validates that required checkpoint exists in YT before proceeding.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger)

  • checkpoint_config (DictConfig) – Checkpoint-specific config (from client.operations.<op>.checkpoint)

Returns:

None

Raises:
Return type:

None

Sort operations#

Submit YT sort jobs using the same (context, operation_config) pattern as map.

yt_framework.operations.sort.run_sort(context, operation_config)[source]#

Run a YT sort operation and wait for completion.

Parameters:
  • context (StageContext) – Stage context (deps, logger, stage_dir, config).

  • operation_config (DictConfig) –

    Sort-specific config. Required keys:

    • input_table — table to sort in-place.

    • sort_by — list of column names.

    Optional keys mirror run_map_reduce:

    • resources.pool / resources.pool_tree — scheduler pool.

    • resources.memory_limit_gb, resources.cpu_limit — resource hints.

Returns:

True if the sort completed successfully.

Return type:

bool

Example config (client.operations.sort in stage config.yaml):

sort:
  sort_by: [shard_order, mock_field]
  resources:
    pool: my_pool

Then in the stage:

from yt_framework.operations.sort import run_sort
sort_cfg = OmegaConf.merge(
    self.config.client.operations.sort,
    {"input_table": intermediate_table},
)
run_sort(context=self.context, operation_config=sort_cfg)

Tokenizer artifact#

Pack tokenizer/processor tarballs, upload to Cypress, and expose sandbox env vars.

yt_framework.operations.tokenizer_artifact.resolve_tokenizer_artifact_name(stage_config, tokenizer_artifact_config)[source]#

Resolve logical tokenizer artifact name from config.

Parameters:
  • stage_config (DictConfig)

  • tokenizer_artifact_config (DictConfig)

Return type:

str | None

yt_framework.operations.tokenizer_artifact.resolve_tokenizer_archive_name(artifact_name)[source]#

Convert logical artifact name to mounted tar filename.

Parameters:

artifact_name (str)

Return type:

str

yt_framework.operations.tokenizer_artifact.resolve_tokenizer_artifact_yt_path(stage_config, tokenizer_artifact_config)[source]#

Resolve full YT file path for tokenizer artifact tarball.

Parameters:
  • stage_config (DictConfig)

  • tokenizer_artifact_config (DictConfig)

Return type:

str | None

yt_framework.operations.tokenizer_artifact.init_tokenizer_artifact_directory(context, tokenizer_artifact_config)[source]#

Initialize tokenizer artifact in YT (if configured).

Behavior: - creates artifact_base if needed; - uploads local artifact from local_artifact_path if provided and missing in YT; - validates artifact presence in YT.

Parameters:
  • context (StageContext)

  • tokenizer_artifact_config (DictConfig)

Return type:

None

Typed jobs#

TypedJob helpers for YTsaurus execution.

class yt_framework.typed_jobs.StageBootstrapTypedJob[source]#

Bases: TypedJob

Base class for TypedJob legs with worker-side bootstrap.

On unpickle, extracts source.tar.gz when needed, prepends the archive root and stages/<stage>/src to sys.path, and sets JOB_CONFIG_PATH when the stage config file exists. Uses __getstate__ / __setstate__ so bootstrap runs on the worker before __call__.

Utilities#

Environment#

Load KEY=value files such as configs/secrets.env into plain dicts.

yt_framework.utils.env.load_env_file(env_path)[source]#

Load environment variables from a .env file.

File format: KEY=VALUE (one per line, # for comments) Missing file is optional and returns empty dict.

Parameters:

env_path (Path) – Path to the .env file

Returns:

Dictionary of loaded environment variables (key -> value). Returns empty dict if file doesn’t exist or cannot be read.

Warns:
  • UserWarning – If the file exists but cannot be read or parsed (non-fatal).

  • Missing file is silent (returns empty dict)

Return type:

Dict[str, str]

Example

>>> env_vars = load_env_file(Path("configs/secrets.env"))
>>> print(env_vars.get("YT_TOKEN"))
yt_framework.utils.env.load_secrets(secrets_dir, env_file='secrets.env')[source]#

Load secrets from secrets.env file in the specified directory.

Parameters:
  • secrets_dir (Path) – Directory containing the secrets.env file

  • env_file (str) – Name of the environment file (default: “secrets.env”)

Returns:

Dictionary of loaded secrets (key -> value). Returns empty dict if file doesn’t exist or cannot be read.

Warns:

UserWarning – If the secrets file exists but cannot be read or parsed (non-fatal).

Return type:

Dict[str, str]

Example

>>> secrets = load_secrets(Path("configs"))
>>> yt_token = secrets.get("YT_TOKEN")

Logging#

Colored console formatter and helpers used by pipeline startup.

class yt_framework.utils.logging.ColoredFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#

Bases: Formatter

Custom formatter with colors for different log levels.

COLORS = {'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'WARNING': '\x1b[33m'}#
RESET = '\x1b[0m'#
ICONS = {'CRITICAL': '🚨', 'DEBUG': '🔍', 'ERROR': '✗', 'WARNING': '⚠️'}#
format(record)[source]#

Format log record with colors and icons.

Parameters:

record (LogRecord) – Log record to format.

Returns:

Formatted log message with ANSI color codes and icons.

Return type:

str

yt_framework.utils.logging.setup_logging(level=20, name=None, use_colors=True)[source]#

Setup logging with consistent formatting.

Parameters:
  • level (int) – Logging level (default: INFO)

  • name (str | None) – Logger name (default: root logger)

  • use_colors (bool) – Whether to use colored output

Returns:

Configured logger instance

Return type:

Logger

yt_framework.utils.logging.log_header(logger, title, context=None)[source]#

Log a compact section header in format: [Title] context.

Parameters:
  • logger (Logger) – Logger instance

  • title (str) – Section title (will be wrapped in brackets)

  • context (str | None) – Optional additional context information

Return type:

None

yt_framework.utils.logging.log_operation(logger, message)[source]#

Log an operation start message with → prefix.

Parameters:
  • logger (Logger) – Logger instance

  • message (str) – Operation description

Return type:

None

yt_framework.utils.logging.log_success(logger, message)[source]#

Log a success/completion message with ✓ prefix.

Parameters:
  • logger (Logger) – Logger instance

  • message (str) – Success message

Return type:

None

yt_framework.utils.logging.log_config(logger, config_dict, title='Configuration')[source]#

Log configuration in a readable format.

Automatically masks sensitive values (keys containing ‘secret’ or ‘key’) by showing only the last 4 characters.

Parameters:
  • logger (Logger) – Logger instance

  • config_dict (dict) – Configuration dictionary to log

  • title (str) – Title for the configuration section

Returns:

None

Return type:

None

Example

>>> config = {"api_key": "secret12345", "mode": "dev"}
>>> log_config(logger, config)
[Configuration]
    api_key: ***2345
    mode: dev

Ignore Patterns#

.ytignore parsing (gitignore-style) for upload tarballs.

class yt_framework.utils.ignore.YTIgnorePattern(pattern, base_dir, is_negation=False)[source]#

Bases: object

Represents a single .ytignore pattern.

This class compiles a pattern string into a regex for efficient matching against file paths. It supports the same syntax as .gitignore files.

Pattern Types: - Simple wildcards: *.pyc, test? - Character classes: *.py[cod] - Recursive wildcards: **/*.log - Directory patterns: build/ (trailing slash) - Rooted patterns: /config (leading slash, root-only) - Negation patterns: !important.py (un-ignore)

Examples

Simple wildcard:

>>> from pathlib import Path
>>> pattern = YTIgnorePattern("*.pyc", Path("/project"))
>>> pattern.matches(Path("/project/test.pyc"))
True
>>> pattern.matches(Path("/project/test.py"))
False

Directory pattern:

>>> pattern = YTIgnorePattern("__pycache__/", Path("/project"))
>>> pattern.matches(Path("/project/__pycache__/module.pyc"))
True
>>> pattern.matches(Path("/project/src/__pycache__/module.pyc"))
True

Recursive wildcard:

>>> pattern = YTIgnorePattern("**/*.log", Path("/project"))
>>> pattern.matches(Path("/project/src/debug.log"))
True
>>> pattern.matches(Path("/project/debug.log"))
False  # Requires at least one directory level

Rooted pattern:

>>> pattern = YTIgnorePattern("/build", Path("/project"))
>>> pattern.matches(Path("/project/build"))
True
>>> pattern.matches(Path("/project/src/build"))
False  # Only matches at root
Parameters:
__init__(pattern, base_dir, is_negation=False)[source]#

Initialize a pattern.

Parameters:
  • pattern (str) – The pattern string (without leading !)

  • base_dir (Path) – Directory where the .ytignore file is located

  • is_negation (bool) – Whether this is a negation pattern (!)

matches(file_path)[source]#

Check if a file path matches this pattern.

Parameters:

file_path (Path) – Absolute or relative file path to check

Returns:

True if the path matches

Return type:

bool

class yt_framework.utils.ignore.YTIgnoreMatcher(base_dir)[source]#

Bases: object

Matches file paths against .ytignore patterns.

This class loads .ytignore files from the specified directory and all parent directories up to the filesystem root, then provides methods to check if files should be ignored based on the loaded patterns.

Patterns follow .gitignore syntax: - *.pyc - matches any file ending in .pyc - __pycache__/ - matches any __pycache__ directory - **/test_*.py - matches test_*.py in any subdirectory (but not root) - !important.py - negates a previous ignore pattern - /build - matches only at root level (not in subdirectories) - src/*.log - matches .log files in src/ but not src/subdir/

Examples

Basic usage:

>>> from pathlib import Path
>>> matcher = YTIgnoreMatcher(Path("/project"))
>>> matcher.should_ignore(Path("/project/test.pyc"))
True
>>> matcher.should_ignore(Path("/project/main.py"))
False

With custom .ytignore file:

>>> # Create a .ytignore file
>>> project_dir = Path("/tmp/myproject")
>>> project_dir.mkdir(exist_ok=True)
>>> (project_dir / ".ytignore").write_text("*.pyc\n__pycache__/\n")
>>> matcher = YTIgnoreMatcher(project_dir)
>>> matcher.should_ignore(project_dir / "module.pyc")
True
>>> matcher.should_ignore(project_dir / "module.py")
False

Negation patterns:

>>> # Ignore all .log files except important.log
>>> (project_dir / ".ytignore").write_text("*.log\n!important.log\n")
>>> matcher = YTIgnoreMatcher(project_dir)
>>> matcher.should_ignore(project_dir / "debug.log")
True
>>> matcher.should_ignore(project_dir / "important.log")
False
Parameters:

base_dir (Path)

__init__(base_dir)[source]#

Initialize matcher.

Parameters:

base_dir (Path) – Base directory for file matching

should_ignore(file_path)[source]#

Check if a file should be ignored.

Parameters:

file_path (Path) – Path to the file (can be absolute or relative)

Returns:

True if the file should be ignored

Return type:

bool

yt_framework.utils.ignore.should_ignore_file(file_path, base_dir)[source]#

Convenience function to check if a file should be ignored.

This is a shorthand for creating a YTIgnoreMatcher and checking a single file. For checking multiple files, create a YTIgnoreMatcher instance directly to avoid reloading .ytignore files for each check.

Parameters:
  • file_path (Path) – Path to the file to check

  • base_dir (Path) – Base directory for .ytignore lookup

Returns:

True if the file should be ignored

Return type:

bool

Examples

>>> from pathlib import Path
>>> # Assuming .ytignore contains "*.pyc"
>>> should_ignore_file(Path("/project/test.pyc"), Path("/project"))
True
>>> should_ignore_file(Path("/project/test.py"), Path("/project"))
False

Packaging and operation helpers (advanced)#

These modules support code upload, file dependencies, and environment wiring. Most pipelines rely on defaults; use the API when extending the framework.

Upload and local build#

Build source.tar.gz, merge extra modules/paths, and push artifacts to Cypress.

yt_framework.operations.upload.build_code_locally(build_dir, pipeline_dir, logger, create_wrappers=False, upload_modules=None, upload_paths=None)[source]#

Build all code in a local build directory.

Copies ytjobs package, optional custom modules/paths, and all stages’ code to the build directory, preserving the same structure as would be uploaded to YT.

Parameters:
  • build_dir (Path) – Local build directory path

  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

  • create_wrappers (bool) – If True, create wrapper scripts for all stages

  • upload_modules (List[str] | None) – Optional list of module names to upload

  • upload_paths (List[Dict[str, str]] | None) – Optional list of {source, target?} for local paths

Returns:

Total number of files copied

Return type:

int

yt_framework.operations.upload.create_code_archive(build_dir, archive_path, logger)[source]#

Create a tar.gz archive from the build directory.

Parameters:
  • build_dir (Path) – Local build directory path

  • archive_path (Path) – Path where the archive should be created

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

yt_framework.operations.upload.upload_code_archive(yt_client, archive_path, build_folder, logger)[source]#

Upload code archive to YT.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • archive_path (Path) – Local path to the tar.gz archive

  • build_folder (str) – YT build folder path

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

yt_framework.operations.upload.upload_all_code(yt_client, build_folder, pipeline_dir, logger, upload_modules=None, upload_paths=None)[source]#

Upload all code to YT: ytjobs package, optional custom modules/paths, and stages.

Builds code locally, creates tar archive, and uploads it to YT.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • build_folder (str) – YT build folder path

  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

  • upload_modules (List[str] | None) – Optional list of module names to upload

  • upload_paths (List[Dict[str, str]] | None) – Optional list of {source, target?} for local paths

Returns:

None

Return type:

None

Stage dependency file lists#

Collect extra file dependencies (including implicit ytjobs) for YT jobs.

yt_framework.operations.dependencies.build_stage_dependencies(build_folder, stage_dir, logger)[source]#

Build dependency list for a single stage.

Includes: - config.yaml (if exists locally) - All .py files from src/ directory

Parameters:
  • build_folder (str) – YT build folder path

  • stage_dir (Path) – Path to stage directory (e.g., stages/run_map/)

  • logger (Logger) – Logger instance

Returns:

List of (yt_path, local_path) tuples

Return type:

List[Tuple[str, str]]

yt_framework.operations.dependencies.build_ytjobs_dependencies(build_folder, logger)[source]#

Build dependency list for ytjobs package.

Includes all .py files from ytjobs/ directory.

Parameters:
  • build_folder (str) – YT build folder path

  • logger (Logger) – Logger instance

Returns:

List of (yt_path, local_path) tuples

Return type:

List[Tuple[str, str]]

yt_framework.operations.dependencies.add_checkpoint(dependencies, model_name, checkpoint_base, logger)[source]#

Add checkpoint file to dependencies if configured.

Parameters:
  • dependencies (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples

  • model_name (str | None) – Optional model name for checkpoint

  • checkpoint_base (str | None) – Optional checkpoint base path in YT

  • logger (Logger) – Logger instance

Returns:

Updated dependency list (new list with checkpoint added, or same list)

Return type:

List[Tuple[str, str]]

yt_framework.operations.dependencies.build_vanilla_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#

Build complete dependency list for a vanilla operation.

Combines: - Stage files (config + src/) - Ytjobs package

Parameters:
  • build_folder (str) – YT build folder path

  • stage_dir (Path) – Path to stage directory

  • model_name (str | None) – Optional model name for checkpoint

  • checkpoint_base (str | None) – Optional checkpoint base path in YT

  • logger (Logger) – Logger instance

Returns:

Tuple of (script_path, dependency_files) - script_path: Path to vanilla.py in YT - dependency_files: Complete list of dependencies

Return type:

Tuple[str, List[Tuple[str, str]]]

yt_framework.operations.dependencies.build_map_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#

Build complete dependency list for a map operation.

Combines: - Stage files (config + src/) - Ytjobs package - Checkpoint (if configured)

Parameters:
  • build_folder (str) – YT build folder path

  • stage_dir (Path) – Path to stage directory

  • model_name (str | None) – Optional model name for checkpoint

  • checkpoint_base (str | None) – Optional checkpoint base path in YT

  • logger (Logger) – Logger instance

Returns:

Tuple of (mapper_path, dependency_files) - mapper_path: Path to mapper.py in YT - dependency_files: Complete list of dependencies

Return type:

Tuple[str, List[Tuple[str, str]]]

Shared operation utilities#

Shared helpers for map/vanilla/map-reduce (resources, secrets, tokenizer wiring).

yt_framework.operations.common.build_environment(configs_dir, logger)[source]#

Build environment variables for map and vanilla operations.

Jobs read configuration from config.yaml, so only secrets are passed via environment variables.

Parameters:
  • configs_dir (Path) – Directory containing secrets.env file

  • logger (Logger) – Logger instance

Returns:

Dictionary of secret environment variables

Return type:

Dict[str, str]

yt_framework.operations.common.prepare_docker_auth(docker_image, docker_username, docker_password)[source]#

Prepare Docker authentication dictionary.

Parameters:
  • docker_image (str | None) – Optional Docker image name

  • docker_username (str | None) – Optional Docker registry username

  • docker_password (str | None) – Optional Docker registry password

Returns:

Docker authentication dict if all credentials provided, None otherwise

Return type:

Dict[str, str] | None

yt_framework.operations.common.extract_operation_resources(operation_config, logger)[source]#

Extract OperationResources from operation config with fallback defaults.

Parameters:
  • operation_config (DictConfig)

  • logger (Logger)

Return type:

OperationResources

yt_framework.operations.common.extract_secure_env_client_kwargs(operation_config)[source]#

Options for YTProdClient secure vault / public env partitioning.

Parameters:

operation_config (DictConfig)

Return type:

Dict[str, Any]

yt_framework.operations.common.collect_passthrough_kwargs(operation_config, reserved_keys)[source]#

Collect top-level config values to forward to YT client.

OmegaConf dict nodes are resolved to plain Python containers.

Parameters:
  • operation_config (DictConfig)

  • reserved_keys (Set[str])

Return type:

Dict[str, Any]

yt_framework.operations.common.build_operation_environment(context, operation_config, logger, include_stage_name=True, include_tokenizer_artifact=True)[source]#

Build operation environment from secrets + explicit env config + optional helpers.

Parameters:
  • context (StageContext)

  • operation_config (DictConfig)

  • logger (Logger)

  • include_stage_name (bool)

  • include_tokenizer_artifact (bool)

Return type:

Dict[str, str]

yt_framework.operations.common.extract_docker_auth_from_operation_config(operation_config, env)[source]#

Resolve docker image from config and return auth payload if credentials exist.

Parameters:
  • operation_config (DictConfig)

  • env (Dict[str, str])

Return type:

Dict[str, str] | None

yt_framework.operations.common.extract_max_failed_jobs(operation_config, logger)[source]#

Extract max_failed_job_count with default.

Parameters:
  • operation_config (DictConfig)

  • logger (Logger)

Return type:

int

YT Client#

Client Factory#

Factory that returns YTDevClient or YTProdClient based on pipeline mode.

yt_framework.yt.factory.create_yt_client(logger=None, mode='dev', pipeline_dir=None, secrets=None, pickling=None)[source]#

Factory function to create appropriate YT client based on mode.

Parameters:
  • logger (Logger | None) – Logger instance (default: creates new logger)

  • mode (Literal['prod', 'dev'] | None) – “prod” for production YT client, “dev” for local development

  • pipeline_dir (Path | str | None) – Pipeline directory (required for dev mode)

  • secrets (Dict[str, str] | None) – Optional dictionary containing YT credentials. Required only for prod mode. Expected keys: - YT_PROXY: YTsaurus proxy URL - YT_TOKEN: YTsaurus authentication token

  • pickling (Dict[str, Any] | None)

Returns:

BaseYTClient instance (YTProdClient or YTDevClient)

Raises:

ValueError – If secrets are required for prod mode but not provided.

Return type:

BaseYTClient

Example

>>> # Dev mode (local filesystem simulation)
>>> client = create_yt_client(mode="dev", pipeline_dir=Path("."))
>>>
>>> # Prod mode (real YT cluster)
>>> secrets = {"YT_PROXY": "my-proxy", "YT_TOKEN": "my-token"}
>>> client = create_yt_client(mode="prod", secrets=secrets)

Dev Client#

Local filesystem stand-in for Cypress tables and subprocess-backed jobs.

class yt_framework.yt.client_dev.YTDevClient(logger, pipeline_dir=None)[source]#

Bases: BaseYTClient

Development YT client implementation.

Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory.

Parameters:
__init__(logger, pipeline_dir=None)[source]#

Initialize development YT client.

Parameters:
  • logger (Logger) – Logger instance

  • pipeline_dir (Path | None) – Pipeline directory (required for dev mode)

Return type:

None

create_path(path, node_type='map_node')[source]#

Create a path in YT (no-op in dev mode).

Parameters:
  • path (str) – YT path to create (not used in dev mode).

  • node_type (Literal['table', 'file', 'map_node', 'list_node', 'document']) – Type of node to create (not used in dev mode).

Returns:

None

Return type:

None

exists(path)[source]#

Check if a path exists in YT.

In dev mode, always returns True (assumes files exist locally).

Parameters:

path (str) – YT path to check.

Returns:

Always True in dev mode.

Return type:

bool

write_table(table_path, rows, append=False, replication_factor=1)[source]#

Write rows to a YT table (saves as local .jsonl file).

In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line.

Parameters:
  • table_path (str) – YT table path (e.g., “//tmp/my_table”).

  • rows (List[Dict[str, Any]]) – List of dictionaries representing table rows.

  • append (bool) – If True, append to existing file; otherwise overwrite.

  • replication_factor (int) – Not used in dev mode (kept for API compatibility).

Returns:

None

Return type:

None

Example

>>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}])
>>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\n
read_table(table_path)[source]#

Read rows from a YT table (reads from local .jsonl file).

Parameters:

table_path (str) – YT table path (e.g., “//tmp/my_table”).

Returns:

List of dictionaries representing table rows.

Returns empty list if file doesn’t exist.

Return type:

List[Dict[str, Any]]

row_count(table_path)[source]#

Get number of rows in a YT table (counts lines in local .jsonl file).

Parameters:

table_path (str) – YT table path (e.g., “//tmp/my_table”).

Returns:

Number of non-empty lines in the JSONL file. Returns 0 if file doesn’t exist.

Return type:

int

run_yql(query, pool='default', max_row_weight=None)[source]#

Execute a YQL query locally using DuckDB simulation.

Parameters:
  • query (str) – YQL query string to execute

  • pool (str) – YT pool name (default: ‘default’)

  • max_row_weight (str | None) – Optional max row weight override

Return type:

None

join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False, max_row_weight=None)[source]#

Join two tables using YQL (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

filter_table(input_table, output_table, condition, dry_run=False, max_row_weight=None)[source]#

Filter table rows using WHERE condition (executed locally with DuckDB in dev mode).

Parameters:
  • input_table (str)

  • output_table (str)

  • condition (str)

  • dry_run (bool)

  • max_row_weight (str | None)

Return type:

str | None

select_columns(input_table, output_table, columns, dry_run=False, max_row_weight=None)[source]#

Select specific columns from a table (executed locally with DuckDB in dev mode).

Parameters:
  • input_table (str)

  • output_table (str)

  • columns (List[str])

  • dry_run (bool)

  • max_row_weight (str | None)

Return type:

str | None

group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False, max_row_weight=None)[source]#

Group by columns and compute aggregations (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

union_tables(tables, output_table, dry_run=False, max_row_weight=None)[source]#

Union multiple tables (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

distinct(input_table, output_table, columns=None, dry_run=False, max_row_weight=None)[source]#

Get distinct rows from a table (executed locally with DuckDB in dev mode).

Parameters:
  • input_table (str)

  • output_table (str)

  • columns (List[str] | None)

  • dry_run (bool)

  • max_row_weight (str | None)

Return type:

str | None

sort_table(input_table, output_table, order_by, ascending=True, dry_run=False, max_row_weight=None)[source]#

Sort table by columns (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

limit_table(input_table, output_table, limit, dry_run=False, max_row_weight=None)[source]#

Limit number of rows from a table (executed locally with DuckDB in dev mode).

Parameters:
  • input_table (str)

  • output_table (str)

  • limit (int)

  • dry_run (bool)

  • max_row_weight (str | None)

Return type:

str | None

upload_file(local_path, yt_path, create_parent_dir=False)[source]#

Upload a file to YT (no-op in dev mode).

Parameters:
  • local_path (Path) – Local file path to upload

  • yt_path (str) – YT destination path

  • create_parent_dir (bool) – If True, create parent directory if it doesn’t exist (default: False)

Return type:

None

upload_directory(local_dir, yt_dir, pattern='*')[source]#

Upload a directory to YT (no-op in dev mode).

Parameters:
  • local_dir (Path) – Local directory path to upload.

  • yt_dir (str) – YT destination directory path.

  • pattern (str) – File pattern to match (not used in dev mode).

Returns:

Empty list in dev mode.

Return type:

List[str]

run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None, job=None, append=False, **kwargs)[source]#

Run a map operation locally using subprocess.

In dev mode, executes the mapper script locally with input/output tables as JSONL files. The command is executed in a temporary sandbox directory with all dependencies available.

Parameters:
  • command (Any) – Mapper job (command string in dev mode).

  • input_table (str) – Input YT table path (read from local JSONL).

  • output_table (str) – Output YT table path (written to local JSONL).

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • resources (OperationResources) – Operation resource configuration (not fully used in dev mode).

  • env (Dict[str, str]) – Environment variables dictionary.

  • output_schema (TableSchema | None) – Optional output table schema (not used in dev mode).

  • max_failed_jobs (int) – Maximum failed jobs allowed (not used in dev mode).

  • docker_auth (Dict[str, str] | None) – Optional Docker authentication (not used in dev mode).

  • append (bool) – If True and output JSONL exists, append mapper stdout lines to it.

  • **kwargs (Any) – Additional arguments (not used in dev mode).

  • job (Any)

Returns:

Mock operation object that simulates YT operation.

Return type:

Operation

Example

>>> op = client.run_map(
...     command="python3 mapper.py",
...     input_table="//tmp/input",
...     output_table="//tmp/output",
...     files=[],
...     resources=OperationResources(),
...     env={}
... )
run_vanilla(command, files, env, task_name='main', job=None, **kwargs)[source]#

Run a vanilla operation locally using subprocess.

In dev mode, executes the vanilla script locally in a temporary sandbox directory with all dependencies available. No input/output tables are involved.

Parameters:
  • command (str) – Command to execute (typically bash command with script path).

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • env (Dict[str, str]) – Environment variables dictionary.

  • task_name (str) – Task name for logging (default: “main”).

  • **kwargs – Additional arguments (not used in dev mode).

  • job (str | None)

Returns:

Mock operation object that simulates YT operation.

Return type:

Operation

run_map_reduce(mapper, reducer, input_table, output_table, reduce_by, files, resources, env, sort_by=None, output_schema=None, max_failed_jobs=1, docker_auth=None, map_job=None, reduce_job=None, **kwargs)[source]#

Dev: no-op; copy input table to output table.

Parameters:
Return type:

Operation

run_reduce(reducer, input_table, output_table, reduce_by, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None, job=None, **kwargs)[source]#

Dev: no-op; copy input table to output table.

Parameters:
Return type:

Operation

run_sort(table_path, sort_by, pool=None, pool_tree=None, **kwargs)[source]#

Dev: no-op (table unchanged).

Parameters:
Return type:

None

Production Client#

Thin wrapper around yt.wrapper.YtClient for real cluster operations.

class yt_framework.yt.client_prod.YTProdClient(logger, secrets, pickling=None)[source]#

Bases: BaseYTClient

Production YT client implementation.

Uses actual YTsaurus client for all operations.

Parameters:
__init__(logger, secrets, pickling=None)[source]#

Initialize production YT client.

Parameters:
  • logger (Logger) – Logger instance

  • secrets (Dict[str, str]) – Dictionary containing YT credentials. Expected keys: - YT_PROXY - YT_TOKEN

  • pickling (Dict[str, Any] | None)

Return type:

None

create_path(path, node_type='map_node')[source]#

Create a path in YT.

Parameters:
  • path (str) – YT path to create.

  • node_type (Literal['table', 'file', 'map_node', 'list_node', 'document']) – Type of node to create (default: “map_node”).

Returns:

None

Raises:

Exception – If path creation fails.

Return type:

None

exists(path)[source]#

Check if a path exists in YT.

Parameters:

path (str) – YT path to check.

Returns:

True if path exists, False otherwise.

Return type:

bool

Raises:

Exception – If check fails.

write_table(table_path, rows, append=False, replication_factor=1, make_parents=True)[source]#

Write rows to a YT table.

Parameters:
  • table_path (str) – YT table path

  • rows (List[Dict[str, Any]]) – List of dictionaries representing table rows

  • append (bool) – If True, append to existing table (default: False)

  • replication_factor (int) – Replication factor for the table (default: 1)

  • make_parents (bool) – If True, create parent directories if they don’t exist (default: True)

Return type:

None

read_table(table_path)[source]#

Read rows from a YT table.

Parameters:

table_path (str) – YT table path to read.

Returns:

List of dictionaries representing table rows.

Return type:

List[Dict[str, Any]]

Raises:

Exception – If table read fails.

row_count(table_path)[source]#

Get number of rows in a YT table.

Parameters:

table_path (str) – YT table path.

Returns:

Number of rows in the table.

Return type:

int

Raises:

Exception – If row count query fails.

run_yql(query, pool='default', max_row_weight=None)[source]#

Execute a YQL query on YT cluster.

Parameters:
  • query (str) – YQL query string to execute

  • pool (str) – YT pool name (default: ‘default’)

  • max_row_weight (str | None) – Optional max row weight override

Raises:

Exception – If query execution fails

Return type:

None

join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False, max_row_weight=None)[source]#

Join two tables using YQL.

Parameters:
  • left_table (str) – Path to left table

  • right_table (str) – Path to right table

  • output_table (str) – Path to output table

  • on (str | List[str] | Dict[str, str]) – Join key(s) - column name(s) to join on - str: Same column name on both sides (e.g., “user_id”) - List[str]: Multiple columns with same names (e.g., [“user_id”, “region”]) - Dict[str, str]: Different column names (e.g., {“left”: “input_s3_path”, “right”: “path”})

  • how (Literal['inner', 'left', 'right', 'full']) – Join type - “inner”, “left”, “right”, or “full”

  • select_columns (List[str] | None) – Optional list of columns to select (with table aliases)

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

filter_table(input_table, output_table, condition, dry_run=False, max_row_weight=None)[source]#

Filter table rows using WHERE condition.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • condition (str) – WHERE condition (e.g., “status = ‘active’ AND total > 100”)

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

select_columns(input_table, output_table, columns, dry_run=False, max_row_weight=None)[source]#

Select specific columns from a table.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • columns (List[str]) – List of column names to select

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False, max_row_weight=None)[source]#

Group by columns and compute aggregations.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • group_by (str | List[str]) – Column(s) to group by

  • aggregations (Dict[str, str | Tuple[str, str]]) – Dict mapping output column names to aggregation functions e.g., {“order_count”: “count”, “total_amount”: “sum”}

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

union_tables(tables, output_table, dry_run=False, max_row_weight=None)[source]#

Union multiple tables.

Parameters:
  • tables (List[str]) – List of table paths to union

  • output_table (str) – Path to output table

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

distinct(input_table, output_table, columns=None, dry_run=False, max_row_weight=None)[source]#

Get distinct rows from a table.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • columns (List[str] | None) – Optional list of columns to select (if None, selects all)

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

sort_table(input_table, output_table, order_by, ascending=True, dry_run=False, max_row_weight=None)[source]#

Sort table by columns.

WARNING: Sorting large tables can be expensive. Use with caution.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • order_by (str | List[str]) – Column(s) to sort by

  • ascending (bool) – Sort direction (True for ASC, False for DESC)

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

limit_table(input_table, output_table, limit, dry_run=False, max_row_weight=None)[source]#

Limit number of rows from a table.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • limit (int) – Maximum number of rows to return

  • dry_run (bool) – If True, return the YQL query without executing

  • max_row_weight (str | None)

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

upload_file(local_path, yt_path, create_parent_dir=False)[source]#

Upload a file to YT.

Parameters:
  • local_path (Path) – Local file path to upload

  • yt_path (str) – YT destination path

  • create_parent_dir (bool) – If True, create parent directory if it doesn’t exist (default: False)

Return type:

None

upload_directory(local_dir, yt_dir, pattern='*')[source]#

Upload a directory to YT cluster.

Recursively uploads all files from a local directory to a YT directory, respecting .ytignore patterns if present.

Parameters:
  • local_dir (Path) – Local directory path to upload

  • yt_dir (str) – YT destination directory path

  • pattern (str) – File pattern to match (default: “*” for all files)

Returns:

List of uploaded YT file paths

Raises:

Exception – If directory upload fails

Return type:

List[str]

run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None, job=None, append=False, **kwargs)[source]#

Run a map operation on YT cluster.

Submits a map operation that processes each row of the input table independently and writes results to the output table. The operation runs on the YT cluster with the specified resources and dependencies.

Parameters:
  • command (Any) – Legacy mapper job argument (TypedJob instance or command string).

  • input_table (str) – Input YT table path.

  • output_table (str) – Output YT table path.

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).

  • env (Dict[str, str]) – Environment variables dictionary.

  • output_schema (TableSchema | None) – Optional output table schema for typed output.

  • max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.

  • docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.

  • job (Any) – Preferred mapper job alias.

  • append (bool) – If True, append mapper output to an existing output table.

  • kwargs (Any)

Returns:

YT operation object that can be monitored and waited on.

Return type:

Operation

Raises:

Exception – If operation submission fails.

run_vanilla(command, files, env, task_name, resources, docker_auth=None, max_failed_jobs=1, job=None, **kwargs)[source]#

Run a vanilla operation on YT cluster.

Submits a vanilla operation that runs a standalone job without input/output tables. The operation runs on the YT cluster with the specified resources and dependencies.

Parameters:
  • command (str) – Legacy command argument (typically bash command with script path).

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • env (Dict[str, str]) – Environment variables dictionary.

  • task_name (str) – Task name for the operation.

  • resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).

  • docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.

  • max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.

  • job (str | None) – Preferred command alias.

  • **kwargs (Any) – Extra options applied to the spec builder (e.g. weight, title) or forwarded to run_operation (sync, enable_optimizations).

Returns:

YT operation object that can be monitored and waited on.

Return type:

Operation

Raises:

Exception – If operation submission fails.

run_map_reduce(mapper, reducer, input_table, output_table, reduce_by, files, resources, env, sort_by=None, output_schema=None, max_failed_jobs=1, docker_auth=None, map_job=None, reduce_job=None, **kwargs)[source]#

Run a map-reduce operation on YT cluster.

Parameters:
Return type:

Operation

run_reduce(reducer, input_table, output_table, reduce_by, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None, job=None, **kwargs)[source]#

Run a reduce-only operation on YT cluster.

Parameters:
Return type:

Operation

run_sort(table_path, sort_by, pool=None, pool_tree=None, **kwargs)[source]#

Sort a table in place by the given columns.

Parameters:
Return type:

None