Skip to main content
Version: 2.0

Advanced Transformations

This section covers specialized transformations: raw SQL, SCD Type 2, API enrichment, schema enforcement, data assertions, partitioning, and custom components.


SQL (8.22)

Executes a raw SQL query against one or more assets registered as temporary views.

Schema:

FieldTypeRequiredDescription
querystringYesSQL query string.
viewsNonEmptyList[AssetRef]YesAssets to register as temporary views before query.

All assets listed in views are registered as temporary views using their asset name as the view name. The SQL dialect is implementation-defined (e.g., Spark SQL for the Spark runtime).

Example — complex query across multiple assets:

transformation:
- name: customerOrders
sql:
views: [customers, orders]
query: >
SELECT c.name, o.order_id, o.amount
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE o.amount > 100

Example — using SQL for complex aggregation:

transformation:
- name: monthlyStats
sql:
views: [transactions]
query: >
SELECT
date_trunc('month', txn_date) as month,
count(*) as total_txns,
sum(amount) as total_amount,
percentile_approx(amount, 0.5) as median_amount
FROM transactions
GROUP BY date_trunc('month', txn_date)
ORDER BY month

Tip: Use sql when the built-in transformations do not cover your use case, or when porting existing SQL logic into a Teckel pipeline.


SCD Type 2 (8.25)

SCD Type 2 flow

Slowly Changing Dimension Type 2 — tracks historical changes to dimension records. This transformation takes two inputs rather than a single from.

Schema:

FieldTypeRequiredDescription
currentAssetRefYesExisting dimension table (previous state).
incomingAssetRefYesNew/updated records to merge.
keyColumnsNonEmptyList[Column]YesBusiness key columns for matching records.
trackColumnsNonEmptyList[Column]YesColumns to track for changes.
startDateColumnstringYesName for the valid-from timestamp column.
endDateColumnstringYesName for the valid-to timestamp column.
currentFlagColumnstringYesName for the is-current boolean column.

How it works:

  1. Match incoming records to current records by keyColumns.
  2. For matched records where any trackColumns value has changed:
    • Close the existing record: set endDateColumn to current timestamp, currentFlagColumn to false.
    • Insert a new record from incoming: startDateColumn = current timestamp, endDateColumn = NULL, currentFlagColumn = true.
  3. For matched records with no changes: no modification.
  4. For new records (no match in current): insert with startDateColumn = current timestamp, endDateColumn = NULL, currentFlagColumn = true.
  5. Output is the full dimension table (all historical + current records).

Example — customer dimension with history tracking:

transformation:
- name: customerDim
scd2:
current: existingCustomers
incoming: newCustomerData
keyColumns: [customer_id]
trackColumns: [name, email, address]
startDateColumn: valid_from
endDateColumn: valid_to
currentFlagColumn: is_current

Tip: Ensure current contains the full dimension table from the previous run, including all historical records. The output replaces the entire dimension.


Enrich (8.26)

Enriches records by calling an external HTTP API for each distinct key value.

Schema:

FieldTypeRequiredDefaultDescription
fromAssetRefYesSource asset.
urlstringYesAPI endpoint URL. May contain ${keyColumn} placeholder.
methodstringNo"GET"HTTP method.
keyColumnColumnYesColumn whose value is sent to the API.
responseColumnstringYesName for the column holding the API response.
headersMap[string, string]No{}HTTP request headers.
onErrorstringNo"null"Error handling: "null", "fail", or "skip".
timeoutintegerNo30000Request timeout in milliseconds.
maxRetriesintegerNo3Max retry attempts for 5xx/timeout errors.

Key behaviors:

  • The API is called once per distinct value of keyColumn. Results are cached and reused for duplicates.
  • The response body is stored as a string in responseColumn.
  • HTTP 4xx errors are not retried. HTTP 5xx and timeouts are retried with exponential backoff.

Example — enrich orders with customer data from an API:

transformation:
- name: enrichedOrders
enrich:
from: orders
url: "https://api.example.com/customers/${customer_id}"
method: GET
keyColumn: customer_id
responseColumn: customer_data
headers:
Authorization: "Bearer {{secrets.api_token}}"
Accept: "application/json"
onError: "null"
timeout: 5000
maxRetries: 2

Tip: Use onError: "skip" to silently drop rows where the API fails, or "fail" to abort the pipeline on any API error.


Schema Enforce (8.27)

Validates and/or evolves the schema of a dataset against an expected definition.

Schema:

FieldTypeRequiredDefaultDescription
fromAssetRefYesSource asset.
modestringNo"strict"Enforcement mode.
columnsNonEmptyList[SchemaColumn]YesExpected schema.

SchemaColumn object:

FieldTypeRequiredDefaultDescription
nameColumnYesColumn name.
dataTypestringYesExpected data type.
nullablebooleanNotrueWhether NULL values are allowed.
defaultExpressionNoDefault value for missing columns (evolve mode only).

