Source code for yt_framework.operations.command_ops.sort
"""Submit YT sort jobs using the same `(context, operation_config)` pattern as map."""
from __future__ import annotations
from typing import TYPE_CHECKING
from yt_framework.operations.common import extract_operation_resources
from yt_framework.utils.logging import log_header, log_success
if TYPE_CHECKING:
from omegaconf import DictConfig
from yt_framework.contracts import StageContext
def _sort_input_table(operation_config: DictConfig) -> str:
raw = operation_config.get("input_table")
return str(raw).strip() if isinstance(raw, str) and raw.strip() else ""
def _sort_columns(operation_config: DictConfig) -> list[str]:
return [str(column) for column in operation_config.get("sort_by", [])]
def _require_sort_inputs(table_path: str, sort_by: list[str]) -> None:
if not table_path:
msg = (
"operation_config must set input_table; "
"expected at client.operations.sort.input_table"
)
raise ValueError(msg)
if not sort_by:
msg = (
"operation_config must set sort_by; "
"expected at client.operations.sort.sort_by"
)
raise ValueError(msg)
[docs]
def run_sort(
context: StageContext,
operation_config: DictConfig,
) -> bool:
"""Run a YT sort operation and wait for completion.
Args:
context: Stage context (deps, logger, stage_dir, config).
operation_config: Sort-specific config. Required keys:
* ``input_table`` — table to sort in-place.
* ``sort_by`` — list of column names.
Optional keys mirror ``run_map_reduce``:
* ``resources.pool`` / ``resources.pool_tree`` — scheduler pool.
* ``resources.memory_limit_gb``, ``resources.cpu_limit`` — resource hints.
Returns:
True if the sort completed successfully.
Example config (``client.operations.sort`` in stage ``config.yaml``)::
sort:
sort_by: [shard_order, mock_field]
resources:
pool: my_pool
Then in the stage::
from yt_framework.operations.command_ops.sort import run_sort
sort_cfg = OmegaConf.merge(
self.config.client.operations.sort,
{"input_table": intermediate_table},
)
run_sort(context=self.context, operation_config=sort_cfg)
"""
logger = context.logger
table_path = _sort_input_table(operation_config)
sort_by = _sort_columns(operation_config)
_require_sort_inputs(table_path, sort_by)
resources = extract_operation_resources(operation_config, logger)
log_header(logger, "Sort Operation", f"Sorting {table_path} by {sort_by}")
context.deps.yt_client.run_sort(
table_path=table_path,
sort_by=sort_by,
pool=resources.pool or None,
pool_tree=resources.pool_tree or None,
)
log_success(logger, "Sort completed")
return True