YQL operations#
YQL helpers on self.deps.yt_client run YTsaurus SQL against Cypress tables. In prod that is cluster YQL; in dev many helpers go through DuckDB-backed simulation (behavior may differ for edge cases).
Overview#
Tip
When YQL fits
Use YQL helpers for joins, filters, aggregates, unions, and similar set operations. Use map when you need arbitrary Python per row or libraries YQL cannot call.
Typical uses:
Join or reshape tables without shipping
mapper.py.Let YT choose execution plans for large inputs.
Preview SQL with dry-run APIs where supported.
Defaults
PRAGMA yt.MaxRowWeightis injected at128Munless you override (max128M).
Note
YQL vs map
YQL expresses set logic declaratively. Map runs your Python on each row stream.
Row Weight Defaults#
All YQL helpers and raw run_yql(...) execution include:
PRAGMA yt.MaxRowWeight = "128M";
by default.
Override per call when needed (must not exceed 128M; larger values raise ValueError):
self.deps.yt_client.join_tables(
left_table="//tmp/a",
right_table="//tmp/b",
output_table="//tmp/out",
on="id",
max_row_weight="64M",
)
For raw queries:
self.deps.yt_client.run_yql(
"INSERT INTO `//tmp/out` WITH TRUNCATE SELECT * FROM `//tmp/in`;",
max_row_weight="64M",
)
If the SQL already contains PRAGMA yt.MaxRowWeight, that value is checked too; overrides and embedded pragmas cannot exceed 128M.
Available Operations#
Join Tables#
Join two tables on a common column.
self.deps.yt_client.join_tables(
left_table="//tmp/my_pipeline/orders",
right_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/joined",
on="user_id",
how="left", # or "inner", "right", "full"
select_columns=[
"a.order_id",
"a.user_id",
"a.amount",
"b.name",
"b.email",
],
)
Parameters:
left_table: Left table pathright_table: Right table pathoutput_table: Output table pathon: Column name(s) to join on:str: Same column name on both sides (e.g.,"user_id")List[str]: Multiple columns with same names (e.g.,["user_id", "region"])Dict[str, str]: Different column names (e.g.,{"left": "input_s3_path", "right": "path"})
how: Join type ("left","inner","right","full")select_columns: Columns to select (prefix witha.orb.for table alias)
Filter Table#
Filter rows based on a condition.
self.deps.yt_client.filter_table(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/filtered",
condition="amount > 100",
)
Parameters:
input_table: Input table pathoutput_table: Output table pathcondition: SQL-like condition (e.g.,"amount > 100","status == 'active'")
Select Columns#
Select specific columns from a table.
self.deps.yt_client.select_columns(
input_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/selected",
columns=["user_id", "name", "email"],
)
Parameters:
input_table: Input table pathoutput_table: Output table pathcolumns: List of column names to select
Group By Aggregate#
Group rows and compute aggregations.
self.deps.yt_client.group_by_aggregate(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/aggregated",
group_by="user_id",
aggregations={
"order_count": "count",
"total_amount": "sum",
"avg_amount": "avg",
"max_amount": "max",
"min_amount": "min",
},
)
Parameters:
input_table: Input table pathoutput_table: Output table pathgroup_by: Column name(s) to group byaggregations: Dictionary mapping output column names to aggregation functions
Aggregation functions:
"count": Count rows"sum": Sum values"avg": Average values"max": Maximum value"min": Minimum value
Union Tables#
Combine multiple tables into one.
self.deps.yt_client.union_tables(
tables=[
"//tmp/my_pipeline/orders_2023",
"//tmp/my_pipeline/orders_2024",
],
output_table="//tmp/my_pipeline/all_orders",
)
Parameters:
tables: List of table paths to unionoutput_table: Output table path
Note: All tables must have the same schema.
Distinct#
Get distinct values from columns.
self.deps.yt_client.distinct(
input_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/distinct_cities",
columns=["city"],
)
Parameters:
input_table: Input table pathoutput_table: Output table pathcolumns: List of column names for distinct operation
Sort Table#
Sort table by one or more columns.
self.deps.yt_client.sort_table(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/sorted",
order_by="amount",
ascending=False, # True for ascending, False for descending
)
Parameters:
input_table: Input table pathoutput_table: Output table pathorder_by: Column name(s) to sort byascending: Sort order (default:True)
Limit Table#
Limit the number of rows in a table.
self.deps.yt_client.limit_table(
input_table="//tmp/my_pipeline/orders",
output_table="//tmp/my_pipeline/limited",
limit=100,
)
Parameters:
input_table: Input table pathoutput_table: Output table pathlimit: Maximum number of rows to include
Dry Run#
All YQL operations support dry run mode to preview queries before execution:
# Preview query without executing
query = self.deps.yt_client.join_tables(
left_table="//tmp/my_pipeline/orders",
right_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/joined",
on="user_id",
how="left",
select_columns=["a.order_id", "b.name"],
dry_run=True, # Enable dry run
)
self.logger.info(f"Query preview:\n{query}")
# Execute actual query
self.deps.yt_client.join_tables(
left_table="//tmp/my_pipeline/orders",
right_table="//tmp/my_pipeline/users",
output_table="//tmp/my_pipeline/joined",
on="user_id",
how="left",
select_columns=["a.order_id", "b.name"],
dry_run=False, # Execute query
)
Dry run returns the YQL query string without executing it.
Complete Example#
from yt_framework.core.pipeline import DebugContext
from yt_framework.core.stage import BaseStage
from yt_framework.utils.logging import log_header
class YqlExamplesStage(BaseStage):
def run(self, debug: DebugContext) -> DebugContext:
# 1. Join tables
log_header(self.logger, "YQL", "1. JOIN TABLES")
self.deps.yt_client.join_tables(
left_table=self.config.client.orders_table,
right_table=self.config.client.users_table,
output_table=self.config.client.output.joined,
on="user_id",
how="left",
select_columns=[
"a.order_id",
"a.user_id",
"a.amount",
"b.name",
"b.email",
],
)
# 2. Filter table
log_header(self.logger, "YQL", "2. FILTER TABLE")
self.deps.yt_client.filter_table(
input_table=self.config.client.orders_table,
output_table=self.config.client.output.filtered,
condition="amount > 100",
)
# 3. Select columns
log_header(self.logger, "YQL", "3. SELECT COLUMNS")
self.deps.yt_client.select_columns(
input_table=self.config.client.users_table,
output_table=self.config.client.output.selected,
columns=["user_id", "name"],
)
# 4. Group by aggregate
log_header(self.logger, "YQL", "4. GROUP BY AGGREGATE")
self.deps.yt_client.group_by_aggregate(
input_table=self.config.client.orders_table,
output_table=self.config.client.output.aggregated,
group_by="user_id",
aggregations={
"order_count": "count",
"total_amount": "sum",
},
)
# 5. Union tables
log_header(self.logger, "YQL", "5. UNION TABLES")
self.deps.yt_client.union_tables(
tables=[
self.config.client.orders_table,
self.config.client.archive_orders_table,
],
output_table=self.config.client.output.united,
)
# 6. Distinct
log_header(self.logger, "YQL", "6. DISTINCT")
self.deps.yt_client.distinct(
input_table=self.config.client.users_table,
output_table=self.config.client.output.distinct,
columns=["city"],
)
# 7. Sort table
log_header(self.logger, "YQL", "7. SORT TABLE")
self.deps.yt_client.sort_table(
input_table=self.config.client.orders_table,
output_table=self.config.client.output.sorted,
order_by="amount",
ascending=False,
)
# 8. Limit table
log_header(self.logger, "YQL", "8. LIMIT TABLE")
self.deps.yt_client.limit_table(
input_table=self.config.client.output.sorted,
output_table=self.config.client.output.limited,
limit=10,
)
return debug
See Example: 03_yql_operations for a complete example with all operations.
Configuration#
YQL operations don’t require special configuration in stage config files. They use the YT client directly:
# stages/yql_examples/config.yaml
client:
orders_table: //tmp/my_pipeline/orders
users_table: //tmp/my_pipeline/users
archive_orders_table: //tmp/my_pipeline/orders_archive
output:
joined: //tmp/my_pipeline/joined
filtered: //tmp/my_pipeline/filtered
selected: //tmp/my_pipeline/selected
aggregated: //tmp/my_pipeline/aggregated
united: //tmp/my_pipeline/united
distinct: //tmp/my_pipeline/distinct
sorted: //tmp/my_pipeline/sorted
limited: //tmp/my_pipeline/limited
Dev Mode Behavior#
In dev mode, YQL operations are simulated using DuckDB:
Operations run locally
Results written to
.dev/directoryFull YQL syntax supported
Performance differs from production
Note: Some advanced YQL features may behave differently in dev mode.
Best Practices#
Use dry run: Preview queries before execution
Check schemas: Ensure table schemas match for joins/unions
Optimize joins: Use appropriate join types
Filter early: Apply filters before expensive operations
Limit results: Use limit for large result sets
Test locally: Use dev mode for testing
Common Patterns#
Multi-Table Join#
# Join multiple tables sequentially
joined1 = self.deps.yt_client.join_tables(
left_table="table1",
right_table="table2",
output_table="temp_joined",
on="id",
)
joined2 = self.deps.yt_client.join_tables(
left_table="temp_joined",
right_table="table3",
output_table="final_joined",
on="id",
)
Filtered Aggregation#
# Filter then aggregate
filtered = self.deps.yt_client.filter_table(
input_table="orders",
output_table="temp_filtered",
condition="amount > 100",
)
aggregated = self.deps.yt_client.group_by_aggregate(
input_table="temp_filtered",
output_table="result",
group_by="user_id",
aggregations={"total": "sum"},
)
Top N Results#
# Sort then limit
sorted_table = self.deps.yt_client.sort_table(
input_table="orders",
output_table="temp_sorted",
order_by="amount",
ascending=False,
)
top_n = self.deps.yt_client.limit_table(
input_table="temp_sorted",
output_table="top_orders",
limit=10,
)
Troubleshooting#
Issue: Join fails#
Check column names match
Verify table schemas are compatible
Check table paths exist
Issue: Filter condition syntax error#
Use proper SQL-like syntax
Escape special characters
Check column names exist
Issue: Aggregation fails#
Verify column types are numeric (for sum/avg)
Check column names exist
Ensure group_by columns exist
Issue: Union fails#
Verify all tables have same schema
Check column order matches
Ensure table paths exist
Next Steps#
Learn about Map Operations for row-by-row processing
Explore S3 Operations for file integration
Check out Examples for more patterns