Modes:

ModeExtra columnsMissing columnsType mismatch
strictError E-SCHEMA-003Error E-SCHEMA-004Attempt cast; NULL on fail
evolvePreservedAdded with default (or NULL if none)Attempt cast; NULL on fail

Example — strict schema enforcement:

transformation:
- name: validated
schemaEnforce:
from: rawData
mode: strict
columns:
- name: id
dataType: integer
nullable: false
- name: name
dataType: string
nullable: false
- name: email
dataType: string
nullable: true
- name: created_at
dataType: timestamp
nullable: false

Example — schema evolution with defaults:

transformation:
- name: evolved
schemaEnforce:
from: legacyData
mode: evolve
columns:
- name: id
dataType: integer
nullable: false
- name: name
dataType: string
- name: version
dataType: integer
default: "1"
- name: migrated
dataType: boolean
default: "false"

Tip: Use strict mode in production pipelines to catch unexpected schema changes early. Use evolve mode when integrating data from sources that may add new columns over time.


Assertion (8.28)

Validates data quality rules without modifying the dataset (unless onFailure: drop is used).

Schema:

FieldTypeRequiredDefaultDescription
fromAssetRefYesSource asset.
checksNonEmptyList[Check]YesQuality checks.
onFailurestringNo"fail"Failure handling mode.

Check object:

FieldTypeRequiredDescription
columnColumnNoColumn to check. Required for column-level rules.
rulestringYesValidation rule.
descriptionstringNoHuman-readable description.

Built-in rules:

RuleRequires columnDescription
not_nullYesColumn must not contain NULL values.
uniqueYesColumn must contain unique values.
Any ConditionNoBoolean expression evaluated per row.

Failure modes:

ModeBehavior
failAbort the pipeline with error. Log failing rows.
warnLog a warning with failing row count. Continue unchanged.
dropRemove rows that fail any check. Log dropped row count.

Example — validate data quality before output:

transformation:
- name: qualityChecked
assertion:
from: processedData
onFailure: fail
checks:
- column: id
rule: not_null
description: "ID must never be null"
- column: email
rule: unique
description: "Email addresses must be unique"
- rule: "age >= 0 AND age <= 150"
description: "Age must be within valid range"

Example — drop invalid rows with a warning:

transformation:
- name: cleanedData
assertion:
from: rawRecords
onFailure: drop
checks:
- column: amount
rule: not_null
- rule: "amount > 0"
description: "Amount must be positive"

Tip: Use onFailure: warn during development to see data quality issues without stopping the pipeline. Switch to fail in production.


Repartition (8.29)

Changes the number of partitions with a full shuffle. Use this to increase or redistribute partitions.

Schema:

FieldTypeRequiredDefaultDescription
fromAssetRefYesSource asset.
numPartitionsintegerYesTarget partition count. Must be > 0.
columnsList[Column]No[]Columns to hash-partition by. Empty = round-robin.

Example — repartition by hash on a key column:

transformation:
- name: repartitioned
repartition:
from: largeDataset
numPartitions: 200
columns: [customer_id]

Example — round-robin repartition:

transformation:
- name: balanced
repartition:
from: skewedData
numPartitions: 100

Tip: Use repartition with columns when you need data locality for downstream joins or aggregations on those columns.


Coalesce (8.30)

Reduces the number of partitions without a full shuffle. This is more efficient than repartition when reducing partition count.

Schema:

FieldTypeRequiredDescription
fromAssetRefYesSource asset.
numPartitionsintegerYesTarget partition count. Must be > 0 and <= current count.

Example — reduce partitions before writing output:

transformation:
- name: compacted
coalesce:
from: processedData
numPartitions: 10

Tip: Use coalesce before writing output to avoid creating many small files. It avoids the full shuffle of repartition, making it faster when you only need to reduce partition count.


Custom (8.31)

Invokes a user-registered component for transformations not covered by built-in operations.

Schema:

FieldTypeRequiredDefaultDescription
fromAssetRefYesSource asset.
componentstringYesRegistered component identifier.
optionsMap[string, string]No{}Component-specific options.

A custom component must:

  1. Accept a single tabular dataset as input.
  2. Accept a string-to-string options map.
  3. Return a single tabular dataset as output.

The registration mechanism is implementation-defined. The component must be registered before the pipeline executes; otherwise, the runtime raises E-COMP-001.

Example — apply a custom ML scoring component:

transformation:
- name: scored
custom:
from: features
component: "com.example.ml.ScoringModel"
options:
model_path: "s3://models/latest/model.pkl"
threshold: "0.75"
output_column: "prediction"

Example — custom data masking:

transformation:
- name: masked
custom:
from: sensitiveData
component: "com.example.security.DataMasker"
options:
columns: "ssn,credit_card"
method: "sha256"

Tip: Use custom as an escape hatch for domain-specific logic that cannot be expressed with built-in transformations. Keep the component interface simple — one dataset in, one dataset out.