Source code for yt_framework.operations.map

"""
High-level orchestration for YT map operations.

This module provides functions for running map operations on YTsaurus clusters.
"""

import logging
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional

from yt.wrapper.schema import TableSchema  # pyright: ignore[reportMissingImports]
from omegaconf import DictConfig

from yt_framework.utils.logging import log_header, log_success
from yt_framework.core.stage import StageContext
from yt_framework.yt.client_base import OperationResources
from .dependency_strategy import TarArchiveDependencyBuilder
from .common import (
    build_environment,
    prepare_docker_auth,
    _get_config_value_with_default,
)


[docs] @dataclass class MapOperationData: """Data container for map operation configuration. Attributes: mapper_path: Path to mapper.py script in YT (or bash wrapper if tar mode). dependencies: List of (yt_path, local_path) tuples for files to upload. environment: Environment variables dictionary (secrets only). docker_auth: Optional Docker authentication dictionary for private registries. command: Optional command to execute (used in tar archive mode). """ mapper_path: str dependencies: List[Tuple[str, str]] environment: Dict[str, str] docker_auth: Optional[Dict[str, str]] command: Optional[str] = None
def _prepare_map_operation( pipeline_config: DictConfig, operation_config: DictConfig, stage_config: DictConfig, stage_dir: Path, configs_dir: Path, logger: logging.Logger, ) -> MapOperationData: """ Prepare everything needed for a map operation (private function). Automatically handles: - Secrets-only environment building - Dependency file list preparation - Docker authentication preparation Args: pipeline_config: Pipeline-level config (for secrets) operation_config: Operation-specific config (from client.operations.map) stage_config: Full stage config (for accessing job section) stage_dir: Path to stage directory configs_dir: Directory containing secrets.env logger: Logger instance Returns: MapOperationData instance containing: - mapper_path: Path to mapper.py in YT (or bash wrapper if tar mode) - dependencies: List of (yt_path, local_path) tuples - environment: Environment variables (secrets only) - docker_auth: Docker auth dict or None - command: Optional command to execute (for tar mode) """ environment = build_environment(configs_dir=configs_dir, logger=logger) # Use strategy pattern to build dependencies # Pass both operation_config (for checkpoint) and stage_config (for job.model_name) builder = TarArchiveDependencyBuilder() mapper_path, dependencies, command = builder.build_dependencies( operation_type="map", stage_dir=stage_dir, build_folder=pipeline_config.pipeline.build_folder, operation_config=operation_config, stage_config=stage_config, logger=logger, ) # Get Docker auth credentials from loaded secrets # Support both resources.docker_image and direct docker_image for flexibility docker_image = None if "resources" in operation_config and operation_config.resources.get( "docker_image" ): docker_image = operation_config.resources.docker_image elif operation_config.get("docker_image"): docker_image = operation_config.docker_image docker_auth = prepare_docker_auth( docker_image=docker_image, docker_username=environment.get("DOCKER_AUTH_USERNAME"), docker_password=environment.get("DOCKER_AUTH_PASSWORD"), ) return MapOperationData( mapper_path=mapper_path, dependencies=dependencies, environment=environment, docker_auth=docker_auth, command=command, )
[docs] def run_map( context: StageContext, operation_config: DictConfig, output_schema: Optional[TableSchema] = None, ) -> bool: """ Run YT map operation and wait for completion. All job parameters (pool, memory, CPU, Docker image, etc.) are automatically extracted from operation_config. Operation config should be passed from stage.config.operations.map. Args: context: Stage context (provides deps, logger, stage_dir) operation_config: Operation-specific config (from client.operations.map) output_schema: Optional YT TableSchema for typed output table Returns: True if successful, False otherwise """ logger = context.logger log_header( logger, "Map Operation", f"Input: {operation_config.input_table} | Output: {operation_config.output_table}", ) if not operation_config.get("input_table"): raise ValueError("No input_table configured in operation config") if not operation_config.get("output_table"): raise ValueError("No output_table configured in operation config") # Prepare operation data automatically map_operation_data = _prepare_map_operation( pipeline_config=context.deps.pipeline_config, operation_config=operation_config, stage_config=context.config, stage_dir=context.stage_dir, configs_dir=context.deps.configs_dir, logger=logger, ) logger.debug(f"Dependencies: {len(map_operation_data.dependencies)} files") # Command is always provided by the dependency builder (tar archive mode) if not map_operation_data.command: raise ValueError("Command not provided by dependency builder") command = map_operation_data.command # Extract job parameters from operation_config.resources (or top-level as fallback) # Use defaults when values are not specified in config, logging when defaults are used resources_config = operation_config.get("resources", {}) if not resources_config: # Fallback to top-level config if resources section doesn't exist resources_config = operation_config logger.debug("Extracting operation resources from config") pool = _get_config_value_with_default(resources_config, "pool", "default", logger) pool_tree = _get_config_value_with_default( resources_config, "pool_tree", None, logger ) docker_image = _get_config_value_with_default( resources_config, "docker_image", None, logger ) memory_gb = _get_config_value_with_default( resources_config, "memory_limit_gb", 4, logger ) cpu_limit = _get_config_value_with_default(resources_config, "cpu_limit", 2, logger) gpu_limit = _get_config_value_with_default(resources_config, "gpu_limit", 0, logger) job_count = _get_config_value_with_default(resources_config, "job_count", 1, logger) user_slots = _get_config_value_with_default( resources_config, "user_slots", None, logger ) max_failed_jobs = _get_config_value_with_default( operation_config, "max_failed_job_count", 1, logger ) resources = OperationResources( pool=pool, pool_tree=pool_tree, docker_image=docker_image, memory_gb=memory_gb, cpu_limit=cpu_limit, gpu_limit=gpu_limit, job_count=job_count, user_slots=user_slots, ) operation = context.deps.yt_client.run_map( command=command, input_table=operation_config.input_table, output_table=operation_config.output_table, files=map_operation_data.dependencies, resources=resources, env=map_operation_data.environment, output_schema=output_schema, max_failed_jobs=max_failed_jobs, docker_auth=map_operation_data.docker_auth, ) # Wait for completion success = context.deps.yt_client.wait_for_operation(operation) if success: log_success(logger, "Map operation completed successfully") else: logger.error("Map operation failed") return success