Agent Reliability Playbook - Part 2: Detect, Triage, and Page Faster

📚 Agent Reliability Playbook
Previous
Part 1: Agent Reliability Drilldown: Instrument, Replay, and Fix Faster
Next
Part 3: Run Reliability Drills Before Production
Ad Space
Agent Reliability Playbook - Part 2: Detect, Triage, and Page Faster
Instrumentation without action is just busywork. In Part 1 we captured traces, transcripts, and metrics. Part 2 turns those signals into decisions: classifying failures, paging the right responder, and preserving timelines so humans can fix the issue on the first try.
Scenario: 3-Minute SLA for Critical Misses
Lumenly's support agent now handles real refunds. Leadership mandates that any "critical miss" (wrong refund or privacy breach) must reach an engineer inside three minutes. We will:
- Define a failure taxonomy with severities and owners.
- Build detectors that classify each mission.
- Route alerts to Slack and PagerDuty with context-rich payloads.
- Store incident timelines so responders see every action immediately.
- Continuously test the alerting stack.
Step 1: Define the Failure Taxonomy
Create a shared table so every detector labels incidents the same way.
| Failure type | Trigger example | Severity | Owner | Auto action |
|---|---|---|---|---|
| data_leak | Agent sends PII to wrong channel | Sev1 | Security lead | Revoke tokens, page security |
| financial_error | Wrong refund or charge | Sev1 | Billing engineer | Lock workflow, alert finance |
| tool_timeout | Tool fails 3 times | Sev2 | Platform on-call | Retry via queue |
| hallucination | Agent invents policy | Sev3 | Content team | Flag transcript, notify CX |
Document this in docs/failure-taxonomy.md and load it into config so detectors can reference the same metadata.
Example taxonomy.json:
{
"data_leak": {
"type": "data_leak",
"severity": "Sev1",
"summary": "PII leaked to unintended channel",
"owner": "security-lead"
},
"financial_error": {
"type": "financial_error",
"severity": "Sev1",
"summary": "Refund or charge mismatch",
"owner": "billing-engineer"
},
"tool_timeout": {
"type": "tool_timeout",
"severity": "Sev2",
"summary": "Critical tool failed repeatedly",
"owner": "platform-oncall"
}
}
Step 2: Build Real-Time Detectors
Use stream processors (Node or Python) that subscribe to the mission queue from Part 1.
Node.js detector worker
// reliability/detector.js
import { createClient } from "redis";
import taxonomy from "./taxonomy.json" assert { type: "json" };
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
export async function startDetector() {
while (true) {
const entry = await redis.xread("COUNT", 1, "BLOCK", 5000, "STREAMS", "mission:events", ">");
if (!entry) continue;
const [, events] = entry[0];
for (const [, fields] of events) {
const payload = mapFields(fields);
const classification = classify(payload);
if (classification) {
await enqueueIncident({ ...payload, ...classification });
}
}
}
}
function classify(event) {
if (event.type === "tool_error" && event.failures >= 3) {
return taxonomy.tool_timeout;
}
if (event.type === "refund" && event.amount > event.limit) {
return taxonomy.financial_error;
}
if (/ssn|passport/i.test(event.output || "")) {
return taxonomy.data_leak;
}
return null;
}
async function enqueueIncident(incident) {
await redis.xadd("incident:queue", "*", "payload", JSON.stringify(incident));
}
function mapFields(fields) {
const obj = {};
for (let i = 0; i < fields.length; i += 2) obj[fields[i]] = fields[i + 1];
obj.timestamp = Number(obj.timestamp);
return obj;
}
Python detector service
# reliability/detector.py
import asyncio, json, os
import aioredis
from taxonomy import TAXONOMY
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
async def classify(event: dict):
if event.get("type") == "tool_error" and event.get("failures", 0) >= 3:
return TAXONOMY["tool_timeout"]
if event.get("type") == "refund" and event.get("amount", 0) > event.get("limit", 0):
return TAXONOMY["financial_error"]
if "ssn" in (event.get("output", "").lower()):
return TAXONOMY["data_leak"]
return None
async def main():
redis = await aioredis.from_url(REDIS_URL, decode_responses=True)
last_id = "$"
while True:
entries = await redis.xread({"mission:events": last_id}, block=5000, count=1)
if not entries:
continue
_, events = entries[0]
for event_id, fields in events:
payload = { fields[i]: fields[i + 1] for i in range(0, len(fields), 2) }
match = await classify(payload)
if match:
incident = { **payload, **match }
await redis.xadd("incident:queue", incident)
last_id = event_id
if __name__ == "__main__":
asyncio.run(main())
Step 3: Route Alerts with Context
Send incidents to Slack, PagerDuty, or email with a single IncidentRouter abstraction.
Node.js router
// reliability/router.js
import fetch from "node-fetch";
import { createClient } from "redis";
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
export async function startRouter() {
while (true) {
const entry = await redis.xread("COUNT", 1, "BLOCK", 3000, "STREAMS", "incident:queue", ">");
if (!entry) continue;
const [, events] = entry[0];
const [, fields] = events[0];
const payloadObj = {};
for (let i = 0; i < fields.length; i += 2) {
payloadObj[fields[i]] = fields[i + 1];
}
const incident = JSON.parse(payloadObj.payload);
await Promise.all([
postToSlack(incident),
triggerPagerDuty(incident)
]);
await redis.xadd("incident:timeline", "*", "incident", JSON.stringify({ ...incident, status: "paged" }));
}
}
async function postToSlack(incident) {
return fetch(process.env.SLACK_WEBHOOK_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
text: `:rotating_light: ${incident.severity} ${incident.type}\n${incident.summary}`,
attachments: [{
color: incident.severity === "Sev1" ? "#e01e5a" : "#ecb22e",
fields: [
{ title: "Customer", value: incident.customer_id || "n/a", short: true },
{ title: "Trace", value: incident.trace_id || "n/a", short: true }
]
}]
})
});
}
async function triggerPagerDuty(incident) {
if (incident.severity !== "Sev1") return;
return fetch("https://events.pagerduty.com/v2/enqueue", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
routing_key: process.env.PAGERDUTY_ROUTING_KEY,
event_action: "trigger",
payload: {
summary: incident.summary,
severity: "critical",
source: "agent",
custom_details: incident
}
})
});
}
Python timeline writer
# reliability/timeline.py
import json
from datetime import datetime
from pathlib import Path
class TimelineStore:
def __init__(self, directory: Path):
directory.mkdir(parents=True, exist_ok=True)
self.directory = directory
def append(self, incident: dict):
incident["ts"] = datetime.utcnow().isoformat()
path = self.directory / f"{incident['incident_id']}.ndjson"
with path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(incident) + "\n")
Use the same NDJSON viewer from Part 1 to replay entire incident timelines.
Step 4: Build the Incident Console
Expose incidents in a small web dashboard (FastAPI or Next.js) so responders can acknowledge from any device.
# reliability/api.py
from fastapi import FastAPI
from pathlib import Path
import json
app = FastAPI()
LOG_DIR = Path("logs/incidents")
@app.get("/incidents")
def list_incidents():
items = []
for file in LOG_DIR.glob("*.ndjson"):
with file.open() as fh:
last = list(fh)[-1]
items.append(json.loads(last))
return sorted(items, key=lambda item: item["ts"], reverse=True)
Expose this route behind your VPN and link it in Slack alerts so on-call engineers jump straight to the timeline.
Step 5: Test the Alerting Flow
Automate verification so detectors never rot.
- Replay tests: feed sample NDJSON missions through the detector and assert incidents emit within 500 ms.
- Synthetic incidents: schedule cron jobs that inject fake Sev1 alerts; page a test channel to validate runbooks.
- Load tests: use k6 or Artillery to push thousands of mission events and ensure the incident queue keeps up.
Example Python test:
# tests/test_detector.py
import asyncio
from reliability.detector import classify
async def test_financial_error_classified():
event = {"type": "refund", "amount": 200, "limit": 50}
match = await classify(event)
assert match["severity"] == "Sev1"
Step 6: Implementation Checklist
- Publish taxonomy doc + JSON config checked into repo.
- Deploy detector workers (Node or Python) reading from mission event streams.
- Configure Slack + PagerDuty routing with encrypted secrets.
- Persist timelines (Redis Streams, Postgres, or NDJSON) and expose
/incidentsAPI. - Add synthetic incident tests plus load checks in CI or cron.
You now have actionable telemetry: failures are classified, paged, and documented with zero manual work. Next we will stress test the agent with synthetic missions and chaos to ensure these alerts fire before real customers ever see an outage.
Continue Building
Ad Space
Recommended Tools & Resources
* This section contains affiliate links. We may earn a commission when you purchase through these links at no additional cost to you.
📚 Featured AI Books
OpenAI API
AI PlatformAccess GPT-4 and other powerful AI models for your agent development.
LangChain Plus
FrameworkAdvanced framework for building applications with large language models.
Pinecone Vector Database
DatabaseHigh-performance vector database for AI applications and semantic search.
AI Agent Development Course
EducationComplete course on building production-ready AI agents from scratch.
💡 Pro Tip
Start with the free tiers of these tools to experiment, then upgrade as your AI agent projects grow. Most successful developers use a combination of 2-3 core tools rather than trying everything at once.
📚 Agent Reliability Playbook
Previous
Part 1: Agent Reliability Drilldown: Instrument, Replay, and Fix Faster
Next
Part 3: Run Reliability Drills Before Production
🚀 Join the AgentForge Community
Get weekly insights, tutorials, and the latest AI agent developments delivered to your inbox.
No spam, ever. Unsubscribe at any time.



