Source code for yt_framework.operations.command_ops.sort

"""Submit YT sort jobs using the same `(context, operation_config)` pattern as map."""

from __future__ import annotations

from typing import TYPE_CHECKING

from yt_framework.operations.common import extract_operation_resources
from yt_framework.utils.logging import log_header, log_success

if TYPE_CHECKING:
    from omegaconf import DictConfig

    from yt_framework.contracts import StageContext


def _sort_input_table(operation_config: DictConfig) -> str:
    raw = operation_config.get("input_table")
    return str(raw).strip() if isinstance(raw, str) and raw.strip() else ""


def _sort_columns(operation_config: DictConfig) -> list[str]:
    return [str(column) for column in operation_config.get("sort_by", [])]


def _require_sort_inputs(table_path: str, sort_by: list[str]) -> None:
    if not table_path:
        msg = (
            "operation_config must set input_table; "
            "expected at client.operations.sort.input_table"
        )
        raise ValueError(msg)
    if not sort_by:
        msg = (
            "operation_config must set sort_by; "
            "expected at client.operations.sort.sort_by"
        )
        raise ValueError(msg)


[docs] def run_sort( context: StageContext, operation_config: DictConfig, ) -> bool: """Run a YT sort operation and wait for completion. Args: context: Stage context (deps, logger, stage_dir, config). operation_config: Sort-specific config. Required keys: * ``input_table`` — table to sort in-place. * ``sort_by`` — list of column names. Optional keys mirror ``run_map_reduce``: * ``resources.pool`` / ``resources.pool_tree`` — scheduler pool. * ``resources.memory_limit_gb``, ``resources.cpu_limit`` — resource hints. Returns: True if the sort completed successfully. Example config (``client.operations.sort`` in stage ``config.yaml``):: sort: sort_by: [shard_order, mock_field] resources: pool: my_pool Then in the stage:: from yt_framework.operations.command_ops.sort import run_sort sort_cfg = OmegaConf.merge( self.config.client.operations.sort, {"input_table": intermediate_table}, ) run_sort(context=self.context, operation_config=sort_cfg) """ logger = context.logger table_path = _sort_input_table(operation_config) sort_by = _sort_columns(operation_config) _require_sort_inputs(table_path, sort_by) resources = extract_operation_resources(operation_config, logger) log_header(logger, "Sort Operation", f"Sorting {table_path} by {sort_by}") context.deps.yt_client.run_sort( table_path=table_path, sort_by=sort_by, pool=resources.pool or None, pool_tree=resources.pool_tree or None, ) log_success(logger, "Sort completed") return True