Skip to main content

Circuit Breakers

For Platform Teams

Olytix Core implements the circuit breaker pattern to prevent cascade failures when downstream services (warehouses, caches, external APIs) experience issues. This guide covers circuit breaker configuration, monitoring, and best practices.

Overview

Circuit breakers protect your system by:

  • Failing fast when services are unavailable
  • Preventing cascade failures across components
  • Allowing recovery time for failing services
  • Providing fallback behavior during outages

Circuit Breaker States

Click states to explore or click Animate to see the flow.

CLOSED
Normal
All requests pass through. Monitoring for failures.
failures
OPEN
Failing
timer
?
HALF-OPEN
Testing
success
Legend
Normal (Closed)
Failing (Open)
Testing (Half-Open)

Configuration

Environment Variables

# Enable circuit breakers
OLYTIX_CIRCUIT_BREAKER__ENABLED=true

# Default settings for all circuit breakers
OLYTIX_CIRCUIT_BREAKER__FAILURE_THRESHOLD=5
OLYTIX_CIRCUIT_BREAKER__SUCCESS_THRESHOLD=3
OLYTIX_CIRCUIT_BREAKER__TIMEOUT_SECONDS=30
OLYTIX_CIRCUIT_BREAKER__HALF_OPEN_MAX_CALLS=3

# Warehouse-specific settings
OLYTIX_CIRCUIT_BREAKER__WAREHOUSE__FAILURE_THRESHOLD=3
OLYTIX_CIRCUIT_BREAKER__WAREHOUSE__TIMEOUT_SECONDS=60

# Cache-specific settings
OLYTIX_CIRCUIT_BREAKER__CACHE__FAILURE_THRESHOLD=10
OLYTIX_CIRCUIT_BREAKER__CACHE__TIMEOUT_SECONDS=15

Configuration File

# config/circuit_breaker.yaml
circuit_breaker:
enabled: true

# Default configuration
default:
failure_threshold: 5
success_threshold: 3
timeout_seconds: 30
half_open_max_calls: 3
excluded_exceptions:
- ValidationError
- AuthenticationError

# Per-service configurations
services:
warehouse:
failure_threshold: 3
success_threshold: 2
timeout_seconds: 60
half_open_max_calls: 2
failure_rate_threshold: 0.5
slow_call_threshold_seconds: 10
slow_call_rate_threshold: 0.8

redis_cache:
failure_threshold: 10
success_threshold: 5
timeout_seconds: 15
half_open_max_calls: 5
fallback: skip_cache

external_api:
failure_threshold: 5
success_threshold: 3
timeout_seconds: 45
half_open_max_calls: 3

Implementation

Basic Usage

from olytix-core.resilience import CircuitBreaker, CircuitBreakerOpen

# Create a circuit breaker
warehouse_breaker = CircuitBreaker(
name="snowflake",
failure_threshold=3,
success_threshold=2,
timeout_seconds=60,
)

async def execute_warehouse_query(sql: str):
"""Execute query with circuit breaker protection."""
try:
async with warehouse_breaker:
return await warehouse.execute(sql)
except CircuitBreakerOpen:
# Circuit is open, fail fast
raise WarehouseUnavailableError(
"Warehouse circuit breaker is open. "
f"Will retry in {warehouse_breaker.time_until_retry}s"
)

Decorator Pattern

from olytix-core.resilience import circuit_breaker

@circuit_breaker(
name="warehouse",
failure_threshold=3,
timeout_seconds=60,
)
async def query_warehouse(sql: str) -> list[dict]:
"""Query warehouse with automatic circuit breaker."""
return await warehouse.execute(sql)

# With fallback
@circuit_breaker(
name="cache",
failure_threshold=10,
fallback=lambda key: None, # Return None on circuit open
)
async def get_cached_result(key: str) -> dict | None:
"""Get cached result with fallback to None."""
return await redis.get(key)

Class-Based Implementation

from olytix-core.resilience import CircuitBreaker, CircuitBreakerConfig

class WarehouseAdapter:
"""Warehouse adapter with circuit breaker protection."""

