Skip to content
← Back to trail
Aoraki30 min

Your Agentic System

Your Agentic System

What you'll learn

  • Architect a complete system combining multi-agent orchestration with RAG
  • Implement shared memory and state management across agents
  • Learn deployment strategies for agentic systems
  • Understand monitoring, observability, and failure handling in production
  • Celebrate -- you have reached Aoraki!

This is it. The final lesson of the Aoraki trail. Over the past three lessons, you learned how multi-agent systems work, how RAG pipelines retrieve knowledge, and how to build a working retrieval pipeline from scratch. Now we are going to bring it all together into a single, cohesive system: a multi-agent application with RAG-powered knowledge, shared memory, and production-ready architecture.

Let us build something real.

The Architecture

We are going to build a Research Assistant System -- a multi-agent application that can take a research question, find relevant information from your document collection, synthesize findings, and produce a well-structured report. Here is the high-level architecture:

                    ┌──────────────────┐
                    │   Supervisor      │
                    │   Agent           │
                    └────────┬─────────┘
                             │
              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
     ┌────────────┐  ┌────────────┐  ┌────────────┐
     │  Retriever  │  │  Analyst   │  │  Writer    │
     │  Agent      │  │  Agent     │  │  Agent     │
     └──────┬─────┘  └──────┬─────┘  └──────┬─────┘
            │               │               │
            ▼               ▼               ▼
     ┌──────────────────────────────────────────┐
     │          Shared Memory / State            │
     │    (RAG Vector Store + Working Memory)    │
     └──────────────────────────────────────────┘

The Agents

  • Supervisor -- Plans the research, delegates to workers, reviews progress, decides when the task is complete
  • Retriever -- Queries the RAG pipeline to find relevant document chunks
  • Analyst -- Synthesizes retrieved information, identifies patterns, flags gaps
  • Writer -- Produces the final report based on the analyst's findings

💡Why This Architecture?

This is a hybrid pattern: the supervisor uses hierarchical orchestration to manage the workflow, while the retriever and analyst can run in parallel when the supervisor determines it is safe to do so. The writer always runs last (sequential). This mirrors how a real research team operates.

Shared Memory: The Backbone

The most critical piece of a multi-agent system is not any individual agent -- it is the shared memory layer that allows agents to communicate and build on each other's work.

We need two types of memory:

  1. Long-term memory -- The RAG vector store (your document knowledge base)
  2. Working memory -- Short-term state for the current task (findings so far, the plan, intermediate results)
# memory.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json


@dataclass
class MemoryEntry:
    agent: str
    action: str
    content: Any
    timestamp: str = field(
        default_factory=lambda: datetime.now().isoformat()
    )


class SharedMemory:
    """Working memory shared across all agents in a task."""

    def __init__(self):
        self.plan: dict = {}
        self.findings: list[dict] = []
        self.retrieved_chunks: list[dict] = []
        self.analysis: str = ""
        self.report: str = ""
        self.history: list[MemoryEntry] = []
        self.status: str = "initialized"

    def record(self, agent: str, action: str, content: Any):
        """Record an action in the shared history."""
        entry = MemoryEntry(
            agent=agent, action=action, content=content
        )
        self.history.append(entry)

    def add_finding(self, agent: str, finding: dict):
        """Add a research finding."""
        self.findings.append({
            **finding,
            "found_by": agent,
            "timestamp": datetime.now().isoformat(),
        })
        self.record(agent, "add_finding", finding)

    def add_chunks(self, chunks: list[dict]):
        """Store retrieved document chunks."""
        self.retrieved_chunks.extend(chunks)
        self.record("retriever", "add_chunks", f"{len(chunks)} chunks")

    def get_context_summary(self) -> str:
        """Get a summary of current state for agent prompts."""
        return json.dumps({
            "status": self.status,
            "plan": self.plan,
            "findings_count": len(self.findings),
            "chunks_retrieved": len(self.retrieved_chunks),
            "has_analysis": bool(self.analysis),
            "has_report": bool(self.report),
        }, indent=2)

    def get_full_context(self) -> str:
        """Get the full working memory as a string for LLM context."""
        parts = [f"## Current Status: {self.status}"]

        if self.plan:
            parts.append(f"## Plan\n{json.dumps(self.plan, indent=2)}")

        if self.findings:
            parts.append("## Findings")
            for f in self.findings:
                parts.append(f"- [{f['found_by']}] {f.get('summary', str(f))}")

        if self.analysis:
            parts.append(f"## Analysis\n{self.analysis}")

        return "\n\n".join(parts)

