Skip to main content

DataFusion Backend

The DataFusion backend executes Teckel pipelines locally using Apache DataFusion, an extensible query engine written in Rust that uses Apache Arrow as its in-memory format.

DataFusion flow

Architecture

DataFusionBackend

The DataFusionBackend wraps a DataFusion SessionContext and implements the Backend trait with DataFrame = datafusion::DataFrame. The SessionContext provides SQL execution, function registry, and table registration.

Transform Implementation

DataFusion transforms use a mix of native DataFrame API calls and SQL. The approach depends on the transform:

Native API Transforms

These transforms map directly to DataFusion's DataFrame methods:

TransformImplementation
Selectdf.select(cols) -- columns parsed as expressions with schema
Wheredf.filter(expr) -- expression parsed with schema context
GroupBydf.aggregate(group_exprs, agg_exprs)
OrderBydf.sort(sort_exprs) with SortExpr for direction and null ordering
Uniondf.union(other) chained, then df.distinct() if not UNION ALL
Intersectdf.intersect(other) chained across sources
Exceptdf.except(other)
Distinctdf.distinct()
Limitdf.limit(0, Some(count))
AddColumnsdf.with_column(name, expr) iterating over column definitions
DropColumnsdf.drop_columns(&cols)
RenameColumnsdf.with_column_renamed(old, new) iterating over mappings

SQL-Based Transforms

Complex transforms register the input DataFrame as a temporary view and execute SQL:

TransformSQL Pattern
SqlRegister views, execute ctx.sql(&query) directly
CastColumnsSELECT CAST("col" AS type) AS "col", ... FROM view
WindowSELECT *, func() OVER (PARTITION BY ... ORDER BY ... frame) AS alias FROM view
PivotConditional aggregation: agg FILTER (WHERE pivot_col = 'val') AS "val"
UnpivotUNION ALL of SELECT ids, 'col' AS var, "col" AS val per value column
FlattenRecursive struct field projection with configurable separator
RollupSELECT ... GROUP BY ROLLUP(cols)
CubeSELECT ... GROUP BY CUBE(cols)
Scd2Complex multi-part UNION ALL with current/incoming dimension logic
ConditionalCASE WHEN cond THEN val ... END as expression
SampleSELECT * FROM view WHERE random() < fraction

Join Implementation

Joins use the native DataFusion join API with filter expressions. The join condition is parsed against a combined schema of left and right DataFrames:

let combined = left_schema.join(right_schema)?;
let filter_expr = parse_expr_with_schema(ctx, &filter, &combined)?;
result = result.join(right_df, join_type, &[], &[], Some(filter_expr))?;

Join type mapping:

Teckel TypeDataFusion Type
InnerJoinType::Inner
LeftJoinType::Left
RightJoinType::Right
OuterJoinType::Full
CrossJoinType::Inner with empty columns
LeftSemiJoinType::LeftSemi
LeftAntiJoinType::LeftAnti

Cross joins use JoinType::Inner with no columns and no filter to produce the Cartesian product.

Type Mapping

The type_mapping.rs module provides bidirectional conversion between Teckel data types and Arrow data types:

teckel_to_arrow()

Converts Teckel's TeckelDataType enum to Arrow's DataType:

Teckel TypeArrow Type
StringDataType::Utf8
IntegerDataType::Int64
LongDataType::Int64
FloatDataType::Float32
DoubleDataType::Float64
BooleanDataType::Boolean
DateDataType::Date32
TimestampDataType::Timestamp(Nanosecond, None)
BinaryDataType::Binary
Decimal(p, s)DataType::Decimal128(p, s)

arrow_to_teckel()

Reverse mapping used when inferring schema from existing DataFrames.

Reader

The reader.rs module handles input source reading based on format:

FormatMethodOptions
CSVctx.read_csv(path, options)header, delimiter, quote, escape, inferSchema
Parquetctx.read_parquet(path, options)Column projection, predicate pushdown
JSONctx.read_json(path, options)Schema inference, multiline

Writer

The writer.rs module handles output writing with write mode support:

  • Error: Check if path exists; fail if it does
  • Overwrite: Write directly (DataFusion creates or replaces)
  • Append: Read existing data, union with new data, write combined
  • Ignore: Check if path exists; skip if it does

Output formats: CSV, Parquet, JSON.

Limitations

FeatureStatusNotes
MergeNot supportedRequires mutable table format (Delta Lake)
Parse CSVPartialBasic CSV parsing supported, complex formats may not work
RepartitionNo-opSingle-machine execution, no shuffle needed
CoalesceNo-opSingle-machine execution

Usage

use teckel_datafusion::DataFusionBackend;
use teckel_engine::PipelineExecutor;
use teckel_parser;

let context = teckel_parser::parse(&yaml, &variables)?;
let backend = DataFusionBackend::new();
let executor = PipelineExecutor::new(backend);
executor.execute(&context).await?;