API Reference#
This page provides detailed API documentation for all YT Framework modules. All documentation is automatically generated from Python docstrings.
Tip
Exploring the API
Use the navigation sidebar to jump to specific modules. Each module includes classes, functions, and their complete documentation with parameters, return types, and examples.
Module Overview#
YT Framework is organized into several key modules:
Core: Pipeline, stage, registry, and discovery functionality
Operations: Map, vanilla, YQL, S3, and table operations
YT: YT client implementations (dev and prod)
Utils: Logging, environment, ignore patterns, and utility functions
Note
Auto-Generated Documentation
This documentation is automatically generated from Python docstrings. For the most up-to-date information, check the source code or use Python’s help() function.
Core Modules#
Pipeline#
Provides common functionality for all pipelines, including code upload methods, CLI entry point, and stage execution.
- To create a new pipeline:
Inherit from BasePipeline
In setup(), create StageRegistry and register stages
Optionally override run() for custom execution flow (rare)
Example
- class Pipeline(BasePipeline):
- def setup(self):
- self.set_stage_registry(
StageRegistry() .add_stage(MyStage)
)
- if __name__ == “__main__”:
Pipeline.main()
- class yt_framework.core.pipeline.BasePipeline(config, pipeline_dir, log_level=20)[source]#
Bases:
objectBase class for all pipelines.
Provides common functionality: - CLI entry point via main() class method - Code upload to YT build folder - YT client initialization - Default stage execution loop
Subclasses must: - In setup(), create StageRegistry and register stages via set_stage_registry()
Subclasses may override: - setup(): Register stages and initialize pipeline-specific clients (S3, etc.) - run(): Custom execution flow (rare, most use default)
- __init__(config, pipeline_dir, log_level=20)[source]#
Initialize the base pipeline.
- Parameters:
- Returns:
None
- Raises:
ValueError – If pipeline directory does not exist.
- setup()[source]#
Hook for pipeline-specific initialization.
Override this method in subclasses to: 1. Register stages using StageRegistry and set_stage_registry() 2. Initialize custom clients (e.g., S3 client, database connections, etc.)
This method is called automatically after base initialization.
- Returns:
None
- Return type:
None
- set_stage_registry(registry)[source]#
Set the stage registry for this pipeline.
- Parameters:
registry (StageRegistry) – StageRegistry instance with registered stages
- Returns:
None
- Return type:
None
- create_stage_dependencies()[source]#
Create stage dependencies for injection.
This method creates a dependency container with only what stages need, following the Interface Segregation Principle.
- Returns:
PipelineStageDependencies with yt_client, pipeline_config, configs_dir
- Return type:
PipelineStageDependencies
- upload_code(build_folder=None)[source]#
Upload code to YT build folder.
Only uploads code if any enabled stages need code execution on YT. If no stages need code execution, this method does nothing.
- Parameters:
build_folder (str | None) – Optional YT build folder path. If None, uses config.pipeline.build_folder
- Returns:
None
- Raises:
ValueError – If build_folder is required but not provided in config.
- Return type:
None
- run()[source]#
Run the pipeline by executing enabled stages.
Default implementation: 1. Upload code to YT (only if stages need code execution) 2. Get enabled stages from config 3. Execute stages in order using stage registry 4. Pass context between stages
Override this method only if you need completely custom execution flow.
- Returns:
None
- Raises:
ValueError – If no enabled_stages found in config or unknown stage name.
AttributeError – If stage registry is not set in setup().
- Return type:
None
- classmethod main(argv=None)[source]#
CLI entry point for the pipeline.
Handles: - Argument parsing (–config option) - Config file loading - Pipeline instantiation - Error handling and exit codes
- Parameters:
argv – Optional command-line arguments. If None, uses sys.argv.
- Returns:
None (exits with code 0 on success, 1 on failure)
- Return type:
None
- Usage:
python pipeline.py python pipeline.py –config configs/custom.yaml
- class yt_framework.core.pipeline.DefaultPipeline(config, pipeline_dir, log_level=20)[source]#
Bases:
BasePipelinePipeline with automatic stage discovery.
Automatically discovers and registers all stages from the stages/ directory. No need to manually import or register stages - just put them in stages/ and they’ll be automatically found.
- Usage:
# pipeline.py from yt_framework.core.pipeline import DefaultPipeline
- if __name__ == “__main__”:
DefaultPipeline.main()
The stages to run are still controlled by the enabled_stages configuration.
Stage#
Base Stage Class#
Abstract base class for pipeline stages. All stages should inherit from BaseStage and implement the run method.
Stage name is automatically detected from the directory name. Directory structure: stages/<stage_name>/stage.py
- class yt_framework.core.stage.StageContext(name, config, stage_dir, logger, deps)[source]#
Bases:
objectStage context containing all stage-related information.
- Parameters:
- config#
Stage-specific configuration loaded from config.yaml.
- Type:
omegaconf.dictconfig.DictConfig
- stage_dir#
Path to the stage directory (stages/<stage_name>/).
- Type:
- logger#
Logger instance for stage logging.
- Type:
- deps#
Injected dependencies (yt_client, pipeline_config, configs_dir).
- Type:
yt_framework.core.dependencies.StageDependencies
- config: DictConfig#
- deps: StageDependencies#
- class yt_framework.core.stage.BaseStage(deps, logger)[source]#
Bases:
ABCAbstract base class for pipeline stages.
Stage name and config are automatically detected from the directory. Directory structure: stages/<stage_name>/stage.py Config file: stages/<stage_name>/config.yaml
Each stage must: - Implement __init__ to receive deps and logger - Implement run method to execute stage logic
Example
- class MyStage(BaseStage):
- def __init__(self, deps, logger):
super().__init__(deps, logger) # self.config is automatically loaded from stages/<stage_name>/config.yaml # Access dependencies via self.deps.yt_client, self.deps.pipeline_config
- def run(self, debug: DebugContext) -> DebugContext:
# Stage logic here # Note: ‘debug’ here is the shared data dict, NOT dependencies return {“result”: “value”}
- Parameters:
deps (StageDependencies)
logger (Logger)
- __init__(deps, logger)[source]#
Initialize stage with injected dependencies.
Stage name and config are automatically detected from the directory containing stage.py.
- Parameters:
deps (StageDependencies) – Injected dependencies (yt_client, pipeline_config, configs_dir)
logger (Logger) – Logger instance for stage logging
- Returns:
None
- Raises:
FileNotFoundError – If config.yaml file is not found in stage directory.
ValueError – If config.yaml does not contain a dictionary.
- Return type:
None
- property stage_dir: Path#
Path to the stage directory.
- Returns:
Absolute path to the stage directory (stages/<stage_name>/).
- Return type:
Path
- property context: StageContext#
Stage context containing all stage-related information.
- Returns:
- Dataclass instance with name, config, stage_dir,
logger, and deps attributes.
- Return type:
Registry#
Stage Registry#
Builder for registering pipeline stages with automatic name detection.
- class yt_framework.core.registry.StageRegistry[source]#
Bases:
objectBuilder for registering pipeline stages.
Automatically detects stage names from directory structure.
Example
registry = StageRegistry() registry.add_stage(CreateTableStage) registry.add_stage(RunMapStage) pipeline.set_stage_registry(registry)
- add_stage(stage_class)[source]#
Register a stage class.
Stage name is automatically detected from the directory containing stage.py.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- get_stage(stage_name)[source]#
Get stage class by name.
- Parameters:
stage_name (str) – Name of the stage to retrieve (matches directory name).
- Returns:
The stage class registered with the given name.
- Return type:
Type[BaseStage]
- Raises:
KeyError – If no stage is registered with the given name.
Example
>>> registry = StageRegistry() >>> registry.add_stage(MyStage) >>> stage_class = registry.get_stage("my_stage")
Discovery#
Stage Discovery#
Automatic discovery of stages from stages/ directory.
- yt_framework.core.discovery.discover_stages(pipeline_dir, logger)[source]#
Automatically discover all stage classes from stages/ directory.
Searches for stage.py files in stages/*/ subdirectories and imports all BaseStage subclasses found in them.
- Directory structure expected:
- pipeline_dir/
- stages/
- stage_name_1/
stage.py # Contains Stage class inheriting from BaseStage
- stage_name_2/
stage.py
Operations#
Map Operations#
High-level orchestration for YT map operations.
This module provides functions for running map operations on YTsaurus clusters.
- class yt_framework.operations.map.MapOperationData(mapper_path, dependencies, environment, docker_auth, command=None)[source]#
Bases:
objectData container for map operation configuration.
- Parameters:
- docker_auth#
Optional Docker authentication dictionary for private registries.
- yt_framework.operations.map.run_map(context, operation_config, output_schema=None)[source]#
Run YT map operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map.
- Parameters:
context (StageContext) – Stage context (provides deps, logger, stage_dir)
operation_config (DictConfig) – Operation-specific config (from client.operations.map)
output_schema (TableSchema | None) – Optional YT TableSchema for typed output table
- Returns:
True if successful, False otherwise
- Return type:
Vanilla Operations#
High-level orchestration for YT vanilla operations.
This module provides functions for running vanilla operations on YTsaurus clusters.
- class yt_framework.operations.vanilla.VanillaOperationData(script_path, dependencies, environment, docker_auth, command=None)[source]#
Bases:
objectData container for vanilla operation configuration.
- Parameters:
- docker_auth#
Optional Docker authentication dictionary for private registries.
- yt_framework.operations.vanilla.run_vanilla(context, operation_config)[source]#
Run YT vanilla operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.vanilla. The task name is automatically set to the stage name.
- Parameters:
context (StageContext) – Stage context (provides deps, logger, stage_dir, name)
operation_config (DictConfig) – Operation-specific config (from client.operations.vanilla)
- Returns:
True if successful, False otherwise
- Return type:
YQL Operations#
YQL operations are methods on the YT client. See the YT Client sections below for join_tables, filter_table, select_columns, group_by_aggregate, union_tables, distinct, sort_table, and limit_table methods.
Note
YQL Operations Location
YQL operations are implemented as methods on BaseYTClient and its subclasses (YTDevClient and YTProdClient). They are not in a separate operations module. See the YT Client sections below for complete documentation.
S3 Operations#
Operations for working with S3 data in pipelines.
This module provides functions for integrating S3 storage with YT Framework pipelines.
- yt_framework.operations.s3.list_s3_files(s3_client, bucket, prefix, logger, extension=None, max_files=None)[source]#
List files from S3 bucket with optional filtering.
- Parameters:
- Returns:
List of S3 file paths
- Return type:
Table Operations#
Table Operations#
Operations for working with YT tables.
- yt_framework.operations.table.get_row_count(yt_client, table_path, logger)[source]#
Get number of rows in a YT table.
- yt_framework.operations.table.read_table(yt_client, table_path, logger)[source]#
Read rows from a YT table.
Checkpoint Operations#
Checkpoint Management Utilities#
Utilities for managing checkpoint files in YTsaurus.
- yt_framework.operations.checkpoint.init_checkpoint_directory(context, checkpoint_config)[source]#
Initialize checkpoint directory in YTsaurus if it doesn’t exist.
Uses checkpoint_base from checkpoint_config. Also uploads local checkpoint if specified. Validates that required checkpoint exists in YT before proceeding.
- Parameters:
context (StageContext) – Stage context (provides deps, logger)
checkpoint_config (DictConfig) – Checkpoint-specific config (from client.operations.<op>.checkpoint)
- Returns:
None
- Raises:
FileNotFoundError – If required checkpoint not found in YT
Exception – If checkpoint initialization fails
- Return type:
None
Utilities#
Environment#
Environment Variable Utilities#
Simple utilities for loading environment variables from .env files.
- yt_framework.utils.env.load_env_file(env_path)[source]#
Load environment variables from a .env file.
File format: KEY=VALUE (one per line, # for comments) Missing file is optional and returns empty dict.
- Parameters:
env_path (Path) – Path to the .env file
- Returns:
Dictionary of loaded environment variables (key -> value). Returns empty dict if file doesn’t exist or cannot be read.
- Warns:
UserWarning – If file doesn’t exist or cannot be parsed (non-fatal).
- Return type:
Example
>>> env_vars = load_env_file(Path("configs/secrets.env")) >>> print(env_vars.get("YT_TOKEN"))
- yt_framework.utils.env.load_secrets(secrets_dir, env_file='secrets.env')[source]#
Load secrets from secrets.env file in the specified directory.
- Parameters:
- Returns:
Dictionary of loaded secrets (key -> value). Returns empty dict if file doesn’t exist or cannot be read.
- Warns:
UserWarning – If file doesn’t exist or cannot be parsed (non-fatal).
- Return type:
Example
>>> secrets = load_secrets(Path("configs")) >>> yt_token = secrets.get("YT_TOKEN")
Logging#
Logging Configuration Module#
Centralized logging setup for the entire pipeline.
- class yt_framework.utils.logging.ColoredFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#
Bases:
FormatterCustom formatter with colors for different log levels.
- COLORS = {'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'WARNING': '\x1b[33m'}#
- RESET = '\x1b[0m'#
- ICONS = {'CRITICAL': '🚨', 'DEBUG': '🔍', 'ERROR': '✗', 'WARNING': '⚠️'}#
- yt_framework.utils.logging.setup_logging(level=20, name=None, use_colors=True)[source]#
Setup logging with consistent formatting.
- yt_framework.utils.logging.log_header(logger, title, context=None)[source]#
Log a compact section header in format: [Title] context.
- yt_framework.utils.logging.log_operation(logger, message)[source]#
Log an operation start message with → prefix.
- yt_framework.utils.logging.log_success(logger, message)[source]#
Log a success/completion message with ✓ prefix.
- yt_framework.utils.logging.log_config(logger, config_dict, title='Configuration')[source]#
Log configuration in a readable format.
Automatically masks sensitive values (keys containing ‘secret’ or ‘key’) by showing only the last 4 characters.
- Parameters:
- Returns:
None
- Return type:
None
Example
>>> config = {"api_key": "secret12345", "mode": "dev"} >>> log_config(logger, config) [Configuration] api_key: ***2345 mode: dev
Ignore Patterns#
.ytignore Parser and Matcher#
Supports .gitignore-like pattern matching for excluding files from YT uploads.
- class yt_framework.utils.ignore.YTIgnorePattern(pattern, base_dir, is_negation=False)[source]#
Bases:
objectRepresents a single .ytignore pattern.
This class compiles a pattern string into a regex for efficient matching against file paths. It supports the same syntax as .gitignore files.
Pattern Types: - Simple wildcards: *.pyc, test? - Character classes: *.py[cod] - Recursive wildcards: **/*.log - Directory patterns: build/ (trailing slash) - Rooted patterns: /config (leading slash, root-only) - Negation patterns: !important.py (un-ignore)
Examples
Simple wildcard:
>>> from pathlib import Path >>> pattern = YTIgnorePattern("*.pyc", Path("/project")) >>> pattern.matches(Path("/project/test.pyc")) True >>> pattern.matches(Path("/project/test.py")) False
Directory pattern:
>>> pattern = YTIgnorePattern("__pycache__/", Path("/project")) >>> pattern.matches(Path("/project/__pycache__/module.pyc")) True >>> pattern.matches(Path("/project/src/__pycache__/module.pyc")) True
Recursive wildcard:
>>> pattern = YTIgnorePattern("**/*.log", Path("/project")) >>> pattern.matches(Path("/project/src/debug.log")) True >>> pattern.matches(Path("/project/debug.log")) False # Requires at least one directory level
Rooted pattern:
>>> pattern = YTIgnorePattern("/build", Path("/project")) >>> pattern.matches(Path("/project/build")) True >>> pattern.matches(Path("/project/src/build")) False # Only matches at root
- class yt_framework.utils.ignore.YTIgnoreMatcher(base_dir)[source]#
Bases:
objectMatches file paths against .ytignore patterns.
This class loads .ytignore files from the specified directory and all parent directories up to the filesystem root, then provides methods to check if files should be ignored based on the loaded patterns.
Patterns follow .gitignore syntax: - *.pyc - matches any file ending in .pyc - __pycache__/ - matches any __pycache__ directory - **/test_*.py - matches test_*.py in any subdirectory (but not root) - !important.py - negates a previous ignore pattern - /build - matches only at root level (not in subdirectories) - src/*.log - matches .log files in src/ but not src/subdir/
Examples
Basic usage:
>>> from pathlib import Path >>> matcher = YTIgnoreMatcher(Path("/project")) >>> matcher.should_ignore(Path("/project/test.pyc")) True >>> matcher.should_ignore(Path("/project/main.py")) False
With custom .ytignore file:
>>> # Create a .ytignore file >>> project_dir = Path("/tmp/myproject") >>> project_dir.mkdir(exist_ok=True) >>> (project_dir / ".ytignore").write_text("*.pyc\n__pycache__/\n") >>> matcher = YTIgnoreMatcher(project_dir) >>> matcher.should_ignore(project_dir / "module.pyc") True >>> matcher.should_ignore(project_dir / "module.py") False
Negation patterns:
>>> # Ignore all .log files except important.log >>> (project_dir / ".ytignore").write_text("*.log\n!important.log\n") >>> matcher = YTIgnoreMatcher(project_dir) >>> matcher.should_ignore(project_dir / "debug.log") True >>> matcher.should_ignore(project_dir / "important.log") False
- Parameters:
base_dir (Path)
- yt_framework.utils.ignore.should_ignore_file(file_path, base_dir)[source]#
Convenience function to check if a file should be ignored.
This is a shorthand for creating a YTIgnoreMatcher and checking a single file. For checking multiple files, create a YTIgnoreMatcher instance directly to avoid reloading .ytignore files for each check.
- Parameters:
- Returns:
True if the file should be ignored
- Return type:
Examples
>>> from pathlib import Path >>> # Assuming .ytignore contains "*.pyc" >>> should_ignore_file(Path("/project/test.pyc"), Path("/project")) True >>> should_ignore_file(Path("/project/test.py"), Path("/project")) False
YT Client#
Client Factory#
YTsaurus Operations#
Reusable YT client for any YT operations.
- yt_framework.yt.factory.create_yt_client(logger=None, mode='dev', pipeline_dir=None, secrets=None)[source]#
Factory function to create appropriate YT client based on mode.
- Parameters:
logger (Logger | None) – Logger instance (default: creates new logger)
mode (Literal['prod', 'dev'] | None) – “prod” for production YT client, “dev” for local development
pipeline_dir (Path | str | None) – Pipeline directory (required for dev mode)
secrets (Dict[str, str] | None) – Optional dictionary containing YT credentials. Required only for prod mode. Expected keys: - YT_PROXY: YTsaurus proxy URL - YT_TOKEN: YTsaurus authentication token
- Returns:
BaseYTClient instance (YTProdClient or YTDevClient)
- Raises:
ValueError – If secrets are required for prod mode but not provided.
- Return type:
BaseYTClient
Example
>>> # Dev mode (local filesystem simulation) >>> client = create_yt_client(mode="dev", pipeline_dir=Path(".")) >>> >>> # Prod mode (real YT cluster) >>> secrets = {"YT_PROXY": "my-proxy", "YT_TOKEN": "my-token"} >>> client = create_yt_client(mode="prod", secrets=secrets)
Dev Client#
Development YT Client#
Development implementation of YT client using local file system.
- class yt_framework.yt.client_dev.YTDevClient(logger, pipeline_dir=None)[source]#
Bases:
BaseYTClientDevelopment YT client implementation.
Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory.
- exists(path)[source]#
Check if a path exists in YT.
In dev mode, always returns True (assumes files exist locally).
- write_table(table_path, rows, append=False, replication_factor=1)[source]#
Write rows to a YT table (saves as local .jsonl file).
In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line.
- Parameters:
- Returns:
None
- Return type:
None
Example
>>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}]) >>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\n
- row_count(table_path)[source]#
Get number of rows in a YT table (counts lines in local .jsonl file).
- join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False)[source]#
Join two tables using YQL (executed locally with DuckDB in dev mode).
- filter_table(input_table, output_table, condition, dry_run=False)[source]#
Filter table rows using WHERE condition (executed locally with DuckDB in dev mode).
- select_columns(input_table, output_table, columns, dry_run=False)[source]#
Select specific columns from a table (executed locally with DuckDB in dev mode).
- group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False)[source]#
Group by columns and compute aggregations (executed locally with DuckDB in dev mode).
- union_tables(tables, output_table, dry_run=False)[source]#
Union multiple tables (executed locally with DuckDB in dev mode).
- distinct(input_table, output_table, columns=None, dry_run=False)[source]#
Get distinct rows from a table (executed locally with DuckDB in dev mode).
- sort_table(input_table, output_table, order_by, ascending=True, dry_run=False)[source]#
Sort table by columns (executed locally with DuckDB in dev mode).
- limit_table(input_table, output_table, limit, dry_run=False)[source]#
Limit number of rows from a table (executed locally with DuckDB in dev mode).
- upload_file(local_path, yt_path, create_parent_dir=False)[source]#
Upload a file to YT (no-op in dev mode).
- upload_directory(local_dir, yt_dir, pattern='*')[source]#
Upload a directory to YT (no-op in dev mode).
- run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None)[source]#
Run a map operation locally using subprocess.
In dev mode, executes the mapper script locally with input/output tables as JSONL files. The command is executed in a temporary sandbox directory with all dependencies available.
- Parameters:
command (str) – Command to execute (typically bash command with script path).
input_table (str) – Input YT table path (read from local JSONL).
output_table (str) – Output YT table path (written to local JSONL).
files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.
resources (OperationResources) – Operation resource configuration (not fully used in dev mode).
output_schema (TableSchema | None) – Optional output table schema (not used in dev mode).
max_failed_jobs (int) – Maximum failed jobs allowed (not used in dev mode).
docker_auth (Dict[str, str] | None) – Optional Docker authentication (not used in dev mode).
- Returns:
Mock operation object that simulates YT operation.
- Return type:
Operation
Example
>>> op = client.run_map( ... command="python3 mapper.py", ... input_table="//tmp/input", ... output_table="//tmp/output", ... files=[], ... resources=OperationResources(), ... env={} ... )
- run_vanilla(command, files, env, task_name='main', **kwargs)[source]#
Run a vanilla operation locally using subprocess.
In dev mode, executes the vanilla script locally in a temporary sandbox directory with all dependencies available. No input/output tables are involved.
- Parameters:
- Returns:
Mock operation object that simulates YT operation.
- Return type:
Operation
Production Client#
Production YT Client#
Production implementation of YT client using actual YTsaurus client.
- class yt_framework.yt.client_prod.YTProdClient(logger, secrets)[source]#
Bases:
BaseYTClientProduction YT client implementation.
Uses actual YTsaurus client for all operations.
- write_table(table_path, rows, append=False, replication_factor=1, make_parents=True)[source]#
Write rows to a YT table.
- Parameters:
table_path (str) – YT table path
rows (List[Dict[str, Any]]) – List of dictionaries representing table rows
append (bool) – If True, append to existing table (default: False)
replication_factor (int) – Replication factor for the table (default: 1)
make_parents (bool) – If True, create parent directories if they don’t exist (default: True)
- Return type:
None
- join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False)[source]#
Join two tables using YQL.
- Parameters:
left_table (str) – Path to left table
right_table (str) – Path to right table
output_table (str) – Path to output table
on (str | List[str] | Dict[str, str]) – Join key(s) - column name(s) to join on - str: Same column name on both sides (e.g., “user_id”) - List[str]: Multiple columns with same names (e.g., [“user_id”, “region”]) - Dict[str, str]: Different column names (e.g., {“left”: “input_s3_path”, “right”: “path”})
how (Literal['inner', 'left', 'right', 'full']) – Join type - “inner”, “left”, “right”, or “full”
select_columns (List[str] | None) – Optional list of columns to select (with table aliases)
dry_run (bool) – If True, return the YQL query without executing
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- filter_table(input_table, output_table, condition, dry_run=False)[source]#
Filter table rows using WHERE condition.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- select_columns(input_table, output_table, columns, dry_run=False)[source]#
Select specific columns from a table.
- group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False)[source]#
Group by columns and compute aggregations.
- Parameters:
input_table (str) – Path to input table
output_table (str) – Path to output table
aggregations (Dict[str, str | Tuple[str, str]]) – Dict mapping output column names to aggregation functions e.g., {“order_count”: “count”, “total_amount”: “sum”}
dry_run (bool) – If True, return the YQL query without executing
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- distinct(input_table, output_table, columns=None, dry_run=False)[source]#
Get distinct rows from a table.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- sort_table(input_table, output_table, order_by, ascending=True, dry_run=False)[source]#
Sort table by columns.
WARNING: Sorting large tables can be expensive. Use with caution.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- limit_table(input_table, output_table, limit, dry_run=False)[source]#
Limit number of rows from a table.
- upload_directory(local_dir, yt_dir, pattern='*')[source]#
Upload a directory to YT cluster.
Recursively uploads all files from a local directory to a YT directory, respecting .ytignore patterns if present.
- run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None)[source]#
Run a map operation on YT cluster.
Submits a map operation that processes each row of the input table independently and writes results to the output table. The operation runs on the YT cluster with the specified resources and dependencies.
- Parameters:
command (str) – Command to execute (typically bash command with script path).
input_table (str) – Input YT table path.
output_table (str) – Output YT table path.
files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.
resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).
output_schema (TableSchema | None) – Optional output table schema for typed output.
max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.
docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.
- Returns:
YT operation object that can be monitored and waited on.
- Return type:
Operation
- Raises:
Exception – If operation submission fails.
- run_vanilla(command, files, env, task_name, resources, docker_auth=None, max_failed_jobs=1)[source]#
Run a vanilla operation on YT cluster.
Submits a vanilla operation that runs a standalone job without input/output tables. The operation runs on the YT cluster with the specified resources and dependencies.
- Parameters:
command (str) – Command to execute (typically bash command with script path).
files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.
task_name (str) – Task name for the operation.
resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).
docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.
max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.
- Returns:
YT operation object that can be monitored and waited on.
- Return type:
Operation
- Raises:
Exception – If operation submission fails.