API Reference#

This page provides detailed API documentation for all YT Framework modules. All documentation is automatically generated from Python docstrings.

Tip

Exploring the API

Use the navigation sidebar to jump to specific modules. Each module includes classes, functions, and their complete documentation with parameters, return types, and examples.

Module Overview#

YT Framework is organized into several key modules:

  • Core: Pipeline, stage, registry, and discovery functionality

  • Operations: Map, vanilla, YQL, S3, and table operations

  • YT: YT client implementations (dev and prod)

  • Utils: Logging, environment, and utility functions

Note

Auto-Generated Documentation

This documentation is automatically generated from Python docstrings. For the most up-to-date information, check the source code or use Python’s help() function.

Core Modules#

Pipeline#

Provides common functionality for all pipelines, including code upload methods, CLI entry point, and stage execution.

To create a new pipeline:
  1. Inherit from BasePipeline

  2. In setup(), create StageRegistry and register stages

  3. Optionally override run() for custom execution flow (rare)

Example

class Pipeline(BasePipeline):
def setup(self):
self.set_stage_registry(

StageRegistry() .add_stage(MyStage)

)

if __name__ == “__main__”:

Pipeline.main()

class yt_framework.core.pipeline.BasePipeline(config, pipeline_dir, log_level=20)[source]#

Bases: object

Base class for all pipelines.

Provides common functionality: - CLI entry point via main() class method - Code upload to YT build folder - YT client initialization - Default stage execution loop

Subclasses must: - In setup(), create StageRegistry and register stages via set_stage_registry()

Subclasses may override: - setup(): Register stages and initialize pipeline-specific clients (S3, etc.) - run(): Custom execution flow (rare, most use default)

Parameters:
  • config (DictConfig)

  • pipeline_dir (Path)

  • log_level (int)

__init__(config, pipeline_dir, log_level=20)[source]#

Initialize the base pipeline.

Parameters:
  • config (DictConfig) – Configuration object (OmegaConf DictConfig)

  • pipeline_dir (Path) – Path to pipeline directory

  • log_level (int) – Logging level

Returns:

None

Raises:

ValueError – If pipeline directory does not exist.

setup()[source]#

Hook for pipeline-specific initialization.

Override this method in subclasses to: 1. Register stages using StageRegistry and set_stage_registry() 2. Initialize custom clients (e.g., S3 client, database connections, etc.)

This method is called automatically after base initialization.

Returns:

None

Return type:

None

set_stage_registry(registry)[source]#

Set the stage registry for this pipeline.

Parameters:

registry (StageRegistry) – StageRegistry instance with registered stages

Returns:

None

Return type:

None

create_stage_dependencies()[source]#

Create stage dependencies for injection.

This method creates a dependency container with only what stages need, following the Interface Segregation Principle.

Returns:

PipelineStageDependencies with yt_client, pipeline_config, configs_dir

Return type:

PipelineStageDependencies

upload_code(build_folder=None)[source]#

Upload code to YT build folder.

Only uploads code if any enabled stages need code execution on YT. If no stages need code execution, this method does nothing.

Parameters:

build_folder (str | None) – Optional YT build folder path. If None, uses config.pipeline.build_folder

Returns:

None

Raises:

ValueError – If build_folder is required but not provided in config.

Return type:

None

run()[source]#

Run the pipeline by executing enabled stages.

Default implementation: 1. Upload code to YT (only if stages need code execution) 2. Get enabled stages from config 3. Execute stages in order using stage registry 4. Pass context between stages

Override this method only if you need completely custom execution flow.

Returns:

None

Raises:
  • ValueError – If no enabled_stages found in config or unknown stage name.

  • AttributeError – If stage registry is not set in setup().

Return type:

None

classmethod main(argv=None)[source]#

CLI entry point for the pipeline.

Handles: - Argument parsing (–config option) - Config file loading - Pipeline instantiation - Error handling and exit codes

Parameters:

argv – Optional command-line arguments. If None, uses sys.argv.

Returns:

None (exits with code 0 on success, 1 on failure)

Return type:

None

Usage:

python pipeline.py python pipeline.py –config configs/custom.yaml

class yt_framework.core.pipeline.DefaultPipeline(config, pipeline_dir, log_level=20)[source]#

Bases: BasePipeline

Pipeline with automatic stage discovery.

