"""
HemoStat Analyzer Agent - Core Implementation
Performs AI-powered root cause analysis of container health issues using LangChain with
Claude/GPT-4, distinguishes real issues from false alarms, calculates confidence scores,
and publishes remediation recommendations or false alarm notifications.
"""
import json
import os
import re
import time
from typing import Any
from agents.agent_base import HemoStatAgent
[docs]
class HealthAnalyzer(HemoStatAgent):
"""
AI-powered health analyzer for container health issues.
Inherits from HemoStatAgent and implements intelligent analysis of container health
alerts using LangChain with Claude/GPT-4, with rule-based fallback for reliability.
"""
[docs]
def __init__(self):
"""
Initialize the Health Analyzer.
Loads configuration from environment variables, initializes LangChain LLM,
and subscribes to health alert channel.
Raises:
HemoStatConnectionError: If Redis connection fails
"""
super().__init__(agent_name="analyzer")
# Load AI configuration
self.ai_model = os.getenv("AI_MODEL", "gpt-4")
# AI_FALLBACK_ENABLED forces rule-based analysis (disables AI)
self.force_rule_based = os.getenv("AI_FALLBACK_ENABLED", "false").lower() == "true"
self.confidence_threshold = float(os.getenv("ANALYZER_CONFIDENCE_THRESHOLD", 0.7))
self.history_size = int(os.getenv("ANALYZER_HISTORY_SIZE", 10))
self.history_ttl = int(os.getenv("ANALYZER_HISTORY_TTL", 3600))
# Initialize LLM (skip if force_rule_based is enabled)
self.llm = None if self.force_rule_based else self._initialize_llm()
# Subscribe to health alerts
self.subscribe_to_channel("hemostat:health_alert", self._handle_health_alert)
self.logger.info(
f"Analyzer Agent initialized with AI model: {self.ai_model if self.llm else 'DISABLED (using rule-based fallback)'}",
extra={"agent": self.agent_name},
)
def _initialize_llm(self) -> Any | None:
"""
Initialize LangChain LLM based on AI_MODEL configuration.
Returns:
Initialized LLM instance (ChatOpenAI or ChatAnthropic), or None if initialization fails
Raises:
ImportError: If required LangChain libraries are not installed
"""
try:
if self.ai_model.startswith("gpt"):
from langchain_openai import ChatOpenAI
if not os.getenv("OPENAI_API_KEY", "").strip():
self.logger.warning(
"OPENAI_API_KEY not set; AI analysis disabled (using rule-based fallback)"
)
return None
self.logger.info(f"Initializing ChatOpenAI with model: {self.ai_model}")
return ChatOpenAI(
model=self.ai_model, # type: ignore[arg-type]
temperature=0.3,
)
elif self.ai_model.startswith("claude"):
from langchain_anthropic import ChatAnthropic
if not os.getenv("ANTHROPIC_API_KEY", "").strip():
self.logger.warning(
"ANTHROPIC_API_KEY not set; AI analysis disabled (using rule-based fallback)"
)
return None
self.logger.info(f"Initializing ChatAnthropic with model: {self.ai_model}")
return ChatAnthropic(
model=self.ai_model,
temperature=0.3,
)
elif "/" in self.ai_model: # Hugging Face model (e.g., "openai/gpt-oss-120b")
from langchain_huggingface import HuggingFaceEndpoint
hf_token = os.getenv("HUGGINGFACE_API_KEY") or os.getenv("HF_TOKEN", "")
if not hf_token.strip():
self.logger.warning(
"HUGGINGFACE_API_KEY or HF_TOKEN not set; AI analysis disabled (using rule-based fallback)"
)
return None
self.logger.info(f"Initializing HuggingFaceEndpoint with model: {self.ai_model}")
return HuggingFaceEndpoint(
repo_id=self.ai_model,
temperature=0.3,
max_new_tokens=512,
huggingfacehub_api_token=hf_token,
)
else:
self.logger.warning(f"Unknown AI model: {self.ai_model}; using rule-based fallback")
return None
except ImportError as e:
self.logger.error(
f"Failed to import LangChain libraries: {e}. Install with: uv sync --extra agents"
)
return None
except Exception as e:
self.logger.error(f"Failed to initialize LLM: {e}")
return None
[docs]
def run(self) -> None:
"""
Start the analyzer listening loop.
Blocks until stop() is called. Handles exceptions gracefully.
"""
try:
self.start_listening()
except Exception as e:
self.logger.error(f"Error in listening loop: {e}", exc_info=True)
def _handle_health_alert(self, message: dict[str, Any]) -> None:
"""
Callback invoked when a health alert is received from Monitor Agent.
Args:
message: Deserialized health alert message from Redis
"""
try:
# Extract alert data
alert_data = message.get("data", {})
container_name = alert_data.get("container_name", "unknown")
self.logger.info(
f"Received health alert for container: {container_name}",
extra={"agent": self.agent_name},
)
# Perform analysis
self._analyze_health_issue(alert_data)
except Exception as e:
self.logger.error(f"Error handling health alert: {e}", exc_info=True)
def _analyze_health_issue(self, alert_data: dict[str, Any]) -> None:
"""
Main analysis orchestration method.
Retrieves historical context, attempts AI analysis, falls back to rule-based
if needed, and routes to appropriate channel based on confidence.
Args:
alert_data: Health alert data from Monitor Agent
"""
container_name = alert_data.get("container_name", "unknown")
try:
# Retrieve historical context
history = self.get_shared_state(f"alert_history:{container_name}")
history_list = history.get("alerts", []) if history else []
# Attempt AI analysis if LLM is available
analysis = None
if self.llm:
analysis = self._ai_analyze(alert_data, history_list)
# Fall back to rule-based if AI failed or not available
if analysis is None:
analysis = self._rule_based_analyze(alert_data, history_list)
# Update alert history
self._update_alert_history(container_name, alert_data)
# Route to appropriate channel based on confidence and action
if analysis.get("is_false_alarm"):
self._publish_false_alarm(alert_data, analysis)
elif analysis.get("confidence", 0) >= self.confidence_threshold:
# Guard: only publish remediation if action is actionable (not "none")
if analysis.get("action") != "none":
self._publish_remediation_needed(alert_data, analysis)
else:
# Action is "none" even with high confidence; treat as false alarm
self._publish_false_alarm(alert_data, analysis)
else:
self._publish_false_alarm(alert_data, analysis)
except Exception as e:
self.logger.error(
f"Error analyzing health issue for {container_name}: {e}", exc_info=True
)
def _ai_analyze(self, alert_data: dict[str, Any], history: list[dict]) -> dict[str, Any] | None:
"""
Perform AI-powered analysis using LangChain.
Args:
alert_data: Current health alert data
history: List of historical alerts for pattern detection
Returns:
Analysis dict with keys: action, reason, confidence, is_false_alarm, analysis_method
Returns None if AI analysis fails (triggers fallback)
"""
try:
from langchain_core.messages import HumanMessage, SystemMessage
container_name = alert_data.get("container_name", "unknown")
metrics = alert_data.get("metrics", {})
anomalies = alert_data.get("anomalies", [])
health_status = alert_data.get("health_status", "unknown")
# Build context for the prompt
history_summary = ""
if history:
history_summary = f"\n\nRecent alert history ({len(history)} alerts):\n"
for i, h in enumerate(history[-3:], 1): # Last 3 alerts
h_metrics = h.get("metrics", {})
h_anomalies = h.get("anomalies", [])
history_summary += f" Alert {i}: CPU={h_metrics.get('cpu_percent', 'N/A')}%, Memory={h_metrics.get('memory_percent', 'N/A')}%, Anomalies={len(h_anomalies)}\n"
# Build structured prompt
prompt_text = f"""You are an expert DevOps engineer analyzing container health issues.
Container: {container_name}
Health Status: {health_status}
Current Metrics:
- CPU: {metrics.get("cpu_percent", "N/A")}%
- Memory: {metrics.get("memory_percent", "N/A")}%
- Network I/O: {metrics.get("network_io", "N/A")}
- Disk I/O: {metrics.get("disk_io", "N/A")}
- Exit Code: {alert_data.get("exit_code", "N/A")}
- Restart Count: {alert_data.get("restart_count", 0)}
Detected Anomalies ({len(anomalies)}):
{json.dumps(anomalies, indent=2) if anomalies else "None"}
{history_summary}
Respond with valid JSON only, no code fences or commentary. Provide your analysis in this format:
{{
"root_cause": "Brief description of the root cause",
"action": "restart|scale_up|cleanup|none",
"reason": "Explanation for the recommended action",
"confidence": 0.0-1.0,
"is_false_alarm": true|false
}}
Be concise and focus on actionable insights."""
# Invoke LLM with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
messages = [
SystemMessage(
content="You are an expert DevOps engineer analyzing container health issues."
),
HumanMessage(content=prompt_text),
]
if not self.llm:
self.logger.error("LLM not initialized")
return None
response = self.llm.invoke(messages)
response_text = response.content
# Parse JSON response - strip code fences first
json_str = response_text.strip()
# Remove markdown code fences if present
json_str = re.sub(r"^```(?:json)?\s*", "", json_str)
json_str = re.sub(r"\s*```$", "", json_str)
# Try to extract JSON from response
json_start = json_str.find("{")
json_end = json_str.rfind("}") + 1
if json_start >= 0 and json_end > json_start:
json_str = json_str[json_start:json_end]
# Parse JSON
analysis_result = json.loads(json_str)
# Validate required fields
if all(
k in analysis_result
for k in [
"root_cause",
"action",
"reason",
"confidence",
"is_false_alarm",
]
):
analysis_result["analysis_method"] = "ai"
self.logger.info(
f"AI analysis successful for {container_name}: "
f"action={analysis_result['action']}, confidence={analysis_result['confidence']}"
)
return analysis_result
self.logger.warning(
f"Invalid AI response format for {container_name}, retrying..."
)
except json.JSONDecodeError as e:
self.logger.warning(
f"Failed to parse AI response (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
# Sleep between retries for non-JSON formats
time.sleep(0.5 * (2**attempt))
continue
self.logger.warning(
f"AI analysis failed for {container_name} after {max_retries} attempts; falling back to rule-based"
)
return None
except Exception as e:
self.logger.error(
f"AI analysis error for {alert_data.get('container_name', 'unknown')}: {e}",
exc_info=True,
)
return None
def _rule_based_analyze(
self, alert_data: dict[str, Any], history: list[dict]
) -> dict[str, Any]:
"""
Fallback analysis using deterministic rules.
Args:
alert_data: Current health alert data
history: List of historical alerts for pattern detection
Returns:
Analysis dict with keys: action, reason, confidence, is_false_alarm, analysis_method
"""
metrics = alert_data.get("metrics", {})
anomalies = alert_data.get("anomalies", [])
health_status = alert_data.get("health_status", "unknown")
exit_code = alert_data.get("exit_code", 0)
restart_count = alert_data.get("restart_count", 0)
cpu_percent = metrics.get("cpu_percent", 0)
memory_percent = metrics.get("memory_percent", 0)
# Rule 1: Non-zero exit code (high confidence)
if exit_code != 0:
return {
"action": "restart",
"reason": f"Container exited with non-zero code: {exit_code}",
"confidence": 0.9,
"is_false_alarm": False,
"analysis_method": "rule_based",
}
# Rule 2: Excessive restarts (circuit breaker - false alarm)
if restart_count > 5:
return {
"action": "none",
"reason": f"Excessive restarts detected ({restart_count}); circuit breaker activated",
"confidence": 0.6,
"is_false_alarm": True,
"analysis_method": "rule_based",
}
# Rule 3: Critical severity anomaly
critical_anomalies = [a for a in anomalies if a.get("severity") == "critical"]
if critical_anomalies:
return {
"action": "restart",
"reason": f"Critical anomalies detected: {', '.join(a.get('type', 'unknown') for a in critical_anomalies)}",
"confidence": 0.85,
"is_false_alarm": False,
"analysis_method": "rule_based",
}
# Rule 4: Unhealthy status
if health_status == "unhealthy":
return {
"action": "restart",
"reason": "Container health check failed",
"confidence": 0.7,
"is_false_alarm": False,
"analysis_method": "rule_based",
}
# Rule 5: Sustained high CPU (2+ consecutive alerts)
cpu_trend = self._detect_metric_trend(history, "cpu_percent")
if cpu_percent > 90 and cpu_trend in ["increasing", "stable"]:
return {
"action": "restart",
"reason": f"Sustained high CPU usage: {cpu_percent}%",
"confidence": 0.75,
"is_false_alarm": False,
"analysis_method": "rule_based",
}
# Rule 6: Memory leak pattern (increasing trend)
memory_trend = self._detect_metric_trend(history, "memory_percent")
if memory_trend == "increasing" and memory_percent > 70:
return {
"action": "restart",
"reason": f"Memory leak pattern detected; memory increasing to {memory_percent}%",
"confidence": 0.8,
"is_false_alarm": False,
"analysis_method": "rule_based",
}
# Rule 7: Transient spike (single medium anomaly, no history)
medium_anomalies = [a for a in anomalies if a.get("severity") == "medium"]
if len(anomalies) == 1 and len(medium_anomalies) == 1 and not history:
return {
"action": "none",
"reason": "Transient spike detected; likely false alarm",
"confidence": 0.65,
"is_false_alarm": True,
"analysis_method": "rule_based",
}
# Default: Low confidence false alarm
return {
"action": "none",
"reason": "Insufficient evidence for remediation",
"confidence": 0.5,
"is_false_alarm": True,
"analysis_method": "rule_based",
}
def _detect_metric_trend(self, history: list[dict], metric_key: str) -> str:
"""
Helper method to detect trends in historical metrics.
Args:
history: List of historical alert dicts
metric_key: Metric key to analyze (e.g., "cpu_percent", "memory_percent")
Returns:
Trend string: "increasing", "decreasing", "stable", or "unknown"
"""
try:
if len(history) < 2:
return "unknown"
# Extract metric values from last 3-5 alerts
values = []
for alert in history[-5:]:
metrics = alert.get("metrics", {})
value = metrics.get(metric_key)
if value is not None:
values.append(float(value))
if len(values) < 2:
return "unknown"
# Calculate trend
diffs = [values[i + 1] - values[i] for i in range(len(values) - 1)]
avg_diff = sum(diffs) / len(diffs)
if avg_diff > 5: # Threshold for "increasing"
return "increasing"
elif avg_diff < -5: # Threshold for "decreasing"
return "decreasing"
else:
return "stable"
except Exception as e:
self.logger.debug(f"Error detecting metric trend: {e}")
return "unknown"
def _publish_remediation_needed(
self, alert_data: dict[str, Any], analysis: dict[str, Any]
) -> None:
"""
Publish a remediation needed event.
Args:
alert_data: Original health alert data
analysis: Analysis result from AI or rule-based logic
"""
container_name = alert_data.get("container_name", "unknown")
payload = {
"container": container_name,
"action": analysis.get("action", "none"),
"reason": analysis.get("reason", ""),
"confidence": analysis.get("confidence", 0.0),
"metrics": alert_data.get("metrics", {}),
"analysis_method": analysis.get("analysis_method", "unknown"),
}
self.publish_event("hemostat:remediation_needed", "remediation_needed", payload)
self.logger.warning(
f"Remediation needed for {container_name}: "
f"action={analysis.get('action')}, confidence={analysis.get('confidence'):.2f}",
extra={"agent": self.agent_name},
)
def _publish_false_alarm(self, alert_data: dict[str, Any], analysis: dict[str, Any]) -> None:
"""
Publish a false alarm event.
Args:
alert_data: Original health alert data
analysis: Analysis result from AI or rule-based logic
"""
container_name = alert_data.get("container_name", "unknown")
payload = {
"container": container_name,
"reason": analysis.get("reason", ""),
"confidence": analysis.get("confidence", 0.0),
"analysis_method": analysis.get("analysis_method", "unknown"),
}
self.publish_event("hemostat:false_alarm", "false_alarm", payload)
self.logger.info(
f"False alarm for {container_name}: {analysis.get('reason')} "
f"(confidence={analysis.get('confidence'):.2f})",
extra={"agent": self.agent_name},
)
def _update_alert_history(self, container_name: str, alert_data: dict[str, Any]) -> None:
"""
Update alert history in Redis for pattern detection.
Args:
container_name: Name of the container
alert_data: Current alert data to append to history
"""
try:
# Retrieve existing history
history_key = f"alert_history:{container_name}"
existing = self.get_shared_state(history_key)
alerts = existing.get("alerts", []) if existing else []
# Append current alert
alerts.append(alert_data)
# Keep only last N alerts
alerts = alerts[-self.history_size :]
# Store updated history
history_data = {"alerts": alerts, "container": container_name}
self.set_shared_state(history_key, history_data, ttl=self.history_ttl)
self.logger.debug(f"Updated alert history for {container_name} ({len(alerts)} alerts)")
except Exception as e:
self.logger.error(f"Error updating alert history for {container_name}: {e}")