Circuit Breakers
Olytix Core implements the circuit breaker pattern to prevent cascade failures when downstream services (warehouses, caches, external APIs) experience issues. This guide covers circuit breaker configuration, monitoring, and best practices.
Overview
Circuit breakers protect your system by:
- Failing fast when services are unavailable
- Preventing cascade failures across components
- Allowing recovery time for failing services
- Providing fallback behavior during outages
Circuit Breaker States
Click states to explore or click Animate to see the flow.
✓
CLOSED
Normal
All requests pass through. Monitoring for failures.
failures
✕
OPEN
Failing
timer
?
HALF-OPEN
Testing
success
Legend
Normal (Closed)
Failing (Open)
Testing (Half-Open)
Configuration
Environment Variables
# Enable circuit breakers
OLYTIX_CIRCUIT_BREAKER__ENABLED=true
# Default settings for all circuit breakers
OLYTIX_CIRCUIT_BREAKER__FAILURE_THRESHOLD=5
OLYTIX_CIRCUIT_BREAKER__SUCCESS_THRESHOLD=3
OLYTIX_CIRCUIT_BREAKER__TIMEOUT_SECONDS=30
OLYTIX_CIRCUIT_BREAKER__HALF_OPEN_MAX_CALLS=3
# Warehouse-specific settings
OLYTIX_CIRCUIT_BREAKER__WAREHOUSE__FAILURE_THRESHOLD=3
OLYTIX_CIRCUIT_BREAKER__WAREHOUSE__TIMEOUT_SECONDS=60
# Cache-specific settings
OLYTIX_CIRCUIT_BREAKER__CACHE__FAILURE_THRESHOLD=10
OLYTIX_CIRCUIT_BREAKER__CACHE__TIMEOUT_SECONDS=15
Configuration File
# config/circuit_breaker.yaml
circuit_breaker:
enabled: true
# Default configuration
default:
failure_threshold: 5
success_threshold: 3
timeout_seconds: 30
half_open_max_calls: 3
excluded_exceptions:
- ValidationError
- AuthenticationError
# Per-service configurations
services:
warehouse:
failure_threshold: 3
success_threshold: 2
timeout_seconds: 60
half_open_max_calls: 2
failure_rate_threshold: 0.5
slow_call_threshold_seconds: 10
slow_call_rate_threshold: 0.8
redis_cache:
failure_threshold: 10
success_threshold: 5
timeout_seconds: 15
half_open_max_calls: 5
fallback: skip_cache
external_api:
failure_threshold: 5
success_threshold: 3
timeout_seconds: 45
half_open_max_calls: 3
Implementation
Basic Usage
from olytix-core.resilience import CircuitBreaker, CircuitBreakerOpen
# Create a circuit breaker
warehouse_breaker = CircuitBreaker(
name="snowflake",
failure_threshold=3,
success_threshold=2,
timeout_seconds=60,
)
async def execute_warehouse_query(sql: str):
"""Execute query with circuit breaker protection."""
try:
async with warehouse_breaker:
return await warehouse.execute(sql)
except CircuitBreakerOpen:
# Circuit is open, fail fast
raise WarehouseUnavailableError(
"Warehouse circuit breaker is open. "
f"Will retry in {warehouse_breaker.time_until_retry}s"
)
Decorator Pattern
from olytix-core.resilience import circuit_breaker
@circuit_breaker(
name="warehouse",
failure_threshold=3,
timeout_seconds=60,
)
async def query_warehouse(sql: str) -> list[dict]:
"""Query warehouse with automatic circuit breaker."""
return await warehouse.execute(sql)
# With fallback
@circuit_breaker(
name="cache",
failure_threshold=10,
fallback=lambda key: None, # Return None on circuit open
)
async def get_cached_result(key: str) -> dict | None:
"""Get cached result with fallback to None."""
return await redis.get(key)
Class-Based Implementation
from olytix-core.resilience import CircuitBreaker, CircuitBreakerConfig
class WarehouseAdapter:
"""Warehouse adapter with circuit breaker protection."""
def __init__(self, config: WarehouseConfig):
self.connection = None
self.circuit_breaker = CircuitBreaker(
name=f"warehouse-{config.name}",
config=CircuitBreakerConfig(
failure_threshold=config.circuit_breaker.failure_threshold,
success_threshold=config.circuit_breaker.success_threshold,
timeout_seconds=config.circuit_breaker.timeout_seconds,
half_open_max_calls=config.circuit_breaker.half_open_max_calls,
),
on_state_change=self._on_state_change,
)
async def execute(self, sql: str) -> list[dict]:
"""Execute SQL with circuit breaker protection."""
async with self.circuit_breaker:
return await self._execute_internal(sql)
async def _on_state_change(self, old_state: str, new_state: str):
"""Handle circuit breaker state changes."""
logger.warning(
"circuit_breaker_state_change",
breaker=self.circuit_breaker.name,
old_state=old_state,
new_state=new_state,
)
if new_state == "open":
# Alert on circuit open
await alerting.send_alert(
severity="warning",
message=f"Circuit breaker opened for {self.circuit_breaker.name}",
)
Advanced Configuration
Failure Rate-Based Triggering
from olytix-core.resilience import CircuitBreaker, SlidingWindowConfig
# Trigger based on failure rate over sliding window
breaker = CircuitBreaker(
name="warehouse",
config=CircuitBreakerConfig(
# Sliding window configuration
sliding_window_type="count", # or "time"
sliding_window_size=100, # last 100 calls
minimum_calls=10, # minimum calls before evaluation
# Failure rate threshold (50% failures triggers open)
failure_rate_threshold=0.5,
# Slow call configuration
slow_call_threshold_seconds=5.0,
slow_call_rate_threshold=0.8, # 80% slow calls triggers open
),
)
Per-Tenant Circuit Breakers
from olytix-core.resilience import CircuitBreakerRegistry
# Registry for managing multiple circuit breakers
registry = CircuitBreakerRegistry(
default_config=CircuitBreakerConfig(
failure_threshold=5,
timeout_seconds=30,
)
)
async def query_with_tenant_isolation(tenant_id: str, sql: str):
"""Each tenant has isolated circuit breaker."""
breaker = registry.get_or_create(f"warehouse-{tenant_id}")
async with breaker:
return await warehouse.execute(sql, tenant_id=tenant_id)
Composite Circuit Breakers
from olytix-core.resilience import CompositeCircuitBreaker
# Combine multiple circuit breakers
query_pipeline = CompositeCircuitBreaker(
breakers=[
CircuitBreaker(name="warehouse", failure_threshold=3),
CircuitBreaker(name="cache", failure_threshold=10),
],
strategy="any_open", # Open if any breaker opens
)
async def execute_query(query: Query):
"""Execute query through protected pipeline."""
async with query_pipeline:
cached = await cache.get(query.cache_key)
if cached:
return cached
result = await warehouse.execute(query.sql)
await cache.set(query.cache_key, result)
return result
Fallback Strategies
Cache Fallback
@circuit_breaker(name="warehouse", failure_threshold=3)
async def get_metrics(cube: str, measures: list[str]) -> dict:
"""Get metrics with stale cache fallback."""
return await warehouse.query(cube, measures)
# Fallback to stale cache when circuit is open
@get_metrics.fallback
async def get_metrics_fallback(cube: str, measures: list[str]) -> dict:
"""Return stale cached data when warehouse unavailable."""
stale_data = await cache.get_stale(f"metrics:{cube}")
if stale_data:
logger.warning(
"serving_stale_cache",
cube=cube,
cached_at=stale_data.cached_at,
)
return stale_data.data
raise WarehouseUnavailableError("No cached data available")
Degraded Mode
async def get_dashboard_data(dashboard_id: str) -> DashboardData:
"""Get dashboard with graceful degradation."""
data = DashboardData()
# Critical metrics - fail if unavailable
data.revenue = await get_revenue_metrics()
# Non-critical - use fallback on failure
try:
async with secondary_breaker:
data.trends = await get_trend_analysis()
except CircuitBreakerOpen:
data.trends = TrendData(status="unavailable")
data.degraded = True
return data
Queue for Retry
from olytix-core.tasks import background_queue
@circuit_breaker(name="external_api", failure_threshold=5)
async def sync_to_external(data: dict) -> None:
"""Sync data to external API."""
await external_api.push(data)
@sync_to_external.fallback
async def sync_fallback(data: dict) -> None:
"""Queue for later retry when circuit is open."""
await background_queue.enqueue(
"sync_to_external",
data=data,
retry_after=timedelta(minutes=5),
)
logger.info("queued_for_retry", data_id=data["id"])
Monitoring
Circuit Breaker Metrics
# Exposed Prometheus metrics
olytix-core_circuit_breaker_state{name="warehouse"} 0 # 0=closed, 1=open, 2=half_open
olytix-core_circuit_breaker_calls_total{name="warehouse", result="success"} 1523
olytix-core_circuit_breaker_calls_total{name="warehouse", result="failure"} 12
olytix-core_circuit_breaker_calls_total{name="warehouse", result="rejected"} 45
olytix-core_circuit_breaker_state_changes_total{name="warehouse", from="closed", to="open"} 2
Health Check Integration
from olytix-core.health import HealthCheck
class CircuitBreakerHealthCheck(HealthCheck):
"""Health check for circuit breakers."""
async def check(self) -> HealthStatus:
open_breakers = [
b for b in circuit_breaker_registry.all()
if b.state == "open"
]
if open_breakers:
return HealthStatus(
status="degraded",
details={
"open_circuits": [b.name for b in open_breakers],
"message": f"{len(open_breakers)} circuit(s) open",
}
)
return HealthStatus(status="healthy")
Alerting
# prometheus-rules.yaml
groups:
- name: circuit_breaker_alerts
rules:
- alert: CircuitBreakerOpen
expr: olytix-core_circuit_breaker_state == 1
for: 1m
labels:
severity: warning
annotations:
summary: "Circuit breaker {{ $labels.name }} is open"
description: "Circuit breaker has been open for more than 1 minute"
- alert: CircuitBreakerFlapping
expr: |
increase(olytix-core_circuit_breaker_state_changes_total[10m]) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "Circuit breaker {{ $labels.name }} is flapping"
description: "Circuit breaker state changed {{ $value }} times in 10 minutes"
- alert: HighRejectionRate
expr: |
rate(olytix-core_circuit_breaker_calls_total{result="rejected"}[5m])
/ rate(olytix-core_circuit_breaker_calls_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High rejection rate for {{ $labels.name }}"
description: "{{ $value | humanizePercentage }} of calls are being rejected"
Best Practices
1. Tune Thresholds Appropriately
# Too sensitive - will open on transient errors
CircuitBreaker(failure_threshold=1) # Avoid
# Too lenient - won't protect from failures
CircuitBreaker(failure_threshold=100) # Avoid
# Balanced - handles transient errors, catches real issues
CircuitBreaker(
failure_threshold=5,
sliding_window_size=20,
failure_rate_threshold=0.5,
)
2. Exclude Non-Retriable Errors
CircuitBreaker(
name="warehouse",
excluded_exceptions=[
# Don't count these as failures
ValidationError, # Client error
AuthenticationError, # Client error
NotFoundError, # Expected behavior
],
included_exceptions=[
# Only count these as failures
ConnectionError,
TimeoutError,
ServerError,
],
)
3. Use Appropriate Timeouts
# Match timeout to service SLA
CircuitBreaker(
name="warehouse",
timeout_seconds=60, # Warehouse might need recovery time
)
CircuitBreaker(
name="cache",
timeout_seconds=10, # Cache should recover quickly
)
4. Implement Bulkheads with Circuit Breakers
from olytix-core.resilience import Bulkhead, CircuitBreaker
# Combine bulkhead (concurrency limit) with circuit breaker
warehouse_bulkhead = Bulkhead(
name="warehouse",
max_concurrent_calls=50,
max_wait_seconds=5,
)
warehouse_breaker = CircuitBreaker(
name="warehouse",
failure_threshold=5,
)
async def execute_query(sql: str):
"""Execute with both bulkhead and circuit breaker protection."""
async with warehouse_bulkhead:
async with warehouse_breaker:
return await warehouse.execute(sql)
Next Steps
- Configure retry policies for transient failures
- Set up monitoring for circuit breaker metrics
- Configure logging for failure tracking