Automatically discovers and registers all stages from the stages/ directory. No need to manually import or register stages - just put them in stages/ and they’ll be automatically found.

Usage:

# pipeline.py from yt_framework.core.pipeline import DefaultPipeline

if __name__ == “__main__”:

DefaultPipeline.main()

The stages to run are still controlled by the enabled_stages configuration.

Parameters:
  • config (DictConfig)

  • pipeline_dir (Path)

  • log_level (int)

setup()[source]#

Automatically discover and register stages from stages/ directory.

Looks for all stage.py files in stages/*/ subdirectories and automatically imports and registers any BaseStage subclasses found.

Returns:

None

Return type:

None

Stage#

Base Stage Class#

Abstract base class for pipeline stages. All stages should inherit from BaseStage and implement the run method.

Stage name is automatically detected from the directory name. Directory structure: stages/<stage_name>/stage.py

class yt_framework.core.stage.StageContext(name, config, stage_dir, logger, deps)[source]#

Bases: object

Stage context containing all stage-related information.

Parameters:
  • name (str)

  • config (DictConfig)

  • stage_dir (Path)

  • logger (Logger)

  • deps (StageDependencies)

name#

Stage name (automatically detected from directory name).

Type:

str

config#

Stage-specific configuration loaded from config.yaml.

Type:

omegaconf.dictconfig.DictConfig

stage_dir#

Path to the stage directory (stages/<stage_name>/).

Type:

pathlib.Path

logger#

Logger instance for stage logging.

Type:

logging.Logger

deps#

Injected dependencies (yt_client, pipeline_config, configs_dir).

Type:

yt_framework.core.dependencies.StageDependencies

name: str#
config: DictConfig#
stage_dir: Path#
logger: Logger#
deps: StageDependencies#
class yt_framework.core.stage.BaseStage(deps, logger)[source]#

Bases: ABC

Abstract base class for pipeline stages.

Stage name and config are automatically detected from the directory. Directory structure: stages/<stage_name>/stage.py Config file: stages/<stage_name>/config.yaml

Each stage must: - Implement __init__ to receive deps and logger - Implement run method to execute stage logic

Example

class MyStage(BaseStage):
def __init__(self, deps, logger):

super().__init__(deps, logger) # self.config is automatically loaded from stages/<stage_name>/config.yaml # Access dependencies via self.deps.yt_client, self.deps.pipeline_config

def run(self, debug: DebugContext) -> DebugContext:

# Stage logic here # Note: ‘debug’ here is the shared data dict, NOT dependencies return {“result”: “value”}

Parameters:
  • deps (StageDependencies)

  • logger (Logger)

__init__(deps, logger)[source]#

Initialize stage with injected dependencies.

Stage name and config are automatically detected from the directory containing stage.py.

Parameters:
  • deps (StageDependencies) – Injected dependencies (yt_client, pipeline_config, configs_dir)

  • logger (Logger) – Logger instance for stage logging

Returns:

None

Raises:
  • FileNotFoundError – If config.yaml file is not found in stage directory.

  • ValueError – If config.yaml does not contain a dictionary.

Return type:

None

name: str#
property stage_dir: Path#

Path to the stage directory.

Returns:

Absolute path to the stage directory (stages/<stage_name>/).

Return type:

Path

abstractmethod run(debug)[source]#

Execute the stage.

Parameters:

debug (Dict[str, Any]) – Shared context dictionary from previous stages. Contains results from earlier stages. Can be empty dict for the first stage.

Returns:

Dictionary with stage results to be merged into context

and passed to the next stage.

Return type:

DebugContext

property context: StageContext#

Stage context containing all stage-related information.

Returns:

Dataclass instance with name, config, stage_dir,

logger, and deps attributes.

Return type:

StageContext

Registry#

Stage Registry#

Builder for registering pipeline stages with automatic name detection.

class yt_framework.core.registry.StageRegistry[source]#

Bases: object

Builder for registering pipeline stages.

Automatically detects stage names from directory structure.

Example

registry = StageRegistry() registry.add_stage(CreateTableStage) registry.add_stage(RunMapStage) pipeline.set_stage_registry(registry)

__init__()[source]#

Initialize empty stage registry.

Return type:

None

add_stage(stage_class)[source]#

Register a stage class.

Stage name is automatically detected from the directory containing stage.py.

Parameters:

stage_class (Type[BaseStage]) – Stage class to register

Returns:

Self for method chaining

Return type:

StageRegistry

get_stage(stage_name)[source]#

Get stage class by name.

Parameters:

stage_name (str) – Name of the stage to retrieve (matches directory name).

Returns:

The stage class registered with the given name.

Return type:

Type[BaseStage]

Raises:

KeyError – If no stage is registered with the given name.

Example

>>> registry = StageRegistry()
>>> registry.add_stage(MyStage)
>>> stage_class = registry.get_stage("my_stage")
has_stage(stage_name)[source]#

Check if stage is registered.

Parameters:

stage_name (str) – Name of the stage to check.

Returns:

True if the stage is registered, False otherwise.

Return type:

bool

get_all_stages()[source]#

Get all registered stages.

Returns:

Dictionary mapping stage names to stage classes.

Returns a copy to prevent external modification.

Return type:

Dict[str, Type[BaseStage]]

Discovery#

Stage Discovery#

Automatic discovery of stages from stages/ directory.

yt_framework.core.discovery.discover_stages(pipeline_dir, logger)[source]#

Automatically discover all stage classes from stages/ directory.

Searches for stage.py files in stages/*/ subdirectories and imports all BaseStage subclasses found in them.