💡 Tip

Shared memory should be append-mostly. Agents should add to the collective knowledge rather than overwrite each other's work. The history log makes debugging much easier -- you can trace exactly which agent did what and when.

Building the Agents

Let us implement each agent. We will use a base class to keep things DRY:

# agents.py
from openai import OpenAI
from memory import SharedMemory

client = OpenAI()


class BaseAgent:
    """Base class for all agents in the system."""

    def __init__(self, name: str, role: str, system_prompt: str):
        self.name = name
        self.role = role
        self.system_prompt = system_prompt

    async def run(self, task: str, memory: SharedMemory) -> str:
        """Execute the agent's task with access to shared memory."""
        context = memory.get_full_context()

        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": f"""
Current shared memory:
{context}

Your task:
{task}
"""},
        ]

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            temperature=0.3,
        )

        result = response.choices[0].message.content
        memory.record(self.name, "completed_task", task)
        return result

The Retriever Agent

# retriever_agent.py
from agents import BaseAgent
from memory import SharedMemory
import chromadb
from openai import OpenAI

client = OpenAI()


class RetrieverAgent(BaseAgent):
    def __init__(self, collection_name: str = "my_docs"):
        super().__init__(
            name="retriever",
            role="Document Retrieval Specialist",
            system_prompt="""You are a retrieval specialist. Given a research
question, you formulate effective search queries to find relevant
information. You may suggest multiple queries to cover different
angles of the topic. Return your queries as a JSON list of strings.""",
        )
        self.chroma = chromadb.PersistentClient(path="./chroma_db")
        self.collection = self.chroma.get_collection(collection_name)

    async def run(self, question: str, memory: SharedMemory) -> str:
        # Step 1: Generate search queries
        query_response = await super().run(
            f"Generate 3-5 search queries for: {question}", memory
        )

        # Parse queries (simplified -- use JSON parsing in production)
        queries = self._parse_queries(query_response)

        # Step 2: Execute searches
        all_chunks = []
        seen_ids = set()

        for query in queries:
            embedding = client.embeddings.create(
                model="text-embedding-3-small",
                input=query,
            ).data[0].embedding

            results = self.collection.query(
                query_embeddings=[embedding],
                n_results=5,
                include=["documents", "metadatas", "distances"],
            )

            for doc, meta, dist in zip(
                results["documents"][0],
                results["metadatas"][0],
                results["distances"][0],
            ):
                chunk_id = f"{meta.get('source', '')}_{meta.get('chunk_index', '')}"
                if chunk_id not in seen_ids:
                    seen_ids.add(chunk_id)
                    all_chunks.append({
                        "content": doc,
                        "source": meta.get("source", "unknown"),
                        "similarity": 1 - dist,
                        "query": query,
                    })

        # Sort by similarity and take top results
        all_chunks.sort(key=lambda x: x["similarity"], reverse=True)
        top_chunks = all_chunks[:10]

        # Store in shared memory
        memory.add_chunks(top_chunks)
        memory.record(
            self.name, "retrieval_complete",
            f"Found {len(top_chunks)} unique chunks from {len(queries)} queries"
        )

        return f"Retrieved {len(top_chunks)} relevant chunks"

    def _parse_queries(self, response: str) -> list[str]:
        """Extract search queries from the LLM response."""
        import json
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            # Fallback: treat each line as a query
            return [
                line.strip().strip('"').strip("- ")
                for line in response.split("\n")
                if line.strip() and len(line.strip()) > 10
            ]

The Analyst and Writer Agents

The analyst and writer follow the same pattern: override run() to build a task-specific prompt from shared memory, call the base LLM, and store results back into memory. The key difference is what they read and write:

# analyst_agent.py
class AnalystAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            name="analyst",
            role="Research Analyst",
            system_prompt="""You are a research analyst. Given retrieved
document chunks and a research question, you:
1. Identify key themes and patterns across the chunks
2. Assess the reliability and relevance of each source
3. Note any gaps in the available information
4. Synthesize findings into a structured analysis

