# Map operations
A map job reads an input table, runs your `mapper.py` once per row (grouped into YT tasks in prod), and writes JSON lines to an output table. Use it when the transform is row-local in Python and does not fit YQL cleanly.
## Overview
```{tip}
**When map fits**
Reach for map when each output row (or zero rows) depends only on one input row and you need arbitrary Python. Prefer YQL for set-style SQL operations.
```
**Behavior:**
- Stdin/stdout JSON lines (one object per line); flush after each printed row.
- Prod: many tasks; dev: a single local subprocess and a sandbox under `.dev/`.
- Requires `input_table`, `output_table`, and `resources` in YAML.
```{warning}
**Mapper Script Requirements**
Your mapper script must read from stdin and write to stdout. Each line is a JSON-encoded row. Make sure to flush output after each row.
```
## Quick Start
### Minimal Example
**Stage** (`stages/process_data/stage.py`):
```python
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.operations.command_ops.map import run_map
class ProcessDataStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
success = run_map(
context=self.context,
operation_config=self.config.client.operations.map,
)
if not success:
raise RuntimeError("Map operation failed")
return debug
```
**Stage config** (`stages/process_data/config.yaml`):
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
job_count: 2
```
**Mapper script** (`stages/process_data/src/mapper.py`):
```python
#!/usr/bin/env python3
import sys
import json
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
def main():
# Load configuration
config = OmegaConf.load(get_config_path())
# Process each input row
for line in sys.stdin:
row = json.loads(line)
# Transform row
output_row = {
"id": row["id"],
"processed_value": row["value"] * 2
}
# Write output row
print(json.dumps(output_row), flush=True)
if __name__ == "__main__":
main()
```
See [Example: 04_map_operation](https://github.com/GregoryKogan/yt-framework/tree/main/examples/04_map_operation/) for a complete example.
(append-output)=
## Append output
Use `append: true` under `client.operations.map` when mapper rows should be appended to an existing output table rather than replacing it.
On the cluster, the output table must already exist and incoming rows must match its schema (including typed columns). In dev mode, if the output `.jsonl` already exists, mapper stdout is appended after the current lines.
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
append: true
resources:
pool: default
```
## Mapper Script
The mapper script (`src/mapper.py`) is executed for each row of the input table.
### Input/Output Format
- **Input**: One JSON object per line via stdin
- **Output**: One JSON object per line to stdout
**Example:**
```python
import sys
import json
def process_row(row: dict) -> dict:
return row # replace with your transform
for line in sys.stdin:
row = json.loads(line)
processed = process_row(row)
print(json.dumps(processed), flush=True)
```
### Configuration Access
Access stage configuration in mapper script:
```python
from omegaconf import OmegaConf
from ytjobs.config import get_config_path
config = OmegaConf.load(get_config_path())
# Access job config
multiplier = config.job.multiplier
prefix = config.job.prefix
# Access client config (read-only)
input_table = config.client.operations.map.input_table
```
**Stage config** (`stages/my_stage/config.yaml`):
```yaml
job:
multiplier: 2
prefix: "processed_"
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
```
### Error Handling
Handle errors gracefully:
```python
import sys
import json
import traceback
for line in sys.stdin:
try:
row = json.loads(line)
output_row = process_row(row)
print(json.dumps(output_row), flush=True)
except Exception as e:
# Log error and skip row
print(f"Error processing row: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
continue
```
**Important:** Failed jobs will cause the operation to fail if `max_failed_job_count` is exceeded.
### Logging
Use YT logging utilities:
```python
import logging
from ytjobs.logging.logger import get_logger
logger = get_logger("mapper", level=logging.INFO)
for line in sys.stdin:
row = json.loads(line)
logger.info(f"Processing row {row['id']}")
# Process row...
```
Logs appear in YT operation logs (prod mode) or `.dev/` directory (dev mode).
## Configuration
### Basic Configuration
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
resources:
pool: default
memory_limit_gb: 4
cpu_limit: 2
job_count: 2
```
### Required Fields
- **`input_table`**: YT path to input table
- **`output_table`**: YT path to output table
- **`resources`**: Resource configuration
### Resource Configuration
```yaml
resources:
pool: default # YT pool name
pool_tree: null # Optional: pool tree
memory_limit_gb: 4 # Memory per job (GB)
cpu_limit: 2 # CPU cores per job (fractional, e.g. 0.5)
job_count: 2 # Number of parallel jobs
gpu_limit: 0 # GPU count (default: 0)
user_slots: null # Optional: user slots limit
```
**Resources (rule of thumb):**
- Raise `memory_limit_gb` when a single row plus model weights no longer fits.
- `cpu_limit` helps per-task throughput; fractional values (e.g. `0.5`, `0.1`) are passed through to YTsaurus; `job_count` spreads rows across tasks.
- `gpu_limit` > 0 only works with an image that actually exposes GPUs to Python.
### Advanced Configuration
**Max failed jobs:**
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
max_failed_job_count: 1 # Fail operation after N failed jobs
resources:
# ...
```
**Max row weight:**
`max_row_weight` defaults to `128M` for map operations (that is also the maximum the cluster accepts). Override it per operation with a value at or below `128M`:
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
max_row_weight: 64M
resources:
# ...
```
**Custom Docker:**
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
resources:
docker_image: my-registry/my-image:latest
# ...
```
See [Docker Guide](../advanced/docker.md) for details.
**Checkpoints:**
```yaml
client:
operations:
map:
input_table: //tmp/my_pipeline/input
output_table: //tmp/my_pipeline/output
checkpoint:
checkpoint_base: //tmp/my_pipeline/checkpoints
local_checkpoint_path: /path/to/local/model.pth
resources:
# ...
```
See [Checkpoints Guide](../advanced/checkpoints.md) for details.
## Running Map Operations
### From Stage
Use `run_map()` function:
```python
from yt_framework.operations.command_ops.map import run_map
success = run_map(
context=self.context,
operation_config=self.config.client.operations.map,
)
```
### Operation Flow
1. **Code upload**: Code is packaged and uploaded to YT (prod mode)
2. **Job creation**: YT creates multiple jobs based on `job_count`
3. **Row distribution**: Input table rows are distributed across jobs
4. **Execution**: Each job runs mapper.py for its assigned rows
5. **Output collection**: Results are written to output table
6. **Completion**: Operation completes when all jobs finish
### Dev Mode Behavior
In dev mode:
- Runs sequentially (single job)
- Creates sandbox directory: `.dev/sandbox_->