YQL operations#

YQL helpers on self.deps.yt_client run YTsaurus SQL against Cypress tables. In prod that is cluster YQL; in dev many helpers go through DuckDB-backed simulation (behavior may differ for edge cases).

Overview#

Tip

When YQL fits

Use YQL helpers for joins, filters, aggregates, unions, and similar set operations. Use map when you need arbitrary Python per row or libraries YQL cannot call.

Typical uses:

  • Join or reshape tables without shipping mapper.py.

  • Let YT choose execution plans for large inputs.

  • Preview SQL with dry-run APIs where supported.

Defaults

  • PRAGMA yt.MaxRowWeight is injected at 128M unless you override (max 128M).

Note

YQL vs map

YQL expresses set logic declaratively. Map runs your Python on each row stream.

Request API#

Each helper is a *_request method on the YT client. It takes a single frozen dataclass from yt_framework.yt.clients.yql.yql_requests (for example JoinTablesRequest). The same types are what yt_framework.yt.clients.yql.yql_builder uses to build SQL strings.

For filter, union, sort, and limit, you can leave columns unset on the request; the client fills them from the input table schema when needed.

Row Weight Defaults#

All YQL helpers and raw run_yql(...) execution include:

PRAGMA yt.MaxRowWeight = "128M";

by default.

Override per call when needed (must not exceed 128M; larger values raise ValueError):

from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest

self.deps.yt_client.join_tables_request(
    JoinTablesRequest(
        left_table="//tmp/a",
        right_table="//tmp/b",
        output_table="//tmp/out",
        on="id",
        max_row_weight="64M",
    ),
)

For raw queries:

self.deps.yt_client.run_yql(
    "INSERT INTO `//tmp/out` WITH TRUNCATE SELECT * FROM `//tmp/in`;",
    max_row_weight="64M",
)

If the SQL already contains PRAGMA yt.MaxRowWeight, that value is checked too; overrides and embedded pragmas cannot exceed 128M.

Available Operations#

Join Tables#

Join two tables on a common column.

from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest

self.deps.yt_client.join_tables_request(
    JoinTablesRequest(
        left_table="//tmp/my_pipeline/orders",
        right_table="//tmp/my_pipeline/users",
        output_table="//tmp/my_pipeline/joined",
        on="user_id",
        how="left",  # or "inner", "right", "full"
        select_columns=[
            "a.order_id",
            "a.user_id",
            "a.amount",
            "b.name",
            "b.email",
        ],
    ),
)

JoinTablesRequest fields:

  • left_table: Left table path

  • right_table: Right table path

  • output_table: Output table path

  • on: Column name(s) to join on:

    • str: Same column name on both sides (e.g., "user_id")

    • List[str]: Multiple columns with same names (e.g., ["user_id", "region"])

    • Dict[str, str]: Different column names (e.g., {"left": "input_s3_path", "right": "path"})

  • how: Join type ("left", "inner", "right", "full")

  • select_columns: Columns to select (prefix with a. or b. for table alias)

Filter Table#

Filter rows based on a condition.

from yt_framework.yt.clients.yql.yql_requests import FilterTableRequest

self.deps.yt_client.filter_table_request(
    FilterTableRequest(
        input_table="//tmp/my_pipeline/orders",
        output_table="//tmp/my_pipeline/filtered",
        condition="amount > 100",
    ),
)

FilterTableRequest fields:

  • input_table: Input table path

  • output_table: Output table path

  • condition: SQL-like condition (e.g., "amount > 100", "status == 'active'")

  • columns: Optional; when omitted, the client resolves columns from the table

Select Columns#

Select specific columns from a table.

from yt_framework.yt.clients.yql.yql_requests import SelectColumnsRequest

self.deps.yt_client.select_columns_request(
    SelectColumnsRequest(
        input_table="//tmp/my_pipeline/users",
        output_table="//tmp/my_pipeline/selected",
        columns=["user_id", "name", "email"],
    ),
)