def __init__(self, config: WarehouseConfig):
self.connection = None
self.circuit_breaker = CircuitBreaker(
name=f"warehouse-{config.name}",
config=CircuitBreakerConfig(
failure_threshold=config.circuit_breaker.failure_threshold,
success_threshold=config.circuit_breaker.success_threshold,
timeout_seconds=config.circuit_breaker.timeout_seconds,
half_open_max_calls=config.circuit_breaker.half_open_max_calls,
),
on_state_change=self._on_state_change,
)

async def execute(self, sql: str) -> list[dict]:
"""Execute SQL with circuit breaker protection."""
async with self.circuit_breaker:
return await self._execute_internal(sql)

async def _on_state_change(self, old_state: str, new_state: str):
"""Handle circuit breaker state changes."""
logger.warning(
"circuit_breaker_state_change",
breaker=self.circuit_breaker.name,
old_state=old_state,
new_state=new_state,
)

if new_state == "open":
# Alert on circuit open
await alerting.send_alert(
severity="warning",
message=f"Circuit breaker opened for {self.circuit_breaker.name}",
)

Advanced Configuration

Failure Rate-Based Triggering

from olytix-core.resilience import CircuitBreaker, SlidingWindowConfig

# Trigger based on failure rate over sliding window
breaker = CircuitBreaker(
name="warehouse",
config=CircuitBreakerConfig(
# Sliding window configuration
sliding_window_type="count", # or "time"
sliding_window_size=100, # last 100 calls
minimum_calls=10, # minimum calls before evaluation

# Failure rate threshold (50% failures triggers open)
failure_rate_threshold=0.5,

# Slow call configuration
slow_call_threshold_seconds=5.0,
slow_call_rate_threshold=0.8, # 80% slow calls triggers open
),
)

Per-Tenant Circuit Breakers

from olytix-core.resilience import CircuitBreakerRegistry

# Registry for managing multiple circuit breakers
registry = CircuitBreakerRegistry(
default_config=CircuitBreakerConfig(
failure_threshold=5,
timeout_seconds=30,
)
)

async def query_with_tenant_isolation(tenant_id: str, sql: str):
"""Each tenant has isolated circuit breaker."""
breaker = registry.get_or_create(f"warehouse-{tenant_id}")

async with breaker:
return await warehouse.execute(sql, tenant_id=tenant_id)

Composite Circuit Breakers

from olytix-core.resilience import CompositeCircuitBreaker

# Combine multiple circuit breakers
query_pipeline = CompositeCircuitBreaker(
breakers=[
CircuitBreaker(name="warehouse", failure_threshold=3),
CircuitBreaker(name="cache", failure_threshold=10),
],
strategy="any_open", # Open if any breaker opens
)

async def execute_query(query: Query):
"""Execute query through protected pipeline."""
async with query_pipeline:
cached = await cache.get(query.cache_key)
if cached:
return cached

result = await warehouse.execute(query.sql)
await cache.set(query.cache_key, result)
return result

Fallback Strategies

Cache Fallback

@circuit_breaker(name="warehouse", failure_threshold=3)
async def get_metrics(cube: str, measures: list[str]) -> dict:
"""Get metrics with stale cache fallback."""
return await warehouse.query(cube, measures)

# Fallback to stale cache when circuit is open
@get_metrics.fallback
async def get_metrics_fallback(cube: str, measures: list[str]) -> dict:
"""Return stale cached data when warehouse unavailable."""
stale_data = await cache.get_stale(f"metrics:{cube}")
if stale_data:
logger.warning(
"serving_stale_cache",
cube=cube,
cached_at=stale_data.cached_at,
)
return stale_data.data
raise WarehouseUnavailableError("No cached data available")

Degraded Mode

async def get_dashboard_data(dashboard_id: str) -> DashboardData:
"""Get dashboard with graceful degradation."""
data = DashboardData()

# Critical metrics - fail if unavailable
data.revenue = await get_revenue_metrics()

# Non-critical - use fallback on failure
try:
async with secondary_breaker:
data.trends = await get_trend_analysis()
except CircuitBreakerOpen:
data.trends = TrendData(status="unavailable")
data.degraded = True

return data

Queue for Retry

from olytix-core.tasks import background_queue