Directory structure expected:
pipeline_dir/
stages/
stage_name_1/

stage.py # Contains Stage class inheriting from BaseStage

stage_name_2/

stage.py

Parameters:
  • pipeline_dir (Path) – Path to pipeline directory

  • logger (Logger) – Logger instance

Returns:

List of discovered stage classes

Return type:

List[Type[BaseStage]]

Operations#

Map Operations#

High-level orchestration for YT map operations.

This module provides functions for running map operations on YTsaurus clusters.

class yt_framework.operations.map.MapOperationData(mapper_path, dependencies, environment, docker_auth, command=None)[source]#

Bases: object

Data container for map operation configuration.

Parameters:
mapper_path#

Path to mapper.py script in YT (or bash wrapper if tar mode).

Type:

str

dependencies#

List of (yt_path, local_path) tuples for files to upload.

Type:

List[Tuple[str, str]]

environment#

Environment variables dictionary (secrets only).

Type:

Dict[str, str]

docker_auth#

Optional Docker authentication dictionary for private registries.

Type:

Dict[str, str] | None

command#

Optional command to execute (used in tar archive mode).

Type:

str | None

mapper_path: str#
dependencies: List[Tuple[str, str]]#
environment: Dict[str, str]#
docker_auth: Dict[str, str] | None#
command: str | None = None#
yt_framework.operations.map.run_map(context, operation_config, output_schema=None)[source]#

Run YT map operation and wait for completion.

All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger, stage_dir)

  • operation_config (DictConfig) – Operation-specific config (from client.operations.map)

  • output_schema (TableSchema | None) – Optional YT TableSchema for typed output table

Returns:

True if successful, False otherwise

Return type:

bool

Vanilla Operations#

High-level orchestration for YT vanilla operations.

This module provides functions for running vanilla operations on YTsaurus clusters.

class yt_framework.operations.vanilla.VanillaOperationData(script_path, dependencies, environment, docker_auth, command=None)[source]#

Bases: object

Data container for vanilla operation configuration.

Parameters:
script_path#

Path to vanilla.py script in YT (or placeholder if tar mode).

Type:

str

dependencies#

List of (yt_path, local_path) tuples for files to upload.

Type:

List[Tuple[str, str]]

environment#

Environment variables dictionary (secrets only).

Type:

Dict[str, str]

docker_auth#

Optional Docker authentication dictionary for private registries.

Type:

Dict[str, str] | None

command#

Optional command to execute (used in tar archive mode).

Type:

str | None

script_path: str#
dependencies: List[Tuple[str, str]]#
environment: Dict[str, str]#
docker_auth: Dict[str, str] | None#
command: str | None = None#
yt_framework.operations.vanilla.run_vanilla(context, operation_config)[source]#

Run YT vanilla operation and wait for completion.

All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.vanilla. The task name is automatically set to the stage name.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger, stage_dir, name)

  • operation_config (DictConfig) – Operation-specific config (from client.operations.vanilla)

Returns:

True if successful, False otherwise

Return type:

bool

YQL Operations#

