Structured Logging
Olytix Core uses structlog for structured JSON logging, enabling powerful log aggregation, filtering, and analysis. This guide covers log configuration, formats, and integration with log management systems.
Overview
Olytix Core's logging system provides:
- Structured JSON output for machine parsing
- Correlation IDs for request tracing
- Contextual information automatically attached to logs
- Performance metrics embedded in log entries
- Multiple output formats for development and production
Configuration
Environment Variables
# Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
OLYTIX_LOG_LEVEL=INFO
# Log format (json, console, pretty)
OLYTIX_LOG_FORMAT=json
# Include timestamps
OLYTIX_LOG_TIMESTAMPS=true
# Include caller information (file, line number)
OLYTIX_LOG_CALLER=true
# Log output destination (stdout, stderr, file)
OLYTIX_LOG_OUTPUT=stdout
# Log file path (when output is file)
OLYTIX_LOG_FILE=/var/log/olytix-core/olytix-core.log
# Log rotation settings
OLYTIX_LOG_MAX_SIZE_MB=100
OLYTIX_LOG_BACKUP_COUNT=5
# Enable request logging
OLYTIX_LOG_REQUESTS=true
# Enable query logging
OLYTIX_LOG_QUERIES=true
Programmatic Configuration
# config/logging.py
import structlog
from olytix-core.config import settings
def configure_logging():
"""Configure structured logging for Olytix Core."""
# Shared processors for all loggers
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
]
if settings.log_format == "json":
# Production JSON format
renderer = structlog.processors.JSONRenderer()
elif settings.log_format == "pretty":
# Development pretty format
renderer = structlog.dev.ConsoleRenderer(colors=True)
else:
# Default console format
renderer = structlog.processors.KeyValueRenderer()
structlog.configure(
processors=shared_processors + [
structlog.processors.format_exc_info,
renderer,
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, settings.log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
Log Formats
JSON Format (Production)
{
"timestamp": "2024-01-15T10:30:45.123456Z",
"level": "info",
"event": "query_executed",
"correlation_id": "req-abc123",
"user_id": "user-456",
"cube": "orders",
"measures": ["revenue", "count"],
"dimensions": ["region", "product_category"],
"duration_ms": 145,
"rows_returned": 250,
"cache_hit": false,
"warehouse": "snowflake",
"caller": {
"file": "query/executor.py",
"line": 89,
"function": "execute"
}
}
Console Format (Development)
2024-01-15 10:30:45 [info ] query_executed correlation_id=req-abc123 cube=orders duration_ms=145 rows_returned=250
Pretty Format (Development)
2024-01-15 10:30:45.123 | INFO | query_executed
correlation_id: req-abc123
cube: orders
measures: ['revenue', 'count']
dimensions: ['region', 'product_category']
duration_ms: 145
rows_returned: 250
cache_hit: False
Log Categories
Request Logs
Every HTTP request generates structured log entries:
{
"timestamp": "2024-01-15T10:30:45.000Z",
"level": "info",
"event": "http_request_started",
"correlation_id": "req-abc123",
"method": "POST",
"path": "/api/v1/query",
"client_ip": "10.0.1.50",
"user_agent": "Python/3.12 requests/2.31.0"
}
{
"timestamp": "2024-01-15T10:30:45.150Z",
"level": "info",
"event": "http_request_completed",
"correlation_id": "req-abc123",
"method": "POST",
"path": "/api/v1/query",
"status_code": 200,
"duration_ms": 150,
"response_size_bytes": 4520
}
Query Logs
Semantic query execution is logged with full context:
{
"timestamp": "2024-01-15T10:30:45.050Z",
"level": "info",
"event": "query_compiled",
"correlation_id": "req-abc123",
"cube": "orders",
"sql_hash": "a1b2c3d4",
"compile_duration_ms": 12
}
{
"timestamp": "2024-01-15T10:30:45.145Z",
"level": "info",
"event": "query_executed",
"correlation_id": "req-abc123",
"cube": "orders",
"warehouse": "snowflake",
"warehouse_duration_ms": 95,
"total_duration_ms": 145,
"rows_returned": 250,
"bytes_scanned": 1048576,
"cache_hit": false,
"preagg_used": "orders_daily"
}
Error Logs
Errors include full stack traces and context:
{
"timestamp": "2024-01-15T10:30:45.200Z",
"level": "error",
"event": "query_failed",
"correlation_id": "req-abc123",
"cube": "orders",
"error_type": "WarehouseConnectionError",
"error_message": "Connection refused to snowflake.example.com:443",
"retry_count": 3,
"exception": {
"type": "ConnectionRefusedError",
"message": "Connection refused",
"traceback": [
"File \"/app/src/olytix-core/engine/executor.py\", line 45, in execute",
" result = await self.warehouse.query(sql)",
"File \"/app/src/olytix-core/adapters/snowflake.py\", line 112, in query",
" raise WarehouseConnectionError(str(e))"
]
}
}
Audit Logs
Security-related events are logged for compliance:
{
"timestamp": "2024-01-15T10:30:45.000Z",
"level": "info",
"event": "authentication_success",
"user_id": "user-456",
"auth_method": "api_key",
"client_ip": "10.0.1.50",
"permissions": ["query:read", "lineage:read"]
}
{
"timestamp": "2024-01-15T10:30:46.000Z",
"level": "warning",
"event": "access_denied",
"user_id": "user-789",
"resource": "cube:finance_data",
"action": "query",
"reason": "insufficient_permissions",
"required_permission": "finance:read"
}
Correlation IDs
Olytix Core automatically generates and propagates correlation IDs for request tracing.
Automatic Generation
# Middleware automatically adds correlation ID
from olytix-core.middleware import CorrelationIdMiddleware
app.add_middleware(CorrelationIdMiddleware)
Client-Provided IDs
# Pass correlation ID in request header
curl -X POST http://localhost:8000/api/v1/query \
-H "X-Correlation-ID: my-trace-123" \
-H "Content-Type: application/json" \
-d '{"metrics": ["revenue"]}'
Propagation to Downstream Services
import structlog
from structlog.contextvars import bind_contextvars
# Bind correlation ID to context
bind_contextvars(correlation_id=request.headers.get("X-Correlation-ID"))
# All subsequent logs include the correlation ID
logger = structlog.get_logger()
logger.info("processing_request") # Automatically includes correlation_id
Log Aggregation
Fluentd Configuration
# fluent.conf
<source>
@type tail
path /var/log/olytix-core/*.log
pos_file /var/log/fluentd/olytix-core.pos
tag olytix-core.*
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter olytix-core.**>
@type record_transformer
<record>
hostname ${hostname}
environment ${ENV["ENVIRONMENT"]}
</record>
</filter>
<match olytix-core.**>
@type elasticsearch
host elasticsearch.logging.svc.cluster.local
port 9200
index_name olytix-core-logs
type_name _doc
logstash_format true
logstash_prefix olytix-core
</match>
Filebeat Configuration
# filebeat.yml
filebeat.inputs:
- type: container
paths:
- /var/log/containers/olytix-core-*.log
json.keys_under_root: true
json.add_error_key: true
json.message_key: event
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
- rename:
fields:
- from: "kubernetes.pod.name"
to: "pod_name"
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "olytix-core-logs-%{+yyyy.MM.dd}"
Vector Configuration
# vector.toml
[sources.olytix-core_logs]
type = "kubernetes_logs"
extra_label_selector = "app.kubernetes.io/name=olytix-core"
[transforms.parse_json]
type = "remap"
inputs = ["olytix-core_logs"]
source = '''
. = parse_json!(.message)
.kubernetes = del(.kubernetes)
'''
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["parse_json"]
endpoints = ["http://elasticsearch:9200"]
index = "olytix-core-logs-%Y-%m-%d"
Log Analysis
Elasticsearch Queries
// Find slow queries
GET olytix-core-logs-*/_search
{
"query": {
"bool": {
"must": [
{ "term": { "event": "query_executed" } },
{ "range": { "duration_ms": { "gte": 1000 } } }
]
}
},
"sort": [{ "duration_ms": "desc" }],
"size": 100
}
// Error rate by cube
GET olytix-core-logs-*/_search
{
"size": 0,
"query": {
"term": { "level": "error" }
},
"aggs": {
"by_cube": {
"terms": { "field": "cube.keyword" }
}
}
}
// Trace request flow
GET olytix-core-logs-*/_search
{
"query": {
"term": { "correlation_id": "req-abc123" }
},
"sort": [{ "timestamp": "asc" }]
}
Loki LogQL Queries
# Find all errors in the last hour
{namespace="olytix-core"} | json | level="error"
# Query latency percentiles
{namespace="olytix-core"} | json | event="query_executed"
| duration_ms > 0
| quantile_over_time(0.95, duration_ms[5m])
# Error rate by cube
sum by (cube) (
rate({namespace="olytix-core"} | json | level="error" [5m])
)
Best Practices
Sensitive Data Handling
# Never log sensitive data
logger.info(
"user_authenticated",
user_id=user.id, # OK
# password=user.password, # NEVER do this
# api_key=request.api_key, # NEVER do this
)
# Mask sensitive fields
import structlog
def mask_sensitive(_, __, event_dict):
"""Mask sensitive fields in log output."""
sensitive_keys = {"password", "api_key", "token", "secret"}
for key in sensitive_keys:
if key in event_dict:
event_dict[key] = "***MASKED***"
return event_dict
structlog.configure(
processors=[
mask_sensitive,
# ... other processors
]
)
Performance Considerations
# Use lazy evaluation for expensive operations
logger.debug(
"query_details",
# Only computed if debug level is enabled
query_plan=lambda: expensive_query_plan_analysis()
)
# Sample high-volume logs
import random
if random.random() < 0.1: # 10% sample rate
logger.debug("high_volume_event", details=event_details)
Structured Error Handling
import structlog
logger = structlog.get_logger()
try:
result = await execute_query(query)
except WarehouseError as e:
logger.error(
"query_failed",
cube=query.cube,
error_type=type(e).__name__,
error_message=str(e),
retryable=e.retryable,
exc_info=True, # Include full traceback
)
raise
Next Steps
- Set up Prometheus metrics for quantitative monitoring
- Configure circuit breakers for failure handling
- Implement retry policies for reliability