Be thorough but concise. Flag contradictions between sources.""",
        )

    async def run(self, question: str, memory: SharedMemory) -> str:
        chunks_text = "\n\n---\n\n".join([
            f"[Source: {c['source']} | Similarity: {c['similarity']:.3f}]\n{c['content']}"
            for c in memory.retrieved_chunks
        ])

        task = f"Research Question: {question}\n\nRetrieved Document Chunks:\n{chunks_text}\n\nAnalyze these chunks and provide a structured synthesis."

        analysis = await super().run(task, memory)
        memory.analysis = analysis  # Analyst writes analysis
        memory.record(self.name, "analysis_complete", "Analysis stored")
        return analysis

The writer is nearly identical -- it reads memory.analysis (produced by the analyst) and writes memory.report:

# writer_agent.py
class WriterAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            name="writer", role="Technical Writer",
            system_prompt="You are a technical writer. Write clear reports with sections, citations, a summary, and noted limitations.",
        )

    async def run(self, question: str, memory: SharedMemory) -> str:
        task = f"Research Question: {question}\n\nAnalysis:\n{memory.analysis}\n\nSource count: {len(memory.retrieved_chunks)} chunks\n\nWrite a comprehensive research report."

        report = await super().run(task, memory)
        memory.report = report  # Writer writes report
        memory.status = "complete"
        memory.record(self.name, "report_complete", "Final report generated")
        return report

The Supervisor: Tying It All Together

# supervisor.py
import asyncio
from memory import SharedMemory
from retriever_agent import RetrieverAgent
from analyst_agent import AnalystAgent
from writer_agent import WriterAgent


class Supervisor:
    def __init__(self):
        self.retriever = RetrieverAgent()
        self.analyst = AnalystAgent()
        self.writer = WriterAgent()

    async def run(self, question: str) -> dict:
        """Run the full research pipeline."""
        memory = SharedMemory()
        memory.status = "planning"

        print(f"[Supervisor] Starting research: {question}")

        # Phase 1: Retrieval
        memory.status = "retrieving"
        print("[Supervisor] Delegating to Retriever Agent...")
        await self.retriever.run(question, memory)
        print(f"[Supervisor] Retrieval complete: "
              f"{len(memory.retrieved_chunks)} chunks found")

        if not memory.retrieved_chunks:
            memory.status = "failed"
            return {
                "status": "failed",
                "reason": "No relevant documents found",
                "report": None,
            }

        # Phase 2: Analysis
        memory.status = "analyzing"
        print("[Supervisor] Delegating to Analyst Agent...")
        await self.analyst.run(question, memory)
        print("[Supervisor] Analysis complete")

        # Phase 3: Writing
        memory.status = "writing"
        print("[Supervisor] Delegating to Writer Agent...")
        await self.writer.run(question, memory)
        print("[Supervisor] Report complete")

        return {
            "status": "complete",
            "report": memory.report,
            "sources": [
                {"source": c["source"], "similarity": c["similarity"]}
                for c in memory.retrieved_chunks
            ],
            "history": [
                {"agent": e.agent, "action": e.action, "time": e.timestamp}
                for e in memory.history
            ],
        }


# Run it
async def main():
    supervisor = Supervisor()
    result = await supervisor.run(
        "What are the best practices for error handling in our codebase?"
    )

    if result["status"] == "complete":
        print("\n" + "=" * 60)
        print("RESEARCH REPORT")
        print("=" * 60)
        print(result["report"])
        print("\n" + "-" * 60)
        print(f"Sources: {len(result['sources'])}")
        print(f"Steps: {len(result['history'])}")

if __name__ == "__main__":
    asyncio.run(main())

⚠️ Warning

This supervisor uses a fixed sequential flow for clarity. In a production system, you would want the supervisor to be more dynamic -- able to request additional retrieval if the analyst reports gaps, or ask the writer to revise if quality checks fail. LangGraph is excellent for building these adaptive loops.

Deployment Considerations

Building the system is one thing. Running it in production is another. Here are the key decisions you will face.

Where to Run Your Agents

| Option | Best For | Tradeoffs | |--------|----------|-----------| | Serverless functions (AWS Lambda, Vercel) | Simple pipelines, low traffic | Cold starts, execution time limits | | Long-running servers (EC2, Railway, Fly.io) | Complex agent loops, persistent connections | Higher cost, need to manage uptime | | Queue-based (Celery, BullMQ, Inngest) | Heavy workloads, async processing | More infrastructure, eventual consistency | | Managed platforms (LangServe, Modal) | Quick deployment, auto-scaling | Vendor lock-in, less control |

Start with the Simplest Deployment

For most projects, a single server process behind an API (FastAPI, Express) is enough to get started. Move to queues and workers only when you need to handle concurrent requests or long-running tasks that exceed HTTP timeout limits.

API Layer

Wrap your supervisor in an API so other services can trigger research tasks:

# api.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from supervisor import Supervisor
import uuid

app = FastAPI()
supervisor = Supervisor()
results_store = {}  # Use Redis or a database in production

class ResearchRequest(BaseModel):
    question: str

@app.post("/research")
async def start_research(request: ResearchRequest, background_tasks: BackgroundTasks):
    task_id = str(uuid.uuid4())
    results_store[task_id] = {"status": "running"}
    background_tasks.add_task(run_research, task_id, request.question)
    return {"task_id": task_id, "status": "running"}

async def run_research(task_id: str, question: str):
    results_store[task_id] = await supervisor.run(question)

@app.get("/research/{task_id}")
async def get_result(task_id: str):
    return results_store.get(task_id, {"status": "not_found"})

Monitoring and Observability

When agents are making decisions autonomously, you need to see what is happening. Monitoring is not optional -- it is essential.

Tracing Agent Execution

The simplest useful pattern is a decorator that logs timing and success/failure for every agent call:

# observability.py
import logging, time
from functools import wraps

logger = logging.getLogger("agentic_system")

def trace_agent(func):
    """Decorator to trace agent execution."""
    @wraps(func)
    async def wrapper(self, *args, **kwargs):
        start = time.time()
        try:
            result = await func(self, *args, **kwargs)
            logger.info(f"[{self.name}] Completed in {time.time()-start:.2f}s")
            return result
        except Exception as e:
            logger.error(f"[{self.name}] Failed after {time.time()-start:.2f}s: {e}")
            raise
    return wrapper

Apply @trace_agent to each agent's run() method. In production, add structured extra={} fields (agent name, duration_ms, status) for your logging stack.

Key Metrics to Track

  • Latency per agent -- Which agents are bottlenecks?
  • Token usage per request -- Directly correlates with cost
  • Retrieval quality -- Are the right chunks being found?
  • Error rates and end-to-end success rate -- What percentage of requests produce useful results?

Handling Failures Gracefully

Agents will fail. LLM calls time out, vector databases go down, and sometimes the model just produces nonsense. The two essential resilience patterns are retry with backoff and timeouts:

# resilience.py
import asyncio
from typing import Callable, Any

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    *args, **kwargs,
) -> Any:
    """Retry a function with exponential backoff."""
    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            await asyncio.sleep(delay)

For timeouts, wrap agent calls with asyncio.wait_for(func(), timeout=30.0). Together, these two patterns handle the vast majority of transient failures in production.

🛠️

Build Your Complete System

This is the big one. Bring together everything from this trail:

  1. Set up your vector store from lesson 3 with your documents.
  2. Implement the agent classes -- at minimum: a retriever, an analyst, and a supervisor.
  3. Add shared memory so agents can communicate.
  4. Wire up the supervisor to orchestrate the workflow.
  5. Test with real questions against your document collection.
  6. Add basic logging so you can see what each agent is doing.
  7. Handle at least one failure case -- what happens if retrieval returns nothing?

Stretch goals:

  • Add a feedback loop where the supervisor can request additional retrieval
  • Wrap the system in a FastAPI endpoint
  • Add retry logic for LLM calls
  • Track token usage and estimate costs

Paw Print Check

Before moving on, make sure you can answer these:

  • 🐾Can you explain how shared memory enables communication between agents?
  • 🐾What role does the supervisor play in a multi-agent system?
  • 🐾How would you deploy this system for production use?
  • 🐾What metrics would you monitor to ensure your agentic system is healthy?
  • 🐾How do you handle failures in a multi-agent pipeline?

You Have Reached Aoraki

Take a moment to appreciate how far you have come. From multi-agent orchestration to RAG pipelines to this complete, deployable agentic architecture -- you now have the building blocks to create real AI-powered applications: research assistants, code review systems, document analysis tools, and whatever else you can imagine.

🐾Haku Is Proud of You

From Rangitoto to Aoraki, you have climbed the entire trail. The AI landscape will keep evolving -- new models, new frameworks, new patterns. But the fundamentals you have learned here will serve you well no matter what comes next. Keep building, keep experimenting, and keep pushing higher.

Aoraki is not the end. It is just the best vantage point to see where to go next.

Next Up

Resources

Explore tools, references, and next steps for your AI journey

Enjoying the course?

If you found this helpful, please share it with friends and family — it really helps us out!

Stay in the loop

Get notified about new lessons, trails, and updates — no spam, just the good stuff.