Skip to main content

Polars Backend

The Polars backend executes Teckel pipelines using Polars, a high-performance DataFrame library written in Rust with a focus on lazy evaluation and memory efficiency.

Polars flow

Architecture

PolarsBackend

The PolarsBackend implements the Backend trait with DataFrame = polars::DataFrame. Unlike DataFusion which uses lazy logical plans by default, Polars uses eager DataFrames as the caching type -- but individual transforms leverage lazy evaluation internally via .lazy() / .collect().

Execution Model

Polars transforms follow a common pattern:

  1. Get the input DataFrame from the cache.
  2. Convert to a LazyFrame with .lazy().
  3. Apply the transformation.
  4. Collect back to an eager DataFrame with .collect().

This gives the Polars query optimizer a chance to optimize each transform while keeping the cache simple (eager DataFrames).

For complex transforms that cannot be expressed with the Polars expression API, a polars::sql::SQLContext is used to execute SQL queries.

Transform Implementation

Polars-Native Transforms

These use the Polars expression API directly:

TransformImplementation
Selectdf.lazy().select(exprs).collect()
Wheredf.lazy().filter(expr).collect()
GroupBydf.lazy().group_by(exprs).agg(exprs).collect()
OrderBydf.lazy().sort_by_exprs(cols, opts).collect() with SortMultipleOptions
Distinctdf.unique_stable(None, UniqueKeepStrategy::First, None)
Limitdf.head(Some(count))
AddColumnslf.with_column(expr.alias(name)) per column
DropColumnsdf.drop_many(&cols)
RenameColumnsdf.rename(old, new) per mapping
CastColumnslf.with_column(col(name).cast(dtype)) per column
Sampledf.sample_n_literal(n, with_replacement, false, seed)

SQL-Based Transforms

These use Polars' SQLContext for complex operations:

TransformSQL Pattern
JoinSELECT * FROM __left {JOIN_TYPE} __right ON condition
IntersectSELECT * FROM t0 INTERSECT SELECT * FROM t1
ExceptSELECT * FROM __left EXCEPT SELECT * FROM __right
SqlRegister views, execute user's SQL query
WindowSELECT *, func() OVER (PARTITION BY ... ORDER BY ...) AS alias FROM __src
PivotConditional aggregation via SQL
UnpivotUNION ALL of per-value-column SELECTs
RollupSELECT ... GROUP BY ROLLUP(cols)
CubeSELECT ... GROUP BY CUBE(cols)
ConditionalSELECT *, CASE WHEN ... END AS col FROM __src
Scd2Complex multi-part SQL with current/incoming dimension logic

Union Implementation

Union uses Polars' concat() function to stack multiple LazyFrames:

let dfs: Vec<LazyFrame> = sources.iter()
.map(|s| get(cache, s).map(|df| df.lazy()))
.collect::<Result<Vec<_>, _>>()?;
let mut result = concat(dfs, UnionArgs::default())?;
if !t.all {
result = result.unique(None, UniqueKeepStrategy::First);
}
result.collect()

Join Implementation

Joins in Polars use SQL via SQLContext because Polars' native join API does not directly support arbitrary join conditions (ON clauses with expressions). The left and right DataFrames are registered as temporary views and a SQL join query is constructed:

let mut ctx = polars::sql::SQLContext::new();
ctx.register("__left", result_df.lazy());
ctx.register("__right", right_df.lazy());
let query = format!("SELECT * FROM __left {jt} __right ON {on_clause}");
result = ctx.execute(&query)?;

Cross joins use the native result.cross_join(right, None) API.

SchemaEnforce

The Polars backend supports schema enforcement by selecting and casting columns:

let select_exprs: Vec<Expr> = t.columns.iter()
.map(|c| col(c.name.as_str()).cast(parse_polars_dtype(&c.data_type)))
.collect();
df.lazy().select(select_exprs).collect()

No-Op Transforms

Since Polars runs in-process on a single machine, these transforms are no-ops that simply return the input DataFrame:

  • Repartition -- no concept of partitions in local execution
  • Coalesce -- no concept of partitions in local execution

Type Handling

Polars has its own type system separate from Arrow. The parse_polars_dtype() function maps Teckel type strings to Polars DataType:

Teckel TypePolars Type
stringDataType::String
integer, intDataType::Int32
long, bigintDataType::Int64
floatDataType::Float32
doubleDataType::Float64
booleanDataType::Boolean
dateDataType::Date
timestampDataType::Datetime(Microseconds, None)

Limitations

FeatureStatusNotes
MergeNot supportedRequires mutable table format
ParseNot supportedCSV/JSON parsing within transforms not implemented
FlattenNot supportedStruct flattening not available in Polars SQL
RepartitionNo-opSingle-machine, no partitions
CoalesceNo-opSingle-machine, no partitions

Usage

use teckel_polars::PolarsBackend;
use teckel_engine::PipelineExecutor;
use teckel_parser;

let context = teckel_parser::parse(&yaml, &variables)?;
let backend = PolarsBackend::new();
let executor = PipelineExecutor::new(backend);
executor.execute(&context).await?;