Skip to main content

Incremental Models

For Data Analysts

Incremental models process only new or changed data instead of rebuilding entire tables. This dramatically reduces build time and warehouse costs for large datasets.

How Incremental Models Work

On the first run, an incremental model behaves like a table (full build). On subsequent runs, it only processes new rows:

First Run (Full):
┌─────────────────┐ ┌─────────────────┐
│ Source Data │ ──► │ Target Table │
│ (all rows) │ │ (all rows) │
└─────────────────┘ └─────────────────┘

Subsequent Runs (Incremental):
┌─────────────────┐ ┌─────────────────┐
│ Source Data │ │ Target Table │
│ (new rows │ ──► │ (existing + │
│ only) │ │ new rows) │
└─────────────────┘ └─────────────────┘

Basic Incremental Model

{{ config(
materialized='incremental',
unique_key='order_id'
) }}

SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
updated_at
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
-- Only process rows newer than the latest in target
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Key Components

ComponentPurpose
materialized='incremental'Enable incremental processing
unique_keyColumn(s) to identify rows for updates
is_incremental()Jinja function, true after first run
{{ this }}References the target table

Incremental Strategies

Olytix Core supports three incremental strategies, each suited to different use cases.

Append Strategy

New rows are inserted without checking for duplicates. Best for immutable, append-only data.

{{ config(
materialized='incremental',
incremental_strategy='append'
) }}

SELECT
event_id,
user_id,
event_type,
event_properties,
event_timestamp
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}

Use When:

  • Data is immutable (events, logs)
  • Rows are never updated after creation
  • No duplicate handling needed

Generated SQL:

INSERT INTO analytics.fct_events
SELECT ... FROM raw.events
WHERE event_timestamp > '2024-01-15 10:30:00'

Merge Strategy

Updates existing rows and inserts new ones based on unique_key. Best for mutable data.

{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}

SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
updated_at
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Use When:

  • Rows can be updated (order status changes)
  • Need to track the latest state
  • Source provides updated_at timestamps

Generated SQL:

MERGE INTO analytics.fct_orders AS target
USING (
SELECT ... FROM raw.orders
WHERE updated_at > '2024-01-15 10:30:00'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...

Delete+Insert Strategy

Deletes matching rows then inserts new data. Best for batch updates within partitions.

{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='delete+insert',
incremental_predicates=[
"order_date >= CURRENT_DATE - INTERVAL '7 days'"
]
) }}

SELECT
order_id,
customer_id,
order_date,
total_amount,
status
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
{% endif %}

Use When:

  • Replacing entire partitions
  • Merge not supported or too slow
  • Data arrives in batches by date

Generated SQL:

DELETE FROM analytics.fct_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days';

INSERT INTO analytics.fct_orders
SELECT ... FROM raw.orders
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days';

Strategy Comparison

StrategyUpdatesDeletesBest ForPerformance
appendNoNoImmutable eventsFastest
mergeYesNoMutable recordsMedium
delete+insertYesYesPartition replacementFast

Composite Unique Keys

Use multiple columns as the unique key:

{{ config(
materialized='incremental',
unique_key=['order_id', 'line_number'],
incremental_strategy='merge'
) }}

SELECT
order_id,
line_number,
product_id,
quantity,
unit_price,
updated_at
FROM {{ source('raw', 'order_lines') }}

{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Incremental Predicates

Optimize merge performance by limiting the scan on the target table:

{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
incremental_predicates=[
"event_date >= CURRENT_DATE - INTERVAL '3 days'"
]
) }}

SELECT
event_id,
event_date,
user_id,
event_type
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
WHERE event_date >= CURRENT_DATE - INTERVAL '3 days'
{% endif %}

The predicate limits the rows scanned in the target during merge:

MERGE INTO target
USING source
ON target.event_id = source.event_id
AND target.event_date >= CURRENT_DATE - INTERVAL '3 days' -- Predicate

Handling Late-Arriving Data

Data may arrive after the incremental window. Handle with a lookback period:

{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge'
) }}

{% set lookback_hours = 6 %}

SELECT
event_id,
user_id,
event_type,
event_timestamp,
_loaded_at
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
-- Use _loaded_at for late arrivals, with lookback buffer
WHERE _loaded_at > (
SELECT MAX(_loaded_at) - INTERVAL '{{ lookback_hours }} hours'
FROM {{ this }}
)
{% endif %}

Schema Changes

Handle column additions, removals, or type changes:

{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
) }}

Schema Change Options

OptionBehavior
ignoreIgnore new columns (default)
failError if schema changes
append_new_columnsAdd new columns, keep existing
sync_all_columnsAdd new, drop removed columns

Micro-Batch Processing

For very large incremental loads, process in batches:

{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='append'
) }}

{% set batch_size = 1000000 %}

WITH source_data AS (
SELECT
event_id,
user_id,
event_type,
event_timestamp,
ROW_NUMBER() OVER (ORDER BY event_timestamp) AS row_num
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
)

SELECT
event_id,
user_id,
event_type,
event_timestamp
FROM source_data
WHERE row_num <= {{ batch_size }}

Slowly Changing Dimensions

Implement SCD Type 2 with incremental models:

