Incremental Models
Incremental models process only new or changed data instead of rebuilding entire tables. This dramatically reduces build time and warehouse costs for large datasets.
How Incremental Models Work
On the first run, an incremental model behaves like a table (full build). On subsequent runs, it only processes new rows:
First Run (Full):
┌─────────────────┐ ┌─────────────────┐
│ Source Data │ ──► │ Target Table │
│ (all rows) │ │ (all rows) │
└─────────────────┘ └─────────────────┘
Subsequent Runs (Incremental):
┌─────────────────┐ ┌─────────────────┐
│ Source Data │ │ Target Table │
│ (new rows │ ──► │ (existing + │
│ only) │ │ new rows) │
└─────────────────┘ └─────────────────┘
Basic Incremental Model
{{ config(
materialized='incremental',
unique_key='order_id'
) }}
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
updated_at
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
-- Only process rows newer than the latest in target
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Key Components
| Component | Purpose |
|---|---|
materialized='incremental' | Enable incremental processing |
unique_key | Column(s) to identify rows for updates |
is_incremental() | Jinja function, true after first run |
{{ this }} | References the target table |
Incremental Strategies
Olytix Core supports three incremental strategies, each suited to different use cases.
Append Strategy
New rows are inserted without checking for duplicates. Best for immutable, append-only data.
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}
SELECT
event_id,
user_id,
event_type,
event_properties,
event_timestamp
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
Use When:
- Data is immutable (events, logs)
- Rows are never updated after creation
- No duplicate handling needed
Generated SQL:
INSERT INTO analytics.fct_events
SELECT ... FROM raw.events
WHERE event_timestamp > '2024-01-15 10:30:00'
Merge Strategy
Updates existing rows and inserts new ones based on unique_key. Best for mutable data.
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
updated_at
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Use When:
- Rows can be updated (order status changes)
- Need to track the latest state
- Source provides updated_at timestamps
Generated SQL:
MERGE INTO analytics.fct_orders AS target
USING (
SELECT ... FROM raw.orders
WHERE updated_at > '2024-01-15 10:30:00'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
Delete+Insert Strategy
Deletes matching rows then inserts new data. Best for batch updates within partitions.
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='delete+insert',
incremental_predicates=[
"order_date >= CURRENT_DATE - INTERVAL '7 days'"
]
) }}
SELECT
order_id,
customer_id,
order_date,
total_amount,
status
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
{% endif %}
Use When:
- Replacing entire partitions
- Merge not supported or too slow
- Data arrives in batches by date
Generated SQL:
DELETE FROM analytics.fct_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days';
INSERT INTO analytics.fct_orders
SELECT ... FROM raw.orders
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days';
Strategy Comparison
| Strategy | Updates | Deletes | Best For | Performance |
|---|---|---|---|---|
append | No | No | Immutable events | Fastest |
merge | Yes | No | Mutable records | Medium |
delete+insert | Yes | Yes | Partition replacement | Fast |
Composite Unique Keys
Use multiple columns as the unique key:
{{ config(
materialized='incremental',
unique_key=['order_id', 'line_number'],
incremental_strategy='merge'
) }}
SELECT
order_id,
line_number,
product_id,
quantity,
unit_price,
updated_at
FROM {{ source('raw', 'order_lines') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Incremental Predicates
Optimize merge performance by limiting the scan on the target table:
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
incremental_predicates=[
"event_date >= CURRENT_DATE - INTERVAL '3 days'"
]
) }}
SELECT
event_id,
event_date,
user_id,
event_type
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE event_date >= CURRENT_DATE - INTERVAL '3 days'
{% endif %}
The predicate limits the rows scanned in the target during merge:
MERGE INTO target
USING source
ON target.event_id = source.event_id
AND target.event_date >= CURRENT_DATE - INTERVAL '3 days' -- Predicate
Handling Late-Arriving Data
Data may arrive after the incremental window. Handle with a lookback period:
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge'
) }}
{% set lookback_hours = 6 %}
SELECT
event_id,
user_id,
event_type,
event_timestamp,
_loaded_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
-- Use _loaded_at for late arrivals, with lookback buffer
WHERE _loaded_at > (
SELECT MAX(_loaded_at) - INTERVAL '{{ lookback_hours }} hours'
FROM {{ this }}
)
{% endif %}
Schema Changes
Handle column additions, removals, or type changes:
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
) }}
Schema Change Options
| Option | Behavior |
|---|---|
ignore | Ignore new columns (default) |
fail | Error if schema changes |
append_new_columns | Add new columns, keep existing |
sync_all_columns | Add new, drop removed columns |
Micro-Batch Processing
For very large incremental loads, process in batches:
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='append'
) }}
{% set batch_size = 1000000 %}
WITH source_data AS (
SELECT
event_id,
user_id,
event_type,
event_timestamp,
ROW_NUMBER() OVER (ORDER BY event_timestamp) AS row_num
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
)
SELECT
event_id,
user_id,
event_type,
event_timestamp
FROM source_data
WHERE row_num <= {{ batch_size }}
Slowly Changing Dimensions
Implement SCD Type 2 with incremental models:
{{ config(
materialized='incremental',
unique_key='customer_sk',
incremental_strategy='merge'
) }}
{% if is_incremental() %}
-- Close expired records
SELECT
customer_sk,
customer_id,
customer_name,
customer_email,
customer_region,
valid_from,
CURRENT_TIMESTAMP AS valid_to,
FALSE AS is_current
FROM {{ this }}
WHERE is_current = TRUE
AND customer_id IN (
SELECT customer_id
FROM {{ source('raw', 'customers') }}
WHERE updated_at > (SELECT MAX(valid_from) FROM {{ this }})
)
UNION ALL
{% endif %}
-- Insert new/updated records
SELECT
{{ dbt_utils.generate_surrogate_key(['customer_id', 'updated_at']) }} AS customer_sk,
customer_id,
customer_name,
customer_email,
customer_region,
updated_at AS valid_from,
CAST('9999-12-31' AS TIMESTAMP) AS valid_to,
TRUE AS is_current
FROM {{ source('raw', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(valid_from) FROM {{ this }})
{% endif %}
Full Refresh
Rebuild an incremental model from scratch:
# Single model
olytix-core run --select fct_events --full-refresh
# All models with dependencies
olytix-core run --select +fct_events --full-refresh
# All incremental models
olytix-core run --full-refresh
When to Full Refresh
- Schema changes requiring rebuild
- Data quality corrections
- Historical data reprocessing
- After fixing bugs in transformation logic
Testing Incremental Models
Verify Row Counts
-- tests/test_fct_events_count.sql
{% set tolerance = 0.01 %}
WITH source_count AS (
SELECT COUNT(*) AS cnt FROM {{ source('raw', 'events') }}
),
target_count AS (
SELECT COUNT(*) AS cnt FROM {{ ref('fct_events') }}
)
SELECT
CASE
WHEN ABS(s.cnt - t.cnt) / s.cnt > {{ tolerance }}
THEN 'FAIL: Row count drift > {{ tolerance * 100 }}%'
END AS result
FROM source_count s, target_count t
WHERE ABS(s.cnt - t.cnt) / s.cnt > {{ tolerance }}
Check for Duplicates
-- In schema.yml
models:
- name: fct_events
columns:
- name: event_id
tests:
- unique
- not_null
Performance Optimization
1. Partition Target Tables
{{ config(
materialized='incremental',
unique_key='event_id',
partition_by={
"field": "event_date",
"data_type": "date",
"granularity": "day"
}
) }}
2. Use Incremental Predicates
{{ config(
incremental_predicates=[
"event_date >= CURRENT_DATE - INTERVAL '7 days'"
]
) }}
3. Index Key Columns
{{ config(
post_hook=[
"CREATE INDEX IF NOT EXISTS idx_updated_at ON {{ this }} (updated_at)"
]
) }}
4. Limit Source Scans
{% if is_incremental() %}
-- Efficient: filter in source
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
Common Patterns
Snapshot with Latest State
Capture the latest state of each entity:
{{ config(
materialized='incremental',
unique_key='customer_id',
incremental_strategy='merge'
) }}
SELECT
customer_id,
customer_name,
customer_email,
subscription_status,
updated_at,
CURRENT_TIMESTAMP AS snapshot_at
FROM {{ source('raw', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Running Aggregates
Maintain running totals:
{{ config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='merge'
) }}
SELECT
DATE(order_timestamp) AS date_day,
COUNT(*) AS daily_orders,
SUM(total_amount) AS daily_revenue,
SUM(SUM(total_amount)) OVER (ORDER BY DATE(order_timestamp)) AS cumulative_revenue
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE DATE(order_timestamp) >= (SELECT MAX(date_day) FROM {{ this }})
{% endif %}
GROUP BY DATE(order_timestamp)
Event Deduplication
Remove duplicate events:
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge'
) }}
WITH ranked_events AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY event_id
ORDER BY _loaded_at DESC
) AS rn
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
)
SELECT
event_id,
user_id,
event_type,
event_properties,
event_timestamp,
_loaded_at
FROM ranked_events
WHERE rn = 1
Troubleshooting
Duplicate Rows
Symptom: Row count grows unexpectedly
Cause: Missing or incorrect unique_key
Fix:
{{ config(
unique_key='event_id' -- Ensure this is truly unique
) }}
Missing Data
Symptom: Some records not appearing
Cause: Incremental filter too aggressive
Fix: Add lookback buffer
WHERE updated_at > (
SELECT MAX(updated_at) - INTERVAL '1 hour' FROM {{ this }}
)
Slow Merges
Symptom: Incremental runs slower than expected
Cause: Full table scans during merge
Fix: Add incremental predicates
{{ config(
incremental_predicates=["order_date >= CURRENT_DATE - 7"]
) }}
Best Practices
- Always define unique_key for merge strategy
- Use updated_at or _loaded_at for reliable filtering
- Add lookback buffers for late-arriving data
- Partition large tables by date
- Test after initial load and after incremental runs
- Document your incremental logic for maintainability
Next Steps
- Understand materializations - Compare all materialization types
- Learn about lineage - Track data flow through incremental models
- Define cubes - Build semantic layer on incremental tables