Freshness Monitoring
Freshness monitoring helps you detect when source data has not been updated within expected time frames. This is critical for maintaining data quality and ensuring your analytics reflect current business state.
What is Freshness?
Freshness measures how recently source data has been updated. Olytix Core compares the most recent timestamp in your data to the current time and alerts when data exceeds defined thresholds.
┌─────────────────────────────────────────────────────────────────┐
│ Freshness Timeline │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Data Last Updated Now │
│ │ │ │
│ ▼ ▼ │
│ ──────●───────────────────────●──── ─────► Time │
│ │ │ │
│ │◄─────── 6 hours ─────►│ │
│ │
│ If warn_after: 4 hours → ⚠️ Warning │
│ If error_after: 8 hours → ✓ OK │
│ │
└─────────────────────────────────────────────────────────────────┘
Configuring Freshness
Basic Freshness Configuration
Add freshness settings to your source definition:
version: 2
sources:
- name: raw
database: analytics_db
schema: raw_data
# Required: specify which column contains the timestamp
loaded_at_field: updated_at
# Freshness thresholds
freshness:
warn_after:
count: 12
period: hour
error_after:
count: 24
period: hour
tables:
- name: orders
description: "Order transactions, updated hourly"
- name: customers
description: "Customer data, updated daily"
Table-Level Overrides
Different tables may have different update frequencies. Override source-level freshness per table:
version: 2
sources:
- name: raw
database: analytics_db
schema: raw_data
loaded_at_field: updated_at
# Default: expect data within 12 hours
freshness:
warn_after:
count: 12
period: hour
error_after:
count: 24
period: hour
tables:
- name: orders
description: "Real-time order stream"
# Override: stricter freshness for orders
loaded_at_field: created_at
freshness:
warn_after:
count: 1
period: hour
error_after:
count: 2
period: hour
- name: customers
description: "Daily customer sync"
# Override: relaxed freshness for daily data
freshness:
warn_after:
count: 36
period: hour
error_after:
count: 48
period: hour
- name: historical_rates
description: "Monthly exchange rates"
# Disable freshness for rarely updated data
freshness: null
Freshness Thresholds
Time Periods
| Period | Description | Example |
|---|---|---|
minute | Minutes | Real-time event streams |
hour | Hours | Hourly batch syncs |
day | Days | Daily ETL jobs |
Threshold Types
| Threshold | Behavior | Use Case |
|---|---|---|
warn_after | Logs a warning | Early alert before SLA breach |
error_after | Fails freshness check | Hard SLA enforcement |
Configuration Examples
Real-Time Data (Streaming)
freshness:
warn_after:
count: 15
period: minute
error_after:
count: 30
period: minute
Hourly Batch Processing
freshness:
warn_after:
count: 2
period: hour
error_after:
count: 4
period: hour
Daily ETL Jobs
freshness:
warn_after:
count: 26
period: hour
error_after:
count: 48
period: hour
Weekly Data Loads
freshness:
warn_after:
count: 8
period: day
error_after:
count: 10
period: day
The loaded_at_field
The loaded_at_field specifies which column Olytix Core queries to determine data freshness.
Choosing the Right Field
| Field Type | When to Use | Example |
|---|---|---|
updated_at | Rows are updated in place | CRM contacts, user profiles |
created_at | Append-only data | Event logs, transactions |
_fivetran_synced | Fivetran-synced tables | Any Fivetran source |
_airbyte_emitted_at | Airbyte-synced tables | Any Airbyte source |
ingested_at | Custom ingestion timestamp | Custom ETL pipelines |
Field Requirements
The loaded_at_field must:
- Be a timestamp or datetime column
- Exist in the source table
- Contain non-null values for recent data
- Be indexed for performance (recommended)
Example: Multiple Sync Tools
version: 2
sources:
- name: salesforce
database: raw_salesforce
schema: fivetran_salesforce
# Fivetran adds this column automatically
loaded_at_field: _fivetran_synced
freshness:
warn_after:
count: 6
period: hour
tables:
- name: opportunity
- name: account
- name: contact
- name: hubspot
database: raw_hubspot
schema: airbyte_hubspot
# Airbyte adds this column automatically
loaded_at_field: _airbyte_emitted_at
freshness:
warn_after:
count: 6
period: hour
tables:
- name: companies
- name: contacts
- name: deals
Running Freshness Checks
CLI Commands
# Check freshness for all sources
olytix-core source freshness
# Check freshness for specific source
olytix-core source freshness --select raw
# Check freshness with JSON output
olytix-core source freshness --output json
Sample Output
Checking source freshness...
Source: raw
Table: orders
Loaded at field: created_at
Max value: 2024-01-15 14:32:00
Freshness: 47 minutes ago
Status: ✓ PASS
Table: customers
Loaded at field: updated_at
Max value: 2024-01-15 08:00:00
Freshness: 7 hours ago
Status: ⚠️ WARN (threshold: 6 hours)
Table: historical_rates
Freshness: SKIPPED (freshness not configured)
Source: salesforce
Table: opportunity
Loaded at field: _fivetran_synced
Max value: 2024-01-14 18:00:00
Freshness: 21 hours ago
Status: ❌ ERROR (threshold: 12 hours)
Summary:
Passed: 1
Warnings: 1
Errors: 1
Warehouse-Specific Considerations
PostgreSQL
sources:
- name: production
database: production_db
schema: public
loaded_at_field: updated_at
tables:
- name: orders
# Ensure the timestamp column is indexed
meta:
index_columns: [updated_at]
The freshness query generated:
SELECT MAX(updated_at) AS max_loaded_at
FROM production_db.public.orders
Snowflake
sources:
- name: raw
database: RAW_DATA
schema: ECOMMERCE
loaded_at_field: _LOADED_AT
# Consider clustering for large tables
meta:
cluster_by: [_LOADED_AT]
tables:
- name: EVENTS
freshness:
warn_after:
count: 30
period: minute
BigQuery
sources:
- name: analytics
database: my-gcp-project
schema: raw_events
loaded_at_field: event_timestamp
tables:
- name: page_views
# BigQuery partitioning improves freshness query performance
meta:
partition_by: event_timestamp
partition_type: DAY
freshness:
warn_after:
count: 1
period: hour
Integrating with CI/CD
GitHub Actions Example
# .github/workflows/freshness.yml
name: Source Freshness Check
on:
schedule:
- cron: '0 * * * *' # Every hour
jobs:
freshness:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Olytix Core
run: pip install olytix-core
- name: Check Freshness
run: olytix-core source freshness
env:
OLYTIX_DATABASE__HOST: ${{ secrets.DB_HOST }}
OLYTIX_DATABASE__PASSWORD: ${{ secrets.DB_PASSWORD }}
- name: Notify on Failure
if: failure()
uses: slackapi/slack-github-action@v1
with:
channel-id: 'data-alerts'
slack-message: 'Source freshness check failed!'
Pre-Run Validation
Add freshness checks before model runs:
#!/bin/bash
# scripts/safe_run.sh
# Check freshness first
olytix-core source freshness
# Exit if freshness check fails
if [ $? -ne 0 ]; then
echo "Freshness check failed. Aborting model run."
exit 1
fi
# Run models
olytix-core run
Freshness Best Practices
Set Realistic Thresholds
- Base thresholds on actual data pipeline SLAs
- Add buffer time for expected delays
- Set
warn_afterto catch issues beforeerror_after
Monitor Critical Sources First
Focus freshness monitoring on:
- Revenue-impacting data (orders, transactions)
- Customer-facing metrics
- Compliance-related data
- Real-time operational data
Handle Time Zones
Ensure loaded_at_field timestamps are in UTC or account for timezone differences:
sources:
- name: raw
meta:
timezone: UTC
loaded_at_field: created_at_utc
freshness:
warn_after:
count: 6
period: hour
Document Expectations
Include freshness expectations in descriptions:
tables:
- name: orders
description: |
Order transactions from the e-commerce platform.
**Freshness SLA:** Data should be no more than 1 hour old.
**Update Frequency:** Real-time via Kafka connector.
**Owner:** Data Platform Team
freshness:
warn_after:
count: 1
period: hour
error_after:
count: 2
period: hour
Disabling Freshness
For tables that do not need freshness monitoring:
tables:
- name: country_codes
description: "Static reference table"
freshness: null
- name: exchange_rates_historical
description: "Historical data, not updated"
freshness: null
Next Steps
Now that you understand freshness monitoring:
- Schema Documentation - Document source columns and data types
- Data Tests - Add quality tests to sources
- Lineage Basics - Track data flow from sources