Vanilla operations#
Vanilla runs src/vanilla.py once as a standalone job: no stdin row stream, no required input/output tables. Use it for bootstrap, teardown, health checks, or anything that should not be modeled as per-row map work.
Overview#
Tip
When vanilla fits
Pick vanilla when you do not need table-based map I/O. If you are emitting one output row per input row, map is usually clearer.
Compared with map:
One process per operation (no row fan-out).
Optional tables only if you open the client yourself inside the script.
Same upload/wrapper machinery as map in prod.
Note
Vanilla vs map
Vanilla = single script invocation. Map = streaming JSON rows through mapper.py.
Quick Start#
Minimal Example#
Stage (stages/setup/stage.py):
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.vanilla import run_vanilla
class SetupStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.vanilla,
)
if not success:
raise RuntimeError("Vanilla operation failed")
return debug
Stage config (stages/setup/config.yaml):
client:
operations:
vanilla:
resources:
pool: default
memory_limit_gb: 2
cpu_limit: 1
Vanilla script (stages/setup/src/vanilla.py):
#!/usr/bin/env python3
import logging
from omegaconf import OmegaConf
from ytjobs.logging.logger import get_logger
from ytjobs.config import get_config_path
def main():
logger = get_logger("vanilla", level=logging.INFO)
# Load configuration
config = OmegaConf.load(get_config_path())
logger.info("Vanilla operation started")
# Do some work
greeting = config.job.get("greeting", "Hello!")
logger.info(f"Greeting: {greeting}")
# Simulate work
for i in range(5):
logger.info(f"Iteration {i+1}")
logger.info("Vanilla operation completed")
if __name__ == "__main__":
main()
See Example: 05_vanilla_operation for a complete example.
Vanilla Script#
The vanilla script (src/vanilla.py) is executed once per operation.
Script Structure#
#!/usr/bin/env python3
import logging
from omegaconf import OmegaConf
from ytjobs.logging.logger import get_logger
from ytjobs.config import get_config_path
def main():
# Initialize logger
logger = get_logger("vanilla", level=logging.INFO)
# Load configuration
config = OmegaConf.load(get_config_path())
# Your logic here
logger.info("Starting work...")
# Do work
perform_task(config)
logger.info("Work completed")
if __name__ == "__main__":
main()
Configuration Access#
Access stage configuration:
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
config = OmegaConf.load(get_config_path())
# Access job config
greeting = config.job.greeting
iterations = config.job.iterations
# Access client config (read-only)
memory = config.client.operations.vanilla.resources.memory_limit_gb
Stage config (stages/my_stage/config.yaml):
job:
greeting: "Hello from YT!"
iterations: 10
client:
operations:
vanilla:
resources:
memory_limit_gb: 4
cpu_limit: 2
Logging#
Use YT logging utilities:
import logging
from ytjobs.logging.logger import get_logger
logger = get_logger("vanilla", level=logging.INFO)
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
logger.debug("Debug message")
Logs appear in YT operation logs (prod mode) or .dev/<stage_name>.log (dev mode).
Error Handling#
Handle errors and exit with appropriate codes:
import sys
import logging
from ytjobs.logging.logger import get_logger
logger = get_logger("vanilla", level=logging.INFO)
try:
# Your logic
perform_task()
logger.info("Task completed successfully")
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
sys.exit(1) # Exit with error code
Important: Non-zero exit codes will cause the operation to fail.
Configuration#
Basic Configuration#
client:
operations:
vanilla:
resources:
pool: default
memory_limit_gb: 2
cpu_limit: 1
Required Fields#
resources: Resource configuration
Resource Configuration#
resources:
pool: default # YT pool name
pool_tree: null # Optional: pool tree
memory_limit_gb: 2 # Memory (GB)
cpu_limit: 1 # CPU cores (fractional, e.g. 0.5)
gpu_limit: 0 # GPU count (default: 0)
user_slots: null # Optional: user slots limit
Resource guidelines:
Memory: Allocate based on task needs
CPU: More cores = faster execution; fractional limits (e.g.
0.5) are supportedGPU: Set to 1+ for GPU workloads (requires custom Docker)
Advanced Configuration#
Max failed jobs:
client:
operations:
vanilla:
max_failed_job_count: 1 # Fail operation after N failed jobs
resources:
# ...
Max row weight:
max_row_weight defaults to 128M for vanilla operations (and cannot exceed 128M). Override it per operation:
client:
operations:
vanilla:
max_row_weight: 64M
resources:
# ...
Custom Docker:
client:
operations:
vanilla:
resources:
docker_image: my-registry/my-image:latest
# ...
See Docker Guide for details.
Checkpoints:
client:
operations:
vanilla:
checkpoint:
checkpoint_base: //tmp/my_pipeline/checkpoints
local_checkpoint_path: /path/to/local/model.pth
resources:
# ...
See Checkpoints Guide for details.
Running Vanilla Operations#
From Stage#
Use run_vanilla() function:
from yt_framework.operations.command_ops.vanilla import run_vanilla
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.vanilla,
)
Operation Flow#
Code upload: Code is packaged and uploaded to YT (prod mode)
Job creation: YT creates a single job
Execution: Job runs vanilla.py script
Completion: Operation completes when job finishes
Dev Mode Behavior#
In dev mode:
Runs locally using subprocess
Creates sandbox directory:
.dev/<stage_name>_sandbox/Extracts code archive
Executes vanilla.py script
Logs output to
.dev/<stage_name>.log
Use Cases#
Setup Tasks#
Initialize directories, create tables, prepare data:
def main():
logger = get_logger("setup", level=logging.INFO)
config = OmegaConf.load(get_config_path())
# Create directories
create_directories(config)
# Initialize tables
initialize_tables(config)
logger.info("Setup completed")
Validation#
Validate data, check conditions, run tests:
def main():
logger = get_logger("validate", level=logging.INFO)
config = OmegaConf.load(get_config_path())
# Validate data
if not validate_data(config):
logger.error("Validation failed")
sys.exit(1)
logger.info("Validation passed")
Cleanup#
Clean up temporary files, remove old data:
def main():
logger = get_logger("cleanup", level=logging.INFO)
config = OmegaConf.load(get_config_path())
# Clean up temporary files
cleanup_temp_files(config)
logger.info("Cleanup completed")
Environment Logging#
Log environment information for debugging:
def main():
logger = get_logger("logenv", level=logging.INFO)
# Log environment
log_system_info(logger)
log_python_environment(logger)
log_gpu_info(logger)
logger.info("Environment logged")
See Example: environment_log for logging environment variables from a vanilla job.
Advanced Topics#
Multiple Vanilla Operations#
Run multiple vanilla operations in one stage:
class SetupAndValidateStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Setup operation
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.setup,
)
if not success:
raise RuntimeError("Setup failed")
# Validate operation
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.validate,
)
if not success:
raise RuntimeError("Validate failed")
return debug
Combining with Map Operations#
Run vanilla operations before or after map operations:
class ProcessAndValidateStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# Map operation
success = run_map(
context=self.context,
operation_config=self.config.client.operations.process,
)
if not success:
raise RuntimeError("Process failed")
# Vanilla validation
success = run_vanilla(
context=self.context,
operation_config=self.config.client.operations.validate,
)
if not success:
raise RuntimeError("Validate failed")
return debug
See Example: 09_multiple_operations for details.
GPU Workloads#
For GPU workloads:
Custom Docker: Create Docker image with GPU support
GPU resources: Set
gpu_limit: 1or higherGPU code: Use GPU libraries in vanilla script
Example:
client:
operations:
vanilla:
resources:
docker_image: my-registry/gpu-image:latest
gpu_limit: 1
memory_limit_gb: 16
Checkpoint Usage#
Load ML models from checkpoints:
import os
from ytjobs.config import get_config_path
from omegaconf import OmegaConf
# Checkpoint file is available in sandbox
checkpoint_file = os.environ.get("CHECKPOINT_FILE")
if checkpoint_file:
# Load model from checkpoint
model = load_model(checkpoint_file)
See Checkpoints Guide for details.
Best Practices#
Single responsibility: Each vanilla operation should do one thing
Error handling: Handle errors and exit with appropriate codes
Logging: Log important operations for debugging
Resource allocation: Allocate resources based on actual needs
Idempotency: Operations should be safe to rerun
Testing: Test vanilla scripts locally before running on cluster
Common Patterns#
Simple Task#
def main():
logger = get_logger("task", level=logging.INFO)
config = OmegaConf.load(get_config_path())
logger.info("Starting task")
perform_task(config)
logger.info("Task completed")
Task with Iterations#
def main():
logger = get_logger("task", level=logging.INFO)
config = OmegaConf.load(get_config_path())
iterations = config.job.iterations
for i in range(iterations):
logger.info(f"Iteration {i+1}/{iterations}")
do_work()
Task with Validation#
def main():
logger = get_logger("task", level=logging.INFO)
config = OmegaConf.load(get_config_path())
# Validate prerequisites
if not check_prerequisites():
logger.error("Prerequisites not met")
sys.exit(1)
# Perform task
perform_task()
logger.info("Task completed")
Troubleshooting#
Issue: Operation fails immediately#
Check vanilla script syntax
Verify script has
if __name__ == "__main__": main()blockCheck resource limits
Issue: Script doesn’t execute#
Verify
src/vanilla.pyexistsCheck file permissions
Review operation logs
Issue: Out of memory#
Increase
memory_limit_gbCheck memory usage in script
Optimize code
Issue: Script hangs#
Check for infinite loops
Verify external dependencies are available
Review resource limits
Next Steps#
Learn about Map Operations
Explore Advanced Topics (Docker, checkpoints)
Check out Examples for more patterns