YQL operations are methods on the YT client. See the YT Client sections below for join_tables, filter_table, select_columns, group_by_aggregate, union_tables, distinct, sort_table, and limit_table methods.

Note

YQL Operations Location

YQL operations are implemented as methods on BaseYTClient and its subclasses (YTDevClient and YTProdClient). They are not in a separate operations module. See the YT Client sections below for complete documentation.

S3 Operations#

Operations for working with S3 data in pipelines.

This module provides functions for integrating S3 storage with YT Framework pipelines.

yt_framework.operations.s3.list_s3_files(s3_client, bucket, prefix, logger, extension=None, max_files=None)[source]#

List files from S3 bucket with optional filtering.

Parameters:
  • s3_client (S3Client) – S3 client instance

  • bucket (str) – S3 bucket name

  • prefix (str) – S3 prefix path

  • logger (Logger) – Logger instance

  • extension (str | None) – Optional file extension filter (e.g., ‘mp4’)

  • max_files (int | None) – Optional maximum number of files to return

Returns:

List of S3 file paths

Return type:

List[str]

yt_framework.operations.s3.save_s3_paths_to_table(yt_client, bucket, paths, output_table, logger)[source]#

Save S3 file paths to YT table as bucket and path columns.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • bucket (str) – S3 bucket name

  • paths (List[str]) – List of S3 file paths

  • output_table (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

Table Operations#

Table Operations#

Operations for working with YT tables.

yt_framework.operations.table.get_row_count(yt_client, table_path, logger)[source]#

Get number of rows in a YT table.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

Number of rows in table

Return type:

int

yt_framework.operations.table.read_table(yt_client, table_path, logger)[source]#

Read rows from a YT table.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • logger (Logger) – Logger instance

Returns:

List of rows as dictionaries

Return type:

List[Dict[str, Any]]

yt_framework.operations.table.download_table(yt_client, table_path, output_file, logger)[source]#

Download YT table to local JSONL file.

Parameters:
  • yt_client (BaseYTClient) – YT client instance

  • table_path (str) – YT table path

  • output_file (Path) – Local file path for output

  • logger (Logger) – Logger instance

Returns:

None

Return type:

None

Checkpoint Operations#

Checkpoint Management Utilities#

Utilities for managing checkpoint files in YTsaurus.

yt_framework.operations.checkpoint.init_checkpoint_directory(context, checkpoint_config)[source]#

Initialize checkpoint directory in YTsaurus if it doesn’t exist.

Uses checkpoint_base from checkpoint_config. Also uploads local checkpoint if specified. Validates that required checkpoint exists in YT before proceeding.

Parameters:
  • context (StageContext) – Stage context (provides deps, logger)

  • checkpoint_config (DictConfig) – Checkpoint-specific config (from client.operations.<op>.checkpoint)

Returns:

None

Raises:
Return type:

None

Utilities#

Archive#

YT Vanilla Job Script to untar archive and upload files to YT build folder.

This script is executed as a standalone YT vanilla job (not imported as a module).

This script: 1. Downloads the archive from YT (or uses local file if provided) 2. Extracts it to local filesystem 3. Uploads extracted files to YT build folder

Environment variables required: - YT_BUILD_FOLDER: YT path to build folder - YT_ARCHIVE_PATH: YT path to archive file - ARCHIVE_LOCAL_NAME: Local filename of archive in sandbox (default: code.tar.gz)

yt_framework.utils.archive.main()[source]#

Main entry point for archive extraction and upload script.

This function is executed as a standalone YT vanilla job. It: 1. Downloads the archive from YT (or uses local file if provided) 2. Extracts it to local filesystem 3. Uploads extracted files to YT build folder

Environment variables required:

YT_BUILD_FOLDER: YT path to build folder YT_ARCHIVE_PATH: YT path to archive file ARCHIVE_LOCAL_NAME: Local filename of archive in sandbox (default: code.tar.gz)

Returns:

None (exits with code 0 on success, 1 on failure)

Raises:

SystemExit – If required environment variables are missing or operations fail.

Return type:

None

Environment#

Environment Variable Utilities#

Simple utilities for loading environment variables from .env files.

yt_framework.utils.env.load_env_file(env_path)[source]#

Load environment variables from a .env file.

File format: KEY=VALUE (one per line, # for comments) Missing file is optional and returns empty dict.

Parameters:

env_path (Path) – Path to the .env file

Returns:

Dictionary of loaded environment variables (key -> value). Returns empty dict if file doesn’t exist or cannot be read.

Warns:

UserWarning – If file doesn’t exist or cannot be parsed (non-fatal).

Return type:

Dict[str, str]

Example

>>> env_vars = load_env_file(Path("configs/secrets.env"))
>>> print(env_vars.get("YT_TOKEN"))
yt_framework.utils.env.load_secrets(secrets_dir, env_file='secrets.env')[source]#

Load secrets from secrets.env file in the specified directory.

Parameters:
  • secrets_dir (Path) – Directory containing the secrets.env file

  • env_file (str) – Name of the environment file (default: “secrets.env”)

Returns:

Dictionary of loaded secrets (key -> value). Returns empty dict if file doesn’t exist or cannot be read.

Warns:

UserWarning – If file doesn’t exist or cannot be parsed (non-fatal).

Return type:

Dict[str, str]

Example

>>> secrets = load_secrets(Path("configs"))
>>> yt_token = secrets.get("YT_TOKEN")

Logging#

Logging Configuration Module#

Centralized logging setup for the entire pipeline.

class yt_framework.utils.logging.ColoredFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#

Bases: Formatter

Custom formatter with colors for different log levels.

COLORS = {'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'WARNING': '\x1b[33m'}#
RESET = '\x1b[0m'#
ICONS = {'CRITICAL': '🚨', 'DEBUG': '🔍', 'ERROR': '✗', 'WARNING': '⚠️'}#
format(record)[source]#

Format log record with colors and icons.

Parameters:

record (LogRecord) – Log record to format.

Returns:

Formatted log message with ANSI color codes and icons.

Return type:

str

yt_framework.utils.logging.setup_logging(level=20, name=None, use_colors=True)[source]#

Setup logging with consistent formatting.

Parameters:
  • level (int) – Logging level (default: INFO)

  • name (str | None) – Logger name (default: root logger)

  • use_colors (bool) – Whether to use colored output

Returns:

Configured logger instance

Return type:

Logger

yt_framework.utils.logging.log_header(logger, title, context=None)[source]#

Log a compact section header in format: [Title] context.

Parameters:
  • logger (Logger) – Logger instance

  • title (str) – Section title (will be wrapped in brackets)

  • context (str | None) – Optional additional context information

Return type:

None

yt_framework.utils.logging.log_operation(logger, message)[source]#

Log an operation start message with → prefix.

Parameters:
  • logger (Logger) – Logger instance

  • message (str) – Operation description

Return type:

None

yt_framework.utils.logging.log_success(logger, message)[source]#

Log a success/completion message with ✓ prefix.

Parameters:
  • logger (Logger) – Logger instance

  • message (str) – Success message

Return type:

None

yt_framework.utils.logging.log_config(logger, config_dict, title='Configuration')[source]#

Log configuration in a readable format.

Automatically masks sensitive values (keys containing ‘secret’ or ‘key’) by showing only the last 4 characters.

Parameters:
  • logger (Logger) – Logger instance

  • config_dict (dict) – Configuration dictionary to log

  • title (str) – Title for the configuration section

Returns:

None

Return type:

None

Example

>>> config = {"api_key": "secret12345", "mode": "dev"}
>>> log_config(logger, config)
[Configuration]
    api_key: ***2345
    mode: dev

YT Client#

Client Factory#

YTsaurus Operations#

Reusable YT client for any YT operations.

yt_framework.yt.factory.create_yt_client(logger=None, mode='dev', pipeline_dir=None, secrets=None)[source]#

Factory function to create appropriate YT client based on mode.

Parameters:
  • logger (Logger | None) – Logger instance (default: creates new logger)

  • mode (Literal['prod', 'dev'] | None) – “prod” for production YT client, “dev” for local development

  • pipeline_dir (Path | str | None) – Pipeline directory (required for dev mode)

  • secrets (Dict[str, str] | None) – Optional dictionary containing YT credentials. Required only for prod mode. Expected keys: - YT_PROXY: YTsaurus proxy URL - YT_TOKEN: YTsaurus authentication token

Returns:

BaseYTClient instance (YTProdClient or YTDevClient)

Raises:

ValueError – If secrets are required for prod mode but not provided.

Return type:

BaseYTClient

Example

>>> # Dev mode (local filesystem simulation)
>>> client = create_yt_client(mode="dev", pipeline_dir=Path("."))
>>>
>>> # Prod mode (real YT cluster)
>>> secrets = {"YT_PROXY": "my-proxy", "YT_TOKEN": "my-token"}
>>> client = create_yt_client(mode="prod", secrets=secrets)

Dev Client#

Development YT Client#

Development implementation of YT client using local file system.

class yt_framework.yt.client_dev.YTDevClient(logger, pipeline_dir=None)[source]#

Bases: BaseYTClient

Development YT client implementation.

Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory.

Parameters:
__init__(logger, pipeline_dir=None)[source]#

Initialize development YT client.

Parameters:
  • logger (Logger) – Logger instance

  • pipeline_dir (Path | None) – Pipeline directory (required for dev mode)

Return type:

None

create_path(path, node_type='map_node')[source]#

Create a path in YT (no-op in dev mode).

Parameters:
  • path (str) – YT path to create (not used in dev mode).

  • node_type (Literal['table', 'file', 'map_node', 'list_node', 'document']) – Type of node to create (not used in dev mode).

Returns:

None

Return type:

None

exists(path)[source]#

Check if a path exists in YT.

In dev mode, always returns True (assumes files exist locally).

Parameters:

path (str) – YT path to check.

Returns:

Always True in dev mode.

Return type:

bool

write_table(table_path, rows, append=False, replication_factor=1)[source]#

Write rows to a YT table (saves as local .jsonl file).

In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line.

Parameters:
  • table_path (str) – YT table path (e.g., “//tmp/my_table”).

  • rows (List[Dict[str, Any]]) – List of dictionaries representing table rows.

  • append (bool) – If True, append to existing file; otherwise overwrite.

  • replication_factor (int) – Not used in dev mode (kept for API compatibility).

Returns:

None

Return type:

None

Example

>>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}])
>>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\n
read_table(table_path)[source]#

Read rows from a YT table (reads from local .jsonl file).

Parameters:

table_path (str) – YT table path (e.g., “//tmp/my_table”).

Returns:

List of dictionaries representing table rows.

Returns empty list if file doesn’t exist.

Return type:

List[Dict[str, Any]]

row_count(table_path)[source]#

Get number of rows in a YT table (counts lines in local .jsonl file).

Parameters:

table_path (str) – YT table path (e.g., “//tmp/my_table”).

Returns:

Number of non-empty lines in the JSONL file. Returns 0 if file doesn’t exist.

Return type:

int

run_yql(query, pool='default')[source]#

Execute a YQL query locally using DuckDB simulation.

Parameters:
  • query (str) – YQL query string to execute

  • pool (str) – YT pool name (default: ‘default’)

Return type:

None

join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False)[source]#

Join two tables using YQL (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

filter_table(input_table, output_table, condition, dry_run=False)[source]#

Filter table rows using WHERE condition (executed locally with DuckDB in dev mode).

Parameters:
  • input_table (str)

  • output_table (str)

  • condition (str)

  • dry_run (bool)

Return type:

str | None

select_columns(input_table, output_table, columns, dry_run=False)[source]#

Select specific columns from a table (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False)[source]#

Group by columns and compute aggregations (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

union_tables(tables, output_table, dry_run=False)[source]#

Union multiple tables (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

distinct(input_table, output_table, columns=None, dry_run=False)[source]#

Get distinct rows from a table (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

sort_table(input_table, output_table, order_by, ascending=True, dry_run=False)[source]#

Sort table by columns (executed locally with DuckDB in dev mode).

Parameters:
Return type:

str | None

limit_table(input_table, output_table, limit, dry_run=False)[source]#

Limit number of rows from a table (executed locally with DuckDB in dev mode).

Parameters:
  • input_table (str)

  • output_table (str)

  • limit (int)

  • dry_run (bool)

Return type:

str | None

upload_file(local_path, yt_path, create_parent_dir=False)[source]#

Upload a file to YT (no-op in dev mode).

Parameters:
  • local_path (Path) – Local file path to upload

  • yt_path (str) – YT destination path

  • create_parent_dir (bool) – If True, create parent directory if it doesn’t exist (default: False)

Return type:

None

upload_directory(local_dir, yt_dir, pattern='*')[source]#

Upload a directory to YT (no-op in dev mode).

Parameters:
  • local_dir (Path) – Local directory path to upload.

  • yt_dir (str) – YT destination directory path.

  • pattern (str) – File pattern to match (not used in dev mode).

Returns:

Empty list in dev mode.

Return type:

List[str]

run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None)[source]#

Run a map operation locally using subprocess.

In dev mode, executes the mapper script locally with input/output tables as JSONL files. The command is executed in a temporary sandbox directory with all dependencies available.

Parameters:
  • command (str) – Command to execute (typically bash command with script path).

  • input_table (str) – Input YT table path (read from local JSONL).

  • output_table (str) – Output YT table path (written to local JSONL).

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • resources (OperationResources) – Operation resource configuration (not fully used in dev mode).

  • env (Dict[str, str]) – Environment variables dictionary.

  • output_schema (TableSchema | None) – Optional output table schema (not used in dev mode).

  • max_failed_jobs (int) – Maximum failed jobs allowed (not used in dev mode).

  • docker_auth (Dict[str, str] | None) – Optional Docker authentication (not used in dev mode).

Returns:

Mock operation object that simulates YT operation.

Return type:

Operation

Example

>>> op = client.run_map(
...     command="python3 mapper.py",
...     input_table="//tmp/input",
...     output_table="//tmp/output",
...     files=[],
...     resources=OperationResources(),
...     env={}
... )
run_vanilla(command, files, env, task_name='main', **kwargs)[source]#

Run a vanilla operation locally using subprocess.

In dev mode, executes the vanilla script locally in a temporary sandbox directory with all dependencies available. No input/output tables are involved.

Parameters:
  • command (str) – Command to execute (typically bash command with script path).

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • env (Dict[str, str]) – Environment variables dictionary.

  • task_name (str) – Task name for logging (default: “main”).

  • **kwargs – Additional arguments (not used in dev mode).

Returns:

Mock operation object that simulates YT operation.

Return type:

Operation

Production Client#

Production YT Client#

Production implementation of YT client using actual YTsaurus client.

class yt_framework.yt.client_prod.YTProdClient(logger, secrets)[source]#

Bases: BaseYTClient

Production YT client implementation.

Uses actual YTsaurus client for all operations.

Parameters:
__init__(logger, secrets)[source]#

Initialize production YT client.

Parameters:
  • logger (Logger) – Logger instance

  • secrets (Dict[str, str]) – Dictionary containing YT credentials. Expected keys: - YT_PROXY - YT_TOKEN

Return type:

None

create_path(path, node_type='map_node')[source]#

Create a path in YT.

Parameters:
  • path (str) – YT path to create.

  • node_type (Literal['table', 'file', 'map_node', 'list_node', 'document']) – Type of node to create (default: “map_node”).

Returns:

None

Raises:

Exception – If path creation fails.

Return type:

None

exists(path)[source]#

Check if a path exists in YT.

Parameters:

path (str) – YT path to check.

Returns:

True if path exists, False otherwise.

Return type:

bool

Raises:

Exception – If check fails.

write_table(table_path, rows, append=False, replication_factor=1, make_parents=True)[source]#

Write rows to a YT table.

Parameters:
  • table_path (str) – YT table path

  • rows (List[Dict[str, Any]]) – List of dictionaries representing table rows

  • append (bool) – If True, append to existing table (default: False)

  • replication_factor (int) – Replication factor for the table (default: 1)

  • make_parents (bool) – If True, create parent directories if they don’t exist (default: True)

Return type:

None

read_table(table_path)[source]#

Read rows from a YT table.

Parameters:

table_path (str) – YT table path to read.

Returns:

List of dictionaries representing table rows.

Return type:

List[Dict[str, Any]]

Raises:

Exception – If table read fails.

row_count(table_path)[source]#

Get number of rows in a YT table.

Parameters:

table_path (str) – YT table path.

Returns:

Number of rows in the table.

Return type:

int

Raises:

Exception – If row count query fails.

run_yql(query, pool='default')[source]#

Execute a YQL query on YT cluster.

Parameters:
  • query (str) – YQL query string to execute

  • pool (str) – YT pool name (default: ‘default’)

Raises:

Exception – If query execution fails

Return type:

None

join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False)[source]#

Join two tables using YQL.

Parameters:
  • left_table (str) – Path to left table

  • right_table (str) – Path to right table

  • output_table (str) – Path to output table

  • on (str | List[str] | Dict[str, str]) – Join key(s) - column name(s) to join on - str: Same column name on both sides (e.g., “user_id”) - List[str]: Multiple columns with same names (e.g., [“user_id”, “region”]) - Dict[str, str]: Different column names (e.g., {“left”: “input_s3_path”, “right”: “path”})

  • how (Literal['inner', 'left', 'right', 'full']) – Join type - “inner”, “left”, “right”, or “full”

  • select_columns (List[str] | None) – Optional list of columns to select (with table aliases)

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

filter_table(input_table, output_table, condition, dry_run=False)[source]#

Filter table rows using WHERE condition.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • condition (str) – WHERE condition (e.g., “status = ‘active’ AND total > 100”)

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

select_columns(input_table, output_table, columns, dry_run=False)[source]#

Select specific columns from a table.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • columns (List[str]) – List of column names to select

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False)[source]#

Group by columns and compute aggregations.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • group_by (str | List[str]) – Column(s) to group by

  • aggregations (Dict[str, str | Tuple[str, str]]) – Dict mapping output column names to aggregation functions e.g., {“order_count”: “count”, “total_amount”: “sum”}

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

union_tables(tables, output_table, dry_run=False)[source]#

Union multiple tables.

Parameters:
  • tables (List[str]) – List of table paths to union

  • output_table (str) – Path to output table

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

distinct(input_table, output_table, columns=None, dry_run=False)[source]#

Get distinct rows from a table.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • columns (List[str] | None) – Optional list of columns to select (if None, selects all)

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

sort_table(input_table, output_table, order_by, ascending=True, dry_run=False)[source]#

Sort table by columns.

WARNING: Sorting large tables can be expensive. Use with caution.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • order_by (str | List[str]) – Column(s) to sort by

  • ascending (bool) – Sort direction (True for ASC, False for DESC)

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

limit_table(input_table, output_table, limit, dry_run=False)[source]#

Limit number of rows from a table.

Parameters:
  • input_table (str) – Path to input table

  • output_table (str) – Path to output table

  • limit (int) – Maximum number of rows to return

  • dry_run (bool) – If True, return the YQL query without executing

Returns:

YQL query string if dry_run=True, None otherwise

Return type:

str | None

upload_file(local_path, yt_path, create_parent_dir=False)[source]#

Upload a file to YT.

Parameters:
  • local_path (Path) – Local file path to upload

  • yt_path (str) – YT destination path

  • create_parent_dir (bool) – If True, create parent directory if it doesn’t exist (default: False)

Return type:

None

upload_directory(local_dir, yt_dir, pattern='*')[source]#

Upload a directory to YT cluster.

Recursively uploads all files from a local directory to a YT directory, respecting .ytignore patterns if present.

Parameters:
  • local_dir (Path) – Local directory path to upload

  • yt_dir (str) – YT destination directory path

  • pattern (str) – File pattern to match (default: “*” for all files)

Returns:

List of uploaded YT file paths

Raises:

Exception – If directory upload fails

Return type:

List[str]

run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None)[source]#

Run a map operation on YT cluster.

Submits a map operation that processes each row of the input table independently and writes results to the output table. The operation runs on the YT cluster with the specified resources and dependencies.

Parameters:
  • command (str) – Command to execute (typically bash command with script path).

  • input_table (str) – Input YT table path.

  • output_table (str) – Output YT table path.

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).

  • env (Dict[str, str]) – Environment variables dictionary.

  • output_schema (TableSchema | None) – Optional output table schema for typed output.

  • max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.

  • docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.

Returns:

YT operation object that can be monitored and waited on.

Return type:

Operation

Raises:

Exception – If operation submission fails.

run_vanilla(command, files, env, task_name, resources, docker_auth=None, max_failed_jobs=1)[source]#

Run a vanilla operation on YT cluster.

Submits a vanilla operation that runs a standalone job without input/output tables. The operation runs on the YT cluster with the specified resources and dependencies.

Parameters:
  • command (str) – Command to execute (typically bash command with script path).

  • files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.

  • env (Dict[str, str]) – Environment variables dictionary.

  • task_name (str) – Task name for the operation.

  • resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).

  • docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.

  • max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.

Returns:

YT operation object that can be monitored and waited on.

Return type:

Operation

Raises:

Exception – If operation submission fails.