Skip to main content

Structured Logging

For Platform Teams

Olytix Core uses structlog for structured JSON logging, enabling powerful log aggregation, filtering, and analysis. This guide covers log configuration, formats, and integration with log management systems.

Overview

Olytix Core's logging system provides:

  • Structured JSON output for machine parsing
  • Correlation IDs for request tracing
  • Contextual information automatically attached to logs
  • Performance metrics embedded in log entries
  • Multiple output formats for development and production

Configuration

Environment Variables

# Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
OLYTIX_LOG_LEVEL=INFO

# Log format (json, console, pretty)
OLYTIX_LOG_FORMAT=json

# Include timestamps
OLYTIX_LOG_TIMESTAMPS=true

# Include caller information (file, line number)
OLYTIX_LOG_CALLER=true

# Log output destination (stdout, stderr, file)
OLYTIX_LOG_OUTPUT=stdout

# Log file path (when output is file)
OLYTIX_LOG_FILE=/var/log/olytix-core/olytix-core.log

# Log rotation settings
OLYTIX_LOG_MAX_SIZE_MB=100
OLYTIX_LOG_BACKUP_COUNT=5

# Enable request logging
OLYTIX_LOG_REQUESTS=true

# Enable query logging
OLYTIX_LOG_QUERIES=true

Programmatic Configuration

# config/logging.py
import structlog
from olytix-core.config import settings

def configure_logging():
"""Configure structured logging for Olytix Core."""

# Shared processors for all loggers
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
]

if settings.log_format == "json":
# Production JSON format
renderer = structlog.processors.JSONRenderer()
elif settings.log_format == "pretty":
# Development pretty format
renderer = structlog.dev.ConsoleRenderer(colors=True)
else:
# Default console format
renderer = structlog.processors.KeyValueRenderer()

