Source code for ytjobs.mapper.utils

"""Shared helpers for YT mapper scripts (rows, stdin parsing, errors)."""

import json
import sys
from collections.abc import Callable, Iterable, Iterator
from typing import Any


[docs] def read_input_rows() -> Iterable[object]: """Read and parse input rows from stdin. Reads JSON lines from stdin, parses them, and creates Row objects. Skips empty lines and logs parsing errors to stderr. Returns: Iterable of objects Example: for row in read_input_rows(): print(row.bucket, row.path) """ for raw_line in sys.stdin: line = raw_line.strip() if not line: continue try: row_data = json.loads(line) yield row_data except (TypeError, UnicodeDecodeError, ValueError, json.JSONDecodeError) as e: error_msg = {"error": f"Failed to parse row: {e!s}", "row": line} sys.stderr.write(json.dumps(error_msg) + "\n")
def parse_json_line(line: str) -> object | None: """Parse a JSON line and log errors to stderr if parsing fails. Args: line: JSON string to parse Returns: Parsed JSON object, or None if parsing failed """ try: return json.loads(line) except json.JSONDecodeError as e: log_error({"error": f"Failed to parse row: {e!s}", "row": line}) return None def log_error(error_dict: dict[str, Any]) -> None: """Log an error message as JSON to stderr. Args: error_dict: Dictionary containing error information """ sys.stderr.write(json.dumps(error_dict) + "\n") sys.stderr.flush() def process_and_write_results( processing_func: Callable[..., Iterator[Any]], data: object, *, redirect_output: bool = True, **kwargs: object, ) -> None: """Execute a processing function and write results as they're yielded. Streams results without loading them all into memory, which is critical for jobs processing millions of rows. Note: We manually manage stdout here rather than using redirect_stdout_to_stderr() context manager because we need to toggle stdout for each result: - Redirect stdout→stderr during processing (iterator yields) - Restore stdout to write each result as JSON - Re-redirect for next iteration This toggling pattern can't be achieved with a single context manager. Args: processing_func: Function that returns an Iterator of results data: Data to pass to processing_func redirect_output: If True, redirect stdout to stderr during processing, then restore it for writing each result **kwargs: Additional keyword arguments to pass to processing_func """ if redirect_output: # Save original stdout (same pattern as redirect_stdout_to_stderr) original_stdout = sys.stdout # Redirect stdout to stderr for processing sys.stdout = sys.stderr try: # Iterate and process - any processing output goes to stderr for result in processing_func(data, **kwargs): # Temporarily restore stdout to write result sys.stdout = original_stdout sys.stdout.write(json.dumps(result) + "\n") sys.stdout.flush() # Re-redirect to stderr for next iteration sys.stdout = sys.stderr finally: # Always restore stdout (matches redirect_stdout_to_stderr cleanup) sys.stdout = original_stdout else: # No redirection - stream results directly for result in processing_func(data, **kwargs): sys.stdout.write(json.dumps(result) + "\n") sys.stdout.flush()