API reference#
yt_framework symbols below are generated from Python docstrings. Job-side entrypoints (ytjobs) have a separate page.
Tip
Navigation
Use the sidebar headings for modules. Anything not listed is still in the source tree; use help(module) locally.
Layout#
Core — pipeline, stage, registry, discovery,
self.depsinjectionContracts —
StageDependenciesandStageContextshared bycoreand operation drivers (preferyt_framework.contracts;operations.stage_contractsis a thin re-export)Operations — map, vanilla, map-reduce/reduce, S3 helpers, table helpers, checkpoint upload, sort, tokenizer artifact wiring
Typed jobs —
StageBootstrapTypedJobYT —
yt.support(shared runtime helpers),yt.clients(public client API and mixins), andytentry (factory, package exports)Utils — env files, logging setup, ignore patterns
Packaging — upload helpers shared by operations
ytjobs— Job-side reference
Note
Narrative docs
How-to guides under docs/operations, docs/configuration, and docs/advanced spell out YAML and stage patterns; this page is the raw API surface.
Core Modules#
Pipeline#
Pipeline base classes, CLI entry point (main), and code upload orchestration.
Subclass BasePipeline, implement setup() to register stages with
StageRegistry, then run via Pipeline.main() from pipeline.py.
DefaultPipeline auto-discovers stages under stages/*/stage.py.
- class yt_framework.core.pipeline.BasePipeline(config, pipeline_dir, log_level=20)[source]#
Bases:
objectBase class for all pipelines.
Provides common functionality: - CLI entry point via main() class method - Code upload to YT build folder - YT client initialization - Default stage execution loop
Subclasses must: - In setup(), create StageRegistry and register stages via set_stage_registry()
Subclasses may override: - setup(): Register stages and initialize pipeline-specific clients (S3, etc.) - run(): Custom execution flow (rare, most use default)
- Parameters:
config (DictConfig)
pipeline_dir (Path)
log_level (int)
- __init__(config, pipeline_dir, log_level=20)[source]#
Initialize the base pipeline.
- Parameters:
- Returns:
None
- Raises:
ValueError – If pipeline directory does not exist.
- Return type:
None
- setup()[source]#
Run pipeline-specific initialization.
Override this method in subclasses to: 1. Register stages using StageRegistry and set_stage_registry() 2. Initialize custom clients (e.g., S3 client, database connections, etc.)
This method is called automatically after base initialization.
- Returns:
None
- Return type:
None
- set_stage_registry(registry)[source]#
Set the stage registry for this pipeline.
- Parameters:
registry (StageRegistry) – StageRegistry instance with registered stages
- Returns:
None
- Return type:
None
- create_stage_dependencies()[source]#
Create stage dependencies for injection.
This method creates a dependency container with only what stages need, following the Interface Segregation Principle.
- Returns:
PipelineStageDependencies with yt_client, pipeline_config, configs_dir
- Return type:
- upload_code(build_folder=None)[source]#
Upload code to YT build folder.
Only uploads code if any enabled stages need code execution on YT. If no stages need code execution, this method does nothing.
- Parameters:
build_folder (str | None) – Optional YT build folder path. If None, uses config.pipeline.build_folder
- Returns:
None
- Raises:
ValueError – If build_folder is required but not provided in config.
- Return type:
None
- run()[source]#
Run the pipeline by executing enabled stages.
Default implementation: 1. Upload code to YT (only if stages need code execution) 2. Get enabled stages from config 3. Execute stages in order using stage registry 4. Pass context between stages
Override this method only if you need completely custom execution flow.
- Returns:
None
- Raises:
ValueError – If no enabled_stages found in config or unknown stage name.
AttributeError – If stage registry is not set in setup().
- Return type:
None
- classmethod main(argv=None)[source]#
CLI entry point for the pipeline.
Handles: - Argument parsing (–config option) - Config file loading - Pipeline instantiation - Error handling and exit codes
- Parameters:
argv (list[str] | None) – Optional command-line arguments. If None, uses sys.argv.
- Returns:
None (exits with code 0 on success, 1 on failure)
- Return type:
None
- Usage:
python pipeline.py python pipeline.py –config configs/custom.yaml
- class yt_framework.core.pipeline.DefaultPipeline(config, pipeline_dir, log_level=20)[source]#
Bases:
BasePipelinePipeline with automatic stage discovery.
Discovers
BaseStagesubclasses fromstages/<name>/stage.pyand registers them. Run withDefaultPipeline.main()frompipeline.py. Which stages execute is still controlled byenabled_stagesin config.- Parameters:
config (DictConfig)
pipeline_dir (Path)
log_level (int)
Pipeline config helpers#
OmegaConf normalization helpers for pipeline and upload configuration.
- yt_framework.core.pipeline_config.normalize_upload_modules(raw)[source]#
Normalize upload_modules config: accept list, tuple, or single string.
- yt_framework.core.pipeline_config.normalize_upload_paths(raw)[source]#
Normalize upload_paths config: must be a list of {source, target?} mappings.
- yt_framework.core.pipeline_config.yt_mode_from_pipeline_config(raw)[source]#
Coerce
pipeline.modeto a literal prod/dev or None (caller may default).
Pipeline CLI helpers#
CLI bootstrap for BasePipeline subclasses.
- yt_framework.core.pipeline_cli.build_pipeline_cli_parser(cls_name)[source]#
Build the
argparseparser used bymain().- Parameters:
cls_name (str)
- Return type:
- yt_framework.core.pipeline_cli.resolve_pipeline_config_path(pipeline_dir, config_arg)[source]#
Resolve
--configto an absolute path (relative paths are underpipeline_dir).
- yt_framework.core.pipeline_cli.read_pipeline_mode_for_header(config_path, logger)[source]#
Return
pipeline.modefrom the config file for log banners; defaultdevon errors.
Stage#
BaseStage — pipeline authors subclass this and implement run().
The framework derives the stage name from the stages/<name>/ directory.
- class yt_framework.core.stage.BaseStage(deps, logger)[source]#
Bases:
ABCAbstract base class for pipeline stages.
Stage name and config are automatically detected from the directory. Directory structure: stages/<stage_name>/stage.py Config file: stages/<stage_name>/config.yaml
Each stage must: - Implement __init__ to receive deps and logger - Implement run method to execute stage logic
Example
- class MyStage(BaseStage):
- def __init__(self, deps, logger):
super().__init__(deps, logger) # self.config is automatically loaded from stages/<stage_name>/config.yaml # Access dependencies via self.deps.yt_client, self.deps.pipeline_config
- def run(self, debug: DebugContext) -> DebugContext:
# Stage logic here # Note: ‘debug’ here is the shared data dict, NOT dependencies return {“result”: “value”}
- Parameters:
deps (StageDependencies)
logger (Logger)
- __init__(deps, logger)[source]#
Initialize stage with injected dependencies.
Stage name and config are automatically detected from the directory containing stage.py.
- Parameters:
deps (StageDependencies) – Injected dependencies (yt_client, pipeline_config, configs_dir)
logger (Logger) – Logger instance for stage logging
- Returns:
None
- Raises:
FileNotFoundError – If config.yaml file is not found in stage directory.
TypeError – If config.yaml does not contain a dictionary (wrong node type).
- Return type:
None
- property stage_dir: Path#
Path to the stage directory.
- Returns:
Absolute path to the stage directory (stages/<stage_name>/).
- Return type:
Path
- property context: StageContext#
Stage context containing all stage-related information.
- Returns:
- Dataclass instance with name, config, stage_dir,
logger, and deps attributes.
- Return type:
Registry#
Mutable registry of BaseStage subclasses keyed by stage name.
- class yt_framework.core.registry.StageRegistry[source]#
Bases:
objectBuilder for registering pipeline stages.
Automatically detects stage names from directory structure.
Example
registry = StageRegistry() registry.add_stage(CreateTableStage) registry.add_stage(RunMapStage) pipeline.set_stage_registry(registry)
- add_stage(stage_class)[source]#
Register a stage class.
Stage name is automatically detected from the directory containing stage.py.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- get_stage(stage_name)[source]#
Get stage class by name.
- Parameters:
stage_name (str) – Name of the stage to retrieve (matches directory name).
- Returns:
The stage class registered with the given name.
- Return type:
Type[BaseStage]
- Raises:
KeyError – If no stage is registered with the given name.
Example
>>> registry = StageRegistry() >>> registry.add_stage(MyStage) >>> stage_class = registry.get_stage("my_stage")
Discovery#
Filesystem scan that imports stages/*/stage.py and collects BaseStage types.
- yt_framework.core.discovery.discover_stages(pipeline_dir, logger)[source]#
Automatically discover all stage classes from the
stagesdirectory tree.Searches for
stage.pyunder eachstageschild directory and imports allBaseStagesubclasses found.Expected layout:
pipeline_dir/stages/<stage_name>/stage.pywith aBaseStagesubclass in each module.
Core injection (self.deps)#
Concrete PipelineStageDependencies for stage run() injection.
The StageDependencies protocol and
StageContext live under yt_framework.contracts
so core and operation drivers share types without operations importing
core.
- class yt_framework.core.dependencies.PipelineStageDependencies(yt_client, pipeline_config, configs_dir)[source]#
Bases:
objectDefault implementation of StageDependencies.
Used by BasePipeline to inject dependencies into stages. This class is instantiated by the pipeline and passed to each stage.
- Parameters:
yt_client (BaseYTClient)
pipeline_config (DictConfig)
configs_dir (Path)
- yt_client#
YT client instance for performing operations on YTsaurus cluster or local filesystem (dev mode).
- Type:
yt_framework.yt.clients.client_base.BaseYTClient
- pipeline_config#
Pipeline-level configuration containing mode, build_folder, and other pipeline-wide settings.
- Type:
omegaconf.dictconfig.DictConfig
- configs_dir#
Path to directory containing secrets.env and configuration files.
- Type:
- yt_client: BaseYTClient#
- pipeline_config: DictConfig#
Contracts#
Stage injection types#
Stage execution contracts shared by operation drivers and core orchestration.
StageDependencies and StageContext live in yt_framework.contracts
so yt_framework.core can depend on this package instead of reaching through
yt_framework.operations only for types, while operation drivers stay free of
core imports (see tach.toml and docs/architecture/layers.md).
- class yt_framework.contracts.stage.StageDependencies(*args, **kwargs)[source]#
Bases:
ProtocolProtocol defining what dependencies stages need.
This is NOT the same as the
contextparameter inrun()methods: -StageDependencies: injected services and config (yt_client, etc.) -contextparameter: shared data dict passed between stages- property yt_client: BaseYTClient#
YT client for operations (dev or prod implementation).
- property pipeline_config: DictConfig#
Pipeline-level configuration (
build_folder, mode, secrets paths).
- class yt_framework.contracts.stage.StageContext(name, config, stage_dir, logger, deps)[source]#
Bases:
objectStage context: name, config, paths, logger, and injected dependencies.
- Parameters:
name (str)
config (DictConfig)
stage_dir (Path)
logger (Logger)
deps (StageDependencies)
- config: DictConfig#
- deps: StageDependencies#
Import path note#
yt_framework.operations.stage_contracts re-exports
:class:~yt_framework.contracts.stage.StageContext and
:class:~yt_framework.contracts.stage.StageDependencies only. Prefer
from yt_framework.contracts import ….
Operations#
Map Operations#
Driver helpers to package src/mapper.py and submit YT map operations.
- class yt_framework.operations.command_ops.map.MapOperationData(mapper_path, dependencies, environment, docker_auth, command=None)[source]#
Bases:
objectData container for map operation configuration.
- Parameters:
- docker_auth#
Optional Docker authentication dictionary for private registries.
- yt_framework.operations.command_ops.map.run_map(context, operation_config, output_schema=None, mapper=None, job=None)[source]#
Run YT map operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map.
- Parameters:
context (StageContext) – Stage context (provides deps, logger, stage_dir)
operation_config (DictConfig) – Operation-specific config (from client.operations.map). Optional key
append(bool): append mapper output to an existing output table.output_schema (TableSchema | None) – Optional YT TableSchema for typed output table
mapper (object | None) – Optional mapper leg (legacy name). When omitted, framework uses command wrapper. Can be a TypedJob instance or command string.
job (object | None) – Preferred mapper leg alias. Can be a TypedJob instance or command string.
- Returns:
True if successful, False otherwise
- Return type:
Vanilla Operations#
Driver helpers to package src/vanilla.py and submit YT vanilla operations.
- class yt_framework.operations.command_ops.vanilla.VanillaOperationData(script_path, dependencies, environment, docker_auth, command=None)[source]#
Bases:
objectData container for vanilla operation configuration.
- Parameters:
- docker_auth#
Optional Docker authentication dictionary for private registries.
- yt_framework.operations.command_ops.vanilla.run_vanilla(context, operation_config, job=None)[source]#
Run YT vanilla operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.vanilla. The task name is automatically set to the stage name.
- Parameters:
context (StageContext) – Stage context (provides deps, logger, stage_dir, name)
operation_config (DictConfig) – Operation-specific config (from client.operations.vanilla)
job (str | None) – Preferred command alias. When omitted, framework wrapper command is used.
- Returns:
True if successful, False otherwise
- Return type:
Map-reduce and reduce#
Submit map-reduce and reduce-only YT operations (TypedJob or command strings).
Builds archives plus optional file dependencies; cluster credentials still come from configs/secrets.env like other operations.
- yt_framework.operations.command_ops.map_reduce.run_map_reduce(context, operation_config, mapper=None, reducer=None, output_schema=None, map_job=None, reduce_job=None)[source]#
Run a YT map-reduce operation and wait for completion.
Pass mapper and reducer either both as
TypedJobinstances or both as command strings (JSON stdin/stdout). Mixing kinds raisesValueError.Set
operation_config.tar_command_bootstrap: trueto wrap string legs with the sametar -xzf source.tar.gz+ wrapper pattern as map operations (requires matching wrappers in the uploaded tarball; see docs).- Parameters:
context (StageContext) – Stage context (deps, logger, stage_dir, config).
operation_config (DictConfig) – client.operations.map_reduce config (input_table, output_table, reduce_by, sort_by, resources, file_paths, etc.).
mapper (object) – Deprecated — use
map_jobinstead.reducer (object) – Deprecated — use
reduce_jobinstead.output_schema (TableSchema | None) – Optional YT TableSchema for output table.
map_job (object) – Mapper leg (
TypedJobinstance or command string).reduce_job (object) – Reducer leg (
TypedJobinstance or command string).
- Returns:
True if the operation completed successfully.
- Return type:
- yt_framework.operations.command_ops.map_reduce.run_reduce(context, operation_config, reducer=None, output_schema=None, job=None)[source]#
Run a YT reduce-only operation and wait for completion.
Pass
reduceras aTypedJobor a command string. Withoperation_config.tar_command_bootstrap: true, string reducers get the same tar extract + wrapper bootstrap as map (see docs).- Parameters:
context (StageContext) – Stage context.
operation_config (DictConfig) – client.operations.reduce config.
reducer (object) – Reducer leg (legacy name).
output_schema (TableSchema | None) – Optional output table schema.
job (object) – Preferred reducer leg alias.
- Returns:
True if the operation completed successfully.
- Return type:
YQL Operations#
YQL operations are *_request methods on the YT client, each taking a frozen request type from yt_framework.yt.clients.yql.yql_requests (for example JoinTablesRequest). See :doc:../operations/yql.
Note
YQL Operations Location
YQL operations are implemented as methods on BaseYTClient and its subclasses (YTDevClient and YTProdClient). They are not in a separate operations module.
S3 Operations#
Driver-side helpers to list S3 keys and persist paths into Cypress tables.
- yt_framework.operations.s3.list_s3_files(s3_client, bucket, prefix, logger, extension=None, max_files=None)[source]#
List files from S3 bucket with optional filtering.
- Parameters:
- Returns:
List of S3 file paths
- Return type:
Table operations#
Helpers to count rows, load rows into memory, and export tables to JSONL.
For low-level access, use BaseYTClient methods (read_table, row_count, etc.)
directly on self.deps.yt_client. These functions add logging and convenience.
- yt_framework.operations.table.get_row_count(yt_client, table_path, logger)[source]#
Get number of rows in a YT table.
- yt_framework.operations.table.read_table(yt_client, table_path, logger)[source]#
Read rows from a YT table.
Checkpoint Operations#
Upload or reuse single-file model checkpoints and wire them into operation specs.
- yt_framework.operations.checkpoint.init_checkpoint_directory(context, checkpoint_config)[source]#
Initialize checkpoint directory in YTsaurus if it doesn’t exist.
Uses checkpoint_base from checkpoint_config. Also uploads local checkpoint if specified. Validates that required checkpoint exists in YT before proceeding.
- Parameters:
context (StageContext) – Stage context (provides deps, logger)
checkpoint_config (DictConfig) – Checkpoint-specific config (from client.operations.<op>.checkpoint)
- Returns:
None
- Raises:
FileNotFoundError – If required checkpoint not found in YT
Exception – If checkpoint initialization fails
- Return type:
None
Sort operations#
Submit YT sort jobs using the same (context, operation_config) pattern as map.
- yt_framework.operations.command_ops.sort.run_sort(context, operation_config)[source]#
Run a YT sort operation and wait for completion.
- Parameters:
context (StageContext) – Stage context (deps, logger, stage_dir, config).
operation_config (DictConfig) –
Sort-specific config. Required keys:
input_table— table to sort in-place.sort_by— list of column names.
Optional keys mirror
run_map_reduce:resources.pool/resources.pool_tree— scheduler pool.resources.memory_limit_gb,resources.cpu_limit— resource hints.
- Returns:
True if the sort completed successfully.
- Return type:
Example config (
client.operations.sortin stageconfig.yaml):sort: sort_by: [shard_order, mock_field] resources: pool: my_pool
Then in the stage:
from yt_framework.operations.command_ops.sort import run_sort sort_cfg = OmegaConf.merge( self.config.client.operations.sort, {"input_table": intermediate_table}, ) run_sort(context=self.context, operation_config=sort_cfg)
Tokenizer artifact#
Pack tokenizer/processor tarballs, upload to Cypress, and expose sandbox env vars.
- yt_framework.operations._internal.tokenizer_artifact.resolve_tokenizer_artifact_name(stage_config, tokenizer_artifact_config)[source]#
Resolve logical tokenizer artifact name from config.
- Parameters:
stage_config (DictConfig)
tokenizer_artifact_config (DictConfig)
- Return type:
str | None
- yt_framework.operations._internal.tokenizer_artifact.resolve_tokenizer_archive_name(artifact_name)[source]#
Convert logical artifact name to mounted tar filename.
- yt_framework.operations._internal.tokenizer_artifact.resolve_tokenizer_artifact_yt_path(stage_config, tokenizer_artifact_config)[source]#
Resolve full YT file path for tokenizer artifact tarball.
- Parameters:
stage_config (DictConfig)
tokenizer_artifact_config (DictConfig)
- Return type:
str | None
- yt_framework.operations._internal.tokenizer_artifact.tokenizer_artifact_name_or_raise(stage_config, tokenizer_artifact_config)[source]#
- Parameters:
stage_config (DictConfig)
tokenizer_artifact_config (DictConfig)
- Return type:
- yt_framework.operations._internal.tokenizer_artifact.verify_tokenizer_path_or_raise(context, yt_artifact_path)[source]#
- Parameters:
context (StageContext)
yt_artifact_path (str)
- Return type:
None
- yt_framework.operations._internal.tokenizer_artifact.init_tokenizer_artifact_directory(context, tokenizer_artifact_config)[source]#
Initialize tokenizer artifact in YT (if configured).
Behavior: - creates artifact_base if needed; - uploads local artifact from local_artifact_path if provided and missing in YT; - validates artifact presence in YT.
- Parameters:
context (StageContext)
tokenizer_artifact_config (DictConfig)
- Return type:
None
Typed jobs#
TypedJob helpers for YTsaurus execution.
- class yt_framework.typed_jobs.StageBootstrapTypedJob[source]#
Bases:
TypedJobBase class for TypedJob legs with worker-side bootstrap.
On unpickle, extracts
source.tar.gzwhen needed, prepends the archive root andstages/<stage>/srctosys.path, and setsJOB_CONFIG_PATHwhen the stage config file exists. Uses__getstate__/__setstate__so bootstrap runs on the worker before__call__.
Utilities#
Environment#
Load KEY=value files such as configs/secrets.env into plain dicts.
- yt_framework.utils.env.load_env_file(env_path)[source]#
Load environment variables from a .env file.
File format: KEY=VALUE (one per line, # for comments) Missing file is optional and returns empty dict.
- Parameters:
env_path (Path) – Path to the .env file
- Returns:
Dictionary of loaded environment variables (key -> value). Returns empty dict if file doesn’t exist or cannot be read.
- Warns:
UserWarning – If the file exists but cannot be read or parsed (non-fatal).
Missing file is silent (returns empty dict)
- Return type:
Example
>>> env_vars = load_env_file(Path("configs/secrets.env")) >>> print(env_vars.get("YT_TOKEN"))
- yt_framework.utils.env.load_secrets(secrets_dir, env_file='secrets.env')[source]#
Load secrets from secrets.env file in the specified directory.
- Parameters:
- Returns:
Dictionary of loaded secrets (key -> value). Returns empty dict if file doesn’t exist or cannot be read.
- Warns:
UserWarning – If the secrets file exists but cannot be read or parsed (non-fatal).
- Return type:
Example
>>> secrets = load_secrets(Path("configs")) >>> yt_token = secrets.get("YT_TOKEN")
Logging#
Colored console formatter and helpers used by pipeline startup.
- class yt_framework.utils.logging.ColoredFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#
Bases:
FormatterCustom formatter with colors for different log levels.
- COLORS: ClassVar[dict[str, str]] = {'CRITICAL': '\x1b[35m', 'DEBUG': '\x1b[36m', 'ERROR': '\x1b[31m', 'INFO': '\x1b[32m', 'WARNING': '\x1b[33m'}#
- RESET = '\x1b[0m'#
- yt_framework.utils.logging.setup_logging(level=20, name=None, use_colors=True)[source]#
Configure logging with consistent formatting.
- yt_framework.utils.logging.log_header(logger, title, context=None)[source]#
Log a compact section header in format: [Title] context.
- yt_framework.utils.logging.log_operation(logger, message)[source]#
Log an operation start message with → prefix.
- yt_framework.utils.logging.log_success(logger, message)[source]#
Log a success/completion message with ✓ prefix.
- yt_framework.utils.logging.log_config(logger, config_dict, title='Configuration')[source]#
Log configuration in a readable format.
Automatically masks sensitive values (keys containing ‘secret’ or ‘key’) by showing only the last 4 characters.
- Parameters:
- Returns:
None
- Return type:
None
Example
>>> config = {"api_key": "secret12345", "mode": "dev"} >>> log_config(logger, config) [Configuration] api_key: ***2345 mode: dev
Ignore Patterns#
.ytignore parsing (gitignore-style) for upload tarballs.
- class yt_framework.utils.ignore.YTIgnorePattern(pattern, base_dir, is_negation=False)[source]#
Bases:
objectRepresents a single .ytignore pattern.
This class compiles a pattern string into a regex for efficient matching against file paths. It supports the same syntax as .gitignore files.
Pattern Types: - Simple wildcards: *.pyc, test? - Character classes: *.py[cod] - Recursive wildcards: **/*.log - Directory patterns: build/ (trailing slash) - Rooted patterns: /config (leading slash, root-only) - Negation patterns: !important.py (un-ignore)
Examples
Simple wildcard:
>>> from pathlib import Path >>> pattern = YTIgnorePattern("*.pyc", Path("/project")) >>> pattern.matches(Path("/project/test.pyc")) True >>> pattern.matches(Path("/project/test.py")) False
Directory pattern:
>>> pattern = YTIgnorePattern("__pycache__/", Path("/project")) >>> pattern.matches(Path("/project/__pycache__/module.pyc")) True >>> pattern.matches(Path("/project/src/__pycache__/module.pyc")) True
Recursive wildcard:
>>> pattern = YTIgnorePattern("**/*.log", Path("/project")) >>> pattern.matches(Path("/project/src/debug.log")) True >>> pattern.matches(Path("/project/debug.log")) False # Requires at least one directory level
Rooted pattern:
>>> pattern = YTIgnorePattern("/build", Path("/project")) >>> pattern.matches(Path("/project/build")) True >>> pattern.matches(Path("/project/src/build")) False # Only matches at root
- class yt_framework.utils.ignore.YTIgnoreMatcher(base_dir)[source]#
Bases:
objectMatches file paths against .ytignore patterns.
This class loads .ytignore files from the specified directory and all parent directories up to the filesystem root, then provides methods to check if files should be ignored based on the loaded patterns.
Patterns follow .gitignore syntax: - *.pyc - matches any file ending in .pyc - __pycache__/ - matches any __pycache__ directory - **/test_*.py - matches test_*.py in any subdirectory (but not root) - !important.py - negates a previous ignore pattern - /build - matches only at root level (not in subdirectories) - src/*.log - matches .log files in src/ but not src/subdir/
Examples
Basic usage:
>>> from pathlib import Path >>> matcher = YTIgnoreMatcher(Path("/project")) >>> matcher.should_ignore(Path("/project/test.pyc")) True >>> matcher.should_ignore(Path("/project/main.py")) False
With custom .ytignore file:
>>> # Create a .ytignore file >>> project_dir = Path("/tmp/myproject") >>> project_dir.mkdir(exist_ok=True) >>> (project_dir / ".ytignore").write_text("*.pyc\\n__pycache__/\\n") >>> matcher = YTIgnoreMatcher(project_dir) >>> matcher.should_ignore(project_dir / "module.pyc") True >>> matcher.should_ignore(project_dir / "module.py") False
Negation patterns:
>>> # Ignore all .log files except important.log >>> (project_dir / ".ytignore").write_text("*.log\\n!important.log\\n") >>> matcher = YTIgnoreMatcher(project_dir) >>> matcher.should_ignore(project_dir / "debug.log") True >>> matcher.should_ignore(project_dir / "important.log") False
- Parameters:
base_dir (Path)
- yt_framework.utils.ignore.should_ignore_file(file_path, base_dir)[source]#
Return whether
file_pathshould be ignored underbase_dir.This is a shorthand for creating a YTIgnoreMatcher and checking a single file. For checking multiple files, create a YTIgnoreMatcher instance directly to avoid reloading .ytignore files for each check.
- Parameters:
- Returns:
True if the file should be ignored
- Return type:
Examples
>>> from pathlib import Path >>> # Assuming .ytignore contains "*.pyc" >>> should_ignore_file(Path("/project/test.pyc"), Path("/project")) True >>> should_ignore_file(Path("/project/test.py"), Path("/project")) False
Packaging and operation helpers (advanced)#
These modules support code upload, file dependencies, and environment wiring. Most pipelines rely on defaults; use the API when extending the framework.
Upload and local build#
Upload pipeline: wrappers, local build orchestration, and archive upload.
- yt_framework.operations.upload.build_code_locally(build_dir, pipeline_dir, logger, create_wrappers=False, upload_modules=None, upload_paths=None)[source]#
Build all code in a local build directory.
Copies ytjobs package, optional custom modules/paths, and all stages’ code to the build directory, preserving the same structure as would be uploaded to YT.
- Parameters:
build_dir (Path) – Local build directory path
pipeline_dir (Path) – Path to pipeline directory
logger (Logger) – Logger instance
create_wrappers (bool) – If True, create wrapper scripts for all stages
upload_modules (list[str] | None) – Optional list of module names to upload
upload_paths (list[dict[str, str]] | None) – Optional list of {source, target?} for local paths
- Returns:
Total number of files copied
- Return type:
- yt_framework.operations.upload.create_code_archive(build_dir, archive_path, logger)[source]#
Create a tar.gz archive from the build directory.
- yt_framework.operations.upload.upload_code_archive(yt_client, archive_path, build_folder, logger)[source]#
Upload code archive to YT.
- yt_framework.operations.upload.upload_all_code(yt_client, build_folder, pipeline_dir, logger, upload_modules=None, upload_paths=None)[source]#
Upload all code to YT: ytjobs package, optional custom modules/paths, and stages.
Builds code locally, creates tar archive, and uploads it to YT.
- Parameters:
yt_client (BaseYTClient) – YT client instance
build_folder (str) – YT build folder path
pipeline_dir (Path) – Path to pipeline directory
logger (Logger) – Logger instance
upload_modules (list[str] | None) – Optional list of module names to upload
upload_paths (list[dict[str, str]] | None) – Optional list of {source, target?} for local paths
- Returns:
None
- Return type:
None
Stage dependency file lists#
Collect extra file dependencies (including implicit ytjobs) for YT jobs.
- yt_framework.operations.dependencies.build_stage_dependencies(build_folder, stage_dir, logger)[source]#
Build dependency list for a single stage.
Includes: - config.yaml (if exists locally) - All .py files from src/ directory
- yt_framework.operations.dependencies.build_ytjobs_dependencies(build_folder, logger)[source]#
Build dependency list for ytjobs package.
Includes all .py files from ytjobs/ directory.
- yt_framework.operations.dependencies.add_checkpoint(dependencies, model_name, checkpoint_base, logger)[source]#
Add checkpoint file to dependencies if configured.
- Parameters:
- Returns:
Updated dependency list (new list with checkpoint added, or same list)
- Return type:
- yt_framework.operations.dependencies.build_vanilla_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#
Build complete dependency list for a vanilla operation.
Combines: - Stage files (config + src/) - Ytjobs package
- Parameters:
- Returns:
Tuple of (script_path, dependency_files) - script_path: Path to vanilla.py in YT - dependency_files: Complete list of dependencies
- Return type:
- yt_framework.operations.dependencies.build_map_dependencies(build_folder, stage_dir, model_name, checkpoint_base, logger)[source]#
Build complete dependency list for a map operation.
Combines: - Stage files (config + src/) - Ytjobs package - Checkpoint (if configured)
- Parameters:
- Returns:
Tuple of (mapper_path, dependency_files) - mapper_path: Path to mapper.py in YT - dependency_files: Complete list of dependencies
- Return type:
Shared operation utilities#
Shared helpers for map/vanilla/map-reduce (resources, secrets, tokenizer wiring).
- yt_framework.operations.common.build_environment(configs_dir, logger)[source]#
Build environment variables for map and vanilla operations.
Jobs read configuration from config.yaml, so only secrets are passed via environment variables.
- Parameters:
configs_dir (Path) – Directory containing secrets.env file
logger (logging.Logger) – Logger instance
- Returns:
Dictionary of secret environment variables
- Return type:
- yt_framework.operations.common.prepare_docker_auth(docker_image, docker_username, docker_password)[source]#
Prepare Docker authentication dictionary.
- Parameters:
- Returns:
Docker authentication dict if all credentials provided, None otherwise
- Return type:
- yt_framework.operations.common.extract_operation_resources(operation_config, logger)[source]#
Extract OperationResources from operation config with fallback defaults.
- Parameters:
operation_config (DictConfig)
logger (logging.Logger)
- Return type:
OperationResources
- yt_framework.operations.common.extract_secure_env_client_kwargs(operation_config)[source]#
Options for
YTProdClientsecure vault / public env partitioning.
- yt_framework.operations.common.collect_passthrough_kwargs(operation_config, reserved_keys)[source]#
Collect top-level config values to forward to YT client.
OmegaConf dict nodes are resolved to plain Python containers.
- yt_framework.operations.common.build_operation_environment(context, operation_config, logger, include_stage_name=True, include_tokenizer_artifact=True)[source]#
Build operation environment from secrets + explicit env config + optional helpers.
- Parameters:
context (StageContext)
operation_config (DictConfig)
logger (logging.Logger)
include_stage_name (bool)
include_tokenizer_artifact (bool)
- Return type:
- yt_framework.operations.common.docker_auth_from_op_config(operation_config, env)[source]#
Resolve docker image from config and return auth payload if credentials exist.
- yt_framework.operations.common.extract_max_failed_jobs(operation_config, logger)[source]#
Extract max_failed_job_count with default.
- Parameters:
operation_config (DictConfig)
logger (logging.Logger)
- Return type:
YT Client#
Client Factory#
Factory that returns YTDevClient or YTProdClient based on pipeline mode.
- yt_framework.yt.factory.create_yt_client(logger=None, mode='dev', pipeline_dir=None, secrets=None, pickling=None)[source]#
Create a YT client for the given pipeline mode.
- Parameters:
logger (Logger | None) – Logger instance (default: creates new logger)
mode (Literal['prod', 'dev'] | None) – “prod” for production YT client, “dev” for local development
pipeline_dir (Path | str | None) – Pipeline directory (required for dev mode)
secrets (dict[str, str] | None) – Optional dictionary containing YT credentials. Required only for prod mode. Expected keys: - YT_PROXY: YTsaurus proxy URL - YT_TOKEN: YTsaurus authentication token
pickling (dict[str, Any] | None) – Optional pickling-related client config (prod only).
- Returns:
BaseYTClient instance (YTProdClient or YTDevClient)
- Raises:
ValueError – If secrets are required for prod mode but not provided.
- Return type:
BaseYTClient
Example
>>> # Dev mode (local filesystem simulation) >>> client = create_yt_client(mode="dev", pipeline_dir=Path(".")) >>> >>> # Prod mode (real YT cluster) >>> secrets = {"YT_PROXY": "my-proxy", "YT_TOKEN": "my-token"} >>> client = create_yt_client(mode="prod", secrets=secrets)
Dev Client#
Local filesystem stand-in for Cypress tables and subprocess-backed jobs.
- class yt_framework.yt.clients.client_dev.YTDevClient(logger, pipeline_dir=None)[source]#
Bases:
ClientDevYqlMixin,ClientDevOpsMixin,BaseYTClientDevelopment YT client implementation.
Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory.
- exists(path)[source]#
Check if a path exists in YT.
In dev mode, always returns True (assumes files exist locally).
- write_table(table_path, rows, *, append=False, replication_factor=1)[source]#
Write rows to a YT table (saves as local .jsonl file).
In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line.
- Parameters:
- Returns:
None
- Return type:
None
Example
>>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}]) >>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\\n
Production Client#
Thin wrapper around yt.wrapper.YtClient for real cluster operations.
- class yt_framework.yt.clients.client_prod.YTProdClient(logger, secrets, pickling=None)[source]#
Bases:
ClientProdYqlMixin,ClientProdOpsMixin,BaseYTClientProduction YT client implementation.
Uses actual YTsaurus client for all operations.
- write_table(table_path, rows, *, append=False, replication_factor=1, make_parents=True)[source]#
Write rows to a YT table.
- Parameters:
table_path (str) – YT table path
rows (list[dict[str, Any]]) – List of dictionaries representing table rows
append (bool) – If True, append to existing table (default: False)
replication_factor (int) – Replication factor for the table (default: 1)
make_parents (bool) – If True, create parent directories if they don’t exist (default: True)
- Return type:
None