API reference#
yt_framework symbols below are generated from Python docstrings. Job-side entrypoints (ytjobs) have a separate page.
Tip
Navigation
Use the sidebar headings for modules. Anything not listed is still in the source tree; use help(module) locally.
Layout#
Core — pipeline, stage, registry, discovery,
self.depsinjectionOperations — map, vanilla, map-reduce/reduce, S3 helpers, table helpers, checkpoint upload, sort, tokenizer artifact wiring
Typed jobs —
StageBootstrapTypedJobYT — client factory, dev and prod clients (YQL helpers live on the clients)
Utils — env files, logging setup, ignore patterns
Packaging — upload helpers shared by operations
ytjobs— Job-side reference
Note
Narrative docs
How-to guides under docs/operations, docs/configuration, and docs/advanced spell out YAML and stage patterns; this page is the raw API surface.
Core Modules#
Pipeline#
Pipeline base classes, CLI entry point (main), and code upload orchestration.
Subclass BasePipeline, implement setup() to register stages with
StageRegistry, then run via Pipeline.main() from pipeline.py.
DefaultPipeline auto-discovers stages under stages/*/stage.py.
- class yt_framework.core.pipeline.BasePipeline(config, pipeline_dir, log_level=20)[source]#
Bases:
objectBase class for all pipelines.
Provides common functionality: - CLI entry point via main() class method - Code upload to YT build folder - YT client initialization - Default stage execution loop
Subclasses must: - In setup(), create StageRegistry and register stages via set_stage_registry()
Subclasses may override: - setup(): Register stages and initialize pipeline-specific clients (S3, etc.) - run(): Custom execution flow (rare, most use default)
- __init__(config, pipeline_dir, log_level=20)[source]#
Initialize the base pipeline.
- Parameters:
- Returns:
None
- Raises:
ValueError – If pipeline directory does not exist.
- setup()[source]#
Hook for pipeline-specific initialization.
Override this method in subclasses to: 1. Register stages using StageRegistry and set_stage_registry() 2. Initialize custom clients (e.g., S3 client, database connections, etc.)
This method is called automatically after base initialization.
- Returns:
None
- Return type:
None
- set_stage_registry(registry)[source]#
Set the stage registry for this pipeline.
- Parameters:
registry (StageRegistry) – StageRegistry instance with registered stages
- Returns:
None
- Return type:
None
- create_stage_dependencies()[source]#
Create stage dependencies for injection.
This method creates a dependency container with only what stages need, following the Interface Segregation Principle.
- Returns:
PipelineStageDependencies with yt_client, pipeline_config, configs_dir
- Return type:
- upload_code(build_folder=None)[source]#
Upload code to YT build folder.
Only uploads code if any enabled stages need code execution on YT. If no stages need code execution, this method does nothing.
- Parameters:
build_folder (str | None) – Optional YT build folder path. If None, uses config.pipeline.build_folder
- Returns:
None
- Raises:
ValueError – If build_folder is required but not provided in config.
- Return type:
None
- run()[source]#
Run the pipeline by executing enabled stages.
Default implementation: 1. Upload code to YT (only if stages need code execution) 2. Get enabled stages from config 3. Execute stages in order using stage registry 4. Pass context between stages
Override this method only if you need completely custom execution flow.
- Returns:
None
- Raises:
ValueError – If no enabled_stages found in config or unknown stage name.
AttributeError – If stage registry is not set in setup().
- Return type:
None
- classmethod main(argv=None)[source]#
CLI entry point for the pipeline.
Handles: - Argument parsing (–config option) - Config file loading - Pipeline instantiation - Error handling and exit codes
- Parameters:
argv – Optional command-line arguments. If None, uses sys.argv.
- Returns:
None (exits with code 0 on success, 1 on failure)
- Return type:
None
- Usage:
python pipeline.py python pipeline.py –config configs/custom.yaml
- class yt_framework.core.pipeline.DefaultPipeline(config, pipeline_dir, log_level=20)[source]#
Bases:
BasePipelinePipeline with automatic stage discovery.
Discovers
BaseStagesubclasses fromstages/<name>/stage.pyand registers them. Run withDefaultPipeline.main()frompipeline.py. Which stages execute is still controlled byenabled_stagesin config.
Stage#
BaseStage — pipeline authors subclass this and implement run().
The framework derives the stage name from the stages/<name>/ directory.
- class yt_framework.core.stage.StageContext(name, config, stage_dir, logger, deps)[source]#
Bases:
objectStage context containing all stage-related information.
- Parameters:
name (str)
config (DictConfig)
stage_dir (Path)
logger (Logger)
deps (StageDependencies)
- config#
Stage-specific configuration loaded from config.yaml.
- Type:
omegaconf.dictconfig.DictConfig
- stage_dir#
Path to the stage directory (stages/<stage_name>/).
- Type:
- logger#
Logger instance for stage logging.
- Type:
- deps#
Injected dependencies (yt_client, pipeline_config, configs_dir).
- config: DictConfig#
- deps: StageDependencies#
- fork(name=None, stage_dir=None)[source]#
Return a shallow copy with selective overrides.
Use this in multi-operation stages when a later operation needs a slightly different context (e.g., a different
stage_dirso thatTarArchiveDependencyBuilderresolves wrapper scripts from the correct location).Only
nameandstage_dircan be overridden; all other fields (config,logger,deps) are inherited from the parent context, which is intentional — they represent shared pipeline state.- Parameters:
- Returns:
A new
StageContextwith the specified fields replaced.- Return type:
Example:
ctx_reduce = self.context.fork(name="mds", stage_dir=Path(__file__).parent) run_reduce(context=ctx_reduce, operation_config=reduce_cfg, reducer=MyReducer())
- class yt_framework.core.stage.BaseStage(deps, logger)[source]#
Bases:
ABCAbstract base class for pipeline stages.
Stage name and config are automatically detected from the directory. Directory structure: stages/<stage_name>/stage.py Config file: stages/<stage_name>/config.yaml
Each stage must: - Implement __init__ to receive deps and logger - Implement run method to execute stage logic
Example
- class MyStage(BaseStage):
- def __init__(self, deps, logger):
super().__init__(deps, logger) # self.config is automatically loaded from stages/<stage_name>/config.yaml # Access dependencies via self.deps.yt_client, self.deps.pipeline_config
- def run(self, debug: DebugContext) -> DebugContext:
# Stage logic here # Note: ‘debug’ here is the shared data dict, NOT dependencies return {“result”: “value”}
- Parameters:
deps (StageDependencies)
logger (Logger)
- __init__(deps, logger)[source]#
Initialize stage with injected dependencies.
Stage name and config are automatically detected from the directory containing stage.py.
- Parameters:
deps (StageDependencies) – Injected dependencies (yt_client, pipeline_config, configs_dir)
logger (Logger) – Logger instance for stage logging
- Returns:
None
- Raises:
FileNotFoundError – If config.yaml file is not found in stage directory.
ValueError – If config.yaml does not contain a dictionary.
- Return type:
None
- property stage_dir: Path#
Path to the stage directory.
- Returns:
Absolute path to the stage directory (stages/<stage_name>/).
- Return type:
Path
- property context: StageContext#
Stage context containing all stage-related information.
- Returns:
- Dataclass instance with name, config, stage_dir,
logger, and deps attributes.
- Return type:
Registry#
Mutable registry of BaseStage subclasses keyed by stage name.
- class yt_framework.core.registry.StageRegistry[source]#
Bases:
objectBuilder for registering pipeline stages.
Automatically detects stage names from directory structure.
Example
registry = StageRegistry() registry.add_stage(CreateTableStage) registry.add_stage(RunMapStage) pipeline.set_stage_registry(registry)
- add_stage(stage_class)[source]#
Register a stage class.
Stage name is automatically detected from the directory containing stage.py.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- get_stage(stage_name)[source]#
Get stage class by name.
- Parameters:
stage_name (str) – Name of the stage to retrieve (matches directory name).
- Returns:
The stage class registered with the given name.
- Return type:
Type[BaseStage]
- Raises:
KeyError – If no stage is registered with the given name.
Example
>>> registry = StageRegistry() >>> registry.add_stage(MyStage) >>> stage_class = registry.get_stage("my_stage")
Discovery#
Filesystem scan that imports stages/*/stage.py and collects BaseStage types.
- yt_framework.core.discovery.discover_stages(pipeline_dir, logger)[source]#
Automatically discover all stage classes from the
stagesdirectory tree.Searches for
stage.pyunder eachstageschild directory and imports allBaseStagesubclasses found.Expected layout:
pipeline_dir/stages/<stage_name>/stage.pywith aBaseStagesubclass in each module.
Core injection (self.deps)#
Injection protocol and PipelineStageDependencies for stage run() methods.
Defines StageDependencies (protocol) and the concrete dataclass the pipeline
passes as self.deps (YT client, pipeline config, configs directory).
- class yt_framework.core.dependencies.StageDependencies(*args, **kwargs)[source]#
Bases:
ProtocolProtocol defining what dependencies stages need.
This is NOT the same as the ‘context’ parameter in run() methods: - StageDependencies: Injected services/config (yt_client, config, etc.) - context parameter: Shared data dictionary passed between stages
Benefits: - Dependency Inversion: Depends on abstraction, not concrete Pipeline - Interface Segregation: Only exposes what stages actually use - Testability: Easy to create mock dependencies for testing
- property yt_client: BaseYTClient#
YT client for operations.
- Returns:
- YT client instance (either YTDevClient or YTProdClient)
for performing table operations, running map/vanilla jobs, etc.
- Return type:
BaseYTClient
- property pipeline_config: DictConfig#
Pipeline-level configuration (contains build_folder and secrets).
- Returns:
- OmegaConf configuration object containing pipeline-wide
settings like mode, build_folder, and other pipeline parameters.
- Return type:
DictConfig
- class yt_framework.core.dependencies.PipelineStageDependencies(yt_client, pipeline_config, configs_dir)[source]#
Bases:
objectDefault implementation of StageDependencies.
Used by BasePipeline to inject dependencies into stages. This class is instantiated by the pipeline and passed to each stage.
- Parameters:
yt_client (BaseYTClient)
pipeline_config (DictConfig)
configs_dir (Path)
- yt_client#
YT client instance for performing operations on YTsaurus cluster or local filesystem (dev mode).
- Type:
yt_framework.yt.client_base.BaseYTClient
- pipeline_config#
Pipeline-level configuration containing mode, build_folder, and other pipeline-wide settings.
- Type:
omegaconf.dictconfig.DictConfig
- configs_dir#
Path to directory containing secrets.env and configuration files.
- Type:
- yt_client: BaseYTClient#
- pipeline_config: DictConfig#
Operations#
Map Operations#
Driver helpers to package src/mapper.py and submit YT map operations.
- class yt_framework.operations.map.MapOperationData(mapper_path, dependencies, environment, docker_auth, command=None)[source]#
Bases:
objectData container for map operation configuration.
- Parameters:
- docker_auth#
Optional Docker authentication dictionary for private registries.
- yt_framework.operations.map.run_map(context, operation_config, output_schema=None, mapper=None, job=None)[source]#
Run YT map operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map.
- Parameters:
context (StageContext) – Stage context (provides deps, logger, stage_dir)
operation_config (DictConfig) – Operation-specific config (from client.operations.map). Optional key
append(bool): append mapper output to an existing output table.output_schema (TableSchema | None) – Optional YT TableSchema for typed output table
mapper (Any | None) – Optional mapper leg (legacy name). When omitted, framework uses command wrapper. Can be a TypedJob instance or command string.
job (Any | None) – Preferred mapper leg alias. Can be a TypedJob instance or command string.
- Returns:
True if successful, False otherwise
- Return type:
Vanilla Operations#
Driver helpers to package src/vanilla.py and submit YT vanilla operations.
- class yt_framework.operations.vanilla.VanillaOperationData(script_path, dependencies, environment, docker_auth, command=None)[source]#
Bases:
objectData container for vanilla operation configuration.
- Parameters:
- docker_auth#
Optional Docker authentication dictionary for private registries.
- yt_framework.operations.vanilla.run_vanilla(context, operation_config, job=None)[source]#
Run YT vanilla operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.vanilla. The task name is automatically set to the stage name.
- Parameters:
context (StageContext) – Stage context (provides deps, logger, stage_dir, name)
operation_config (DictConfig) – Operation-specific config (from client.operations.vanilla)
job (str | None) – Preferred command alias. When omitted, framework wrapper command is used.
- Returns:
True if successful, False otherwise
- Return type:
Map-reduce and reduce#
Submit map-reduce and reduce-only YT operations (TypedJob or command strings).
Builds archives plus optional file dependencies; cluster credentials still come from configs/secrets.env like other operations.
- yt_framework.operations.map_reduce.run_map_reduce(context, operation_config, mapper=None, reducer=None, output_schema=None, map_job=None, reduce_job=None)[source]#
Run a YT map-reduce operation and wait for completion.
Pass mapper and reducer either both as
TypedJobinstances or both as command strings (JSON stdin/stdout). Mixing kinds raisesValueError.Set
operation_config.tar_command_bootstrap: trueto wrap string legs with the sametar -xzf source.tar.gz+ wrapper pattern as map operations (requires matching wrappers in the uploaded tarball; see docs).- Parameters:
context (StageContext) – Stage context (deps, logger, stage_dir, config).
operation_config (DictConfig) – client.operations.map_reduce config (input_table, output_table, reduce_by, sort_by, resources, file_paths, etc.).
mapper (Any) – Deprecated — use
map_jobinstead.reducer (Any) – Deprecated — use
reduce_jobinstead.output_schema (Any | None) – Optional YT TableSchema for output table.
map_job (Any) – Mapper leg (
TypedJobinstance or command string).reduce_job (Any) – Reducer leg (
TypedJobinstance or command string).
- Returns:
True if the operation completed successfully.
- Return type:
- yt_framework.operations.map_reduce.run_reduce(context, operation_config, reducer=None, output_schema=None, job=None)[source]#
Run a YT reduce-only operation and wait for completion.
Pass
reduceras aTypedJobor a command string. Withoperation_config.tar_command_bootstrap: true, string reducers get the same tar extract + wrapper bootstrap as map (see docs).- Parameters:
context (StageContext) – Stage context.
operation_config (DictConfig) – client.operations.reduce config.
reducer (Any) – Reducer leg (legacy name).
output_schema (Any | None) – Optional output table schema.
job (Any) – Preferred reducer leg alias.
- Returns:
True if the operation completed successfully.
- Return type:
YQL Operations#
YQL operations are methods on the YT client. See the YT Client sections below for join_tables, filter_table, select_columns, group_by_aggregate, union_tables, distinct, sort_table, and limit_table.
Note
YQL Operations Location
YQL operations are implemented as methods on BaseYTClient and its subclasses (YTDevClient and YTProdClient). They are not in a separate operations module.
S3 Operations#
Driver-side helpers to list S3 keys and persist paths into Cypress tables.
- yt_framework.operations.s3.list_s3_files(s3_client, bucket, prefix, logger, extension=None, max_files=None)[source]#
List files from S3 bucket with optional filtering.
- Parameters:
- Returns:
List of S3 file paths
- Return type:
Table operations#
Helpers to count rows, load rows into memory, and export tables to JSONL.
For low-level access, use BaseYTClient methods (read_table, row_count, etc.)
directly on self.deps.yt_client. These functions add logging and convenience.
- yt_framework.operations.table.get_row_count(yt_client, table_path, logger)[source]#
Get number of rows in a YT table.
- yt_framework.operations.table.read_table(yt_client, table_path, logger)[source]#
Read rows from a YT table.
Checkpoint Operations#
Upload or reuse single-file model checkpoints and wire them into operation specs.
- yt_framework.operations.checkpoint.init_checkpoint_directory(context, checkpoint_config)[source]#
Initialize checkpoint directory in YTsaurus if it doesn’t exist.
Uses checkpoint_base from checkpoint_config. Also uploads local checkpoint if specified. Validates that required checkpoint exists in YT before proceeding.
- Parameters:
context (StageContext) – Stage context (provides deps, logger)
checkpoint_config (DictConfig) – Checkpoint-specific config (from client.operations.<op>.checkpoint)
- Returns:
None
- Raises:
FileNotFoundError – If required checkpoint not found in YT
Exception – If checkpoint initialization fails
- Return type:
None
Sort operations#
Submit YT sort jobs using the same (context, operation_config) pattern as map.
- yt_framework.operations.sort.run_sort(context, operation_config)[source]#
Run a YT sort operation and wait for completion.
- Parameters:
context (StageContext) – Stage context (deps, logger, stage_dir, config).
operation_config (DictConfig) –
Sort-specific config. Required keys:
input_table— table to sort in-place.sort_by— list of column names.
Optional keys mirror
run_map_reduce:resources.pool/resources.pool_tree— scheduler pool.resources.memory_limit_gb,resources.cpu_limit— resource hints.
- Returns:
True if the sort completed successfully.
- Return type:
Example config (
client.operations.sortin stageconfig.yaml):sort: sort_by: [shard_order, mock_field] resources: pool: my_pool
Then in the stage:
from yt_framework.operations.sort import run_sort sort_cfg = OmegaConf.merge( self.config.client.operations.sort, {"input_table": intermediate_table}, ) run_sort(context=self.context, operation_config=sort_cfg)
Tokenizer artifact#
Pack tokenizer/processor tarballs, upload to Cypress, and expose sandbox env vars.
- yt_framework.operations.tokenizer_artifact.resolve_tokenizer_artifact_name(stage_config, tokenizer_artifact_config)[source]#
Resolve logical tokenizer artifact name from config.
- Parameters:
stage_config (DictConfig)
tokenizer_artifact_config (DictConfig)
- Return type:
str | None
- yt_framework.operations.tokenizer_artifact.resolve_tokenizer_archive_name(artifact_name)[source]#
Convert logical artifact name to mounted tar filename.
- yt_framework.operations.tokenizer_artifact.resolve_tokenizer_artifact_yt_path(stage_config, tokenizer_artifact_config)[source]#
Resolve full YT file path for tokenizer artifact tarball.
- Parameters:
stage_config (DictConfig)
tokenizer_artifact_config (DictConfig)
- Return type:
str | None
- yt_framework.operations.tokenizer_artifact.init_tokenizer_artifact_directory(context, tokenizer_artifact_config)[source]#
Initialize tokenizer artifact in YT (if configured).
Behavior: - creates artifact_base if needed; - uploads local artifact from local_artifact_path if provided and missing in YT; - validates artifact presence in YT.
- Parameters:
context (StageContext)
tokenizer_artifact_config (DictConfig)
- Return type:
None
Typed jobs#
TypedJob helpers for YTsaurus execution.
- class yt_framework.typed_jobs.StageBootstrapTypedJob[source]#
Bases:
TypedJobBase class for TypedJob legs with worker-side bootstrap.
On unpickle, extracts
source.tar.gzwhen needed, prepends the archive root andstages/<stage>/srctosys.path, and setsJOB_CONFIG_PATHwhen the stage config file exists. Uses__getstate__/__setstate__so bootstrap runs on the worker before__call__.
Utilities#
Environment#
Load KEY=value files such as configs/secrets.env into plain dicts.
- yt_framework.utils.env.load_env_file(env_path)[source]#
Load environment variables from a .env file.
File format: KEY=VALUE (one per line, # for comments) Missing file is optional and returns empty dict.
- Parameters:
env_path (Path) – Path to the .env file
- Returns:
Dictionary of loaded environment variables (key -> value). Returns empty dict if file doesn’t exist or cannot be read.
- Warns:
UserWarning – If the file exists but cannot be read or parsed (non-fatal).
Missing file is silent (returns empty dict)
- Return type:
Example
>>> env_vars = load_env_file(Path("configs/secrets.env")) >>> print(env_vars.get("YT_TOKEN"))
- yt_framework.utils.env.load_secrets(secrets_dir, env_file='secrets.env')[source]#
Load secrets from secrets.env file in the specified directory.
- Parameters:
- Returns:
Dictionary of loaded secrets (key -> value). Returns empty dict if file doesn’t exist or cannot be read.
- Warns:
UserWarning – If the secrets file exists but cannot be read or parsed (non-fatal).
- Return type:
Example
>>> secrets = load_secrets(Path("configs")) >>> yt_token = secrets.get("YT_TOKEN")
Logging#
Colored console formatter and helpers used by pipeline startup.
- class yt_framework.utils.logging.ColoredFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#
Bases:
FormatterCustom formatter with colors for different log levels.
- COLORS = {'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'WARNING': '\x1b[33m'}#
- RESET = '\x1b[0m'#
- ICONS = {'CRITICAL': '🚨', 'DEBUG': '🔍', 'ERROR': '✗', 'WARNING': '⚠️'}#
- yt_framework.utils.logging.setup_logging(level=20, name=None, use_colors=True)[source]#
Setup logging with consistent formatting.
- yt_framework.utils.logging.log_header(logger, title, context=None)[source]#
Log a compact section header in format: [Title] context.
- yt_framework.utils.logging.log_operation(logger, message)[source]#
Log an operation start message with → prefix.
- yt_framework.utils.logging.log_success(logger, message)[source]#
Log a success/completion message with ✓ prefix.
- yt_framework.utils.logging.log_config(logger, config_dict, title='Configuration')[source]#
Log configuration in a readable format.
Automatically masks sensitive values (keys containing ‘secret’ or ‘key’) by showing only the last 4 characters.
- Parameters:
- Returns:
None
- Return type:
None
Example
>>> config = {"api_key": "secret12345", "mode": "dev"} >>> log_config(logger, config) [Configuration] api_key: ***2345 mode: dev
Ignore Patterns#
.ytignore parsing (gitignore-style) for upload tarballs.
- class yt_framework.utils.ignore.YTIgnorePattern(pattern, base_dir, is_negation=False)[source]#
Bases:
objectRepresents a single .ytignore pattern.
This class compiles a pattern string into a regex for efficient matching against file paths. It supports the same syntax as .gitignore files.
Pattern Types: - Simple wildcards: *.pyc, test? - Character classes: *.py[cod] - Recursive wildcards: **/*.log - Directory patterns: build/ (trailing slash) - Rooted patterns: /config (leading slash, root-only) - Negation patterns: !important.py (un-ignore)
Examples
Simple wildcard:
>>> from pathlib import Path >>> pattern = YTIgnorePattern("*.pyc", Path("/project")) >>> pattern.matches(Path("/project/test.pyc")) True >>> pattern.matches(Path("/project/test.py")) False
Directory pattern:
>>> pattern = YTIgnorePattern("__pycache__/", Path("/project")) >>> pattern.matches(Path("/project/__pycache__/module.pyc")) True >>> pattern.matches(Path("/project/src/__pycache__/module.pyc")) True
Recursive wildcard:
>>> pattern = YTIgnorePattern("**/*.log", Path("/project")) >>> pattern.matches(Path("/project/src/debug.log")) True >>> pattern.matches(Path("/project/debug.log")) False # Requires at least one directory level
Rooted pattern:
>>> pattern = YTIgnorePattern("/build", Path("/project")) >>> pattern.matches(Path("/project/build")) True >>> pattern.matches(Path("/project/src/build")) False # Only matches at root
- class yt_framework.utils.ignore.YTIgnoreMatcher(base_dir)[source]#
Bases:
objectMatches file paths against .ytignore patterns.
This class loads .ytignore files from the specified directory and all parent directories up to the filesystem root, then provides methods to check if files should be ignored based on the loaded patterns.
Patterns follow .gitignore syntax: - *.pyc - matches any file ending in .pyc - __pycache__/ - matches any __pycache__ directory - **/test_*.py - matches test_*.py in any subdirectory (but not root) - !important.py - negates a previous ignore pattern - /build - matches only at root level (not in subdirectories) - src/*.log - matches .log files in src/ but not src/subdir/
Examples
Basic usage:
>>> from pathlib import Path >>> matcher = YTIgnoreMatcher(Path("/project")) >>> matcher.should_ignore(Path("/project/test.pyc")) True >>> matcher.should_ignore(Path("/project/main.py")) False
With custom .ytignore file:
>>> # Create a .ytignore file >>> project_dir = Path("/tmp/myproject") >>> project_dir.mkdir(exist_ok=True) >>> (project_dir / ".ytignore").write_text("*.pyc\n__pycache__/\n") >>> matcher = YTIgnoreMatcher(project_dir) >>> matcher.should_ignore(project_dir / "module.pyc") True >>> matcher.should_ignore(project_dir / "module.py") False
Negation patterns:
>>> # Ignore all .log files except important.log >>> (project_dir / ".ytignore").write_text("*.log\n!important.log\n") >>> matcher = YTIgnoreMatcher(project_dir) >>> matcher.should_ignore(project_dir / "debug.log") True >>> matcher.should_ignore(project_dir / "important.log") False
- Parameters:
base_dir (Path)
- yt_framework.utils.ignore.should_ignore_file(file_path, base_dir)[source]#
Convenience function to check if a file should be ignored.
This is a shorthand for creating a YTIgnoreMatcher and checking a single file. For checking multiple files, create a YTIgnoreMatcher instance directly to avoid reloading .ytignore files for each check.
- Parameters:
- Returns:
True if the file should be ignored
- Return type:
Examples
>>> from pathlib import Path >>> # Assuming .ytignore contains "*.pyc" >>> should_ignore_file(Path("/project/test.pyc"), Path("/project")) True >>> should_ignore_file(Path("/project/test.py"), Path("/project")) False
Packaging and operation helpers (advanced)#
These modules support code upload, file dependencies, and environment wiring. Most pipelines rely on defaults; use the API when extending the framework.
Upload and local build#
Build source.tar.gz, merge extra modules/paths, and push artifacts to Cypress.
- yt_framework.operations.upload.build_code_locally(build_dir, pipeline_dir, logger, create_wrappers=False, upload_modules=None, upload_paths=None)[source]#
Build all code in a local build directory.
Copies ytjobs package, optional custom modules/paths, and all stages’ code to the build directory, preserving the same structure as would be uploaded to YT.
- Parameters:
build_dir (Path) – Local build directory path
pipeline_dir (Path) – Path to pipeline directory
logger (Logger) – Logger instance
create_wrappers (bool) – If True, create wrapper scripts for all stages
upload_modules (List[str] | None) – Optional list of module names to upload
upload_paths (List[Dict[str, str]] | None) – Optional list of {source, target?} for local paths
- Returns:
Total number of files copied
- Return type:
- yt_framework.operations.upload.create_code_archive(build_dir, archive_path, logger)[source]#
Create a tar.gz archive from the build directory.
- yt_framework.operations.upload.upload_code_archive(yt_client, archive_path, build_folder, logger)[source]#
Upload code archive to YT.
- yt_framework.operations.upload.upload_all_code(yt_client, build_folder, pipeline_dir, logger, upload_modules=None, upload_paths=None)[source]#
Upload all code to YT: ytjobs package, optional custom modules/paths, and stages.
Builds code locally, creates tar archive, and uploads it to YT.
- Parameters:
yt_client (BaseYTClient) – YT client instance
build_folder (str) – YT build folder path
pipeline_dir (Path) – Path to pipeline directory
logger (Logger) – Logger instance
upload_modules (List[str] | None) – Optional list of module names to upload
upload_paths (List[Dict[str, str]] | None) – Optional list of {source, target?} for local paths
- Returns:
None
- Return type:
None
Stage dependency file lists#
Collect extra file dependencies (including implicit ytjobs) for YT jobs.
- yt_framework.operations.dependencies.build_stage_dependencies(build_folder, stage_dir, logger)[source]#
Build dependency list for a single stage.
Includes: - config.yaml (if exists locally) - All .py files from src/ directory
- yt_framework.operations.dependencies.build_ytjobs_dependencies(build_folder, logger)[source]#
Build dependency list for ytjobs package.
Includes all .py files from ytjobs/ directory.
- yt_framework.operations.dependencies.add_checkpoint(dependencies, model_name, checkpoint_base, logger)[source]#
Add checkpoint file to dependencies if configured.
- Parameters:
- Returns:
Updated dependency list (new list with checkpoint added, or same list)
- Return type:
- yt_framework.operations.dependencies.build_vanilla_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#
Build complete dependency list for a vanilla operation.
Combines: - Stage files (config + src/) - Ytjobs package
- Parameters:
- Returns:
Tuple of (script_path, dependency_files) - script_path: Path to vanilla.py in YT - dependency_files: Complete list of dependencies
- Return type:
- yt_framework.operations.dependencies.build_map_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#
Build complete dependency list for a map operation.
Combines: - Stage files (config + src/) - Ytjobs package - Checkpoint (if configured)
- Parameters:
- Returns:
Tuple of (mapper_path, dependency_files) - mapper_path: Path to mapper.py in YT - dependency_files: Complete list of dependencies
- Return type:
Shared operation utilities#
Shared helpers for map/vanilla/map-reduce (resources, secrets, tokenizer wiring).
- yt_framework.operations.common.build_environment(configs_dir, logger)[source]#
Build environment variables for map and vanilla operations.
Jobs read configuration from config.yaml, so only secrets are passed via environment variables.
- yt_framework.operations.common.prepare_docker_auth(docker_image, docker_username, docker_password)[source]#
Prepare Docker authentication dictionary.
- Parameters:
- Returns:
Docker authentication dict if all credentials provided, None otherwise
- Return type:
- yt_framework.operations.common.extract_operation_resources(operation_config, logger)[source]#
Extract OperationResources from operation config with fallback defaults.
- Parameters:
operation_config (DictConfig)
logger (Logger)
- Return type:
OperationResources
- yt_framework.operations.common.extract_secure_env_client_kwargs(operation_config)[source]#
Options for
YTProdClientsecure vault / public env partitioning.
- yt_framework.operations.common.collect_passthrough_kwargs(operation_config, reserved_keys)[source]#
Collect top-level config values to forward to YT client.
OmegaConf dict nodes are resolved to plain Python containers.
- yt_framework.operations.common.build_operation_environment(context, operation_config, logger, include_stage_name=True, include_tokenizer_artifact=True)[source]#
Build operation environment from secrets + explicit env config + optional helpers.
YT Client#
Client Factory#
Factory that returns YTDevClient or YTProdClient based on pipeline mode.
- yt_framework.yt.factory.create_yt_client(logger=None, mode='dev', pipeline_dir=None, secrets=None, pickling=None)[source]#
Factory function to create appropriate YT client based on mode.
- Parameters:
logger (Logger | None) – Logger instance (default: creates new logger)
mode (Literal['prod', 'dev'] | None) – “prod” for production YT client, “dev” for local development
pipeline_dir (Path | str | None) – Pipeline directory (required for dev mode)
secrets (Dict[str, str] | None) – Optional dictionary containing YT credentials. Required only for prod mode. Expected keys: - YT_PROXY: YTsaurus proxy URL - YT_TOKEN: YTsaurus authentication token
- Returns:
BaseYTClient instance (YTProdClient or YTDevClient)
- Raises:
ValueError – If secrets are required for prod mode but not provided.
- Return type:
BaseYTClient
Example
>>> # Dev mode (local filesystem simulation) >>> client = create_yt_client(mode="dev", pipeline_dir=Path(".")) >>> >>> # Prod mode (real YT cluster) >>> secrets = {"YT_PROXY": "my-proxy", "YT_TOKEN": "my-token"} >>> client = create_yt_client(mode="prod", secrets=secrets)
Dev Client#
Local filesystem stand-in for Cypress tables and subprocess-backed jobs.
- class yt_framework.yt.client_dev.YTDevClient(logger, pipeline_dir=None)[source]#
Bases:
BaseYTClientDevelopment YT client implementation.
Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory.
- exists(path)[source]#
Check if a path exists in YT.
In dev mode, always returns True (assumes files exist locally).
- write_table(table_path, rows, append=False, replication_factor=1)[source]#
Write rows to a YT table (saves as local .jsonl file).
In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line.
- Parameters:
- Returns:
None
- Return type:
None
Example
>>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}]) >>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\n
- row_count(table_path)[source]#
Get number of rows in a YT table (counts lines in local .jsonl file).
- run_yql(query, pool='default', max_row_weight=None)[source]#
Execute a YQL query locally using DuckDB simulation.
- join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False, max_row_weight=None)[source]#
Join two tables using YQL (executed locally with DuckDB in dev mode).
- filter_table(input_table, output_table, condition, dry_run=False, max_row_weight=None)[source]#
Filter table rows using WHERE condition (executed locally with DuckDB in dev mode).
- select_columns(input_table, output_table, columns, dry_run=False, max_row_weight=None)[source]#
Select specific columns from a table (executed locally with DuckDB in dev mode).
- group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False, max_row_weight=None)[source]#
Group by columns and compute aggregations (executed locally with DuckDB in dev mode).
- union_tables(tables, output_table, dry_run=False, max_row_weight=None)[source]#
Union multiple tables (executed locally with DuckDB in dev mode).
- distinct(input_table, output_table, columns=None, dry_run=False, max_row_weight=None)[source]#
Get distinct rows from a table (executed locally with DuckDB in dev mode).
- sort_table(input_table, output_table, order_by, ascending=True, dry_run=False, max_row_weight=None)[source]#
Sort table by columns (executed locally with DuckDB in dev mode).
- limit_table(input_table, output_table, limit, dry_run=False, max_row_weight=None)[source]#
Limit number of rows from a table (executed locally with DuckDB in dev mode).
- upload_file(local_path, yt_path, create_parent_dir=False)[source]#
Upload a file to YT (no-op in dev mode).
- upload_directory(local_dir, yt_dir, pattern='*')[source]#
Upload a directory to YT (no-op in dev mode).
- run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None, job=None, append=False, **kwargs)[source]#
Run a map operation locally using subprocess.
In dev mode, executes the mapper script locally with input/output tables as JSONL files. The command is executed in a temporary sandbox directory with all dependencies available.
- Parameters:
command (Any) – Mapper job (command string in dev mode).
input_table (str) – Input YT table path (read from local JSONL).
output_table (str) – Output YT table path (written to local JSONL).
files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.
resources (OperationResources) – Operation resource configuration (not fully used in dev mode).
output_schema (TableSchema | None) – Optional output table schema (not used in dev mode).
max_failed_jobs (int) – Maximum failed jobs allowed (not used in dev mode).
docker_auth (Dict[str, str] | None) – Optional Docker authentication (not used in dev mode).
append (bool) – If True and output JSONL exists, append mapper stdout lines to it.
**kwargs (Any) – Additional arguments (not used in dev mode).
job (Any)
- Returns:
Mock operation object that simulates YT operation.
- Return type:
Operation
Example
>>> op = client.run_map( ... command="python3 mapper.py", ... input_table="//tmp/input", ... output_table="//tmp/output", ... files=[], ... resources=OperationResources(), ... env={} ... )
- run_vanilla(command, files, env, task_name='main', job=None, **kwargs)[source]#
Run a vanilla operation locally using subprocess.
In dev mode, executes the vanilla script locally in a temporary sandbox directory with all dependencies available. No input/output tables are involved.
- Parameters:
- Returns:
Mock operation object that simulates YT operation.
- Return type:
Operation
- run_map_reduce(mapper, reducer, input_table, output_table, reduce_by, files, resources, env, sort_by=None, output_schema=None, max_failed_jobs=1, docker_auth=None, map_job=None, reduce_job=None, **kwargs)[source]#
Dev: no-op; copy input table to output table.
Production Client#
Thin wrapper around yt.wrapper.YtClient for real cluster operations.
- class yt_framework.yt.client_prod.YTProdClient(logger, secrets, pickling=None)[source]#
Bases:
BaseYTClientProduction YT client implementation.
Uses actual YTsaurus client for all operations.
- write_table(table_path, rows, append=False, replication_factor=1, make_parents=True)[source]#
Write rows to a YT table.
- Parameters:
table_path (str) – YT table path
rows (List[Dict[str, Any]]) – List of dictionaries representing table rows
append (bool) – If True, append to existing table (default: False)
replication_factor (int) – Replication factor for the table (default: 1)
make_parents (bool) – If True, create parent directories if they don’t exist (default: True)
- Return type:
None
- join_tables(left_table, right_table, output_table, on, how='left', select_columns=None, dry_run=False, max_row_weight=None)[source]#
Join two tables using YQL.
- Parameters:
left_table (str) – Path to left table
right_table (str) – Path to right table
output_table (str) – Path to output table
on (str | List[str] | Dict[str, str]) – Join key(s) - column name(s) to join on - str: Same column name on both sides (e.g., “user_id”) - List[str]: Multiple columns with same names (e.g., [“user_id”, “region”]) - Dict[str, str]: Different column names (e.g., {“left”: “input_s3_path”, “right”: “path”})
how (Literal['inner', 'left', 'right', 'full']) – Join type - “inner”, “left”, “right”, or “full”
select_columns (List[str] | None) – Optional list of columns to select (with table aliases)
dry_run (bool) – If True, return the YQL query without executing
max_row_weight (str | None)
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- filter_table(input_table, output_table, condition, dry_run=False, max_row_weight=None)[source]#
Filter table rows using WHERE condition.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- select_columns(input_table, output_table, columns, dry_run=False, max_row_weight=None)[source]#
Select specific columns from a table.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- group_by_aggregate(input_table, output_table, group_by, aggregations, dry_run=False, max_row_weight=None)[source]#
Group by columns and compute aggregations.
- Parameters:
input_table (str) – Path to input table
output_table (str) – Path to output table
aggregations (Dict[str, str | Tuple[str, str]]) – Dict mapping output column names to aggregation functions e.g., {“order_count”: “count”, “total_amount”: “sum”}
dry_run (bool) – If True, return the YQL query without executing
max_row_weight (str | None)
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- union_tables(tables, output_table, dry_run=False, max_row_weight=None)[source]#
Union multiple tables.
- distinct(input_table, output_table, columns=None, dry_run=False, max_row_weight=None)[source]#
Get distinct rows from a table.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- sort_table(input_table, output_table, order_by, ascending=True, dry_run=False, max_row_weight=None)[source]#
Sort table by columns.
WARNING: Sorting large tables can be expensive. Use with caution.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- limit_table(input_table, output_table, limit, dry_run=False, max_row_weight=None)[source]#
Limit number of rows from a table.
- Parameters:
- Returns:
YQL query string if dry_run=True, None otherwise
- Return type:
str | None
- upload_directory(local_dir, yt_dir, pattern='*')[source]#
Upload a directory to YT cluster.
Recursively uploads all files from a local directory to a YT directory, respecting .ytignore patterns if present.
- run_map(command, input_table, output_table, files, resources, env, output_schema=None, max_failed_jobs=1, docker_auth=None, job=None, append=False, **kwargs)[source]#
Run a map operation on YT cluster.
Submits a map operation that processes each row of the input table independently and writes results to the output table. The operation runs on the YT cluster with the specified resources and dependencies.
- Parameters:
command (Any) – Legacy mapper job argument (TypedJob instance or command string).
input_table (str) – Input YT table path.
output_table (str) – Output YT table path.
files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.
resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).
output_schema (TableSchema | None) – Optional output table schema for typed output.
max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.
docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.
job (Any) – Preferred mapper job alias.
append (bool) – If True, append mapper output to an existing output table.
kwargs (Any)
- Returns:
YT operation object that can be monitored and waited on.
- Return type:
Operation
- Raises:
Exception – If operation submission fails.
- run_vanilla(command, files, env, task_name, resources, docker_auth=None, max_failed_jobs=1, job=None, **kwargs)[source]#
Run a vanilla operation on YT cluster.
Submits a vanilla operation that runs a standalone job without input/output tables. The operation runs on the YT cluster with the specified resources and dependencies.
- Parameters:
command (str) – Legacy command argument (typically bash command with script path).
files (List[Tuple[str, str]]) – List of (yt_path, local_path) tuples for dependencies.
task_name (str) – Task name for the operation.
resources (OperationResources) – Operation resource configuration (memory, CPU, GPU, etc.).
docker_auth (Dict[str, str] | None) – Optional Docker authentication for private registries.
max_failed_jobs (int) – Maximum failed jobs allowed before operation fails.
job (str | None) – Preferred command alias.
**kwargs (Any) – Extra options applied to the spec builder (e.g. weight, title) or forwarded to run_operation (sync, enable_optimizations).
- Returns:
YT operation object that can be monitored and waited on.
- Return type:
Operation
- Raises:
Exception – If operation submission fails.
- run_map_reduce(mapper, reducer, input_table, output_table, reduce_by, files, resources, env, sort_by=None, output_schema=None, max_failed_jobs=1, docker_auth=None, map_job=None, reduce_job=None, **kwargs)[source]#
Run a map-reduce operation on YT cluster.