Source code for yt_framework.operations.table

"""Helpers to count rows, load rows into memory, and export tables to JSONL.

For low-level access, use ``BaseYTClient`` methods (``read_table``, ``row_count``, etc.)
directly on ``self.deps.yt_client``. These functions add logging and convenience.
"""

import json
import logging
from pathlib import Path
from typing import Any

from yt_framework.yt.clients.client_base import BaseYTClient


[docs] def get_row_count( yt_client: BaseYTClient, table_path: str, logger: logging.Logger, ) -> int: """Get number of rows in a YT table. Args: yt_client: YT client instance table_path: YT table path logger: Logger instance Returns: Number of rows in table """ logger.info("Reading row count from %s", table_path) count = yt_client.row_count(table_path) logger.info("Table has %s rows", count) return count
[docs] def read_table( yt_client: BaseYTClient, table_path: str, logger: logging.Logger, ) -> list[dict[str, Any]]: """Read rows from a YT table. Args: yt_client: YT client instance table_path: YT table path logger: Logger instance Returns: List of rows as dictionaries """ logger.info("Reading results from %s", table_path) results = list(yt_client.read_table(table_path)) logger.info("Read %s rows", len(results)) return results
[docs] def download_table( yt_client: BaseYTClient, table_path: str, output_file: Path, logger: logging.Logger, ) -> None: """Download YT table to local JSONL file. Args: yt_client: YT client instance table_path: YT table path output_file: Local file path for output logger: Logger instance Returns: None """ logger.info("Downloading table %s to %s", table_path, output_file) rows = yt_client.read_table(table_path) with output_file.open("w") as f: f.writelines(json.dumps(row) + "\n" for row in rows) with output_file.open() as f: row_count = sum(1 for _ in f) logger.info("✓ Downloaded %s rows → %s", row_count, output_file)