YQL operations#
YQL helpers on self.deps.yt_client run YTsaurus SQL against Cypress tables. In prod that is cluster YQL; in dev many helpers go through DuckDB-backed simulation (behavior may differ for edge cases).
Overview#
Tip
When YQL fits
Use YQL helpers for joins, filters, aggregates, unions, and similar set operations. Use map when you need arbitrary Python per row or libraries YQL cannot call.
Typical uses:
Join or reshape tables without shipping
mapper.py.Let YT choose execution plans for large inputs.
Preview SQL with dry-run APIs where supported.
Defaults
PRAGMA yt.MaxRowWeightis injected at128Munless you override (max128M).
Note
YQL vs map
YQL expresses set logic declaratively. Map runs your Python on each row stream.
Request API#
Each helper is a *_request method on the YT client. It takes a single frozen dataclass from yt_framework.yt.clients.yql.yql_requests (for example JoinTablesRequest). The same types are what yt_framework.yt.clients.yql.yql_builder uses to build SQL strings.
For filter, union, sort, and limit, you can leave columns unset on the request; the client fills them from the input table schema when needed.
Row Weight Defaults#
All YQL helpers and raw run_yql(...) execution include:
PRAGMA yt.MaxRowWeight = "128M";
by default.
Override per call when needed (must not exceed 128M; larger values raise ValueError):
from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest
self.deps.yt_client.join_tables_request(
JoinTablesRequest(
left_table="//tmp/a",
right_table="//tmp/b",
output_table="//tmp/out",
on="id",
max_row_weight="64M",
),
)
For raw queries:
self.deps.yt_client.run_yql(
"INSERT INTO `//tmp/out` WITH TRUNCATE SELECT * FROM `//tmp/in`;",
max_row_weight="64M",
)
If the SQL already contains PRAGMA yt.MaxRowWeight, that value is checked too; overrides and embedded pragmas cannot exceed 128M.
Available Operations#
Join Tables#
Join two tables on a common column.
from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest
self.deps.yt_client.join_tables_request(
JoinTablesRequest(
left_table="//tmp/my_pipeline/orders",
right_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/joined",
on="user_id",
how="left", # or "inner", "right", "full"
select_columns=[
"a.order_id",
"a.user_id",
"a.amount",
"b.name",
"b.email",
],
),
)
JoinTablesRequest fields:
left_table: Left table pathright_table: Right table pathoutput_table: Output table pathon: Column name(s) to join on:str: Same column name on both sides (e.g.,"user_id")List[str]: Multiple columns with same names (e.g.,["user_id", "region"])Dict[str, str]: Different column names (e.g.,{"left": "input_s3_path", "right": "path"})
how: Join type ("left","inner","right","full")select_columns: Columns to select (prefix witha.orb.for table alias)
Filter Table#
Filter rows based on a condition.
from yt_framework.yt.clients.yql.yql_requests import FilterTableRequest
self.deps.yt_client.filter_table_request(
FilterTableRequest(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/filtered",
condition="amount > 100",
),
)
FilterTableRequest fields:
input_table: Input table pathoutput_table: Output table pathcondition: SQL-like condition (e.g.,"amount > 100","status == 'active'")columns: Optional; when omitted, the client resolves columns from the table
Select Columns#
Select specific columns from a table.
from yt_framework.yt.clients.yql.yql_requests import SelectColumnsRequest
self.deps.yt_client.select_columns_request(
SelectColumnsRequest(
input_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/selected",
columns=["user_id", "name", "email"],
),
)
SelectColumnsRequest fields:
input_table: Input table pathoutput_table: Output table pathcolumns: List of column names to select
Group By Aggregate#
Group rows and compute aggregations.
from yt_framework.yt.clients.yql.yql_requests import GroupByAggregateRequest
self.deps.yt_client.group_by_aggregate_request(
GroupByAggregateRequest(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/aggregated",
group_by="user_id",
aggregations={
"order_count": "count",
"total_amount": "sum",
"avg_amount": "avg",
"max_amount": "max",
"min_amount": "min",
},
),
)
GroupByAggregateRequest fields:
input_table: Input table pathoutput_table: Output table pathgroup_by: Column name(s) to group byaggregations: Dictionary mapping output column names to aggregation functions
Aggregation functions:
"count": Count rows"sum": Sum values"avg": Average values"max": Maximum value"min": Minimum value
Union Tables#
Combine multiple tables into one.
from yt_framework.yt.clients.yql.yql_requests import UnionTablesRequest
self.deps.yt_client.union_tables_request(
UnionTablesRequest(
tables=(
"//tmp/my_pipeline/orders_2023",
"//tmp/my_pipeline/orders_2024",
),
output_table="//tmp/my_pipeline/all_orders",
),
)
UnionTablesRequest fields:
tables: Tuple of table paths to unionoutput_table: Output table pathcolumns: Optional; when omitted, the client uses the first table’s columns
Note: All tables must have the same schema.
Distinct#
Get distinct values from columns.
from yt_framework.yt.clients.yql.yql_requests import DistinctRequest
self.deps.yt_client.distinct_request(
DistinctRequest(
input_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/distinct_cities",
columns=["city"],
),
)
DistinctRequest fields:
input_table: Input table pathoutput_table: Output table pathcolumns: List of column names for distinct operation (optional; omit forSELECT DISTINCT *)
Sort Table#
Sort table by one or more columns.
from yt_framework.yt.clients.yql.yql_requests import SortTableRequest
self.deps.yt_client.sort_table_request(
SortTableRequest(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/sorted",
order_by="amount",
ascending=False, # True for ascending, False for descending
),
)
SortTableRequest fields:
input_table: Input table pathoutput_table: Output table pathorder_by: Column name(s) to sort byascending: Sort order (default:True)columns: Optional; when omitted, the client resolves columns from the input table
Limit Table#
Limit the number of rows in a table.
from yt_framework.yt.clients.yql.yql_requests import LimitTableRequest
self.deps.yt_client.limit_table_request(
LimitTableRequest(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/limited",
limit=100,
),
)
LimitTableRequest fields:
input_table: Input table pathoutput_table: Output table pathlimit: Maximum number of rows to includecolumns: Optional; when omitted, the client resolves columns from the input table
Dry Run#
All YQL operations support dry run mode to preview queries before execution:
from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest
# Preview query without executing
query = self.deps.yt_client.join_tables_request(
JoinTablesRequest(
left_table="//tmp/my_pipeline/orders",
right_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/joined",
on="user_id",
how="left",
select_columns=["a.order_id", "b.name"],
dry_run=True,
),
)
self.logger.info(f"Query preview:\n{query}")
# Execute actual query
self.deps.yt_client.join_tables_request(
JoinTablesRequest(
left_table="//tmp/my_pipeline/orders",
right_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/joined",
on="user_id",
how="left",
select_columns=["a.order_id", "b.name"],
dry_run=False,
),
)
Dry run returns the YQL query string without executing it.
Complete Example#
The live example under examples/03_yql_operations imports request types and calls *_request methods from the stage. See that tree for the full listing.
See Example: 03_yql_operations for a complete example with all operations.
Configuration#
YQL operations don’t require special configuration in stage config files. They use the YT client directly:
# stages/yql_examples/config.yaml
client:
orders_table: //tmp/my_pipeline/orders
users_table: //tmp/my_pipeline/users
archive_orders_table: //tmp/my_pipeline/orders_archive
output:
joined: //tmp/my_pipeline/joined
filtered: //tmp/my_pipeline/filtered
selected: //tmp/my_pipeline/selected
aggregated: //tmp/my_pipeline/aggregated
united: //tmp/my_pipeline/united
distinct: //tmp/my_pipeline/distinct
sorted: //tmp/my_pipeline/sorted
limited: //tmp/my_pipeline/limited
Dev Mode Behavior#
In dev mode, YQL operations are simulated using DuckDB:
Operations run locally
Results written to
.dev/directoryFull YQL syntax supported
Performance differs from production
Note: Some advanced YQL features may behave differently in dev mode.
Best Practices#
Use dry run: Preview queries before execution
Check schemas: Ensure table schemas match for joins/unions
Optimize joins: Use appropriate join types
Filter early: Apply filters before expensive operations
Limit results: Use limit for large result sets
Test locally: Use dev mode for testing
Common Patterns#
Multi-Table Join#
from yt_framework.yt.clients.yql.yql_requests import JoinTablesRequest
joined1 = self.deps.yt_client.join_tables_request(
JoinTablesRequest(
left_table="table1",
right_table="table2",
output_table="temp_joined",
on="id",
),
)
joined2 = self.deps.yt_client.join_tables_request(
JoinTablesRequest(
left_table="temp_joined",
right_table="table3",
output_table="final_joined",
on="id",
),
)
Filtered Aggregation#
from yt_framework.yt.clients.yql.yql_requests import (
FilterTableRequest,
GroupByAggregateRequest,
)
filtered = self.deps.yt_client.filter_table_request(
FilterTableRequest(
input_table="orders",
output_table="temp_filtered",
condition="amount > 100",
),
)
aggregated = self.deps.yt_client.group_by_aggregate_request(
GroupByAggregateRequest(
input_table="temp_filtered",
output_table="result",
group_by="user_id",
aggregations={"total": "sum"},
),
)
Top N Results#
from yt_framework.yt.clients.yql.yql_requests import LimitTableRequest, SortTableRequest
sorted_table = self.deps.yt_client.sort_table_request(
SortTableRequest(
input_table="orders",
output_table="temp_sorted",
order_by="amount",
ascending=False,
),
)
top_n = self.deps.yt_client.limit_table_request(
LimitTableRequest(
input_table="temp_sorted",
output_table="top_orders",
limit=10,
),
)
Troubleshooting#
Issue: Join fails#
Check column names match
Verify table schemas are compatible
Check table paths exist
Issue: Filter condition syntax error#
Use proper SQL-like syntax
Escape special characters
Check column names exist
Issue: Aggregation fails#
Verify column types are numeric (for sum/avg)
Check column names exist
Ensure group_by columns exist
Issue: Union fails#
Verify all tables have same schema
Check column order matches
Ensure table paths exist
Next Steps#
Learn about Map Operations for row-by-row processing
Explore S3 Operations for file integration
Check out Examples for more patterns