"""
HemoStat Responder Agent - Safe Container Remediation
Executes remediation actions recommended by the Analyzer Agent with comprehensive
safety constraints including cooldown periods, circuit breakers, and audit logging.
"""
import json
import os
import time
from datetime import UTC, datetime
from typing import Any
import docker
from docker.errors import APIError, DockerException, NotFound
from agents.agent_base import HemoStatAgent
from agents.platform_utils import get_docker_host
[docs]
class ContainerResponder(HemoStatAgent):
"""
Executes safe container remediation with multi-layered safety mechanisms.
Subscribes to hemostat:remediation_needed channel and executes Docker operations
(restart, scale, cleanup, exec) while enforcing cooldown periods, circuit breakers,
and maintaining comprehensive audit logs.
"""
[docs]
def __init__(self):
"""
Initialize the Responder Agent.
Connects to Docker daemon with retry logic, loads safety configuration from
environment variables, and subscribes to remediation_needed channel.
Raises:
HemoStatConnectionError: If Redis connection fails
"""
super().__init__(agent_name="responder")
# Initialize Docker client with exponential backoff retry logic
try:
self.docker_client = self._connect_docker()
self.docker_available = True
except DockerException as e:
self.logger.warning(
f"Docker client unavailable (running in Docker without socket mount): {e}. "
f"Responder will continue via Redis events only."
)
self.docker_client = None
self.docker_available = False
# Load safety configuration from environment
self.cooldown_seconds = int(os.getenv("RESPONDER_COOLDOWN_SECONDS", "3600"))
self.max_retries_per_hour = int(os.getenv("RESPONDER_MAX_RETRIES_PER_HOUR", "3"))
self.dry_run = os.getenv("RESPONDER_DRY_RUN", "false").lower() == "true"
self.enforce_exec_allowlist = (
os.getenv("RESPONDER_ENFORCE_EXEC_ALLOWLIST", "false").lower() == "true"
)
# Subscribe to remediation channel
self.subscribe_to_channel("hemostat:remediation_needed", self._handle_remediation_request)
self.logger.info(
f"Responder Agent initialized - "
f"cooldown={self.cooldown_seconds}s, "
f"max_retries={self.max_retries_per_hour}/hour, "
f"dry_run={self.dry_run}, "
f"enforce_exec_allowlist={self.enforce_exec_allowlist}"
)
def _connect_docker(self) -> docker.DockerClient:
"""
Connect to Docker daemon with exponential backoff retry logic.
Uses platform-aware Docker socket detection. Automatically selects the
appropriate socket path for Windows (npipe), Linux, or macOS (unix socket).
Returns:
Connected Docker client instance
Raises:
DockerException: If connection fails after configured attempts
"""
max_retries = int(os.getenv("RESPONDER_RETRY_MAX", "3"))
initial_delay = float(os.getenv("RESPONDER_RETRY_DELAY", "1"))
docker_host = os.getenv("DOCKER_HOST") or get_docker_host()
retry_delays = [initial_delay * (2**i) for i in range(max_retries)]
last_error: DockerException | None = None
for attempt in range(max_retries):
try:
client = docker.from_env()
self.logger.info(f"Docker client initialized: {docker_host}")
return client
except DockerException as e:
last_error = e
if attempt < max_retries - 1:
wait_time = retry_delays[attempt]
self.logger.warning(
f"Failed to connect to Docker (attempt {attempt + 1}/{max_retries}). "
f"Retrying in {wait_time}s... Error: {e!s}"
)
time.sleep(wait_time)
# All retries exhausted - raise the error
msg = f"Failed to connect to Docker after {max_retries} attempts. Last error: {last_error!s}"
raise DockerException(msg) from last_error
[docs]
def run(self) -> None:
"""
Start the message listening loop.
Blocks until stop() is called. Listens for remediation requests on
hemostat:remediation_needed channel and processes them.
"""
try:
self.logger.info("Starting Responder Agent listening loop")
self.start_listening()
except Exception as e:
self.logger.error(f"Error in listening loop: {e}", exc_info=True)
raise
def _handle_remediation_request(self, message: dict[str, Any]) -> None:
"""
Callback invoked when remediation request is received from Analyzer Agent.
Args:
message: Full message wrapper with event_type, timestamp, agent, data
"""
try:
# Extract request payload from message wrapper
request_data = message.get("data", {})
self.logger.info(f"Received remediation request: {json.dumps(request_data)}")
self._execute_remediation(request_data)
except Exception as e:
self.logger.error(f"Error handling remediation request: {e}", exc_info=True)
def _execute_remediation(self, request_data: dict[str, Any]) -> None:
"""
Main remediation orchestration method.
Performs safety checks (cooldown, circuit breaker) and routes to appropriate
action handler. Updates state and publishes completion event.
Args:
request_data: Remediation request with container, action, and metadata
"""
container = request_data.get("container")
action = request_data.get("action")
if not container or not action:
self.logger.error("Invalid remediation request: missing container or action")
return
# Safety Check 1: Cooldown period
if not self._check_cooldown(container):
remaining = self._get_cooldown_remaining(container)
self.logger.info(f"Cooldown active for {container}: {remaining}s remaining")
self._publish_cooldown_active(container, action, remaining, request_data.get("confidence", 0))
self._log_audit_trail(
container,
action,
{"status": "rejected", "reason": "cooldown_active"},
request_data,
)
return
# Safety Check 2: Circuit breaker
if not self._check_circuit_breaker(container):
cb_state = self.get_shared_state(f"circuit_breaker:{container}") or {}
retry_count = cb_state.get("retry_count", 0)
self.logger.warning(f"Circuit breaker open for {container}: {retry_count} retries")
self._publish_circuit_breaker_active(container, action, retry_count, request_data.get("confidence", 0))
self._log_audit_trail(
container,
action,
{"status": "rejected", "reason": "circuit_breaker_open"},
request_data,
)
return
# Safety Check 3: Dry-run mode
if self.dry_run:
self._dry_run_action(container, action, request_data)
return
# Route to appropriate action handler
result = None
try:
if action == "restart":
result = self._restart_container(container)
elif action == "scale_up":
result = self._scale_container(container)
elif action == "cleanup":
result = self._cleanup_container(container)
elif action == "exec":
command = request_data.get("command")
result = self._exec_container(container, command)
else:
result = {"status": "failed", "error": f"Unknown action: {action}"}
self.logger.error(f"Unknown remediation action: {action}")
except Exception as e:
result = {"status": "failed", "error": str(e)}
self.logger.error(f"Error executing {action} on {container}: {e}", exc_info=True)
# Update state based on result (treat not_applicable as non-failure)
success = result.get("status") in ("success", "not_applicable")
if result.get("status") != "not_applicable":
self._update_remediation_history(container, action, result)
self._update_circuit_breaker(container, success)
else:
# Log not_applicable but don't trigger cooldown or circuit breaker
self.logger.info(f"Action {action} not applicable for {container}")
# Publish completion event
self._publish_remediation_complete(request_data, result)
# Log audit trail
self._log_audit_trail(container, action, result, request_data)
def _check_cooldown(self, container: str) -> bool:
"""
Check if cooldown period has elapsed since last remediation action.
Args:
container: Container name
Returns:
True if cooldown expired or no previous action, False if within cooldown
"""
history = self.get_shared_state(f"remediation_history:{container}")
if not history:
self.logger.debug(f"No remediation history for {container}")
return True
last_timestamp = history.get("last_action_timestamp")
if not last_timestamp:
return True
try:
last_time = datetime.fromisoformat(last_timestamp)
elapsed = (datetime.now(UTC) - last_time).total_seconds()
if elapsed < self.cooldown_seconds:
self.logger.debug(
f"Cooldown active for {container}: {elapsed}s elapsed, "
f"{self.cooldown_seconds}s required"
)
return False
self.logger.debug(f"Cooldown expired for {container}: {elapsed}s elapsed")
return True
except Exception as e:
self.logger.error(f"Error checking cooldown for {container}: {e}")
return True
def _get_cooldown_remaining(self, container: str) -> int:
"""
Get remaining cooldown time in seconds.
Calculates how many seconds remain in the cooldown period for a container.
Args:
container: Container name
Returns:
Remaining cooldown time in seconds (0 if no cooldown active)
"""
history = self.get_shared_state(f"remediation_history:{container}") or {}
last_timestamp = history.get("last_action_timestamp")
if not last_timestamp:
return 0
try:
last_time = datetime.fromisoformat(last_timestamp)
elapsed = (datetime.now(UTC) - last_time).total_seconds()
remaining = max(0, int(self.cooldown_seconds - elapsed))
return remaining
except Exception:
return 0
def _check_circuit_breaker(self, container: str) -> bool:
"""
Check if circuit breaker is open (max retries exceeded).
Args:
container: Container name
Returns:
True if circuit closed (safe to proceed), False if circuit open
"""
cb_state = self.get_shared_state(f"circuit_breaker:{container}")
if not cb_state:
self.logger.debug(f"No circuit breaker state for {container}")
return True
# Check if hour window has elapsed (reset circuit)
opened_timestamp = cb_state.get("opened_timestamp")
if opened_timestamp:
try:
opened_time = datetime.fromisoformat(opened_timestamp)
elapsed = (datetime.now(UTC) - opened_time).total_seconds()
if elapsed >= 3600: # 1 hour
self.logger.info(f"Circuit breaker hour window elapsed for {container}")
return True
except Exception as e:
self.logger.error(f"Error checking circuit breaker window: {e}")
# Check if circuit is open
is_open = cb_state.get("is_open", False)
if is_open:
self.logger.warning(
f"Circuit breaker open for {container}: {cb_state.get('retry_count', 0)} retries"
)
return False
self.logger.debug(f"Circuit breaker closed for {container}")
return True
def _restart_container(self, container: str) -> dict[str, Any]:
"""
Restart a container gracefully.
Args:
container: Container name or ID
Returns:
Result dict with status and details
"""
try:
self.logger.warning(f"Restarting container: {container}")
container_obj = self.docker_client.containers.get(container)
container_obj.restart(timeout=10)
# Wait for container to reach running state
max_wait = 30
start_time = time.time()
while time.time() - start_time < max_wait:
container_obj.reload()
if container_obj.status == "running":
self.logger.warning(f"Container restarted successfully: {container}")
return {
"status": "success",
"action": "restart",
"container": container,
"details": "Container restarted and running",
}
time.sleep(1)
# Timeout waiting for running state
return {
"status": "failed",
"error": f"Container did not reach running state within {max_wait}s",
}
except NotFound:
error_msg = f"Container not found: {container}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
except APIError as e:
error_msg = f"Docker API error restarting {container}: {e}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
def _scale_container(self, container: str) -> dict[str, Any]:
"""
Scale container replicas (Docker Swarm services).
Args:
container: Container or service name
Returns:
Result dict with status and details
"""
try:
self.logger.info(f"Scaling container: {container}")
# Get container to check for Swarm service labels
try:
container_obj = self.docker_client.containers.get(container)
labels = container_obj.labels or {}
except NotFound:
error_msg = f"Container not found: {container}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
# Check if container is part of a Swarm service
service_name = labels.get("com.docker.swarm.service.name")
if not service_name:
self.logger.warning(
f"Scale operation not applicable for standalone containers. "
f"Container {container} is not part of a Docker Swarm service."
)
return {
"status": "not_applicable",
"action": "scale_up",
"container": container,
"details": "Scale operation not applicable - requires Docker Swarm service",
}
# Find and scale the Swarm service
services = self.docker_client.services.list(filters={"name": service_name})
if not services:
self.logger.warning(f"Swarm service not found: {service_name}")
return {
"status": "not_applicable",
"action": "scale_up",
"container": container,
"details": f"Swarm service {service_name} not found",
}
service = services[0]
current_spec = service.attrs.get("Spec", {})
current_mode = current_spec.get("Mode", {})
current_replicas = current_mode.get("Replicated", {}).get("Replicas", 1)
new_replicas = current_replicas + 1
# Update service replicas
current_mode["Replicated"] = {"Replicas": new_replicas}
current_spec["Mode"] = current_mode
service.update(mode=current_mode)
self.logger.warning(
f"Scaled service {service_name} from {current_replicas} to {new_replicas} replicas"
)
return {
"status": "success",
"action": "scale_up",
"container": container,
"details": {
"service": service_name,
"previous_replicas": current_replicas,
"new_replicas": new_replicas,
},
}
except APIError as e:
error_msg = f"Docker API error scaling {container}: {e}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
def _cleanup_container(self, container: str) -> dict[str, Any]:
"""
Clean up stopped containers and prune unused resources strictly scoped to target container.
Args:
container: Container name or ID (used for filtering)
Returns:
Result dict with cleanup statistics
"""
try:
self.logger.info(f"Cleaning up resources for container: {container}")
# Get target container info for scoped cleanup
try:
target_container = self.docker_client.containers.get(container)
image_id = target_container.image.id
labels = target_container.labels or {}
except NotFound:
error_msg = f"Container not found: {container}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
cleanup_stats: dict[str, int | list[str]] = {
"containers_removed": 0,
"volumes_removed": 0,
"space_reclaimed_bytes": 0,
"notes": [],
}
# Build container filters combining constraints
filters = {"status": ["exited"]}
compose_project = labels.get("com.docker.compose.project")
compose_service = labels.get("com.docker.compose.service")
if compose_project:
# Build label filters for Compose project and optionally service
label_filters = [f"com.docker.compose.project={compose_project}"]
if compose_service:
label_filters.append(f"com.docker.compose.service={compose_service}")
filters["label"] = label_filters
self.logger.debug(
f"Using Compose filters: project={compose_project}, service={compose_service}"
)
else:
# Filter by ancestor image to match containers from same image
filters["ancestor"] = [image_id]
self.logger.debug(f"Using image filter: ancestor={image_id}")
# List and remove scoped stopped containers
stopped_containers = self.docker_client.containers.list(all=True, filters=filters)
removed_container_ids = []
for stopped_container in stopped_containers:
try:
# Double-check status before removal
stopped_container.reload()
if stopped_container.status == "running":
self.logger.warning(f"Skipping running container: {stopped_container.name}")
continue
self.logger.info(f"Removing stopped container: {stopped_container.name}")
stopped_container.remove(v=True) # Remove with volumes
if isinstance(cleanup_stats["containers_removed"], int):
cleanup_stats["containers_removed"] += 1
removed_container_ids.append(stopped_container.id)
except APIError as e:
self.logger.warning(f"Failed to remove container {stopped_container.name}: {e}")
# Prune volumes strictly scoped
try:
volumes_removed_count = 0
space_reclaimed = 0
if compose_project:
# Prune volumes with Compose project label filter
volume_filters = {"label": [f"com.docker.compose.project={compose_project}"]}
if compose_service:
volume_filters["label"].append(
f"com.docker.compose.service={compose_service}"
)
self.logger.debug(f"Pruning volumes with filters: {volume_filters}")
volumes_result = self.docker_client.volumes.prune(filters=volume_filters)
volumes_removed_count = len(volumes_result.get("VolumesDeleted", []))
space_reclaimed = volumes_result.get("SpaceReclaimed", 0)
self.logger.info(f"Pruned {volumes_removed_count} Compose-scoped volumes")
else:
# No Compose labels: enumerate dangling volumes and match to removed containers
if removed_container_ids:
dangling_volumes = self.docker_client.volumes.list(
filters={"dangling": True}
)
for vol in dangling_volumes:
try:
# Check if volume is referenced by removed containers or has matching labels
vol_labels = vol.attrs.get("Labels", {})
if vol_labels.get("com.docker.compose.project") or any(
cid in str(vol.attrs) for cid in removed_container_ids
):
self.logger.info(f"Removing dangling volume: {vol.name}")
vol.remove()
volumes_removed_count += 1
except APIError as e:
self.logger.warning(f"Failed to remove volume {vol.name}: {e}")
else:
if isinstance(cleanup_stats["notes"], list):
cleanup_stats["notes"].append(
"No containers removed; skipping volume pruning"
)
self.logger.info("No containers removed; skipping volume pruning")
if isinstance(cleanup_stats["volumes_removed"], int):
cleanup_stats["volumes_removed"] = volumes_removed_count
if isinstance(cleanup_stats["space_reclaimed_bytes"], int):
cleanup_stats["space_reclaimed_bytes"] = space_reclaimed
except APIError as e:
self.logger.warning(f"Failed to prune volumes: {e}")
if isinstance(cleanup_stats["notes"], list):
cleanup_stats["notes"].append(f"Volume pruning failed: {e}")
self.logger.info(
f"Cleanup complete: {cleanup_stats['containers_removed']} containers removed, "
f"{cleanup_stats['volumes_removed']} volumes removed, "
f"{cleanup_stats['space_reclaimed_bytes']} bytes reclaimed"
)
return {
"status": "success",
"action": "cleanup",
"container": container,
"details": cleanup_stats,
}
except APIError as e:
error_msg = f"Docker API error during cleanup: {e}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
def _exec_container(self, container: str, command: str | None) -> dict[str, Any]:
"""
Execute diagnostic command inside container.
Args:
container: Container name or ID
command: Command to execute (default: ps aux)
Returns:
Result dict with command output and exit code
"""
try:
if not command:
command = "ps aux"
self.logger.info(f"Executing command in {container}: {command}")
# Security: Validate command against whitelist of safe diagnostic commands
safe_commands = [
"ps aux",
"ps",
"top",
"df",
"free",
"netstat",
"ss",
"env",
"pwd",
"whoami",
"date",
"uptime",
"uname",
]
command_allowed = any(command.startswith(safe_cmd) for safe_cmd in safe_commands)
if not command_allowed:
if self.enforce_exec_allowlist:
error_msg = f"Command not in allowlist (enforce_exec_allowlist=true): {command}"
self.logger.error(error_msg)
return {"status": "rejected", "error": error_msg}
else:
self.logger.warning(f"Command not in whitelist, executing anyway: {command}")
container_obj = self.docker_client.containers.get(container)
# Check if container is running
container_obj.reload()
if container_obj.status != "running":
error_msg = f"Container not running: {container} (status: {container_obj.status})"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
# Execute command
exit_code, output = container_obj.exec_run(command)
# Decode output if bytes
if isinstance(output, bytes):
output = output.decode("utf-8", errors="replace")
self.logger.info(f"Command executed in {container}: exit_code={exit_code}")
return {
"status": "success",
"action": "exec",
"container": container,
"command": command,
"exit_code": exit_code,
"output": output[:1000], # Limit output to 1000 chars
}
except NotFound:
error_msg = f"Container not found: {container}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
except APIError as e:
error_msg = f"Docker API error executing command: {e}"
self.logger.error(error_msg)
return {"status": "failed", "error": error_msg}
def _dry_run_action(self, container: str, action: str, request_data: dict[str, Any]) -> None:
"""
Simulate remediation action without executing.
Args:
container: Container name
action: Remediation action
request_data: Original request data
"""
self.logger.info(f"DRY RUN: Would execute {action} on {container}")
# Simulate operation time
time.sleep(0.5)
result = {
"status": "success",
"action": action,
"container": container,
"details": f"Dry-run simulation of {action}",
}
# Publish success event with dry_run flag
self._publish_remediation_complete(request_data, result, dry_run=True)
# Log audit trail
self._log_audit_trail(container, action, result, request_data, dry_run=True)
def _update_remediation_history(
self, container: str, action: str, result: dict[str, Any]
) -> None:
"""
Update remediation history in Redis.
Args:
container: Container name
action: Remediation action
result: Action result
"""
try:
history = self.get_shared_state(f"remediation_history:{container}") or {}
# Update last action timestamp
history["last_action_timestamp"] = datetime.now(UTC).isoformat()
history["last_action"] = action
history["last_result_status"] = result.get("status")
# Update retry count (increment if within same hour, reset if new hour)
if result.get("status") == "success":
history["retry_count"] = 0
else:
last_retry_hour = history.get("last_retry_hour")
current_hour = datetime.now(UTC).replace(minute=0, second=0, microsecond=0)
if last_retry_hour:
try:
last_hour = datetime.fromisoformat(last_retry_hour)
if last_hour == current_hour:
history["retry_count"] = history.get("retry_count", 0) + 1
else:
history["retry_count"] = 1
except Exception:
history["retry_count"] = 1
else:
history["retry_count"] = 1
history["last_retry_hour"] = current_hour.isoformat()
self.set_shared_state(
f"remediation_history:{container}",
history,
ttl=7200, # 2 hours
)
self.logger.debug(f"Updated remediation history for {container}")
except Exception as e:
self.logger.error(f"Error updating remediation history: {e}")
def _update_circuit_breaker(self, container: str, success: bool) -> None:
"""
Update circuit breaker state in Redis with rolling 1-hour window.
Args:
container: Container name
success: Whether remediation was successful
"""
try:
cb_state = self.get_shared_state(f"circuit_breaker:{container}") or {}
current_time = datetime.now(UTC)
# Check if 1-hour window has elapsed and reset if needed
opened_timestamp_str = cb_state.get("opened_timestamp")
if opened_timestamp_str:
try:
opened_time = datetime.fromisoformat(opened_timestamp_str)
elapsed = (current_time - opened_time).total_seconds()
if elapsed >= 3600: # 1 hour
# Reset circuit breaker after 1-hour window
cb_state["is_open"] = False
cb_state["failure_count"] = 0
cb_state["retry_count"] = 0
cb_state["opened_timestamp"] = None
self.logger.info(
f"Circuit breaker window elapsed for {container}, resetting"
)
except Exception as e:
self.logger.error(f"Error checking circuit breaker window: {e}")
if success:
# Reset on success
cb_state["is_open"] = False
cb_state["retry_count"] = 0
cb_state["failure_count"] = 0
self.logger.debug(f"Circuit breaker closed for {container}")
else:
# Increment failure count
failure_count = cb_state.get("failure_count", 0) + 1
cb_state["failure_count"] = failure_count
# Check if we should open circuit
if failure_count >= self.max_retries_per_hour:
cb_state["is_open"] = True
cb_state["opened_timestamp"] = current_time.isoformat()
self.logger.warning(
f"Circuit breaker opened for {container}: {failure_count} failures"
)
else:
cb_state["retry_count"] = failure_count
self.set_shared_state(
f"circuit_breaker:{container}",
cb_state,
ttl=7200, # 2 hours
)
except Exception as e:
self.logger.error(f"Error updating circuit breaker: {e}")
def _publish_remediation_complete(
self,
request_data: dict[str, Any],
result: dict[str, Any],
dry_run: bool = False,
) -> None:
"""
Publish remediation completion event.
Args:
request_data: Original remediation request
result: Action result
dry_run: Whether this was a dry-run
"""
try:
# Build data without event_type or timestamp
data = {
"container": request_data.get("container"),
"action": request_data.get("action"),
"result": result,
"dry_run": dry_run,
"reason": request_data.get("reason"),
"confidence": request_data.get("confidence"),
}
self.publish_event("hemostat:remediation_complete", "remediation_complete", data)
if result.get("status") == "success":
self.logger.info(
f"Published remediation_complete: {data['container']} - {result.get('status')}"
)
else:
self.logger.error(
f"Published remediation_complete: {data['container']} - {result.get('status')}"
)
except Exception as e:
self.logger.error(f"Error publishing remediation_complete: {e}")
def _log_audit_trail(
self,
container: str,
action: str,
result: dict[str, Any],
request_data: dict[str, Any],
dry_run: bool = False,
) -> None:
"""
Log comprehensive audit trail to Redis.
Args:
container: Container name
action: Remediation action
result: Action result
request_data: Original request data
dry_run: Whether this was a dry-run
"""
try:
audit_entry = {
"timestamp": datetime.now(UTC).isoformat(),
"container": container,
"action": action,
"result_status": result.get("status"),
"error": result.get("error"),
"confidence": request_data.get("confidence"),
"reason": request_data.get("reason"),
"metrics": request_data.get("metrics"),
"dry_run": dry_run,
}
# Store in Redis list (LPUSH for newest first)
audit_key = f"hemostat:audit:{container}"
self.redis.lpush(audit_key, json.dumps(audit_entry))
# Keep only last 100 entries
self.redis.ltrim(audit_key, 0, 99)
# Set TTL (7 days)
self.redis.expire(audit_key, 604800)
self.logger.debug(f"Logged audit trail for {container}")
except Exception as e:
self.logger.error(f"Error logging audit trail: {e}")
def _publish_cooldown_active(self, container: str, action: str, remaining_seconds: int, confidence: float = 0.0) -> None:
"""
Publish cooldown active event.
Args:
container: Container name
action: Original remediation action
remaining_seconds: Seconds remaining in cooldown
confidence: Confidence level from original request
"""
try:
# Structure rejection events with result object
data = {
"container": container,
"action": action,
"result": {
"status": "rejected",
"reason": "cooldown_active",
"remaining_seconds": remaining_seconds,
},
"confidence": confidence,
}
self.publish_event("hemostat:remediation_complete", "remediation_complete", data)
self.logger.info(f"Cooldown active for {container}: {remaining_seconds}s remaining")
except Exception as e:
self.logger.error(f"Error publishing cooldown_active: {e}")
def _publish_circuit_breaker_active(
self, container: str, action: str, retry_count: int, confidence: float = 0.0
) -> None:
"""
Publish circuit breaker open event.
Args:
container: Container name
action: Original remediation action
retry_count: Current retry count
confidence: Confidence level from original request
"""
try:
# Structure rejection events with result object
data = {
"container": container,
"action": action,
"result": {
"status": "rejected",
"reason": "circuit_breaker_open",
"retry_count": retry_count,
},
"confidence": confidence,
}
self.publish_event("hemostat:remediation_complete", "remediation_complete", data)
self.logger.warning(f"Circuit breaker open for {container}: {retry_count} retries")
except Exception as e:
self.logger.error(f"Error publishing circuit_breaker_active: {e}")