"""
HemoStat Monitor Agent
Continuously polls Docker containers to detect health issues, resource anomalies, and failures.
Publishes structured health alerts to Redis for consumption by the Analyzer Agent.
"""
import os
import time
from datetime import datetime, UTC
from typing import Any
import docker
from docker.errors import APIError, DockerException
from agents.agent_base import HemoStatAgent
from agents.platform_utils import get_docker_host
[docs]
class ContainerMonitor(HemoStatAgent):
"""
Monitor Agent for HemoStat.
Polls Docker containers at regular intervals, collects metrics (CPU, memory, network, disk I/O),
detects anomalies against configurable thresholds, and publishes health alerts to Redis.
"""
[docs]
def __init__(self):
"""
Initialize the Container Monitor agent.
Raises:
DockerException: If Docker connection fails
HemoStatConnectionError: If Redis connection fails
"""
# Initialize base agent
super().__init__(agent_name="monitor")
# Initialize Docker client with platform-aware socket detection
try:
docker_host = os.getenv("DOCKER_HOST") or get_docker_host()
self.docker_client = docker.from_env()
self.logger.info(f"Docker client initialized successfully: {docker_host}")
self.docker_available = True
except DockerException as e:
self.logger.warning(
f"Docker client unavailable (running in Docker without socket mount): {e}. "
f"Monitor will continue via Redis events only."
)
self.docker_client = None
self.docker_available = False
# Load configuration from environment
self.poll_interval = int(os.getenv("AGENT_POLL_INTERVAL", 30))
self.threshold_cpu = int(os.getenv("THRESHOLD_CPU_PERCENT", 85))
self.threshold_memory = int(os.getenv("THRESHOLD_MEMORY_PERCENT", 80))
self.logger.info(
f"Monitor Agent initialized with thresholds: "
f"CPU={self.threshold_cpu}%, Memory={self.threshold_memory}%"
)
[docs]
def run(self) -> None:
"""
Main monitoring loop that runs continuously until stopped.
Polls containers at regular intervals and detects anomalies.
"""
self._running = True
self.logger.info("Starting monitor loop")
try:
while self._running:
try:
self._poll_containers()
except Exception as e:
self.logger.error(f"Error during container polling: {e}", exc_info=True)
time.sleep(self.poll_interval)
except KeyboardInterrupt:
self.logger.info("Monitor interrupted by user")
finally:
self.stop()
def _poll_containers(self) -> None:
"""
Fetch all containers (running and exited) and check their health status.
Includes both running and exited containers to detect non-zero exit codes.
Handles Docker API errors gracefully without breaking the loop.
Skips polling if Docker is unavailable.
"""
if not self.docker_available:
return
try:
containers = self.docker_client.containers.list(
all=True, filters={"status": ["running", "exited"]}
)
self.logger.debug(f"Polling {len(containers)} containers")
for container in containers:
try:
# Refresh container state to avoid stale status
container.reload()
self._check_container_health(container)
except Exception as e:
self.logger.error(
f"Error checking container {container.short_id}: {e}", exc_info=False
)
except APIError as e:
self.logger.error(f"Docker API error during container listing: {e}")
except DockerException as e:
self.logger.error(f"Docker error during polling: {e}")
def _check_container_health(self, container) -> None:
"""
Check the health status of a single container.
Collects metrics, detects anomalies, and publishes alerts if needed.
Args:
container: Docker container object
"""
container_name = container.name
try:
# Collect container metadata
stats = self._get_container_stats(container)
if stats is None:
return
# Get health status
health_info = self._check_health_status(container)
# Detect anomalies
anomalies = self._detect_anomalies(container, stats, health_info)
# Store container state to Redis (for dashboard health grid)
# This stores data for ALL containers, not just unhealthy ones
container_id = container.short_id
container_state = {
"container_id": container_id,
"container_name": container_name,
"status": container.status,
"cpu_percent": stats.get("cpu_percent", 0),
"memory_percent": stats.get("memory_percent", 0),
"memory_usage": stats.get("memory_usage", 0),
"memory_limit": stats.get("memory_limit", 0),
"health_status": health_info["health_status"],
"timestamp": datetime.now(UTC).isoformat(),
}
self.set_shared_state(f"container:{container_id}", container_state, ttl=300)
# Publish alert if anomalies detected
if anomalies:
self._publish_health_alert(container, stats, anomalies, health_info)
else:
self.logger.debug(f"Container {container_name} is healthy")
except Exception as e:
self.logger.error(f"Error checking health of {container_name}: {e}", exc_info=False)
def _get_container_stats(self, container) -> dict[str, Any] | None:
"""
Fetch container metrics using non-streaming stats call.
Uses precpu_stats and cpu_stats for CPU calculation without maintaining open streams.
This method retrieves a single snapshot of container statistics and calculates
CPU percentage using Docker's official formula with precpu_stats to avoid
connection leaks from streaming calls.
Args:
container: Docker container object to fetch stats for
Returns:
Dictionary with keys: cpu_percent, memory_percent, memory_usage, memory_limit,
network_rx_bytes, network_tx_bytes, blkio_read_bytes, blkio_write_bytes.
Returns None if stats retrieval fails.
"""
try:
# Use non-streaming call to get stats with precpu_stats for CPU calculation
stats = container.stats(stream=False)
# Calculate CPU percentage using Docker's formula with precpu_stats
cpu_percent = self._calculate_cpu_percent(stats)
# Calculate memory percentage
memory_stats = stats.get("memory_stats", {})
memory_percent = self._calculate_memory_percent(memory_stats)
# Extract network I/O stats
networks = stats.get("networks") or {}
network_rx_bytes = 0
network_tx_bytes = 0
for net_data in networks.values() if networks else []:
network_rx_bytes += net_data.get("rx_bytes", 0)
network_tx_bytes += net_data.get("tx_bytes", 0)
# Extract block I/O stats
blkio_stats = stats.get("blkio_stats") or {}
blkio_read_bytes = 0
blkio_write_bytes = 0
for stat in (blkio_stats.get("io_service_bytes_recursive") or []):
if stat.get("op") == "Read":
blkio_read_bytes += stat.get("value", 0)
elif stat.get("op") == "Write":
blkio_write_bytes += stat.get("value", 0)
metrics = {
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"memory_usage": memory_stats.get("usage", 0),
"memory_limit": memory_stats.get("limit", 0),
"network_rx_bytes": network_rx_bytes,
"network_tx_bytes": network_tx_bytes,
"blkio_read_bytes": blkio_read_bytes,
"blkio_write_bytes": blkio_write_bytes,
}
return metrics
except Exception as e:
self.logger.error(f"Error getting stats for {container.name}: {e}")
return None
def _check_health_status(self, container) -> dict[str, Any]:
"""
Extract health status, exit code, and restart count from container.
Returns:
Dictionary with health_status, exit_code, and restart_count
"""
try:
attrs = container.attrs
state = attrs.get("State", {})
# Get health status
health = state.get("Health", {})
health_status = health.get("Status", "unknown")
# Get exit code
exit_code = state.get("ExitCode", 0)
# Get restart count
restart_count = attrs.get("RestartCount", 0)
return {
"health_status": health_status,
"exit_code": exit_code,
"restart_count": restart_count,
}
except Exception as e:
self.logger.error(f"Error checking health status: {e}")
return {
"health_status": "unknown",
"exit_code": 0,
"restart_count": 0,
}
def _detect_anomalies(
self, container, stats: dict[str, Any], health_info: dict[str, Any]
) -> list[dict[str, Any]]:
"""
Detect anomalies in container metrics against configured thresholds.
Severity levels:
- critical: metric > 95% or immediate action required
- high: metric > threshold
- medium: metric > 80% of threshold
Args:
container: Docker container object
stats: Container metrics dictionary
health_info: Health status information
Returns:
List of detected anomalies with type, severity, and details
"""
anomalies = []
# CPU anomaly with granular severity
cpu_percent = stats["cpu_percent"]
if cpu_percent > self.threshold_cpu:
severity = "critical" if cpu_percent > 95 else "high"
anomalies.append(
{
"type": "high_cpu",
"severity": severity,
"threshold": self.threshold_cpu,
"actual": round(cpu_percent, 2),
}
)
elif cpu_percent > 0.8 * self.threshold_cpu:
anomalies.append(
{
"type": "high_cpu",
"severity": "medium",
"threshold": self.threshold_cpu,
"actual": round(cpu_percent, 2),
}
)
# Memory anomaly with granular severity
memory_percent = stats["memory_percent"]
if memory_percent > self.threshold_memory:
severity = "critical" if memory_percent > 95 else "high"
anomalies.append(
{
"type": "high_memory",
"severity": severity,
"threshold": self.threshold_memory,
"actual": round(memory_percent, 2),
}
)
elif memory_percent > 0.8 * self.threshold_memory:
anomalies.append(
{
"type": "high_memory",
"severity": "medium",
"threshold": self.threshold_memory,
"actual": round(memory_percent, 2),
}
)
# Health status anomaly
if health_info["health_status"] not in ["healthy", "unknown"]:
anomalies.append(
{
"type": "unhealthy_status",
"severity": "high",
"status": health_info["health_status"],
}
)
# Exit code anomaly (for stopped containers)
if health_info["exit_code"] != 0 and container.status == "exited":
anomalies.append(
{
"type": "non_zero_exit",
"severity": "high",
"exit_code": health_info["exit_code"],
}
)
# Excessive restarts
if health_info["restart_count"] > 5:
anomalies.append(
{
"type": "excessive_restarts",
"severity": "medium",
"restart_count": health_info["restart_count"],
}
)
return anomalies
def _publish_health_alert(
self,
container,
stats: dict[str, Any],
anomalies: list[dict[str, Any]],
health_info: dict[str, Any],
) -> None:
"""
Publish a health alert to Redis for consumption by the Analyzer Agent.
Args:
container: Docker container object
stats: Container metrics
anomalies: List of detected anomalies
health_info: Health status information
"""
try:
container_id = container.short_id
container_name = container.name
# Build event payload
payload = {
"container_id": container_id,
"container_name": container_name,
"image": container.image.tags[0] if container.image.tags else "unknown",
"status": container.status,
"metrics": stats,
"anomalies": anomalies,
"health_status": health_info["health_status"],
"exit_code": health_info["exit_code"],
"restart_count": health_info["restart_count"],
}
# Publish event
self.publish_event("hemostat:health_alert", "container_unhealthy", payload)
# Update shared state with TTL
self.set_shared_state(f"container:{container_id}", stats, ttl=300)
self.logger.warning(
f"Health alert published for {container_name}: {len(anomalies)} anomalies detected"
)
except Exception as e:
self.logger.error(f"Error publishing health alert: {e}", exc_info=False)
def _calculate_cpu_percent(self, stats: dict[str, Any]) -> float:
"""
Calculate CPU percentage using Docker's official formula.
Formula: (delta_cpu / delta_system) x online_cpus x 100
Uses precpu_stats and cpu_stats from a single stats snapshot.
Allows CPU percent > 100% on multi-core systems.
Args:
stats: Container stats dictionary with cpu_stats and precpu_stats
Returns:
CPU percentage as float (0.0 if calculation fails, can exceed 100% on multi-core)
"""
try:
# Extract current and previous CPU values
cpu_stats = stats.get("cpu_stats", {})
precpu_stats = stats.get("precpu_stats", {})
cpu_usage = cpu_stats.get("cpu_usage", {}).get("total_usage", 0)
precpu_usage = precpu_stats.get("cpu_usage", {}).get("total_usage", 0)
system_usage = cpu_stats.get("system_cpu_usage", 0)
presystem_usage = precpu_stats.get("system_cpu_usage", 0)
online_cpus = cpu_stats.get("online_cpus", 1)
# Calculate deltas
cpu_delta = cpu_usage - precpu_usage
system_delta = system_usage - presystem_usage
# Avoid division by zero
if system_delta == 0:
return 0.0
# Apply Docker formula
cpu_percent = (cpu_delta / system_delta) * online_cpus * 100.0
# Return without upper clamp to allow >100% on multi-core systems
return max(0.0, cpu_percent)
except Exception as e:
self.logger.debug(f"Error calculating CPU percent: {e}")
return 0.0
def _calculate_memory_percent(self, mem_stats: dict[str, Any]) -> float:
"""
Calculate memory percentage, excluding cache (matches docker stats behavior).
Args:
mem_stats: Memory stats from container stats
Returns:
Memory percentage as float (0.0 if calculation fails)
"""
try:
usage = mem_stats.get("usage", 0)
limit = mem_stats.get("limit", 0)
# Subtract cache (handle both cgroup v1 and v2)
stats = mem_stats.get("stats", {})
cache = stats.get("inactive_file", 0) or stats.get("total_inactive_file", 0)
actual_usage = usage - cache
# Avoid division by zero
if limit == 0:
return 0.0
memory_percent = (actual_usage / limit) * 100.0
return max(0.0, min(memory_percent, 100.0)) # Clamp to 0-100
except Exception as e:
self.logger.debug(f"Error calculating memory percent: {e}")
return 0.0
[docs]
def stop(self) -> None:
"""Stop the monitor agent gracefully."""
self._running = False
self.logger.info("Monitor agent stopped")