Vanilla operations#

Vanilla runs src/vanilla.py once as a standalone job: no stdin row stream, no required input/output tables. Use it for bootstrap, teardown, health checks, or anything that should not be modeled as per-row map work.

Overview#

Tip

When vanilla fits

Pick vanilla when you do not need table-based map I/O. If you are emitting one output row per input row, map is usually clearer.

Compared with map:

  • One process per operation (no row fan-out).

  • Optional tables only if you open the client yourself inside the script.

  • Same upload/wrapper machinery as map in prod.

Note

Vanilla vs map

Vanilla = single script invocation. Map = streaming JSON rows through mapper.py.

Quick Start#

Minimal Example#

Stage (stages/setup/stage.py):

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.vanilla import run_vanilla

class SetupStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.vanilla,
        )
        
        if not success:
            raise RuntimeError("Vanilla operation failed")
        
        return debug

Stage config (stages/setup/config.yaml):

client:
  operations:
    vanilla:
      resources:
        pool: default
        memory_limit_gb: 2
        cpu_limit: 1

Vanilla script (stages/setup/src/vanilla.py):

#!/usr/bin/env python3
import logging
from omegaconf import OmegaConf
from ytjobs.logging.logger import get_logger
from ytjobs.config import get_config_path

def main():
    logger = get_logger("vanilla", level=logging.INFO)
    
    # Load configuration
    config = OmegaConf.load(get_config_path())
    
    logger.info("Vanilla operation started")
    
    # Do some work
    greeting = config.job.get("greeting", "Hello!")
    logger.info(f"Greeting: {greeting}")
    
    # Simulate work
    for i in range(5):
        logger.info(f"Iteration {i+1}")
    
    logger.info("Vanilla operation completed")

if __name__ == "__main__":
    main()

See Example: 05_vanilla_operation for a complete example.

Vanilla Script#

The vanilla script (src/vanilla.py) is executed once per operation.

Script Structure#

#!/usr/bin/env python3
import logging
from omegaconf import OmegaConf
from ytjobs.logging.logger import get_logger
from ytjobs.config import get_config_path

def main():
    # Initialize logger
    logger = get_logger("vanilla", level=logging.INFO)
    
    # Load configuration
    config = OmegaConf.load(get_config_path())
    
    # Your logic here
    logger.info("Starting work...")
    
    # Do work
    perform_task(config)
    
    logger.info("Work completed")

if __name__ == "__main__":
    main()

Configuration Access#

Access stage configuration:

from omegaconf import OmegaConf
from ytjobs.config import get_config_path

config = OmegaConf.load(get_config_path())

# Access job config
greeting = config.job.greeting
iterations = config.job.iterations

# Access client config (read-only)
memory = config.client.operations.vanilla.resources.memory_limit_gb

Stage config (stages/my_stage/config.yaml):

job:
  greeting: "Hello from YT!"
  iterations: 10

client:
  operations:
    vanilla:
      resources:
        memory_limit_gb: 4
        cpu_limit: 2

Logging#

Use YT logging utilities:

import logging
from ytjobs.logging.logger import get_logger

logger = get_logger("vanilla", level=logging.INFO)

logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
logger.debug("Debug message")

Logs appear in YT operation logs (prod mode) or .dev/<stage_name>.log (dev mode).

Error Handling#

Handle errors and exit with appropriate codes:

import sys
import logging
from ytjobs.logging.logger import get_logger

logger = get_logger("vanilla", level=logging.INFO)

try:
    # Your logic
    perform_task()
    logger.info("Task completed successfully")
except Exception as e:
    logger.error(f"Task failed: {e}", exc_info=True)
    sys.exit(1)  # Exit with error code

Important: Non-zero exit codes will cause the operation to fail.

Configuration#

Basic Configuration#

client:
  operations:
    vanilla:
      resources:
        pool: default
        memory_limit_gb: 2
        cpu_limit: 1

Required Fields#

  • resources: Resource configuration

Resource Configuration#

resources:
  pool: default              # YT pool name
  pool_tree: null            # Optional: pool tree
  memory_limit_gb: 2         # Memory (GB)
  cpu_limit: 1               # CPU cores (fractional, e.g. 0.5)
  gpu_limit: 0               # GPU count (default: 0)
  user_slots: null           # Optional: user slots limit

Resource guidelines:

  • Memory: Allocate based on task needs

  • CPU: More cores = faster execution; fractional limits (e.g. 0.5) are supported

  • GPU: Set to 1+ for GPU workloads (requires custom Docker)

Advanced Configuration#

Max failed jobs:

client:
  operations:
    vanilla:
      max_failed_job_count: 1  # Fail operation after N failed jobs
      resources:
        # ...

Max row weight:

max_row_weight defaults to 128M for vanilla operations (and cannot exceed 128M). Override it per operation:

client:
  operations:
    vanilla:
      max_row_weight: 64M
      resources:
        # ...

Custom Docker:

client:
  operations:
    vanilla:
      resources:
        docker_image: my-registry/my-image:latest
        # ...

See Docker Guide for details.

Checkpoints:

