Source code for yt_framework.operations.map_reduce

"""Submit map-reduce and reduce-only YT operations (TypedJob or command strings).

Builds archives plus optional file dependencies; cluster credentials still come
from `configs/secrets.env` like other operations.
"""

import warnings
from typing import Any, Optional, TYPE_CHECKING

from omegaconf import DictConfig, OmegaConf

from yt_framework.utils.logging import log_header, log_success
from .dependency_strategy import TarArchiveDependencyBuilder
from .job_command import require_consistent_map_reduce_legs
from .common import (
    extract_operation_resources,
    build_operation_environment,
    extract_docker_auth_from_operation_config,
    extract_max_failed_jobs,
    collect_passthrough_kwargs,
    extract_secure_env_client_kwargs,
)

if TYPE_CHECKING:
    from yt_framework.core.stage import StageContext


[docs] def run_map_reduce( context: "StageContext", operation_config: DictConfig, mapper: Any = None, reducer: Any = None, output_schema: Optional[Any] = None, map_job: Any = None, reduce_job: Any = None, ) -> bool: """ Run a YT map-reduce operation and wait for completion. Pass mapper and reducer either both as ``TypedJob`` instances or both as command strings (JSON stdin/stdout). Mixing kinds raises ``ValueError``. Set ``operation_config.tar_command_bootstrap: true`` to wrap string legs with the same ``tar -xzf source.tar.gz`` + wrapper pattern as map operations (requires matching wrappers in the uploaded tarball; see docs). Args: context: Stage context (deps, logger, stage_dir, config). operation_config: client.operations.map_reduce config (input_table, output_table, reduce_by, sort_by, resources, file_paths, etc.). mapper: *Deprecated* — use ``map_job`` instead. reducer: *Deprecated* — use ``reduce_job`` instead. output_schema: Optional YT TableSchema for output table. map_job: Mapper leg (``TypedJob`` instance or command string). reduce_job: Reducer leg (``TypedJob`` instance or command string). Returns: True if the operation completed successfully. """ if mapper is not None and map_job is None: warnings.warn( "'mapper=' is deprecated; use 'map_job=' instead", DeprecationWarning, stacklevel=2, ) if reducer is not None and reduce_job is None: warnings.warn( "'reducer=' is deprecated; use 'reduce_job=' instead", DeprecationWarning, stacklevel=2, ) logger = context.logger log_header( logger, "Map-Reduce Operation", f"Input: {operation_config.get('input_table')} -> Output: {operation_config.get('output_table')}", ) input_table = operation_config.get("input_table") output_table = operation_config.get("output_table") reduce_by = list(operation_config.get("reduce_by") or []) if not input_table or not output_table or not reduce_by: raise ValueError( "operation_config must set input_table, output_table, and reduce_by; " "expected at client.operations.map_reduce.{input_table,output_table,reduce_by}" ) env = build_operation_environment( context=context, operation_config=operation_config, logger=logger, include_stage_name=True, include_tokenizer_artifact=True, ) resources = extract_operation_resources(operation_config, logger) if mapper is not None and map_job is not None and mapper != map_job: raise ValueError( "Both 'mapper' and 'map_job' are set with different values; use only one" ) if reducer is not None and reduce_job is not None and reducer != reduce_job: raise ValueError( "Both 'reducer' and 'reduce_job' are set with different values; use only one" ) mapper = map_job if map_job is not None else mapper reducer = reduce_job if reduce_job is not None else reducer require_consistent_map_reduce_legs(mapper, reducer) builder = TarArchiveDependencyBuilder() dep = builder.build_dependencies( operation_type="map_reduce", stage_dir=context.stage_dir, archive_name="source.tar.gz", build_folder=context.deps.pipeline_config.pipeline.build_folder, operation_config=operation_config, stage_config=context.config, logger=logger, mapper=mapper, reducer=reducer, ) dependencies = dep.dependencies if dep.mapper_command is not None and dep.reducer_command is not None: mapper = dep.mapper_command reducer = dep.reducer_command logger.info( "Using tar bootstrap commands for map-reduce mapper and reducer legs" ) elif dep.mapper_command is not None or dep.reducer_command is not None: raise RuntimeError( "Internal error: partial map-reduce tar bootstrap (only one leg set); " "expected both or neither." ) docker_auth = extract_docker_auth_from_operation_config(operation_config, env) sort_by = list(operation_config.get("sort_by") or []) max_failed_jobs = extract_max_failed_jobs(operation_config, logger) spec_kwargs: dict = {} if operation_config.get("map_job_count") is not None: spec_kwargs["map_job_count"] = operation_config.map_job_count od = operation_config.get("operation_description") if od: if isinstance(od, str): logger.info(f"Operation label: {od}") spec_kwargs["title"] = od else: spec_kwargs["operation_description"] = OmegaConf.to_container( od, resolve=True ) passthrough = collect_passthrough_kwargs( operation_config, reserved_keys={ "input_table", "output_table", "reduce_by", "sort_by", "resources", "env", "max_failed_job_count", "file_paths", "checkpoint", "tokenizer_artifact", "tar_command_bootstrap", "map_job_count", "operation_description", # Legacy custom IO options are intentionally no longer consumed here. "typed_reduce_row_iterator_io", "reduce_job_io", "map_job_io", "environment_public_keys", "use_plain_environment_for_secrets", }, ) spec_kwargs.update(passthrough) operation = context.deps.yt_client.run_map_reduce( mapper=mapper, reducer=reducer, input_table=input_table, output_table=output_table, reduce_by=reduce_by, files=dependencies, resources=resources, env=env, sort_by=sort_by if sort_by else None, output_schema=output_schema, max_failed_jobs=max_failed_jobs, docker_auth=docker_auth, **extract_secure_env_client_kwargs(operation_config), **spec_kwargs, ) success = context.deps.yt_client.wait_for_operation(operation) if success: log_success(logger, "Map-reduce operation completed successfully") else: logger.error("Map-reduce operation failed") return success
[docs] def run_reduce( context: "StageContext", operation_config: DictConfig, reducer: Any = None, output_schema: Optional[Any] = None, job: Any = None, ) -> bool: """ Run a YT reduce-only operation and wait for completion. Pass ``reducer`` as a ``TypedJob`` or a command string. With ``operation_config.tar_command_bootstrap: true``, string reducers get the same tar extract + wrapper bootstrap as map (see docs). Args: context: Stage context. operation_config: client.operations.reduce config. reducer: Reducer leg (legacy name). output_schema: Optional output table schema. job: Preferred reducer leg alias. Returns: True if the operation completed successfully. """ logger = context.logger log_header( logger, "Reduce Operation", f"Input: {operation_config.get('input_table')} -> Output: {operation_config.get('output_table')}", ) input_table = operation_config.get("input_table") output_table = operation_config.get("output_table") reduce_by = list(operation_config.get("reduce_by") or []) if not input_table or not output_table or not reduce_by: raise ValueError( "operation_config must set input_table, output_table, and reduce_by; " "expected at client.operations.reduce.{input_table,output_table,reduce_by}" ) env = build_operation_environment( context=context, operation_config=operation_config, logger=logger, include_stage_name=True, include_tokenizer_artifact=True, ) resources = extract_operation_resources(operation_config, logger) if reducer is not None and job is not None and reducer != job: raise ValueError( "Both 'reducer' and 'job' are set with different values; use only one" ) reducer = job if job is not None else reducer builder = TarArchiveDependencyBuilder() dep = builder.build_dependencies( operation_type="reduce", stage_dir=context.stage_dir, archive_name="source.tar.gz", build_folder=context.deps.pipeline_config.pipeline.build_folder, operation_config=operation_config, stage_config=context.config, logger=logger, reducer=reducer, ) dependencies = dep.dependencies if dep.reducer_command is not None: reducer = dep.reducer_command logger.info("Using tar bootstrap command for reduce leg") docker_auth = extract_docker_auth_from_operation_config(operation_config, env) max_failed_jobs = extract_max_failed_jobs(operation_config, logger) reduce_kw: dict = {} rod = operation_config.get("operation_description") if rod: if isinstance(rod, str): logger.info(f"Operation label: {rod}") reduce_kw["title"] = rod else: reduce_kw["operation_description"] = OmegaConf.to_container( rod, resolve=True ) passthrough = collect_passthrough_kwargs( operation_config, reserved_keys={ "input_table", "output_table", "reduce_by", "resources", "env", "max_failed_job_count", "file_paths", "checkpoint", "tokenizer_artifact", "tar_command_bootstrap", "operation_description", # Legacy custom IO option is intentionally no longer consumed here. "job_io", "environment_public_keys", "use_plain_environment_for_secrets", }, ) reduce_kw.update(passthrough) operation = context.deps.yt_client.run_reduce( reducer=reducer, input_table=input_table, output_table=output_table, reduce_by=reduce_by, files=dependencies, resources=resources, env=env, output_schema=output_schema, max_failed_jobs=max_failed_jobs, docker_auth=docker_auth, **extract_secure_env_client_kwargs(operation_config), **reduce_kw, ) success = context.deps.yt_client.wait_for_operation(operation) if success: log_success(logger, "Reduce operation completed successfully") else: logger.error("Reduce operation failed") return success