Core Concepts
Caching system
Xorq provides a sophisticated caching system that enables efficient iterative development of ML pipelines. The caching system allows you to:
- Cache results from upstream query engines
- Persist data locally or in remote storage
- Automatically invalidate cache when source data changes
- Chain caches across multiple engines
Cache types
Xorq supports three main types of cache storage:
1. Source Cache
- Automatically invalidates cache when upstream data changes
- Persistence depends on the source backend
- Supports both remote (Snowflake, Postgres) and in-process (pandas, DuckDB) backends
import xorq.api as xo
from xorq.caching import SourceCache
# Connect to source database
pg = xo.postgres.connect_env()
con = xo.connect() # empty connection
# Create source storage
cache = SourceCache.from_kwargs(source=con)
# Register table from postgres and cache it
batting = pg.table("batting")
# Cache the filtered data in the source backend
cached = (
batting.filter(batting.yearID == 2015)
.cache(cache) # cache expression
)
# Execute the query - results will be cached
result = xo.execute(cached)2. Snapshot Cache
- No automatic invalidation
- Ideal for one-off analyses
- Persistence depends on source backend
3. Parquet Cache
- Special case of Source Cache
- Caches results as Parquet files on local disk
- Uses source backend for writing
- Ensures durable persistence
Hashing strategies
Cache invalidation uses different hashing strategies based on the cache type:
| Storage Type | Hash Components |
|---|---|
| In-Memory | Data bytes + Schema |
| Disk-Based | Query plan + Schema |
| Remote | Table metadata + Last modified time |
Key benefits
- Faster Iteration:
- Reduce network calls to source systems
- Minimize recomputation of expensive operations
- Cache intermediate results for complex pipelines
- Declarative Integration:
- Chain cache operations anywhere in the expression
- Transparent integration with existing pipelines
- Multiple storage options for different use cases
- Automatic Management:
- Smart invalidation based on source changes
- No manual cache management required
- Efficient storage utilization
- Multi-Engine Support:
- Cache data between different engines
- Optimize storage location for performance
- Flexible persistence options
Multi-engine system
xorq’s multi-engine system enables seamless data movement between different query engines, allowing you to leverage the strengths of each engine while maintaining a unified workflow.
The into_backend operator
The core of xorq’s multi-engine capability is the into_backend operator, which enables:
- Transparent data movement between engines
- Zero-copy data transfer using Apache Arrow
- Automatic optimization of data placement
import xorq.api as xo
from xorq.expr.relations import into_backend
# Connect to different engines
pg = xo.postgres.connect_env()
db = xo.duckdb.connect()
# Get tables from different sources
batting = pg.table("batting")
# Load awards_players into DuckDB
awards_players = xo.examples.awards_players.fetch(backend=db)
# Filter data in respective engines
left = batting.filter(batting.yearID == 2015)
right = awards_players.filter(awards_players.lgID == "NL").drop("yearID", "lgID")
# Move right table into postgres for efficient join
expr = left.join(
into_backend(right, pg),
["playerID"],
how="semi"
)[["yearID", "stint"]]
# Execute the multi-engine query
result = expr.execute()Supported engines
xorq currently supports:
- In-Process Engines
- DuckDB
- DataFusion
- Pandas
- Distributed Engines
- Trino
- Snowflake
- BigQuery
Engine selection guidelines
Choose engines based on their strengths:
- DuckDB: Local processing, AsOf joins, efficient file formats
- DataFusion: Custom UDFs, streaming processing
- Trino: Distributed queries, federation, security
- Snowflake/BigQuery: Managed infrastructure, scalability
Data transfer
Data movement between engines is handled through:
- Arrow Flight: Zero-copy data transfer protocol
- Memory Management: Automatic spilling to disk
- Batching: Efficient chunk-based processing
Custom UD(X)F system
xorq provides a powerful system for extending query engines with custom User-Defined Functions (UDFs). Here are three key types supported:
1. Scalar UDF with model integration
import xorq.api as xo
from xorq.expr.ml import make_quickgrove_udf
from pathlib import Path
from xorq.api import _
t = xo.examples.diamonds.fetch()
model_path = Path(xo.options.pins.get_path("diamonds-model"))
model = make_quickgrove_udf(model_path, model_name="diamonds_model")
expr = t.mutate(pred=model.on_expr).filter(_.carat < 1).select(_.pred).execute()2. User-defined aggregate functions
from xorq.expr import udf
import xorq.vendor.ibis.expr.datatypes as dt
alltypes = xo.examples.functional_alltypes.fetch()
cols = (by, _) = ["year", "month"]
name = "sum_sum"
@udf.agg.pandas_df(
schema=alltypes[cols].schema(),
return_type=dt.int64(),
name=name,
)
def sum_sum(df):
return df.sum().sum()
actual = (
alltypes.group_by(by)
.agg(sum_sum(*(alltypes[c] for c in cols)).name(name))
.execute()
)Additionally you can use UDAF for training ML models, see the example for training an XGBoost model
3. Window UDF for analysis
import pyarrow as pa
from xorq.expr.udf import pyarrow_udwf
from xorq.vendor import ibis
@pyarrow_udwf(
schema=ibis.schema({"a": float}),
return_type=ibis.dtype(float),
alpha=0.9,
)
def exp_smooth(self, values: list[pa.Array], num_rows: int) -> pa.Array:
results = []
curr_value = 0.0
values = values[0]
for idx in range(num_rows):
if idx == 0:
curr_value = values[idx].as_py()
else:
curr_value = values[idx].as_py() * self.alpha + curr_value * (
1.0 - self.alpha
)
results.append(curr_value)
return pa.array(results)
df = pd.DataFrame(
[
(0, 7, "A"),
(1, 4, "A"),
(2, 3, "A"),
(3, 8, "A"),
(4, 9, "B"),
(5, 1, "B"),
(6, 6, "B"),
],
columns=["a", "b", "c"],
)
t = xo.register(df, table_name="t")
expr = t.select(
t.a,
udwf=exp_smooth.on_expr(t).over(ibis.window()),
).order_by(t.a)
result = expr.execute()Ephemeral Flight service
xorq’s Ephemeral Flight Service provides a high-performance data transfer mechanism between engines using Apache Arrow Flight. Unlike traditional data transfer methods, this service provides:
- Automatic Lifecycle Management
- Zero-Copy Data Movement
- Direct memory transfer between processes
- No serialization/deserialization overhead
- Efficient handling of large datasets
- Process Isolation
- Separate processes for different engines
- Independent resource management
- Fault isolation
- Resource Management
- Security Integration