Module 8: Capstone — Customer Support Agent
Bringing It All Together
Introduction: The Final Project
This is it. Everything you've learned across seven modules comes together in one project: a production-ready AI customer support agent.
This agent will:
- Answer product questions using a RAG-powered knowledge base
- Create support tickets when it can't resolve an issue
- Escalate sensitive conversations to human agents
- Maintain conversation memory across messages
- Handle errors gracefully with fallback responses
- Follow a stateful workflow built with LangGraph
By the end of this module, you'll have a complete, deployable agent that demonstrates every concept from this course.
8.1 Architecture Overview
The State Machine
Our customer support agent follows this workflow:
User Message
↓
[Classify] ──→ General Question ──→ [RAG Search] ──→ [Respond]
↓ ↓
Sensitive Issue ──→ [Escalate to Human] [Done]
↓
Technical Problem ──→ [RAG Search] ──→ [Create Ticket] ──→ [Respond]
Components
- State: Tracks the conversation, classification, retrieved documents, and ticket info
- Classifier node: Determines the type of inquiry
- RAG node: Searches the product knowledge base
- Response node: Generates a helpful answer using context
- Ticket node: Creates a support ticket for unresolved issues
- Escalation node: Hands off to a human for sensitive topics
- Memory: Maintains conversation history across turns
8.2 Setting Up the Project
Installation
mkdir customer-support-agent
cd customer-support-agent
python -m venv venv
source venv/bin/activate
pip install langchain langchain-openai langchain-chroma langgraph fastapi uvicorn pypdf
Create .env:
OPENAI_API_KEY=your_api_key_here
Project Structure
customer-support-agent/
├── .env
├── agent.py # Main agent logic
├── knowledge_base.py # RAG setup and document loading
├── tools.py # Agent tools (ticket creation, escalation)
├── server.py # FastAPI endpoints
└── knowledge/ # Product documentation
└── (your product docs go here)
8.3 Step 1: Define the State and Tools
The Agent State
Create agent.py:
import os
from typing import Literal, TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, BaseMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
load_dotenv()
llm = ChatOpenAI(model="gpt-4o")
fast_llm = ChatOpenAI(model="gpt-4o-mini") # For classification (cheaper)
class SupportState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
classification: str # "general", "technical", "sensitive", "escalate"
retrieved_context: str
ticket_id: str
escalated: bool
response: str
The Knowledge Base
Create knowledge_base.py:
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Sample product knowledge base
PRODUCT_DOCS = [
# Pricing
"The Starter plan is free and includes 1,000 API calls per month with community support.",
"The Pro plan costs $49/month and includes 50,000 API calls with priority email support.",
"The Enterprise plan has custom pricing with unlimited API calls and dedicated account manager.",
"All paid plans come with a 14-day free trial. No credit card required for the trial.",
"Annual billing gives a 20% discount on Pro and Enterprise plans.",
# Features
"Our API supports REST and GraphQL endpoints for all plan levels.",
"Webhook notifications are available on Pro and Enterprise plans.",
"Rate limits: Starter 10 req/s, Pro 100 req/s, Enterprise 1000 req/s.",
"Data export is available in CSV, JSON, and Parquet formats on all plans.",
"Custom integrations with Salesforce, HubSpot, and Slack are Enterprise-only features.",
# Technical
"API authentication uses Bearer tokens. Generate tokens in the dashboard under Settings > API Keys.",
"The API base URL is https://api.techcorp.io/v2/ for all endpoints.",
"Rate limit errors return HTTP 429. Implement exponential backoff for retries.",
"Webhook payloads are signed with HMAC-SHA256. Verify the X-Signature header.",
"SDK libraries are available for Python, JavaScript, Go, and Ruby.",
# Policies
"Refunds are available within 30 days of purchase for annual plans.",
"Monthly plans can be cancelled at any time with no cancellation fee.",
"Data retention: We store customer data for 90 days after account deletion.",
"Our SLA guarantees 99.9% uptime for Pro plans and 99.99% for Enterprise.",
"GDPR and SOC 2 Type II compliance is maintained across all plans.",
# Troubleshooting
"If you receive a 401 error, check that your API key is valid and not expired.",
"Connection timeouts usually indicate a network issue. Check your firewall settings.",
"For slow response times, try using our regional endpoints: us.api.techcorp.io or eu.api.techcorp.io.",
"If webhooks are not firing, verify the endpoint URL is publicly accessible and returns a 200 status.",
"Dashboard login issues: Clear your browser cache or try an incognito window.",
]
def create_knowledge_base(persist_dir: str = "./support_kb") -> Chroma:
"""Create the product knowledge base vector store."""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
)
texts = text_splitter.split_text("\n\n".join(PRODUCT_DOCS))
vectorstore = Chroma.from_texts(
texts=texts,
embedding=embeddings,
persist_directory=persist_dir,
)
print(f"Knowledge base created with {len(texts)} chunks")
return vectorstore
def get_knowledge_base(persist_dir: str = "./support_kb") -> Chroma:
"""Load or create the knowledge base."""
try:
vectorstore = Chroma(
embedding_function=embeddings,
persist_directory=persist_dir,
)
# Check if it has documents
if vectorstore._collection.count() > 0:
return vectorstore
except Exception:
pass
return create_knowledge_base(persist_dir)
The Tools
Create tools.py:
import uuid
from datetime import datetime
# In-memory ticket store (use a database in production)
tickets = {}
def create_ticket(
customer_message: str,
classification: str,
context: str,
) -> str:
"""Create a support ticket and return the ticket ID."""
ticket_id = f"TKT-{uuid.uuid4().hex[:8].upper()}"
tickets[ticket_id] = {
"id": ticket_id,
"status": "open",
"classification": classification,
"customer_message": customer_message,
"context": context,
"created_at": datetime.now().isoformat(),
}
return ticket_id
def escalate_to_human(
customer_message: str,
reason: str,
) -> str:
"""Escalate the conversation to a human agent."""
ticket_id = create_ticket(
customer_message=customer_message,
classification="escalated",
context=f"Escalation reason: {reason}",
)
# In production, this would notify a human agent via
# Slack, PagerDuty, email, or a ticketing system
return ticket_id
8.4 Step 2: Build the LangGraph Workflow
Back in agent.py, add the node definitions and graph:
# --- continued in agent.py ---
from knowledge_base import get_knowledge_base
from tools import create_ticket, escalate_to_human
# Initialize knowledge base
kb = get_knowledge_base()
# --- Node Definitions ---
def classify_node(state: SupportState) -> dict:
"""Classify the user's message to determine routing."""
messages = state["messages"]
last_message = messages[-1].content
response = fast_llm.invoke([
SystemMessage(content="""Classify this customer support message into ONE category.
Respond with ONLY the category name, nothing else.
Categories:
- general: Product questions, pricing, features, how-to questions
- technical: Bug reports, API errors, integration issues, troubleshooting
- sensitive: Billing disputes, account deletion, legal concerns, complaints about service
- escalate: Threats, harassment, requests to speak to a manager, urgent safety issues
Message to classify:"""),
HumanMessage(content=last_message)
])
classification = response.content.strip().lower()
# Validate classification
valid = {"general", "technical", "sensitive", "escalate"}
if classification not in valid:
classification = "general"
print(f"[Classifier] Category: {classification}")
return {"classification": classification}
def rag_search_node(state: SupportState) -> dict:
"""Search the knowledge base for relevant information."""
messages = state["messages"]
last_message = messages[-1].content
print("[RAG] Searching knowledge base...")
results = kb.similarity_search(last_message, k=4)
if results:
context = "\n\n".join(
f"- {doc.page_content}" for doc in results
)
print(f"[RAG] Found {len(results)} relevant documents")
else:
context = "No relevant documentation found."
print("[RAG] No relevant documents found")
return {"retrieved_context": context}
def respond_node(state: SupportState) -> dict:
"""Generate a helpful response using the retrieved context."""
messages = state["messages"]
context = state.get("retrieved_context", "")
classification = state.get("classification", "general")
ticket_id = state.get("ticket_id", "")
system_content = f"""You are a friendly, professional customer support agent for TechCorp.
Use the following knowledge base context to answer the customer's question:
{context}
Rules:
1. Be helpful, empathetic, and professional.
2. Only provide information that is supported by the context above.
3. If you don't know the answer, say so honestly and suggest contacting support.
4. Keep responses concise but thorough.
5. If a support ticket was created, mention the ticket ID: {ticket_id}."""
response = llm.invoke([
SystemMessage(content=system_content),
*messages
])
print("[Response] Generated reply")
return {
"response": response.content,
"messages": [AIMessage(content=response.content)],
}
def create_ticket_node(state: SupportState) -> dict:
"""Create a support ticket for technical issues."""
messages = state["messages"]
last_message = messages[-1].content
classification = state.get("classification", "technical")
context = state.get("retrieved_context", "")
ticket_id = create_ticket(
customer_message=last_message,
classification=classification,
context=context,
)
print(f"[Ticket] Created: {ticket_id}")
return {"ticket_id": ticket_id}
def escalation_node(state: SupportState) -> dict:
"""Escalate to a human agent for sensitive or urgent issues."""
messages = state["messages"]
last_message = messages[-1].content
classification = state.get("classification", "")
ticket_id = escalate_to_human(
customer_message=last_message,
reason=f"Classified as: {classification}",
)
escalation_message = (
"I understand this is important to you. I've escalated your case to a "
f"senior support specialist (Reference: {ticket_id}). A human agent will "
"reach out to you within the next 2 hours. Is there anything else I can "
"help with in the meantime?"
)
print(f"[Escalation] Escalated with ticket: {ticket_id}")
return {
"ticket_id": ticket_id,
"escalated": True,
"response": escalation_message,
"messages": [AIMessage(content=escalation_message)],
}
# --- Routing Logic ---
def route_by_classification(state: SupportState) -> Literal["rag_search", "escalation"]:
"""Route based on message classification."""
classification = state.get("classification", "general")
if classification in ("sensitive", "escalate"):
return "escalation"
return "rag_search"
def route_after_rag(state: SupportState) -> Literal["create_ticket", "respond"]:
"""After RAG search, decide if we need a ticket."""
classification = state.get("classification", "general")
if classification == "technical":
return "create_ticket"
return "respond"
# --- Build the Graph ---
workflow = StateGraph(SupportState)
# Add nodes
workflow.add_node("classify", classify_node)
workflow.add_node("rag_search", rag_search_node)
workflow.add_node("respond", respond_node)
workflow.add_node("create_ticket", create_ticket_node)
workflow.add_node("escalation", escalation_node)
# Define edges
workflow.add_edge(START, "classify")
workflow.add_conditional_edges(
"classify",
route_by_classification,
{
"rag_search": "rag_search",
"escalation": "escalation",
}
)
workflow.add_conditional_edges(
"rag_search",
route_after_rag,
{
"create_ticket": "create_ticket",
"respond": "respond",
}
)
workflow.add_edge("create_ticket", "respond")
workflow.add_edge("respond", END)
workflow.add_edge("escalation", END)
# Compile the agent
support_agent = workflow.compile()
8.5 Step 3: Add Conversation Memory
We need the agent to remember previous messages in a session:
# --- continued in agent.py ---
# Session memory store
session_store: dict[str, list[BaseMessage]] = {}
def chat(session_id: str, user_message: str) -> str:
"""Handle a chat message with session memory."""
# Get or create session history
if session_id not in session_store:
session_store[session_id] = []
history = session_store[session_id]
history.append(HumanMessage(content=user_message))
# Invoke the agent with full history
result = support_agent.invoke({
"messages": history,
"classification": "",
"retrieved_context": "",
"ticket_id": "",
"escalated": False,
"response": "",
})
# Extract response and update history
response = result["response"]
history.append(AIMessage(content=response))
# Trim history to prevent unbounded growth
max_messages = 20
if len(history) > max_messages:
session_store[session_id] = history[-max_messages:]
return response
8.6 Step 4: The API Server
Create server.py:
import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, field_validator
load_dotenv()
from agent import chat, session_store
from tools import tickets
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Customer Support Agent API is starting...")
yield
print("Shutting down...")
app = FastAPI(
title="Customer Support Agent",
description="AI-powered customer support with RAG, tickets, and escalation",
lifespan=lifespan,
)
# --- Request/Response Models ---
class ChatRequest(BaseModel):
message: str
session_id: str = "default"
@field_validator("message")
@classmethod
def validate_message(cls, v):
if len(v.strip()) == 0:
raise ValueError("Message cannot be empty")
if len(v) > 4000:
raise ValueError("Message too long (max 4000 characters)")
return v.strip()
class ChatResponse(BaseModel):
response: str
session_id: str
class TicketResponse(BaseModel):
ticket_id: str
status: str
classification: str
created_at: str
# --- Endpoints ---
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""Send a message to the support agent."""
try:
response = chat(request.session_id, request.message)
return ChatResponse(
response=response,
session_id=request.session_id,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/tickets/{ticket_id}", response_model=TicketResponse)
async def get_ticket(ticket_id: str):
"""Look up a support ticket by ID."""
if ticket_id not in tickets:
raise HTTPException(status_code=404, detail="Ticket not found")
ticket = tickets[ticket_id]
return TicketResponse(
ticket_id=ticket["id"],
status=ticket["status"],
classification=ticket["classification"],
created_at=ticket["created_at"],
)
@app.delete("/sessions/{session_id}")
async def clear_session(session_id: str):
"""Clear a conversation session."""
if session_id in session_store:
del session_store[session_id]
return {"status": "cleared"}
@app.get("/health")
async def health():
return {"status": "healthy"}
Running the Complete System
uvicorn server:app --host 0.0.0.0 --port 8000 --reload
8.7 Testing the Agent
Test Scenarios
# General question (routes to RAG -> respond)
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "What plans do you offer?", "session_id": "test1"}'
# Technical issue (routes to RAG -> create ticket -> respond)
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "I keep getting 401 errors when calling the API", "session_id": "test2"}'
# Sensitive issue (routes to escalation)
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "I want a refund. Your service has been terrible.", "session_id": "test3"}'
# Follow-up with memory (same session)
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "How much does it cost per year?", "session_id": "test1"}'
Expected Behavior
| Scenario | Classification | Flow | Result |
|---|---|---|---|
| "What plans do you offer?" | general | RAG -> Respond | Lists pricing info from KB |
| "Getting 401 API errors" | technical | RAG -> Ticket -> Respond | Answer + ticket ID |
| "I want a refund" | sensitive | Escalation | Escalation message + ticket |
| "Let me speak to a manager" | escalate | Escalation | Immediate human handoff |
8.8 Testing and Evaluation
Unit Testing Your Agent
# test_agent.py
import pytest
from agent import support_agent, chat
from langchain_core.messages import HumanMessage
def test_general_question():
"""General questions should not create tickets."""
result = support_agent.invoke({
"messages": [HumanMessage(content="What are your pricing plans?")],
"classification": "",
"retrieved_context": "",
"ticket_id": "",
"escalated": False,
"response": "",
})
assert result["classification"] == "general"
assert result["escalated"] is False
assert len(result["response"]) > 0
def test_technical_issue_creates_ticket():
"""Technical issues should create a support ticket."""
result = support_agent.invoke({
"messages": [HumanMessage(content="API returning 500 errors on every request")],
"classification": "",
"retrieved_context": "",
"ticket_id": "",
"escalated": False,
"response": "",
})
assert result["classification"] == "technical"
assert result["ticket_id"].startswith("TKT-")
def test_sensitive_escalation():
"""Sensitive issues should escalate to a human."""
result = support_agent.invoke({
"messages": [HumanMessage(content="I want to delete my account and all my data immediately")],
"classification": "",
"retrieved_context": "",
"ticket_id": "",
"escalated": False,
"response": "",
})
assert result["escalated"] is True
assert result["ticket_id"].startswith("TKT-")
def test_session_memory():
"""Agent should remember context within a session."""
session_id = "test_memory"
chat(session_id, "My name is Alice and I'm on the Pro plan.")
response = chat(session_id, "What plan am I on?")
assert "pro" in response.lower()
Run with:
pytest test_agent.py -v
8.9 Full Working Code Summary
Here is how all the files work together:
1. User sends a message via POST /chat
2. server.py validates the input and calls agent.chat()
3. agent.py loads session history and invokes the LangGraph workflow
4. classify_node uses GPT-4o-mini to categorize the message
5. Routing sends the message to either RAG or escalation
6. rag_search_node queries ChromaDB for relevant product docs
7. For technical issues, create_ticket_node logs a ticket
8. respond_node generates a contextual answer using retrieved docs
9. The response is stored in session memory and returned to the user
This architecture is modular, testable, and production-ready. Each component can be independently improved, swapped, or scaled.
Key Takeaways
- State machines (LangGraph) give you explicit control over complex agent workflows
- Classification routing lets a cheap, fast model decide where to send each request
- RAG integration grounds responses in actual product knowledge, reducing hallucination
- Human escalation is essential for issues that AI should not handle autonomously
- Session memory creates natural, continuous conversations
- Separation of concerns (agent, knowledge base, tools, server) makes the system maintainable
- Testing agents requires both unit tests and scenario-based integration tests
Exercise: Extend the Capstone
Push your capstone further with these challenges:
- Add a satisfaction survey node that asks the user to rate their experience after the issue is resolved
- Implement conversation summary for long sessions to keep token costs down
- Add a proactive suggestions feature where the agent recommends related documentation
- Deploy the agent to a cloud provider and test it with concurrent users
Next up: The Epilogue, where we look back at everything you've built and chart your path forward as an AI engineer.