client:
  operations:
    vanilla:
      checkpoint:
        checkpoint_base: //tmp/my_pipeline/checkpoints
        local_checkpoint_path: /path/to/local/model.pth
      resources:
        # ...

See Checkpoints Guide for details.

Running Vanilla Operations#

From Stage#

Use run_vanilla() function:

from yt_framework.operations.command_ops.vanilla import run_vanilla

success = run_vanilla(
    context=self.context,
    operation_config=self.config.client.operations.vanilla,
)

Operation Flow#

  1. Code upload: Code is packaged and uploaded to YT (prod mode)

  2. Job creation: YT creates a single job

  3. Execution: Job runs vanilla.py script

  4. Completion: Operation completes when job finishes

Dev Mode Behavior#

In dev mode:

  • Runs locally using subprocess

  • Creates sandbox directory: .dev/<stage_name>_sandbox/

  • Extracts code archive

  • Executes vanilla.py script

  • Logs output to .dev/<stage_name>.log

Use Cases#

Setup Tasks#

Initialize directories, create tables, prepare data:

def main():
    logger = get_logger("setup", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    # Create directories
    create_directories(config)
    
    # Initialize tables
    initialize_tables(config)
    
    logger.info("Setup completed")

Validation#

Validate data, check conditions, run tests:

def main():
    logger = get_logger("validate", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    # Validate data
    if not validate_data(config):
        logger.error("Validation failed")
        sys.exit(1)
    
    logger.info("Validation passed")

Cleanup#

Clean up temporary files, remove old data:

def main():
    logger = get_logger("cleanup", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    # Clean up temporary files
    cleanup_temp_files(config)
    
    logger.info("Cleanup completed")

Environment Logging#

Log environment information for debugging:

def main():
    logger = get_logger("logenv", level=logging.INFO)
    
    # Log environment
    log_system_info(logger)
    log_python_environment(logger)
    log_gpu_info(logger)
    
    logger.info("Environment logged")

See Example: environment_log for logging environment variables from a vanilla job.

Advanced Topics#

Multiple Vanilla Operations#

Run multiple vanilla operations in one stage:

class SetupAndValidateStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Setup operation
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.setup,
        )
        if not success:
            raise RuntimeError("Setup failed")
        
        # Validate operation
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.validate,
        )
        if not success:
            raise RuntimeError("Validate failed")
        
        return debug

Combining with Map Operations#

Run vanilla operations before or after map operations:

class ProcessAndValidateStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # Map operation
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.process,
        )
        if not success:
            raise RuntimeError("Process failed")
        
        # Vanilla validation
        success = run_vanilla(
            context=self.context,
            operation_config=self.config.client.operations.validate,
        )
        if not success:
            raise RuntimeError("Validate failed")
        
        return debug

See Example: 09_multiple_operations for details.

GPU Workloads#

For GPU workloads:

  1. Custom Docker: Create Docker image with GPU support

  2. GPU resources: Set gpu_limit: 1 or higher

  3. GPU code: Use GPU libraries in vanilla script

Example:

client:
  operations:
    vanilla:
      resources:
        docker_image: my-registry/gpu-image:latest
        gpu_limit: 1
        memory_limit_gb: 16

Checkpoint Usage#

Load ML models from checkpoints:

import os
from ytjobs.config import get_config_path
from omegaconf import OmegaConf

# Checkpoint file is available in sandbox
checkpoint_file = os.environ.get("CHECKPOINT_FILE")
if checkpoint_file:
    # Load model from checkpoint
    model = load_model(checkpoint_file)

See Checkpoints Guide for details.

Best Practices#

  1. Single responsibility: Each vanilla operation should do one thing

  2. Error handling: Handle errors and exit with appropriate codes

  3. Logging: Log important operations for debugging

  4. Resource allocation: Allocate resources based on actual needs

  5. Idempotency: Operations should be safe to rerun

  6. Testing: Test vanilla scripts locally before running on cluster

Common Patterns#

Simple Task#

def main():
    logger = get_logger("task", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    logger.info("Starting task")
    perform_task(config)
    logger.info("Task completed")

Task with Iterations#

def main():
    logger = get_logger("task", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    iterations = config.job.iterations
    for i in range(iterations):
        logger.info(f"Iteration {i+1}/{iterations}")
        do_work()

Task with Validation#

def main():
    logger = get_logger("task", level=logging.INFO)
    config = OmegaConf.load(get_config_path())
    
    # Validate prerequisites
    if not check_prerequisites():
        logger.error("Prerequisites not met")
        sys.exit(1)
    
    # Perform task
    perform_task()
    logger.info("Task completed")

Troubleshooting#

Issue: Operation fails immediately#

  • Check vanilla script syntax

  • Verify script has if __name__ == "__main__": main() block

  • Check resource limits

Issue: Script doesn’t execute#

  • Verify src/vanilla.py exists

  • Check file permissions

  • Review operation logs

Issue: Out of memory#

  • Increase memory_limit_gb

  • Check memory usage in script

  • Optimize code

Issue: Script hangs#

  • Check for infinite loops

  • Verify external dependencies are available

  • Review resource limits

Next Steps#