Pipelines and Stages#
Pipelines and stages are the core building blocks of YT Framework. Understanding how they work is essential for building effective data processing workflows.
Pipelines#
A pipeline is a collection of stages that execute in sequence. The pipeline manages:
Stage discovery and registration
Configuration loading
YT client initialization
Code upload (if needed)
Stage execution order
Error handling
DefaultPipeline#
Tip
Use DefaultPipeline
DefaultPipeline is recommended for most use cases. It automatically discovers stages, reducing boilerplate and making your code cleaner.
DefaultPipeline automatically discovers and registers stages from the stages/ directory. This is the recommended approach for most use cases.
Usage:
# pipeline.py
from yt_framework.core.pipeline import DefaultPipeline
if __name__ == "__main__":
DefaultPipeline.main()
How it works:
Scans
stages/directory for subdirectoriesLooks for
stage.pyfiles in each subdirectoryImports and registers any
BaseStagesubclasses foundExecutes stages in the order specified by
enabled_stagesin config
Example structure:
my_pipeline/
├── pipeline.py
├── configs/
│ └── config.yaml
└── stages/
├── stage1/
│ ├── stage.py
│ └── config.yaml
└── stage2/
├── stage.py
└── config.yaml
See Example: 01_hello_world for a complete example.
BasePipeline#
BasePipeline allows manual stage registration. Use this when you need custom pipeline logic or want explicit control over stage registration.
Usage:
# pipeline.py
from yt_framework.core.pipeline import BasePipeline
from yt_framework.core.registry import StageRegistry
from stages.stage1.stage import Stage1
from stages.stage2.stage import Stage2
class MyPipeline(BasePipeline):
def setup(self):
registry = StageRegistry()
registry.add_stage(Stage1)
registry.add_stage(Stage2)
self.set_stage_registry(registry)
if __name__ == "__main__":
MyPipeline.main()
When to use BasePipeline:
Custom pipeline initialization logic
Conditional stage registration
Integration with external systems
Advanced use cases
For most users, DefaultPipeline is sufficient.
Stages#
A stage is a single unit of work in a pipeline. Each stage:
Has its own configuration file
Receives dependencies (YT client, configs, etc.)
Can pass data to subsequent stages via context
Is executed independently
BaseStage#
All stages inherit from BaseStage. The base class provides:
Automatic config loading from
stages/<stage_name>/config.yamlAccess to YT client via
self.deps.yt_clientAccess to pipeline config via
self.deps.pipeline_configLogger via
self.loggerStage context via
self.context
Stage Structure#
Each stage must follow this directory structure:
stages/
└── stage_name/
├── stage.py # Stage implementation (required)
├── config.yaml # Stage configuration (required)
├── src/ # Optional: source code for operations
│ ├── mapper.py # For map operations
│ └── vanilla.py # For vanilla operations
└── requirements.txt # Optional: Python dependencies
Creating a Stage#
Minimal stage:
# stages/my_stage/stage.py
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
self.logger.info("Running my stage")
# Stage logic here
return debug
Stage with configuration:
# stages/my_stage/config.yaml
client:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
# stages/my_stage/stage.py
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
input_table = self.config.client.input_table
output_table = self.config.client.output_table
# Read from input
rows = list(self.deps.yt_client.read_table(input_table))
# Process data
processed = [process_row(row) for row in rows]
# Write to output
self.deps.yt_client.write_table(output_table, processed)
return debug
Stage Dependencies#
Stages receive dependencies through self.deps:
self.deps.yt_client: YT client for table operationsself.deps.pipeline_config: Pipeline-level configurationself.deps.configs_dir: Path to configs directory (for secrets)
Example:
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Access YT client
yt = self.deps.yt_client
# Access pipeline config
mode = self.deps.pipeline_config.pipeline.mode
# Access stage config
table_path = self.config.client.output_table
return debug
Stage Context#
Stages can pass data to subsequent stages via the debug context dictionary:
class Stage1(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Add data to context
debug["result"] = "some value"
debug["count"] = 42
return debug
class Stage2(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Access data from previous stage
result = debug.get("result")
count = debug.get("count", 0)
return debug
Warning
Context Size Limits
The context is a simple dictionary. Use it for small amounts of data (metadata, counts, flags). For large datasets, use YT tables instead.
Important: The context is a simple dictionary. Use it for small amounts of data. For large datasets, use YT tables instead.
Stage Configuration#
Each stage has its own config.yaml file. Configuration is accessed via self.config:
# stages/my_stage/config.yaml
job:
multiplier: 2
prefix: "processed_"
client:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
operations:
map:
resources:
memory_limit_gb: 4
cpu_limit: 2
# stages/my_stage/stage.py
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Access job config
multiplier = self.config.job.multiplier
prefix = self.config.job.prefix
# Access client config
input_table = self.config.client.input_table
output_table = self.config.client.output_table
# Access nested config
memory = self.config.client.operations.map.resources.memory_limit_gb
return debug
Stage Lifecycle#
Discovery (DefaultPipeline only): Framework scans
stages/directoryRegistration: Stage class is registered in stage registry
Initialization: Stage instance is created with dependencies
Config Loading: Stage config is loaded from
config.yamlExecution:
run()method is called with contextCompletion: Context is returned and passed to next stage
Stage Execution Order#
Stages execute in the order specified by enabled_stages in the pipeline config:
# configs/config.yaml
stages:
enabled_stages:
- create_input
- process_data
- validate_output
Note
Sequential Execution
Stages are executed sequentially. If a stage fails, the pipeline stops. Make sure each stage handles errors appropriately.
Stages are executed sequentially. If a stage fails, the pipeline stops.
Multiple Stages Example#
See Example: 02_multi_stage_pipeline for a complete example with multiple stages and context passing.
Best Practices#
Keep stages focused: Each stage should do one thing well
Use descriptive names: Stage names should clearly indicate their purpose
Pass data via tables: Use YT tables for large datasets, not context
Handle errors: Stages should raise exceptions on failure
Log progress: Use
self.loggerto log important operationsTest locally: Use dev mode for development and testing
Next Steps#
Learn about Configuration management
Understand Dev vs Prod modes
Explore Operations for different operation types
Review Examples for complete pipeline examples