Map operations#

A map job reads an input table, runs your mapper.py once per row (grouped into YT tasks in prod), and writes JSON lines to an output table. Use it when the transform is row-local in Python and does not fit YQL cleanly.

Overview#

Tip

When map fits

Reach for map when each output row (or zero rows) depends only on one input row and you need arbitrary Python. Prefer YQL for set-style SQL operations.

Behavior:

  • Stdin/stdout JSON lines (one object per line); flush after each printed row.

  • Prod: many tasks; dev: a single local subprocess and a sandbox under .dev/.

  • Requires input_table, output_table, and resources in YAML.

Warning

Mapper Script Requirements

Your mapper script must read from stdin and write to stdout. Each line is a JSON-encoded row. Make sure to flush output after each row.

Quick Start#

Minimal Example#

Stage (stages/process_data/stage.py):

from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map

class ProcessDataStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.map,
        )
        
        if not success:
            raise RuntimeError("Map operation failed")
        
        return debug

Stage config (stages/process_data/config.yaml):

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      resources:
        pool: default
        memory_limit_gb: 4
        cpu_limit: 2
        job_count: 2

Mapper script (stages/process_data/src/mapper.py):

#!/usr/bin/env python3
import sys
import json
from omegaconf import OmegaConf
from ytjobs.config import get_config_path

def main():
    # Load configuration
    config = OmegaConf.load(get_config_path())
    
    # Process each input row
    for line in sys.stdin:
        row = json.loads(line)
        
        # Transform row
        output_row = {
            "id": row["id"],
            "processed_value": row["value"] * 2
        }
        
        # Write output row
        print(json.dumps(output_row), flush=True)

if __name__ == "__main__":
    main()

See Example: 04_map_operation for a complete example.

Append output#

Use append: true under client.operations.map when mapper rows should be appended to an existing output table rather than replacing it.

On the cluster, the output table must already exist and incoming rows must match its schema (including typed columns). In dev mode, if the output .jsonl already exists, mapper stdout is appended after the current lines.

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      append: true
      resources:
        pool: default

Mapper Script#

The mapper script (src/mapper.py) is executed for each row of the input table.

Input/Output Format#

  • Input: One JSON object per line via stdin

  • Output: One JSON object per line to stdout

Example:

import sys
import json

def process_row(row: dict) -> dict:
    return row  # replace with your transform

for line in sys.stdin:
    row = json.loads(line)
    processed = process_row(row)
    print(json.dumps(processed), flush=True)

Configuration Access#

Access stage configuration in mapper script:

from omegaconf import OmegaConf
from ytjobs.config import get_config_path

config = OmegaConf.load(get_config_path())

# Access job config
multiplier = config.job.multiplier
prefix = config.job.prefix

# Access client config (read-only)
input_table = config.client.operations.map.input_table

Stage config (stages/my_stage/config.yaml):

job:
  multiplier: 2
  prefix: "processed_"

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output

Error Handling#

Handle errors gracefully:

import sys
import json
import traceback

for line in sys.stdin:
    try:
        row = json.loads(line)
        output_row = process_row(row)
        print(json.dumps(output_row), flush=True)
    except Exception as e:
        # Log error and skip row
        print(f"Error processing row: {e}", file=sys.stderr)
        traceback.print_exc(file=sys.stderr)
        continue

Important: Failed jobs will cause the operation to fail if max_failed_job_count is exceeded.

Logging#

Use YT logging utilities:

import logging
from ytjobs.logging.logger import get_logger

logger = get_logger("mapper", level=logging.INFO)

for line in sys.stdin:
    row = json.loads(line)
    logger.info(f"Processing row {row['id']}")
    # Process row...

Logs appear in YT operation logs (prod mode) or .dev/ directory (dev mode).

Configuration#

Basic Configuration#

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      resources:
        pool: default
        memory_limit_gb: 4
        cpu_limit: 2
        job_count: 2

Required Fields#

  • input_table: YT path to input table

  • output_table: YT path to output table

  • resources: Resource configuration

Resource Configuration#

resources:
  pool: default              # YT pool name
  pool_tree: null            # Optional: pool tree
  memory_limit_gb: 4        # Memory per job (GB)
  cpu_limit: 2               # CPU cores per job (fractional, e.g. 0.5)
  job_count: 2               # Number of parallel jobs
  gpu_limit: 0               # GPU count (default: 0)
  user_slots: null           # Optional: user slots limit

