Source code for ytjobs.mapper.utils
"""Shared helpers for YT mapper scripts (rows, stdin parsing, errors)."""
import json
import sys
from collections.abc import Callable, Iterable, Iterator
from typing import Any
def parse_json_line(line: str) -> object | None:
"""Parse a JSON line and log errors to stderr if parsing fails.
Args:
line: JSON string to parse
Returns:
Parsed JSON object, or None if parsing failed
"""
try:
return json.loads(line)
except json.JSONDecodeError as e:
log_error({"error": f"Failed to parse row: {e!s}", "row": line})
return None
def log_error(error_dict: dict[str, Any]) -> None:
"""Log an error message as JSON to stderr.
Args:
error_dict: Dictionary containing error information
"""
sys.stderr.write(json.dumps(error_dict) + "\n")
sys.stderr.flush()
def process_and_write_results(
processing_func: Callable[..., Iterator[Any]],
data: object,
*,
redirect_output: bool = True,
**kwargs: object,
) -> None:
"""Execute a processing function and write results as they're yielded.
Streams results without loading them all into memory, which is critical
for jobs processing millions of rows.
Note: We manually manage stdout here rather than using redirect_stdout_to_stderr()
context manager because we need to toggle stdout for each result:
- Redirect stdout→stderr during processing (iterator yields)
- Restore stdout to write each result as JSON
- Re-redirect for next iteration
This toggling pattern can't be achieved with a single context manager.
Args:
processing_func: Function that returns an Iterator of results
data: Data to pass to processing_func
redirect_output: If True, redirect stdout to stderr during processing,
then restore it for writing each result
**kwargs: Additional keyword arguments to pass to processing_func
"""
if redirect_output:
# Save original stdout (same pattern as redirect_stdout_to_stderr)
original_stdout = sys.stdout
# Redirect stdout to stderr for processing
sys.stdout = sys.stderr
try:
# Iterate and process - any processing output goes to stderr
for result in processing_func(data, **kwargs):
# Temporarily restore stdout to write result
sys.stdout = original_stdout
sys.stdout.write(json.dumps(result) + "\n")
sys.stdout.flush()
# Re-redirect to stderr for next iteration
sys.stdout = sys.stderr
finally:
# Always restore stdout (matches redirect_stdout_to_stderr cleanup)
sys.stdout = original_stdout
else:
# No redirection - stream results directly
for result in processing_func(data, **kwargs):
sys.stdout.write(json.dumps(result) + "\n")
sys.stdout.flush()