"""
Mapper Base Classes
===================
Reusable mapper classes that handle stdin/stdout boilerplate.
Allows users to focus on processing logic.
"""
import sys
from typing import Callable, Iterator, Optional, List, Any
from .utils import parse_json_line, log_error, process_and_write_results
[docs]
class StreamMapper:
"""
Mapper that processes stdin one line at a time.
Reads JSON lines from stdin, processes each individually,
and writes results to stdout. Define ``processing_func(row)`` that yields
result dicts, then call ``StreamMapper().map(processing_func)``.
"""
[docs]
def map(
self,
processing_func: Callable[[Any], Iterator[Any]],
redirect_processing_output: bool = True,
**kwargs: Any,
) -> None:
"""
Read stdin line-by-line, process each, write results to stdout.
Args:
processing_func: Function that takes a row dict and returns Iterator of results
redirect_processing_output: If True, redirect stdout to stderr during processing
**kwargs: Additional keyword arguments to pass to processing_func
"""
for line in sys.stdin:
line = line.strip()
if not line:
continue
# Parse input row
row_data = parse_json_line(line)
if row_data is None:
continue
# Process row and write results as they're yielded
try:
process_and_write_results(
processing_func, row_data, redirect_processing_output, **kwargs
)
except Exception as e:
log_error({"error": f"Processing failed: {str(e)}", "row": line})
raise
[docs]
class BatchMapper:
"""
Mapper that processes stdin in batches.
Reads JSON lines from stdin in batches, processes each batch,
and writes results to stdout. Pass ``batch_size`` (or ``None`` to buffer all
stdin), define ``processing_func(rows)`` that yields result dicts, then
call ``mapper.map(processing_func)``.
"""
[docs]
def __init__(self, batch_size: Optional[int] = None):
"""
Initialize batch mapper.
Args:
batch_size: Number of rows per batch, or None to process all rows at once
"""
self.batch_size = batch_size
[docs]
def map(
self,
processing_func: Callable[..., Iterator[Any]],
redirect_processing_output: bool = True,
**kwargs: Any,
) -> None:
"""
Read stdin in batches, process each batch, write results to stdout.
Args:
processing_func: Function that takes a list of rows (and optional kwargs) and returns Iterator of results
redirect_processing_output: If True, redirect stdout to stderr during processing
**kwargs: Additional keyword arguments to pass to processing_func
"""
if self.batch_size is None:
self._process_all_rows(
processing_func, redirect_processing_output, **kwargs
)
else:
self._process_in_batches(
processing_func, redirect_processing_output, **kwargs
)
def _process_all_rows(
self,
processing_func: Callable[..., Iterator[Any]],
redirect_processing_output: bool,
**kwargs: Any,
) -> None:
"""Process all rows from stdin at once."""
rows = self._read_all_rows()
if rows:
try:
process_and_write_results(
processing_func, rows, redirect_processing_output, **kwargs
)
except Exception as e:
log_error(
{
"error": f"Batch processing failed: {str(e)}",
"total_rows": len(rows),
}
)
raise
def _process_in_batches(
self,
processing_func: Callable[..., Iterator[Any]],
redirect_processing_output: bool,
**kwargs: Any,
) -> None:
"""Process rows from stdin in batches."""
if self.batch_size is None:
raise ValueError("Batch size must be set")
batch = []
batch_count = 0
for line in sys.stdin:
line = line.strip()
if not line:
continue
row_data = parse_json_line(line)
if row_data is None:
continue
batch.append(row_data)
# Process batch when it reaches batch_size
if len(batch) >= self.batch_size:
self._process_batch(
batch,
batch_count,
processing_func,
redirect_processing_output,
**kwargs,
)
batch = []
batch_count += 1
# Process remaining rows
if batch:
self._process_batch(
batch,
batch_count,
processing_func,
redirect_processing_output,
**kwargs,
)
def _read_all_rows(self) -> List[Any]:
"""Read all rows from stdin."""
rows = []
for line in sys.stdin:
line = line.strip()
if not line:
continue
row_data = parse_json_line(line)
if row_data is not None:
rows.append(row_data)
return rows
def _process_batch(
self,
batch: List[Any],
batch_number: int,
processing_func: Callable[..., Iterator[Any]],
redirect_processing_output: bool,
**kwargs: Any,
) -> None:
"""Process a single batch."""
try:
process_and_write_results(
processing_func, batch, redirect_processing_output, **kwargs
)
except Exception as e:
log_error(
{
"error": f"Batch {batch_number} processing failed: {str(e)}",
"batch_size": len(batch),
"batch_number": batch_number,
}
)
raise