SelectColumnsRequest fields:

  • input_table: Input table path

  • output_table: Output table path

  • columns: List of column names to select

Group By Aggregate#

Group rows and compute aggregations.

from yt_framework.yt.clients.yql.yql_requests import GroupByAggregateRequest

self.deps.yt_client.group_by_aggregate_request(
    GroupByAggregateRequest(
        input_table="//tmp/my_pipeline/orders",
        output_table="//tmp/my_pipeline/aggregated",
        group_by="user_id",
        aggregations={
            "order_count": "count",
            "total_amount": "sum",
            "avg_amount": "avg",
            "max_amount": "max",
            "min_amount": "min",
        },
    ),
)

GroupByAggregateRequest fields:

  • input_table: Input table path

  • output_table: Output table path

  • group_by: Column name(s) to group by

  • aggregations: Dictionary mapping output column names to aggregation functions

Aggregation functions:

  • "count": Count rows

  • "sum": Sum values

  • "avg": Average values

  • "max": Maximum value

  • "min": Minimum value

Union Tables#

Combine multiple tables into one.

from yt_framework.yt.clients.yql.yql_requests import UnionTablesRequest

self.deps.yt_client.union_tables_request(
    UnionTablesRequest(
        tables=(
            "//tmp/my_pipeline/orders_2023",
            "//tmp/my_pipeline/orders_2024",
        ),
        output_table="//tmp/my_pipeline/all_orders",
    ),
)

UnionTablesRequest fields:

  • tables: Tuple of table paths to union

  • output_table: Output table path

  • columns: Optional; when omitted, the client uses the first table’s columns

Note: All tables must have the same schema.

Distinct#

Get distinct values from columns.

from yt_framework.yt.clients.yql.yql_requests import DistinctRequest

self.deps.yt_client.distinct_request(
    DistinctRequest(
        input_table="//tmp/my_pipeline/users",
        output_table="//tmp/my_pipeline/distinct_cities",
        columns=["city"],
    ),
)

DistinctRequest fields:

  • input_table: Input table path

  • output_table: Output table path

  • columns: List of column names for distinct operation (optional; omit for SELECT DISTINCT *)

Sort Table#

Sort table by one or more columns.

from yt_framework.yt.clients.yql.yql_requests import SortTableRequest

self.deps.yt_client.sort_table_request(
    SortTableRequest(
        input_table="//tmp/my_pipeline/orders",
        output_table="//tmp/my_pipeline/sorted",
        order_by="amount",
        ascending=False,  # True for ascending, False for descending
    ),
)

SortTableRequest fields:

  • input_table: Input table path

  • output_table: Output table path

  • order_by: Column name(s) to sort by

  • ascending: Sort order (default: True)

  • columns: Optional; when omitted, the client resolves columns from the input table

Limit Table#

Limit the number of rows in a table.

from yt_framework.yt.clients.yql.yql_requests import LimitTableRequest

self.deps.yt_client.limit_table_request(
    LimitTableRequest(
        input_table="//tmp/my_pipeline/orders",
        output_table="//tmp/my_pipeline/limited",
        limit=100,
    ),
)

LimitTableRequest fields:

  • input_table: Input table path

  • output_table: Output table path

  • limit: Maximum number of rows to include

  • columns: Optional; when omitted, the client resolves columns from the input table

Dry Run#

All YQL operations support dry run mode to preview queries before execution:

from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest

# Preview query without executing
query = self.deps.yt_client.join_tables_request(
    JoinTablesRequest(
        left_table="//tmp/my_pipeline/orders",
        right_table="//tmp/my_pipeline/users",
        output_table="//tmp/my_pipeline/joined",
        on="user_id",
        how="left",
        select_columns=["a.order_id", "b.name"],
        dry_run=True,
    ),
)

self.logger.info(f"Query preview:\n{query}")

