"""Driver helpers to package `src/mapper.py` and submit YT map operations."""
import logging
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional, TYPE_CHECKING, Any
from yt.wrapper.schema import TableSchema # pyright: ignore[reportMissingImports]
from omegaconf import DictConfig
from yt_framework.utils.logging import log_header, log_success
from yt_framework.yt.client_base import OperationResources
from .dependency_strategy import TarArchiveDependencyBuilder
from .common import (
extract_operation_resources,
build_operation_environment,
extract_docker_auth_from_operation_config,
extract_max_failed_jobs,
collect_passthrough_kwargs,
extract_secure_env_client_kwargs,
)
if TYPE_CHECKING:
from yt_framework.core.stage import StageContext
[docs]
@dataclass
class MapOperationData:
"""Data container for map operation configuration.
Attributes:
mapper_path: Path to mapper.py script in YT (or bash wrapper if tar mode).
dependencies: List of (yt_path, local_path) tuples for files to upload.
environment: Environment variables dictionary (secrets only).
docker_auth: Optional Docker authentication dictionary for private registries.
command: Optional command to execute (used in tar archive mode).
"""
mapper_path: str
dependencies: List[Tuple[str, str]]
environment: Dict[str, str]
docker_auth: Optional[Dict[str, str]]
command: Optional[str] = None
def _prepare_map_operation(
pipeline_config: DictConfig,
operation_config: DictConfig,
stage_config: DictConfig,
stage_dir: Path,
logger: logging.Logger,
) -> MapOperationData:
"""
Build tar-archive dependencies for a map operation.
Environment and docker_auth are intentionally left empty here; the caller
builds them via ``build_operation_environment`` and sets them on the returned
object after construction.
Args:
pipeline_config: Pipeline-level config (build_folder, etc.)
operation_config: Operation-specific config (from client.operations.map)
stage_config: Full stage config (for accessing job section)
stage_dir: Path to stage directory
logger: Logger instance
Returns:
MapOperationData with dependencies and command populated.
"""
builder = TarArchiveDependencyBuilder()
dep = builder.build_dependencies(
operation_type="map",
stage_dir=stage_dir,
archive_name="source.tar.gz",
build_folder=pipeline_config.pipeline.build_folder,
operation_config=operation_config,
stage_config=stage_config,
logger=logger,
)
return MapOperationData(
mapper_path=dep.script_path,
dependencies=dep.dependencies,
environment={},
docker_auth=None,
command=dep.command,
)
[docs]
def run_map(
context: "StageContext",
operation_config: DictConfig,
output_schema: Optional[TableSchema] = None,
mapper: Optional[Any] = None,
job: Optional[Any] = None,
) -> bool:
"""
Run YT map operation and wait for completion.
All job parameters (pool, memory, CPU, Docker image, etc.) are automatically
extracted from operation_config. Operation config should be passed from
stage.config.operations.map.
Args:
context: Stage context (provides deps, logger, stage_dir)
operation_config: Operation-specific config (from client.operations.map).
Optional key ``append`` (bool): append mapper output to an existing output table.
output_schema: Optional YT TableSchema for typed output table
mapper: Optional mapper leg (legacy name). When omitted, framework uses command wrapper.
Can be a TypedJob instance or command string.
job: Preferred mapper leg alias. Can be a TypedJob instance or command string.
Returns:
True if successful, False otherwise
"""
logger = context.logger
if not operation_config.get("input_table"):
raise ValueError(
"No input_table in operation_config; "
"expected at client.operations.map.input_table"
)
if not operation_config.get("output_table"):
raise ValueError(
"No output_table in operation_config; "
"expected at client.operations.map.output_table"
)
log_header(
logger,
"Map Operation",
f"Input: {operation_config.input_table} | Output: {operation_config.output_table}",
)
env = build_operation_environment(
context=context,
operation_config=operation_config,
logger=logger,
include_stage_name=True,
include_tokenizer_artifact=True,
)
map_operation_data = _prepare_map_operation(
pipeline_config=context.deps.pipeline_config,
operation_config=operation_config,
stage_config=context.config,
stage_dir=context.stage_dir,
logger=logger,
)
map_operation_data.environment = env
map_operation_data.docker_auth = extract_docker_auth_from_operation_config(
operation_config, env
)
logger.debug(f"Dependencies: {len(map_operation_data.dependencies)} files")
if mapper is not None and job is not None and mapper != job:
raise ValueError(
"Both 'mapper' and 'job' are set with different values; use only one"
)
mapper_leg = job if job is not None else mapper
if mapper_leg is None:
mapper_leg = map_operation_data.command
if mapper_leg is None:
raise ValueError("Command not provided by dependency builder")
logger.debug("Extracting operation resources from config")
resources: OperationResources = extract_operation_resources(
operation_config, logger
)
max_failed_jobs = extract_max_failed_jobs(operation_config, logger)
append_output = bool(operation_config.get("append", False))
map_kwargs: dict = {}
od = operation_config.get("operation_description")
if od:
if isinstance(od, str):
logger.info(f"Operation label: {od}")
map_kwargs["title"] = od
else:
from omegaconf import OmegaConf as _OmegaConf
map_kwargs["operation_description"] = _OmegaConf.to_container(
od, resolve=True
)
reserved_keys = {
"input_table",
"output_table",
"resources",
"env",
"max_failed_job_count",
"file_paths",
"checkpoint",
"tokenizer_artifact",
"tar_command_bootstrap",
"operation_description",
"append",
"environment_public_keys",
"use_plain_environment_for_secrets",
}
map_kwargs.update(collect_passthrough_kwargs(operation_config, reserved_keys))
operation = context.deps.yt_client.run_map(
command=mapper_leg,
input_table=operation_config.input_table,
output_table=operation_config.output_table,
files=map_operation_data.dependencies,
resources=resources,
env=map_operation_data.environment,
output_schema=output_schema,
max_failed_jobs=max_failed_jobs,
docker_auth=map_operation_data.docker_auth,
append=append_output,
**extract_secure_env_client_kwargs(operation_config),
**map_kwargs,
)
# Wait for completion
success = context.deps.yt_client.wait_for_operation(operation)
if success:
log_success(logger, "Map operation completed successfully")
else:
logger.error("Map operation failed")
return success