{{ config(
materialized='incremental',
unique_key='customer_sk',
incremental_strategy='merge'
) }}

{% if is_incremental() %}

-- Close expired records
SELECT
customer_sk,
customer_id,
customer_name,
customer_email,
customer_region,
valid_from,
CURRENT_TIMESTAMP AS valid_to,
FALSE AS is_current
FROM {{ this }}
WHERE is_current = TRUE
AND customer_id IN (
SELECT customer_id
FROM {{ source('raw', 'customers') }}
WHERE updated_at > (SELECT MAX(valid_from) FROM {{ this }})
)

UNION ALL

{% endif %}

-- Insert new/updated records
SELECT
{{ dbt_utils.generate_surrogate_key(['customer_id', 'updated_at']) }} AS customer_sk,
customer_id,
customer_name,
customer_email,
customer_region,
updated_at AS valid_from,
CAST('9999-12-31' AS TIMESTAMP) AS valid_to,
TRUE AS is_current
FROM {{ source('raw', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(valid_from) FROM {{ this }})
{% endif %}

Full Refresh

Rebuild an incremental model from scratch:

# Single model
olytix-core run --select fct_events --full-refresh

# All models with dependencies
olytix-core run --select +fct_events --full-refresh

# All incremental models
olytix-core run --full-refresh

When to Full Refresh

  • Schema changes requiring rebuild
  • Data quality corrections
  • Historical data reprocessing
  • After fixing bugs in transformation logic

Testing Incremental Models

Verify Row Counts

-- tests/test_fct_events_count.sql
{% set tolerance = 0.01 %}

WITH source_count AS (
SELECT COUNT(*) AS cnt FROM {{ source('raw', 'events') }}
),
target_count AS (
SELECT COUNT(*) AS cnt FROM {{ ref('fct_events') }}
)
SELECT
CASE
WHEN ABS(s.cnt - t.cnt) / s.cnt > {{ tolerance }}
THEN 'FAIL: Row count drift > {{ tolerance * 100 }}%'
END AS result
FROM source_count s, target_count t
WHERE ABS(s.cnt - t.cnt) / s.cnt > {{ tolerance }}

Check for Duplicates

-- In schema.yml
models:
- name: fct_events
columns:
- name: event_id
tests:
- unique
- not_null

Performance Optimization

1. Partition Target Tables

{{ config(
materialized='incremental',
unique_key='event_id',
partition_by={
"field": "event_date",
"data_type": "date",
"granularity": "day"
}
) }}

2. Use Incremental Predicates

{{ config(
incremental_predicates=[
"event_date >= CURRENT_DATE - INTERVAL '7 days'"
]
) }}

3. Index Key Columns

{{ config(
post_hook=[
"CREATE INDEX IF NOT EXISTS idx_updated_at ON {{ this }} (updated_at)"
]
) }}

4. Limit Source Scans

{% if is_incremental() %}
-- Efficient: filter in source
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}

Common Patterns

Snapshot with Latest State

Capture the latest state of each entity:

{{ config(
materialized='incremental',
unique_key='customer_id',
incremental_strategy='merge'
) }}

SELECT
customer_id,
customer_name,
customer_email,
subscription_status,
updated_at,
CURRENT_TIMESTAMP AS snapshot_at
FROM {{ source('raw', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Running Aggregates

Maintain running totals:

{{ config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='merge'
) }}

SELECT
DATE(order_timestamp) AS date_day,
COUNT(*) AS daily_orders,
SUM(total_amount) AS daily_revenue,
SUM(SUM(total_amount)) OVER (ORDER BY DATE(order_timestamp)) AS cumulative_revenue
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE DATE(order_timestamp) >= (SELECT MAX(date_day) FROM {{ this }})
{% endif %}
GROUP BY DATE(order_timestamp)

Event Deduplication

Remove duplicate events:

{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge'
) }}

WITH ranked_events AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY event_id
ORDER BY _loaded_at DESC
) AS rn
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
)

SELECT
event_id,
user_id,
event_type,
event_properties,
event_timestamp,
_loaded_at
FROM ranked_events
WHERE rn = 1

Troubleshooting

Duplicate Rows

Symptom: Row count grows unexpectedly

Cause: Missing or incorrect unique_key

Fix:

{{ config(
unique_key='event_id' -- Ensure this is truly unique
) }}

Missing Data

Symptom: Some records not appearing

Cause: Incremental filter too aggressive

Fix: Add lookback buffer

WHERE updated_at > (
SELECT MAX(updated_at) - INTERVAL '1 hour' FROM {{ this }}
)

Slow Merges

Symptom: Incremental runs slower than expected

Cause: Full table scans during merge

Fix: Add incremental predicates

{{ config(
incremental_predicates=["order_date >= CURRENT_DATE - 7"]
) }}

Best Practices

  1. Always define unique_key for merge strategy
  2. Use updated_at or _loaded_at for reliable filtering
  3. Add lookback buffers for late-arriving data
  4. Partition large tables by date
  5. Test after initial load and after incremental runs
  6. Document your incremental logic for maintainability

Next Steps

  1. Understand materializations - Compare all materialization types
  2. Learn about lineage - Track data flow through incremental models
  3. Define cubes - Build semantic layer on incremental tables