# Execute actual query
self.deps.yt_client.join_tables_request(
    JoinTablesRequest(
        left_table="//tmp/my_pipeline/orders",
        right_table="//tmp/my_pipeline/users",
        output_table="//tmp/my_pipeline/joined",
        on="user_id",
        how="left",
        select_columns=["a.order_id", "b.name"],
        dry_run=False,
    ),
)

Dry run returns the YQL query string without executing it.

Complete Example#

The live example under examples/03_yql_operations imports request types and calls *_request methods from the stage. See that tree for the full listing.

See Example: 03_yql_operations for a complete example with all operations.

Configuration#

YQL operations don’t require special configuration in stage config files. They use the YT client directly:

# stages/yql_examples/config.yaml
client:
  orders_table: //tmp/my_pipeline/orders
  users_table: //tmp/my_pipeline/users
  archive_orders_table: //tmp/my_pipeline/orders_archive
  output:
    joined: //tmp/my_pipeline/joined
    filtered: //tmp/my_pipeline/filtered
    selected: //tmp/my_pipeline/selected
    aggregated: //tmp/my_pipeline/aggregated
    united: //tmp/my_pipeline/united
    distinct: //tmp/my_pipeline/distinct
    sorted: //tmp/my_pipeline/sorted
    limited: //tmp/my_pipeline/limited

Dev Mode Behavior#

In dev mode, YQL operations are simulated using DuckDB:

  • Operations run locally

  • Results written to .dev/ directory

  • Full YQL syntax supported

  • Performance differs from production

Note: Some advanced YQL features may behave differently in dev mode.

Best Practices#

  1. Use dry run: Preview queries before execution

  2. Check schemas: Ensure table schemas match for joins/unions

  3. Optimize joins: Use appropriate join types

  4. Filter early: Apply filters before expensive operations

  5. Limit results: Use limit for large result sets

  6. Test locally: Use dev mode for testing

Common Patterns#

Multi-Table Join#

from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest

joined1 = self.deps.yt_client.join_tables_request(
    JoinTablesRequest(
        left_table="table1",
        right_table="table2",
        output_table="temp_joined",
        on="id",
    ),
)

joined2 = self.deps.yt_client.join_tables_request(
    JoinTablesRequest(
        left_table="temp_joined",
        right_table="table3",
        output_table="final_joined",
        on="id",
    ),
)

Filtered Aggregation#

from yt_framework.yt.clients.yql.yql_requests import (
    FilterTableRequest,
    GroupByAggregateRequest,
)

filtered = self.deps.yt_client.filter_table_request(
    FilterTableRequest(
        input_table="orders",
        output_table="temp_filtered",
        condition="amount > 100",
    ),
)

aggregated = self.deps.yt_client.group_by_aggregate_request(
    GroupByAggregateRequest(
        input_table="temp_filtered",
        output_table="result",
        group_by="user_id",
        aggregations={"total": "sum"},
    ),
)

Top N Results#

from yt_framework.yt.clients.yql.yql_requests import LimitTableRequest, SortTableRequest

sorted_table = self.deps.yt_client.sort_table_request(
    SortTableRequest(
        input_table="orders",
        output_table="temp_sorted",
        order_by="amount",
        ascending=False,
    ),
)

top_n = self.deps.yt_client.limit_table_request(
    LimitTableRequest(
        input_table="temp_sorted",
        output_table="top_orders",
        limit=10,
    ),
)

Troubleshooting#

Issue: Join fails#

  • Check column names match

  • Verify table schemas are compatible

  • Check table paths exist

Issue: Filter condition syntax error#

  • Use proper SQL-like syntax

  • Escape special characters

  • Check column names exist

Issue: Aggregation fails#

  • Verify column types are numeric (for sum/avg)

  • Check column names exist

  • Ensure group_by columns exist

Issue: Union fails#

  • Verify all tables have same schema

  • Check column order matches

  • Ensure table paths exist

Next Steps#