Source code for yt_framework.yt.clients.client_dev

"""Local filesystem stand-in for Cypress tables and subprocess-backed jobs."""

import json
import logging
import os
from pathlib import Path
from typing import Any, Literal

from yt_framework.yt.clients._client_split._client_dev_ops_mixin import (
    ClientDevOpsMixin,
)
from yt_framework.yt.clients._client_split._client_dev_yql_mixin import (
    ClientDevYqlMixin,
)
from yt_framework.yt.clients.client_base import BaseYTClient
from yt_framework.yt.support._client_dev_runtime import (
    dev_columns_from_first_row,
    dev_run_yql_simulation,
)
from yt_framework.yt.support.max_row_weight import ensure_max_row_weight_pragma


[docs] class YTDevClient(ClientDevYqlMixin, ClientDevOpsMixin, BaseYTClient): """Development YT client implementation. Uses local file system for all operations, simulating YT behavior. Tables are stored as .jsonl files in .dev/ directory. """
[docs] def __init__( self, logger: logging.Logger, pipeline_dir: Path | None = None, ) -> None: """Initialize development YT client. Args: logger: Logger instance pipeline_dir: Pipeline directory (required for dev mode) """ if pipeline_dir is not None: resolved_pipeline_dir = Path(pipeline_dir).resolve() else: pd = os.environ.get("YT_PIPELINE_DIR") if pd: resolved_pipeline_dir = Path(pd).resolve() else: resolved_pipeline_dir = Path.cwd() logger.warning( "mode=dev but pipeline_dir not set and YT_PIPELINE_DIR not set; using cwd as pipeline_dir", ) super().__init__(logger, pipeline_dir=resolved_pipeline_dir)
def _pipeline_dir_or_raise(self) -> Path: pd = self.pipeline_dir if pd is None: msg = "pipeline_dir is required in dev mode" raise RuntimeError(msg) return pd def _dev_dir(self) -> Path: """Return .dev directory under pipeline_dir. Caller should mkdir when writing.""" return self._pipeline_dir_or_raise() / ".dev" def _table_basename(self, yt_path: str) -> str: """Last path component of a YT table path, e.g. //home/.../name -> name.""" return yt_path.rstrip("/").split("/")[-1] def _table_local_path(self, yt_path: str) -> Path: """Local .jsonl path for a YT table in dev: {pipeline_dir}/.dev/{basename}.jsonl.""" return self._dev_dir() / f"{self._table_basename(yt_path)}.jsonl"
[docs] def create_path( self, path: str, node_type: Literal[ "table", "file", "map_node", "list_node", "document", ] = "map_node", ) -> None: """Create a path in YT (no-op in dev mode). Args: path: YT path to create (not used in dev mode). node_type: Type of node to create (not used in dev mode). Returns: None """
[docs] def exists(self, path: str) -> bool: """Check if a path exists in YT. In dev mode, always returns True (assumes files exist locally). Args: path: YT path to check. Returns: bool: Always True in dev mode. """ return True
[docs] def write_table( self, table_path: str, rows: list[dict[str, Any]], *, append: bool = False, replication_factor: int = 1, ) -> None: r"""Write rows to a YT table (saves as local .jsonl file). In dev mode, tables are stored as JSONL files in the .dev directory. Each row is written as a JSON object on a single line. Args: table_path: YT table path (e.g., "//tmp/my_table"). rows: List of dictionaries representing table rows. append: If True, append to existing file; otherwise overwrite. replication_factor: Not used in dev mode (kept for API compatibility). Returns: None Example: >>> client.write_table("//tmp/data", [{"id": 1, "name": "Alice"}]) >>> # Creates .dev/data.jsonl with: {"id":1,"name":"Alice"}\\n """ mode_str = "append" if append else "overwrite" self.logger.info("Writing %s rows → %s (%s)", len(rows), table_path, mode_str) p = self._table_local_path(table_path) self._dev_dir().mkdir(parents=True, exist_ok=True) with p.open("a" if append else "w") as f: f.writelines(json.dumps(row, ensure_ascii=False) + "\n" for row in rows)
[docs] def read_table(self, table_path: str) -> list[dict[str, Any]]: """Read rows from a YT table (reads from local .jsonl file). Args: table_path: YT table path (e.g., "//tmp/my_table"). Returns: List[Dict[str, Any]]: List of dictionaries representing table rows. Returns empty list if file doesn't exist. """ self.logger.info("Reading table: %s", table_path) p = self._table_local_path(table_path) if not p.exists(): self.logger.warning("Table file not found: %s, returning empty list", p) return [] results = [] with p.open() as f: for raw_line in f: line = raw_line.strip() if line: results.append(json.loads(line)) self.logger.info("✓ Read %s rows", len(results)) return results
[docs] def row_count(self, table_path: str) -> int: """Get number of rows in a YT table (counts lines in local .jsonl file). Args: table_path: YT table path (e.g., "//tmp/my_table"). Returns: int: Number of non-empty lines in the JSONL file. Returns 0 if file doesn't exist. """ p = self._table_local_path(table_path) if not p.exists(): return 0 with p.open() as f: n = sum(1 for line in f if line.strip()) self.logger.debug("Row count: %s", n) return n
def _get_table_columns(self, table_path: str) -> list[str]: """Get column names from a table by reading one row. In dev mode, tables are stored as JSONL files, so binary columns are less likely. This implementation matches the production client structure for consistency. Args: table_path: Path to YT table Returns: List of column names (filtered to exclude internal YQL columns) Raises: ValueError: If table is empty or cannot be read """ try: rows = self.read_table(table_path) return dev_columns_from_first_row(rows, table_path) except ValueError: raise except Exception: self.logger.exception("Failed to get table columns") raise
[docs] def run_yql( self, query: str, pool: str = "default", max_row_weight: str | None = None, ) -> None: """Execute a YQL query locally using DuckDB simulation. Args: query: YQL query string to execute pool: YT pool name (default: 'default') max_row_weight: Optional max row weight override """ self.logger.info("Executing YQL query (dev mode - DuckDB simulation)") self.logger.debug("Pool: %s", pool) query_with_max_row_weight = ensure_max_row_weight_pragma( query=query, max_row_weight=max_row_weight, ) self.logger.debug("Query:\n%s", query_with_max_row_weight) try: dev_run_yql_simulation( query_with_max_row_weight=query_with_max_row_weight, dev_dir=self._dev_dir(), logger=self.logger, table_local_path=self._table_local_path, write_table=lambda table, rows, *, append: self.write_table( table, rows, append=append, ), ) except Exception: self.logger.exception("Failed to execute YQL query in dev mode") raise