structlog.configure(
processors=shared_processors + [
structlog.processors.format_exc_info,
renderer,
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, settings.log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)

Log Formats

JSON Format (Production)

{
"timestamp": "2024-01-15T10:30:45.123456Z",
"level": "info",
"event": "query_executed",
"correlation_id": "req-abc123",
"user_id": "user-456",
"cube": "orders",
"measures": ["revenue", "count"],
"dimensions": ["region", "product_category"],
"duration_ms": 145,
"rows_returned": 250,
"cache_hit": false,
"warehouse": "snowflake",
"caller": {
"file": "query/executor.py",
"line": 89,
"function": "execute"
}
}

Console Format (Development)

2024-01-15 10:30:45 [info     ] query_executed                 correlation_id=req-abc123 cube=orders duration_ms=145 rows_returned=250

Pretty Format (Development)

2024-01-15 10:30:45.123 | INFO     | query_executed
correlation_id: req-abc123
cube: orders
measures: ['revenue', 'count']
dimensions: ['region', 'product_category']
duration_ms: 145
rows_returned: 250
cache_hit: False

Log Categories

Request Logs

Every HTTP request generates structured log entries:

{
"timestamp": "2024-01-15T10:30:45.000Z",
"level": "info",
"event": "http_request_started",
"correlation_id": "req-abc123",
"method": "POST",
"path": "/api/v1/query",
"client_ip": "10.0.1.50",
"user_agent": "Python/3.12 requests/2.31.0"
}

{
"timestamp": "2024-01-15T10:30:45.150Z",
"level": "info",
"event": "http_request_completed",
"correlation_id": "req-abc123",
"method": "POST",
"path": "/api/v1/query",
"status_code": 200,
"duration_ms": 150,
"response_size_bytes": 4520
}

Query Logs

Semantic query execution is logged with full context:

{
"timestamp": "2024-01-15T10:30:45.050Z",
"level": "info",
"event": "query_compiled",
"correlation_id": "req-abc123",
"cube": "orders",
"sql_hash": "a1b2c3d4",
"compile_duration_ms": 12
}

{
"timestamp": "2024-01-15T10:30:45.145Z",
"level": "info",
"event": "query_executed",
"correlation_id": "req-abc123",
"cube": "orders",
"warehouse": "snowflake",
"warehouse_duration_ms": 95,
"total_duration_ms": 145,
"rows_returned": 250,
"bytes_scanned": 1048576,
"cache_hit": false,
"preagg_used": "orders_daily"
}

Error Logs

Errors include full stack traces and context:

{
"timestamp": "2024-01-15T10:30:45.200Z",
"level": "error",
"event": "query_failed",
"correlation_id": "req-abc123",
"cube": "orders",
"error_type": "WarehouseConnectionError",
"error_message": "Connection refused to snowflake.example.com:443",
"retry_count": 3,
"exception": {
"type": "ConnectionRefusedError",
"message": "Connection refused",
"traceback": [
"File \"/app/src/olytix-core/engine/executor.py\", line 45, in execute",
" result = await self.warehouse.query(sql)",
"File \"/app/src/olytix-core/adapters/snowflake.py\", line 112, in query",
" raise WarehouseConnectionError(str(e))"
]
}
}

Audit Logs

Security-related events are logged for compliance:

{
"timestamp": "2024-01-15T10:30:45.000Z",
"level": "info",
"event": "authentication_success",
"user_id": "user-456",
"auth_method": "api_key",
"client_ip": "10.0.1.50",
"permissions": ["query:read", "lineage:read"]
}

{
"timestamp": "2024-01-15T10:30:46.000Z",
"level": "warning",
"event": "access_denied",
"user_id": "user-789",
"resource": "cube:finance_data",
"action": "query",
"reason": "insufficient_permissions",
"required_permission": "finance:read"
}

Correlation IDs

Olytix Core automatically generates and propagates correlation IDs for request tracing.

Automatic Generation

# Middleware automatically adds correlation ID
from olytix-core.middleware import CorrelationIdMiddleware

app.add_middleware(CorrelationIdMiddleware)

Client-Provided IDs

# Pass correlation ID in request header
curl -X POST http://localhost:8000/api/v1/query \
-H "X-Correlation-ID: my-trace-123" \
-H "Content-Type: application/json" \
-d '{"metrics": ["revenue"]}'

Propagation to Downstream Services

import structlog
from structlog.contextvars import bind_contextvars

# Bind correlation ID to context
bind_contextvars(correlation_id=request.headers.get("X-Correlation-ID"))

# All subsequent logs include the correlation ID
logger = structlog.get_logger()
logger.info("processing_request") # Automatically includes correlation_id

Log Aggregation

Fluentd Configuration

# fluent.conf
<source>
@type tail
path /var/log/olytix-core/*.log
pos_file /var/log/fluentd/olytix-core.pos
tag olytix-core.*
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>

<filter olytix-core.**>
@type record_transformer
<record>
hostname ${hostname}
environment ${ENV["ENVIRONMENT"]}
</record>
</filter>

<match olytix-core.**>
@type elasticsearch
host elasticsearch.logging.svc.cluster.local
port 9200
index_name olytix-core-logs
type_name _doc
logstash_format true
logstash_prefix olytix-core
</match>

Filebeat Configuration

# filebeat.yml
filebeat.inputs:
- type: container
paths:
- /var/log/containers/olytix-core-*.log
json.keys_under_root: true
json.add_error_key: true
json.message_key: event

processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
- rename:
fields:
- from: "kubernetes.pod.name"
to: "pod_name"

output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "olytix-core-logs-%{+yyyy.MM.dd}"

Vector Configuration

# vector.toml
[sources.olytix-core_logs]
type = "kubernetes_logs"
extra_label_selector = "app.kubernetes.io/name=olytix-core"

[transforms.parse_json]
type = "remap"
inputs = ["olytix-core_logs"]
source = '''
. = parse_json!(.message)
.kubernetes = del(.kubernetes)
'''

[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["parse_json"]
endpoints = ["http://elasticsearch:9200"]
index = "olytix-core-logs-%Y-%m-%d"

Log Analysis

Elasticsearch Queries

// Find slow queries
GET olytix-core-logs-*/_search
{
"query": {
"bool": {
"must": [
{ "term": { "event": "query_executed" } },
{ "range": { "duration_ms": { "gte": 1000 } } }
]
}
},
"sort": [{ "duration_ms": "desc" }],
"size": 100
}

// Error rate by cube
GET olytix-core-logs-*/_search
{
"size": 0,
"query": {
"term": { "level": "error" }
},
"aggs": {
"by_cube": {
"terms": { "field": "cube.keyword" }
}
}
}

// Trace request flow
GET olytix-core-logs-*/_search
{
"query": {
"term": { "correlation_id": "req-abc123" }
},
"sort": [{ "timestamp": "asc" }]
}

Loki LogQL Queries

# Find all errors in the last hour
{namespace="olytix-core"} | json | level="error"

# Query latency percentiles
{namespace="olytix-core"} | json | event="query_executed"
| duration_ms > 0
| quantile_over_time(0.95, duration_ms[5m])

# Error rate by cube
sum by (cube) (
rate({namespace="olytix-core"} | json | level="error" [5m])
)

Best Practices

Sensitive Data Handling

# Never log sensitive data
logger.info(
"user_authenticated",
user_id=user.id, # OK
# password=user.password, # NEVER do this
# api_key=request.api_key, # NEVER do this
)

# Mask sensitive fields
import structlog

def mask_sensitive(_, __, event_dict):
"""Mask sensitive fields in log output."""
sensitive_keys = {"password", "api_key", "token", "secret"}
for key in sensitive_keys:
if key in event_dict:
event_dict[key] = "***MASKED***"
return event_dict

structlog.configure(
processors=[
mask_sensitive,
# ... other processors
]
)

Performance Considerations

# Use lazy evaluation for expensive operations
logger.debug(
"query_details",
# Only computed if debug level is enabled
query_plan=lambda: expensive_query_plan_analysis()
)

# Sample high-volume logs
import random

if random.random() < 0.1: # 10% sample rate
logger.debug("high_volume_event", details=event_details)

Structured Error Handling

import structlog

logger = structlog.get_logger()

try:
result = await execute_query(query)
except WarehouseError as e:
logger.error(
"query_failed",
cube=query.cube,
error_type=type(e).__name__,
error_message=str(e),
retryable=e.retryable,
exc_info=True, # Include full traceback
)
raise

Next Steps