Resources (rule of thumb):

  • Raise memory_limit_gb when a single row plus model weights no longer fits.

  • cpu_limit helps per-task throughput; fractional values (e.g. 0.5, 0.1) are passed through to YTsaurus; job_count spreads rows across tasks.

  • gpu_limit > 0 only works with an image that actually exposes GPUs to Python.

Advanced Configuration#

Max failed jobs:

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      max_failed_job_count: 1  # Fail operation after N failed jobs
      resources:
        # ...

Max row weight:

max_row_weight defaults to 128M for map operations (that is also the maximum the cluster accepts). Override it per operation with a value at or below 128M:

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      max_row_weight: 64M
      resources:
        # ...

Custom Docker:

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      resources:
        docker_image: my-registry/my-image:latest
        # ...

See Docker Guide for details.

Checkpoints:

client:
  operations:
    map:
      input_table: //tmp/my_pipeline/input
      output_table: //tmp/my_pipeline/output
      checkpoint:
        checkpoint_base: //tmp/my_pipeline/checkpoints
        local_checkpoint_path: /path/to/local/model.pth
      resources:
        # ...

See Checkpoints Guide for details.

Running Map Operations#

From Stage#

Use run_map() function:

from yt_framework.operations.command_ops.map import run_map

success = run_map(
    context=self.context,
    operation_config=self.config.client.operations.map,
)

Operation Flow#

  1. Code upload: Code is packaged and uploaded to YT (prod mode)

  2. Job creation: YT creates multiple jobs based on job_count

  3. Row distribution: Input table rows are distributed across jobs

  4. Execution: Each job runs mapper.py for its assigned rows

  5. Output collection: Results are written to output table

  6. Completion: Operation completes when all jobs finish

Dev Mode Behavior#

In dev mode:

  • Runs sequentially (single job)

  • Creates sandbox directory: .dev/sandbox_<input>-><output>/

  • Input table copied to sandbox

  • Mapper script executed locally

  • Output written to .dev/<output>.jsonl

Advanced Topics#

Multiple Map Operations#

Run multiple map operations in one stage:

class ProcessAndValidateStage(BaseStage):
    def run(self, debug: DebugContext) -> DebugContext:
        # First map operation
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.process,
        )
        if not success:
            raise RuntimeError("Process failed")
        
        # Second map operation
        success = run_map(
            context=self.context,
            operation_config=self.config.client.operations.validate,
        )
        if not success:
            raise RuntimeError("Validate failed")
        
        return debug

See Example: 09_multiple_operations for details.

GPU Processing#

For GPU workloads:

  1. Custom Docker: Create Docker image with GPU support

  2. GPU resources: Set gpu_limit: 1 or higher

  3. GPU code: Use GPU libraries in mapper script

Example:

client:
  operations:
    map:
      resources:
        docker_image: my-registry/gpu-image:latest
        gpu_limit: 1
        memory_limit_gb: 16

See Example: video_gpu for GPU processing example.

Checkpoint Usage#

Load ML models from checkpoints:

import os
from ytjobs.config import get_config_path
from omegaconf import OmegaConf

# Checkpoint file is available in sandbox
checkpoint_file = os.environ.get("CHECKPOINT_FILE")
if checkpoint_file:
    # Load model from checkpoint
    model = load_model(checkpoint_file)

See Checkpoints Guide for details.

Practices#

  1. Keep transforms deterministic for the same input row.

  2. Decide whether a bad row should skip, fail the task, or poison the whole op (max_failed_job_count).

  3. Size memory from peak RSS you observe in dev, not from guesses.

  4. Log row ids sparingly; high-volume logs hurt both dev and prod.

  5. Run dev mode on a slice of production schema before widening job_count in prod.

Common Patterns#

Row Transformation#

for line in sys.stdin:
    row = json.loads(line)
    output_row = {
        "id": row["id"],
        "transformed": transform(row["data"])
    }
    print(json.dumps(output_row), flush=True)

Row Filtering#

for line in sys.stdin:
    row = json.loads(line)
    if should_include(row):
        print(json.dumps(row), flush=True)

Row Enrichment#

for line in sys.stdin:
    row = json.loads(line)
    enriched = {
        **row,
        "computed_field": compute(row)
    }
    print(json.dumps(enriched), flush=True)

Troubleshooting#

Symptom

Checks

Op fails at start

Mapper entrypoint path, import errors in bundle, missing input table

Sparse row failures

max_failed_job_count, stderr in failing task

Slow wall clock

job_count too low for data size, CPU-bound Python, remote I/O inside mapper

OOM kills

Raise memory_limit_gb, shrink per-row allocations, stream instead of buffering

Next steps#