Your Agentic System
What you'll learn
- ✓Architect a complete system combining multi-agent orchestration with RAG
- ✓Implement shared memory and state management across agents
- ✓Learn deployment strategies for agentic systems
- ✓Understand monitoring, observability, and failure handling in production
- ✓Celebrate -- you have reached Aoraki!
This is it. The final lesson of the Aoraki trail. Over the past three lessons, you learned how multi-agent systems work, how RAG pipelines retrieve knowledge, and how to build a working retrieval pipeline from scratch. Now we are going to bring it all together into a single, cohesive system: a multi-agent application with RAG-powered knowledge, shared memory, and production-ready architecture.
Let us build something real.
The Architecture
We are going to build a Research Assistant System -- a multi-agent application that can take a research question, find relevant information from your document collection, synthesize findings, and produce a well-structured report. Here is the high-level architecture:
┌──────────────────┐
│ Supervisor │
│ Agent │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Retriever │ │ Analyst │ │ Writer │
│ Agent │ │ Agent │ │ Agent │
└──────┬─────┘ └──────┬─────┘ └──────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────┐
│ Shared Memory / State │
│ (RAG Vector Store + Working Memory) │
└──────────────────────────────────────────┘
The Agents
- Supervisor -- Plans the research, delegates to workers, reviews progress, decides when the task is complete
- Retriever -- Queries the RAG pipeline to find relevant document chunks
- Analyst -- Synthesizes retrieved information, identifies patterns, flags gaps
- Writer -- Produces the final report based on the analyst's findings
💡Why This Architecture?
This is a hybrid pattern: the supervisor uses hierarchical orchestration to manage the workflow, while the retriever and analyst can run in parallel when the supervisor determines it is safe to do so. The writer always runs last (sequential). This mirrors how a real research team operates.
Shared Memory: The Backbone
The most critical piece of a multi-agent system is not any individual agent -- it is the shared memory layer that allows agents to communicate and build on each other's work.
We need two types of memory:
- Long-term memory -- The RAG vector store (your document knowledge base)
- Working memory -- Short-term state for the current task (findings so far, the plan, intermediate results)
# memory.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class MemoryEntry:
agent: str
action: str
content: Any
timestamp: str = field(
default_factory=lambda: datetime.now().isoformat()
)
class SharedMemory:
"""Working memory shared across all agents in a task."""
def __init__(self):
self.plan: dict = {}
self.findings: list[dict] = []
self.retrieved_chunks: list[dict] = []
self.analysis: str = ""
self.report: str = ""
self.history: list[MemoryEntry] = []
self.status: str = "initialized"
def record(self, agent: str, action: str, content: Any):
"""Record an action in the shared history."""
entry = MemoryEntry(
agent=agent, action=action, content=content
)
self.history.append(entry)
def add_finding(self, agent: str, finding: dict):
"""Add a research finding."""
self.findings.append({
**finding,
"found_by": agent,
"timestamp": datetime.now().isoformat(),
})
self.record(agent, "add_finding", finding)
def add_chunks(self, chunks: list[dict]):
"""Store retrieved document chunks."""
self.retrieved_chunks.extend(chunks)
self.record("retriever", "add_chunks", f"{len(chunks)} chunks")
def get_context_summary(self) -> str:
"""Get a summary of current state for agent prompts."""
return json.dumps({
"status": self.status,
"plan": self.plan,
"findings_count": len(self.findings),
"chunks_retrieved": len(self.retrieved_chunks),
"has_analysis": bool(self.analysis),
"has_report": bool(self.report),
}, indent=2)
def get_full_context(self) -> str:
"""Get the full working memory as a string for LLM context."""
parts = [f"## Current Status: {self.status}"]
if self.plan:
parts.append(f"## Plan\n{json.dumps(self.plan, indent=2)}")
if self.findings:
parts.append("## Findings")
for f in self.findings:
parts.append(f"- [{f['found_by']}] {f.get('summary', str(f))}")
if self.analysis:
parts.append(f"## Analysis\n{self.analysis}")
return "\n\n".join(parts)
💡 Tip
Shared memory should be append-mostly. Agents should add to the collective knowledge rather than overwrite each other's work. The history log makes debugging much easier -- you can trace exactly which agent did what and when.
Building the Agents
Let us implement each agent. We will use a base class to keep things DRY:
# agents.py
from openai import OpenAI
from memory import SharedMemory
client = OpenAI()
class BaseAgent:
"""Base class for all agents in the system."""
def __init__(self, name: str, role: str, system_prompt: str):
self.name = name
self.role = role
self.system_prompt = system_prompt
async def run(self, task: str, memory: SharedMemory) -> str:
"""Execute the agent's task with access to shared memory."""
context = memory.get_full_context()
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"""
Current shared memory:
{context}
Your task:
{task}
"""},
]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.3,
)
result = response.choices[0].message.content
memory.record(self.name, "completed_task", task)
return result
The Retriever Agent
# retriever_agent.py
from agents import BaseAgent
from memory import SharedMemory
import chromadb
from openai import OpenAI
client = OpenAI()
class RetrieverAgent(BaseAgent):
def __init__(self, collection_name: str = "my_docs"):
super().__init__(
name="retriever",
role="Document Retrieval Specialist",
system_prompt="""You are a retrieval specialist. Given a research
question, you formulate effective search queries to find relevant
information. You may suggest multiple queries to cover different
angles of the topic. Return your queries as a JSON list of strings.""",
)
self.chroma = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma.get_collection(collection_name)
async def run(self, question: str, memory: SharedMemory) -> str:
# Step 1: Generate search queries
query_response = await super().run(
f"Generate 3-5 search queries for: {question}", memory
)
# Parse queries (simplified -- use JSON parsing in production)
queries = self._parse_queries(query_response)
# Step 2: Execute searches
all_chunks = []
seen_ids = set()
for query in queries:
embedding = client.embeddings.create(
model="text-embedding-3-small",
input=query,
).data[0].embedding
results = self.collection.query(
query_embeddings=[embedding],
n_results=5,
include=["documents", "metadatas", "distances"],
)
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
):
chunk_id = f"{meta.get('source', '')}_{meta.get('chunk_index', '')}"
if chunk_id not in seen_ids:
seen_ids.add(chunk_id)
all_chunks.append({
"content": doc,
"source": meta.get("source", "unknown"),
"similarity": 1 - dist,
"query": query,
})
# Sort by similarity and take top results
all_chunks.sort(key=lambda x: x["similarity"], reverse=True)
top_chunks = all_chunks[:10]
# Store in shared memory
memory.add_chunks(top_chunks)
memory.record(
self.name, "retrieval_complete",
f"Found {len(top_chunks)} unique chunks from {len(queries)} queries"
)
return f"Retrieved {len(top_chunks)} relevant chunks"
def _parse_queries(self, response: str) -> list[str]:
"""Extract search queries from the LLM response."""
import json
try:
return json.loads(response)
except json.JSONDecodeError:
# Fallback: treat each line as a query
return [
line.strip().strip('"').strip("- ")
for line in response.split("\n")
if line.strip() and len(line.strip()) > 10
]
The Analyst and Writer Agents
The analyst and writer follow the same pattern: override run() to build a task-specific prompt from shared memory, call the base LLM, and store results back into memory. The key difference is what they read and write:
# analyst_agent.py
class AnalystAgent(BaseAgent):
def __init__(self):
super().__init__(
name="analyst",
role="Research Analyst",
system_prompt="""You are a research analyst. Given retrieved
document chunks and a research question, you:
1. Identify key themes and patterns across the chunks
2. Assess the reliability and relevance of each source
3. Note any gaps in the available information
4. Synthesize findings into a structured analysis
Be thorough but concise. Flag contradictions between sources.""",
)
async def run(self, question: str, memory: SharedMemory) -> str:
chunks_text = "\n\n---\n\n".join([
f"[Source: {c['source']} | Similarity: {c['similarity']:.3f}]\n{c['content']}"
for c in memory.retrieved_chunks
])
task = f"Research Question: {question}\n\nRetrieved Document Chunks:\n{chunks_text}\n\nAnalyze these chunks and provide a structured synthesis."
analysis = await super().run(task, memory)
memory.analysis = analysis # Analyst writes analysis
memory.record(self.name, "analysis_complete", "Analysis stored")
return analysis
The writer is nearly identical -- it reads memory.analysis (produced by the analyst) and writes memory.report:
# writer_agent.py
class WriterAgent(BaseAgent):
def __init__(self):
super().__init__(
name="writer", role="Technical Writer",
system_prompt="You are a technical writer. Write clear reports with sections, citations, a summary, and noted limitations.",
)
async def run(self, question: str, memory: SharedMemory) -> str:
task = f"Research Question: {question}\n\nAnalysis:\n{memory.analysis}\n\nSource count: {len(memory.retrieved_chunks)} chunks\n\nWrite a comprehensive research report."
report = await super().run(task, memory)
memory.report = report # Writer writes report
memory.status = "complete"
memory.record(self.name, "report_complete", "Final report generated")
return report
The Supervisor: Tying It All Together
# supervisor.py
import asyncio
from memory import SharedMemory
from retriever_agent import RetrieverAgent
from analyst_agent import AnalystAgent
from writer_agent import WriterAgent
class Supervisor:
def __init__(self):
self.retriever = RetrieverAgent()
self.analyst = AnalystAgent()
self.writer = WriterAgent()
async def run(self, question: str) -> dict:
"""Run the full research pipeline."""
memory = SharedMemory()
memory.status = "planning"
print(f"[Supervisor] Starting research: {question}")
# Phase 1: Retrieval
memory.status = "retrieving"
print("[Supervisor] Delegating to Retriever Agent...")
await self.retriever.run(question, memory)
print(f"[Supervisor] Retrieval complete: "
f"{len(memory.retrieved_chunks)} chunks found")
if not memory.retrieved_chunks:
memory.status = "failed"
return {
"status": "failed",
"reason": "No relevant documents found",
"report": None,
}
# Phase 2: Analysis
memory.status = "analyzing"
print("[Supervisor] Delegating to Analyst Agent...")
await self.analyst.run(question, memory)
print("[Supervisor] Analysis complete")
# Phase 3: Writing
memory.status = "writing"
print("[Supervisor] Delegating to Writer Agent...")
await self.writer.run(question, memory)
print("[Supervisor] Report complete")
return {
"status": "complete",
"report": memory.report,
"sources": [
{"source": c["source"], "similarity": c["similarity"]}
for c in memory.retrieved_chunks
],
"history": [
{"agent": e.agent, "action": e.action, "time": e.timestamp}
for e in memory.history
],
}
# Run it
async def main():
supervisor = Supervisor()
result = await supervisor.run(
"What are the best practices for error handling in our codebase?"
)
if result["status"] == "complete":
print("\n" + "=" * 60)
print("RESEARCH REPORT")
print("=" * 60)
print(result["report"])
print("\n" + "-" * 60)
print(f"Sources: {len(result['sources'])}")
print(f"Steps: {len(result['history'])}")
if __name__ == "__main__":
asyncio.run(main())
⚠️ Warning
This supervisor uses a fixed sequential flow for clarity. In a production system, you would want the supervisor to be more dynamic -- able to request additional retrieval if the analyst reports gaps, or ask the writer to revise if quality checks fail. LangGraph is excellent for building these adaptive loops.
Deployment Considerations
Building the system is one thing. Running it in production is another. Here are the key decisions you will face.
Where to Run Your Agents
| Option | Best For | Tradeoffs | |--------|----------|-----------| | Serverless functions (AWS Lambda, Vercel) | Simple pipelines, low traffic | Cold starts, execution time limits | | Long-running servers (EC2, Railway, Fly.io) | Complex agent loops, persistent connections | Higher cost, need to manage uptime | | Queue-based (Celery, BullMQ, Inngest) | Heavy workloads, async processing | More infrastructure, eventual consistency | | Managed platforms (LangServe, Modal) | Quick deployment, auto-scaling | Vendor lock-in, less control |
✅Start with the Simplest Deployment
For most projects, a single server process behind an API (FastAPI, Express) is enough to get started. Move to queues and workers only when you need to handle concurrent requests or long-running tasks that exceed HTTP timeout limits.
API Layer
Wrap your supervisor in an API so other services can trigger research tasks:
# api.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from supervisor import Supervisor
import uuid
app = FastAPI()
supervisor = Supervisor()
results_store = {} # Use Redis or a database in production
class ResearchRequest(BaseModel):
question: str
@app.post("/research")
async def start_research(request: ResearchRequest, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
results_store[task_id] = {"status": "running"}
background_tasks.add_task(run_research, task_id, request.question)
return {"task_id": task_id, "status": "running"}
async def run_research(task_id: str, question: str):
results_store[task_id] = await supervisor.run(question)
@app.get("/research/{task_id}")
async def get_result(task_id: str):
return results_store.get(task_id, {"status": "not_found"})
Monitoring and Observability
When agents are making decisions autonomously, you need to see what is happening. Monitoring is not optional -- it is essential.
Tracing Agent Execution
The simplest useful pattern is a decorator that logs timing and success/failure for every agent call:
# observability.py
import logging, time
from functools import wraps
logger = logging.getLogger("agentic_system")
def trace_agent(func):
"""Decorator to trace agent execution."""
@wraps(func)
async def wrapper(self, *args, **kwargs):
start = time.time()
try:
result = await func(self, *args, **kwargs)
logger.info(f"[{self.name}] Completed in {time.time()-start:.2f}s")
return result
except Exception as e:
logger.error(f"[{self.name}] Failed after {time.time()-start:.2f}s: {e}")
raise
return wrapper
Apply @trace_agent to each agent's run() method. In production, add structured extra={} fields (agent name, duration_ms, status) for your logging stack.
Key Metrics to Track
- Latency per agent -- Which agents are bottlenecks?
- Token usage per request -- Directly correlates with cost
- Retrieval quality -- Are the right chunks being found?
- Error rates and end-to-end success rate -- What percentage of requests produce useful results?
Handling Failures Gracefully
Agents will fail. LLM calls time out, vector databases go down, and sometimes the model just produces nonsense. The two essential resilience patterns are retry with backoff and timeouts:
# resilience.py
import asyncio
from typing import Callable, Any
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
*args, **kwargs,
) -> Any:
"""Retry a function with exponential backoff."""
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
For timeouts, wrap agent calls with asyncio.wait_for(func(), timeout=30.0). Together, these two patterns handle the vast majority of transient failures in production.
Build Your Complete System
This is the big one. Bring together everything from this trail:
- Set up your vector store from lesson 3 with your documents.
- Implement the agent classes -- at minimum: a retriever, an analyst, and a supervisor.
- Add shared memory so agents can communicate.
- Wire up the supervisor to orchestrate the workflow.
- Test with real questions against your document collection.
- Add basic logging so you can see what each agent is doing.
- Handle at least one failure case -- what happens if retrieval returns nothing?
Stretch goals:
- Add a feedback loop where the supervisor can request additional retrieval
- Wrap the system in a FastAPI endpoint
- Add retry logic for LLM calls
- Track token usage and estimate costs
Paw Print Check
Before moving on, make sure you can answer these:
- 🐾Can you explain how shared memory enables communication between agents?
- 🐾What role does the supervisor play in a multi-agent system?
- 🐾How would you deploy this system for production use?
- 🐾What metrics would you monitor to ensure your agentic system is healthy?
- 🐾How do you handle failures in a multi-agent pipeline?
You Have Reached Aoraki
Take a moment to appreciate how far you have come. From multi-agent orchestration to RAG pipelines to this complete, deployable agentic architecture -- you now have the building blocks to create real AI-powered applications: research assistants, code review systems, document analysis tools, and whatever else you can imagine.
🐾Haku Is Proud of You
From Rangitoto to Aoraki, you have climbed the entire trail. The AI landscape will keep evolving -- new models, new frameworks, new patterns. But the fundamentals you have learned here will serve you well no matter what comes next. Keep building, keep experimenting, and keep pushing higher.
Aoraki is not the end. It is just the best vantage point to see where to go next.