Pipelines and stages#
A pipeline runs an ordered list of stages. The framework loads config, builds a YT client for the current mode, runs upload when needed, then calls each stage’s run method in sequence.
Pipelines#
The pipeline is responsible for:
Discovering or registering stages
Loading YAML and secrets
Initializing the YT client (dev or prod)
Uploading code in prod
Invoking stages in
enabled_stagesorderSurfacing failures (a raised exception stops the run)
DefaultPipeline#
Tip
Prefer DefaultPipeline
It discovers stages under stages/ so you rarely need custom registration logic.
Point __main__ at DefaultPipeline.main():
# pipeline.py
from yt_framework.core.pipeline import DefaultPipeline
if __name__ == "__main__":
DefaultPipeline.main()
Discovery does the following:
List subdirectories of
stages/.Import
stage.pyfrom each.Register every
BaseStagesubclass found.Run stages in the order given by
configs/config.yaml→stages.enabled_stages.
Example layout:
my_pipeline/
├── pipeline.py
├── configs/
│ └── config.yaml
└── stages/
├── stage1/
│ ├── stage.py
│ └── config.yaml
└── stage2/
├── stage.py
└── config.yaml
Working tree: 01_hello_world.
BasePipeline#
Use BasePipeline when you need explicit registration or setup hooks (conditional stages, tests, unusual layout).
# pipeline.py
from yt_framework.core.pipeline import BasePipeline
from yt_framework.core.registry import StageRegistry
from stages.stage1.stage import Stage1
from stages.stage2.stage import Stage2
class MyPipeline(BasePipeline):
def setup(self):
registry = StageRegistry()
registry.add_stage(Stage1)
registry.add_stage(Stage2)
self.set_stage_registry(registry)
if __name__ == "__main__":
MyPipeline.main()
Reasons you might choose this:
Custom
setup()work before stages existStages not discoverable from a flat
stages/treeTests that inject a small fixed registry
For normal repos, DefaultPipeline is enough.
Stages#
A stage is one unit of work: one BaseStage subclass, one config.yaml, and optional src/ or requirements.txt for uploaded jobs.
BaseStage#
BaseStage gives you:
Parsed stage config at
self.config(fromstages/<name>/config.yaml)self.deps.yt_client, pipeline-wide settings, pathsself.loggerOperation-related context on
self.contextwhere applicable
Directory layout#
stages/
└── stage_name/
├── stage.py # required
├── config.yaml # required
├── src/ # optional (mapper, vanilla, etc.)
│ ├── mapper.py
│ └── vanilla.py
└── requirements.txt # optional extra pip deps for the job bundle
Minimal stage#
# stages/my_stage/stage.py
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
self.logger.info("Running my stage")
return debug
Stage with tables#
# stages/my_stage/config.yaml
client:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
# stages/my_stage/stage.py
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
input_table = self.config.client.input_table
output_table = self.config.client.output_table
rows = list(self.deps.yt_client.read_table(input_table))
processed = [process_row(row) for row in rows]
# Default overwrites the output. Pass append=True to append if the table already exists.
self.deps.yt_client.write_table(output_table, processed)
return debug
self.deps#
Typical fields:
self.deps.yt_client— read/write tables, YQL helpers, etc.self.deps.pipeline_config— merged pipeline section fromconfigs/config.yamlself.deps.configs_dir— directory that holdssecrets.env
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
yt = self.deps.yt_client
mode = self.deps.pipeline_config.pipeline.mode
table_path = self.config.client.output_table
self.logger.info("mode=%s table=%s", mode, table_path)
return debug
Passing data between stages (debug)#
debug is a mutable mapping carried from stage to stage. Put small flags or summaries here; put large data in YT tables.
class Stage1(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
debug["result"] = "some value"
debug["count"] = 42
return debug
class Stage2(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
result = debug.get("result")
count = debug.get("count", 0)
self.logger.info("from stage1: result=%s count=%s", result, count)
return debug
Warning
Keep debug small
It is an in-memory dict passed through the driver process. Metadata only: for big payloads, write a table and pass the path in debug if needed.
Stage configuration#
Access nested YAML through self.config:
# stages/my_stage/config.yaml
job:
multiplier: 2
prefix: "processed_"
client:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
operations:
map:
resources:
memory_limit_gb: 4
cpu_limit: 2
class MyStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
multiplier = self.config.job.multiplier
prefix = self.config.job.prefix
input_table = self.config.client.input_table
output_table = self.config.client.output_table
memory = self.config.client.operations.map.resources.memory_limit_gb
self.logger.info(
"job %s %s tables %s -> %s map_mem_gb=%s",
prefix,
multiplier,
input_table,
output_table,
memory,
)
return debug
Lifecycle (conceptual)#
Discovery (
DefaultPipeline): scanstages/.Registration: stage classes recorded in a registry.
Construction: one instance per stage with dependencies injected.
Config load: OmegaConf-style object from
config.yaml.Run:
run(debug)executes; return value becomes the next stage’sdebug.Next stage: repeat until the list ends or an error is raised.
Order#
# configs/config.yaml
stages:
enabled_stages:
- create_input
- process_data
- validate_output
Note
Sequential
Stages run one after another. An uncaught exception aborts the pipeline.
Injection details#
self.deps follows PipelineStageDependencies. See Core injection (self.deps) under API reference (yt_framework.core.dependencies).
Multi-stage sample#
Practices that tend to help#
One main responsibility per stage.
Names that match what the stage does in YT terms.
Large payloads in tables, not in
debug.Fail fast: raise on invalid input instead of returning partial success silently.
Log decisions you will need when reading
.devlogs or YT operation logs.Exercise new stages in dev mode before pointing prod traffic at them.