"""Driver helpers to package `src/mapper.py` and submit YT map operations."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from omegaconf import DictConfig, OmegaConf
from yt_framework.operations._internal.dependency_strategy import (
DependencyBuildContext,
TarArchiveDependencyBuilder,
)
from yt_framework.operations.common import (
build_operation_environment,
collect_passthrough_kwargs,
docker_auth_from_op_config,
extract_max_failed_jobs,
extract_operation_resources,
extract_secure_env_client_kwargs,
)
from yt_framework.utils.logging import log_header, log_success
from yt_framework.yt.clients.operation_specs import (
MapSubmitSpec,
docker_auth_tuple,
env_pairs_tuple,
extras_tuple,
file_pairs_tuple,
)
if TYPE_CHECKING:
import logging
from pathlib import Path
from yt.wrapper.schema import TableSchema
from yt_framework.contracts import StageContext
from yt_framework.yt.clients.client_base import OperationResources
[docs]
@dataclass
class MapOperationData:
"""Data container for map operation configuration.
Attributes:
mapper_path: Path to mapper.py script in YT (or bash wrapper if tar mode).
dependencies: List of (yt_path, local_path) tuples for files to upload.
environment: Environment variables dictionary (secrets only).
docker_auth: Optional Docker authentication dictionary for private registries.
command: Optional command to execute (used in tar archive mode).
"""
mapper_path: str
dependencies: list[tuple[str, str]]
environment: dict[str, str]
docker_auth: dict[str, str] | None
command: str | None = None
def _prepare_map_operation(
pipeline_config: DictConfig,
operation_config: DictConfig,
stage_config: DictConfig,
stage_dir: Path,
logger: logging.Logger,
) -> MapOperationData:
"""Build tar-archive dependencies for a map operation.
Environment and docker_auth are intentionally left empty here; the caller
builds them via ``build_operation_environment`` and sets them on the returned
object after construction.
Args:
pipeline_config: Pipeline-level config (build_folder, etc.)
operation_config: Operation-specific config (from client.operations.map)
stage_config: Full stage config (for accessing job section)
stage_dir: Path to stage directory
logger: Logger instance
Returns:
MapOperationData with dependencies and command populated.
"""
builder = TarArchiveDependencyBuilder()
dep = builder.build_dependencies(
DependencyBuildContext(
operation_type="map",
stage_dir=stage_dir,
archive_name="source.tar.gz",
build_folder=pipeline_config.pipeline.build_folder,
operation_config=operation_config,
stage_config=stage_config,
logger=logger,
),
)
return MapOperationData(
mapper_path=dep.script_path,
dependencies=dep.dependencies,
environment={},
docker_auth=None,
command=dep.command,
)
def _require_map_tables(operation_config: DictConfig) -> None:
if not operation_config.get("input_table"):
msg = (
"No input_table in operation_config; "
"expected at client.operations.map.input_table"
)
raise ValueError(msg)
if not operation_config.get("output_table"):
msg = (
"No output_table in operation_config; "
"expected at client.operations.map.output_table"
)
raise ValueError(msg)
def _assert_exclusive_mapper_job_leg(
mapper: object | None,
job: object | None,
) -> None:
if mapper is not None and job is not None and mapper != job:
msg = "Both 'mapper' and 'job' are set with different values; use only one"
raise ValueError(msg)
def _pick_mapper_leg(
mapper: object | None,
job: object | None,
map_operation_data: MapOperationData,
) -> object:
mapper_leg = job if job is not None else mapper
if mapper_leg is None:
mapper_leg = map_operation_data.command
if mapper_leg is None:
msg = "Command not provided by dependency builder"
raise ValueError(msg)
return mapper_leg
def _resolve_map_mapper_leg(
mapper: object | None,
job: object | None,
map_operation_data: MapOperationData,
) -> object:
_assert_exclusive_mapper_job_leg(mapper, job)
return _pick_mapper_leg(mapper, job, map_operation_data)
def _map_operation_description_kwargs(
operation_config: DictConfig,
logger: logging.Logger,
) -> dict[str, object]:
map_kwargs: dict[str, object] = {}
od = operation_config.get("operation_description")
if not od:
return map_kwargs
if isinstance(od, str):
logger.info("Operation label: %s", od)
map_kwargs["title"] = od
return map_kwargs
map_kwargs["operation_description"] = OmegaConf.to_container(od, resolve=True)
return map_kwargs
[docs]
def run_map(
context: StageContext,
operation_config: DictConfig,
output_schema: TableSchema | None = None,
mapper: object | None = None,
job: object | None = None,
) -> bool:
"""Run YT map operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically
extracted from operation_config. Operation config should be passed from
stage.config.operations.map.
Args:
context: Stage context (provides deps, logger, stage_dir)
operation_config: Operation-specific config (from client.operations.map).
Optional key ``append`` (bool): append mapper output to an existing output table.
output_schema: Optional YT TableSchema for typed output table
mapper: Optional mapper leg (legacy name). When omitted, framework uses command wrapper.
Can be a TypedJob instance or command string.
job: Preferred mapper leg alias. Can be a TypedJob instance or command string.
Returns:
True if successful, False otherwise
"""
logger = context.logger
_require_map_tables(operation_config)
log_header(
logger,
"Map Operation",
f"Input: {operation_config.input_table} | Output: {operation_config.output_table}",
)
env = build_operation_environment(
context=context,
operation_config=operation_config,
logger=logger,
include_stage_name=True,
include_tokenizer_artifact=True,
)
map_operation_data = _prepare_map_operation(
pipeline_config=context.deps.pipeline_config,
operation_config=operation_config,
stage_config=context.config,
stage_dir=context.stage_dir,
logger=logger,
)
map_operation_data.environment = env
map_operation_data.docker_auth = docker_auth_from_op_config(
operation_config,
env,
)
logger.debug("Dependencies: %s files", len(map_operation_data.dependencies))
mapper_leg = _resolve_map_mapper_leg(mapper, job, map_operation_data)
logger.debug("Extracting operation resources from config")
resources: OperationResources = extract_operation_resources(
operation_config,
logger,
)
max_failed_jobs = extract_max_failed_jobs(operation_config, logger)
append_output = bool(operation_config.get("append", False))
map_kwargs = _map_operation_description_kwargs(operation_config, logger)
reserved_keys = {
"input_table",
"output_table",
"resources",
"env",
"max_failed_job_count",
"file_paths",
"checkpoint",
"tokenizer_artifact",
"tar_command_bootstrap",
"operation_description",
"append",
"environment_public_keys",
"use_plain_environment_for_secrets",
}
map_kwargs.update(collect_passthrough_kwargs(operation_config, reserved_keys))
merged_extras: dict[str, object] = {
**extract_secure_env_client_kwargs(operation_config),
**map_kwargs,
}
operation = context.deps.yt_client.run_map_submit(
MapSubmitSpec(
command=mapper_leg,
input_table=str(operation_config.input_table),
output_table=str(operation_config.output_table),
files=file_pairs_tuple(map_operation_data.dependencies),
resources=resources,
env=env_pairs_tuple(map_operation_data.environment),
output_schema=output_schema,
max_failed_jobs=max_failed_jobs,
docker_auth=docker_auth_tuple(map_operation_data.docker_auth),
append=append_output,
extras=extras_tuple(merged_extras),
),
)
# Wait for completion
success = context.deps.yt_client.wait_for_operation(operation)
if success:
log_success(logger, "Map operation completed successfully")
else:
logger.error("Map operation failed")
return success