@circuit_breaker(name="external_api", failure_threshold=5)
async def sync_to_external(data: dict) -> None:
"""Sync data to external API."""
await external_api.push(data)

@sync_to_external.fallback
async def sync_fallback(data: dict) -> None:
"""Queue for later retry when circuit is open."""
await background_queue.enqueue(
"sync_to_external",
data=data,
retry_after=timedelta(minutes=5),
)
logger.info("queued_for_retry", data_id=data["id"])

Monitoring

Circuit Breaker Metrics

# Exposed Prometheus metrics
olytix-core_circuit_breaker_state{name="warehouse"} 0 # 0=closed, 1=open, 2=half_open
olytix-core_circuit_breaker_calls_total{name="warehouse", result="success"} 1523
olytix-core_circuit_breaker_calls_total{name="warehouse", result="failure"} 12
olytix-core_circuit_breaker_calls_total{name="warehouse", result="rejected"} 45
olytix-core_circuit_breaker_state_changes_total{name="warehouse", from="closed", to="open"} 2

Health Check Integration

from olytix-core.health import HealthCheck

class CircuitBreakerHealthCheck(HealthCheck):
"""Health check for circuit breakers."""

async def check(self) -> HealthStatus:
open_breakers = [
b for b in circuit_breaker_registry.all()
if b.state == "open"
]

if open_breakers:
return HealthStatus(
status="degraded",
details={
"open_circuits": [b.name for b in open_breakers],
"message": f"{len(open_breakers)} circuit(s) open",
}
)

return HealthStatus(status="healthy")

Alerting

# prometheus-rules.yaml
groups:
- name: circuit_breaker_alerts
rules:
- alert: CircuitBreakerOpen
expr: olytix-core_circuit_breaker_state == 1
for: 1m
labels:
severity: warning
annotations:
summary: "Circuit breaker {{ $labels.name }} is open"
description: "Circuit breaker has been open for more than 1 minute"

- alert: CircuitBreakerFlapping
expr: |
increase(olytix-core_circuit_breaker_state_changes_total[10m]) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "Circuit breaker {{ $labels.name }} is flapping"
description: "Circuit breaker state changed {{ $value }} times in 10 minutes"

- alert: HighRejectionRate
expr: |
rate(olytix-core_circuit_breaker_calls_total{result="rejected"}[5m])
/ rate(olytix-core_circuit_breaker_calls_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High rejection rate for {{ $labels.name }}"
description: "{{ $value | humanizePercentage }} of calls are being rejected"

Best Practices

1. Tune Thresholds Appropriately

# Too sensitive - will open on transient errors
CircuitBreaker(failure_threshold=1) # Avoid

# Too lenient - won't protect from failures
CircuitBreaker(failure_threshold=100) # Avoid

# Balanced - handles transient errors, catches real issues
CircuitBreaker(
failure_threshold=5,
sliding_window_size=20,
failure_rate_threshold=0.5,
)

2. Exclude Non-Retriable Errors

CircuitBreaker(
name="warehouse",
excluded_exceptions=[
# Don't count these as failures
ValidationError, # Client error
AuthenticationError, # Client error
NotFoundError, # Expected behavior
],
included_exceptions=[
# Only count these as failures
ConnectionError,
TimeoutError,
ServerError,
],
)

3. Use Appropriate Timeouts

# Match timeout to service SLA
CircuitBreaker(
name="warehouse",
timeout_seconds=60, # Warehouse might need recovery time
)

CircuitBreaker(
name="cache",
timeout_seconds=10, # Cache should recover quickly
)

4. Implement Bulkheads with Circuit Breakers

from olytix-core.resilience import Bulkhead, CircuitBreaker

# Combine bulkhead (concurrency limit) with circuit breaker
warehouse_bulkhead = Bulkhead(
name="warehouse",
max_concurrent_calls=50,
max_wait_seconds=5,
)

warehouse_breaker = CircuitBreaker(
name="warehouse",
failure_threshold=5,
)

async def execute_query(sql: str):
"""Execute with both bulkhead and circuit breaker protection."""
async with warehouse_bulkhead:
async with warehouse_breaker:
return await warehouse.execute(sql)

Next Steps