Integrating AutoGen Agents into Your Web Application (FastApi + Websockets + Queues)
Issue #19 | How to build your own web application that integrates Generative AI Agents
Suppose you have successfully prototyped your desired agent workflow, (e.g., a scenario with two agents or more), and you are already observing great results—excellent! The likely next step is to wrap this workflow into an API that can be integrated into a web application. In addition, if your application is interactive, chances are high that you want to stream intermediate updates from agents to some user interface as the task progresses.
This post provides a walkthrough of steps to create a web api with support for REST calls and streaming agent actions over a web socket.
TLDR;
Create a Class to wrap your agents - a WorkflowManager
Keep track of agent history, provide a summary
Integrate into FastAPI rest endpoint
Stream agent actions/responses via Websockets
Improve Performance (Multiple Threads) with Gunicorn
Note: The general approach described in this post is used in implementing the streaming features in AutoGen Studio.
Step 1 - Wrap Your Agents into a Single Class
This involves defining your agent workflow as you would typically do in AutoGen. Then, encapsulate the implementation within a class method capable of accepting input and executing your agent workflow—for instance, by invoking the `initiate_chat` method.
We will can name this class `WorkflowManager` and add a `run` method that encapsulates our agent implementation.
Step 2 - Keep Track of Agent History
AutoGen provides a `register_reply` method where specific functions can be run when an agent receives a message. You can leverage this mechanism to keep track of all messages sent by agents in your application:
def update_history(recipient, messages, sender, config):
self.agent_history.append({sender: sender.name, receiver: recipient.name, message: message})
agent.register_reply(
[autogen.Agent, None],
reply_func=update_history,
config={"callback": None},
)
define an `update_history` function that can take a message and adds it to a list.
For all your agents, register `udpate_history` to run whenever the agent receives a message
At this point, we have a history of messages sent by agents whenever a chat is initiated.
Step 3: Summarize and Return Agent History
Most applications that integrate Large Language Models (LLMs) follow a query-response setup; that is, the user submits a request and receives a response that addresses the query. In the process, various operations may occur, such as query expansion, retrieval of additional context (e.g., using Retrieval-Augmented Generation or RAG), and post-processing or verification before delivering the final response. Typically, the outcome is a single response.
However, there are significant differences between using an LLM and using agents to address tasks in your application, particularly in the output structure:
LLM: The output is typically a straightforward text response or an array of text responses. For example, in response to the query "What is the height of the Eiffel Tower?" the response might be "X meters."
Agents: The output is usually a history of actions taken by each agent. For instance, in response to "What is the stock price of NVIDIA?" the response could be a sequence like ["Agent 1: wrote some code", "Agent 2: executed the code... there was an error due to a missing library", "Agent 1: rewrote the code to fix the error", "Agent 2: executed the code... success", "Agent 2: the final answer is 1300 USD. Task complete"].
The important consideration here is how to represent the "agent history" as a single outcome for the user in your application. Users typically are not concerned with the internal workings of your agents; they are interested in the final response.
There are a few common approaches:
Summarize [recommended]: Add a summarization step to distill the final answer from the agent history using an LLM. This introduces some latency but provides the best end-user experience.
Return last message: The last message may not contain the most crucial information or the actual answer.
Return full agent history: This approach does not typically result in a good user experience.
A summarization approach is recommended.
Step 4: Integrating with a Fast Api Rest End Point
This section is relatively straightforward. Follow these steps to integrate your `WorkflowManager` into a FastAPI application:
1. Create your FastAPI application as you normally would.
2. Create an instance of `WorkflowManager`.
3. Define a REST endpoint that can accept a user query, invoke the `run` method of `WorkflowManager`, and return the response.
Here's an example of how you might implement this in code:
from fastapi import FastAPI, HTTPException
from your_application import WorkflowManager
app = FastAPI()
manager = WorkflowManager()
@app.post("/query")
async def handle_query(user_query: str):
try:
response = manager.run(user_query)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Step 5: Enhancing User Experience with Sockets and Streaming
In the previous section, we discussed returning agent responses through a REST endpoint, which is suitable for batch or offline tasks. However, for interactive applications, users may find it frustrating to wait for agents to complete their conversation, which can take any arbitrary amount of time (from seconds to minutes). To mitigate this, streaming intermediate agent action or state to the user interface (UI) as they are generated can significantly improve the user experience. This concept is not unique to agents but is also applicable to applications integrating Large Language Models (LLMs).
With AutoGen agents, we can:
Stream intermediate agent messages, although there may be some latency as each message could take a few seconds or minutes.
Stream all tokens from an LLM as they become available, noting that there may still be latency from agent actions unrelated to LLMs, such as code execution.
In the next section, we will focus on streaming agent messages. However, to implement both, it is useful to review some notes on streaming over websockets, threads, and blocking:
Websockets for UI Streaming
To stream messages, we need a websocket with a publisher and subscriber model:
- The publisher places messages on the socket.
- The subscriber (the UI) listens on the websocket and displays messages as they become available.
FastAPI integrates very well with the python websockets library and we can rapidly setup a websocket server. Our front end UI interface can then connect to this server endpoint and receive messages on these endpoints. On the other hand, agents should have be able to “view” open connects and selectively send messages to connected UI interfaces.
Streaming Messages
For real-time streaming, messages should be sent as soon as they are available. For example, when `WorkflowManager.run()` is called, we expect a message to be sent on the websocket each time an agent completes a turn.
The Challenge with Single-Threaded Applications
FastAPI, by default, operates in a single-threaded mode where code is executed sequentially. If an operation blocks the thread, such as waiting for an agent to complete its turn, the application cannot proceed until the blocking operation finishes. This means that while `WorkflowManager.run()` is executing, the application cannot send the websocket messages due to the blocked thread.
The Solution: Asynchronous Operations and Threading
There are two main solutions to this challenge: using Python's `asyncio` library or threading.
Using `asyncio` with FastAPI
1. Define `WorkflowManager.run()` as an asynchronous function with `async def run(self, ...)` and use `await` for blocking calls.
2. Modify the websocket handler in FastAPI to establish an asynchronous websocket connection and send messages.
3. Execute `WorkflowManager.run()` as a separate asynchronous task, allowing the main thread to continue processing and sending websocket messages.
This approach leverages the asynchronous capabilities of FastAPI and `asyncio`, but it requires that all operations within `WorkflowManager.run()` be compatible with asynchronous execution. This can be hard to achieve in practice as many libraries or coroutines used within your application may not implement truly async methods.
Using Threading and Queues (Recommended):
1. Create a thread-safe queue.
2. Create a background thread that monitors the queue and sends messages using the FastAPI socket.
3. Pass the queue to `WorkflowManager` so it can write to the queue whenever an agent message is available.
This setup allows a background process to monitor a thread-safe shared queue and broadcast messages to websockets, while agents write updates to the queue as they occur.
**Example Code:**
from fastapi import FastAPI, WebSocket
from threading import Thread
from queue import Queue
import asyncio
app = FastAPI()
queue = Queue()
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
if not queue.empty():
message = queue.get()
await websocket.send_text(message)
@app.websocket("/ws")
async def websocket_receiver(websocket: WebSocket):
await websocket_endpoint(websocket)
def agent_interaction_workflow(queue):
# Placeholder for agent interaction logic
# This function would interact with agents and put messages in the queue
pass
@app.on_event("startup")
async def startup_event():
# Start the background thread for agent interactions
thread = Thread(target=agent_interaction_workflow, args=(queue,))
thread.start()
In this example, we have a FastAPI application with a websocket endpoint for streaming messages. A background thread is started to handle agent interactions and place messages in the queue, which are then sent to specific websockets. AutoGen agents can also write to this queue, by passing the queue object as an argument to WorkflowManager.
By implementing these solutions, we can provide a more responsive and engaging user experience for applications that require real-time interaction with agents.
Improving Performance (Multiple Threads) with Gunicorn
In the above settings, things work well when fastapi is launched with uvicorn and a single worker. However, when Uvicorn is launched with multiple workers, some side effects arise - for example each worker appears to maintains only a read only copy of the active connection list (list grows with each connection instead of disconnected connections being actively removed).
Furthermore, for better performance, what you really need is a setup that can utilize multiple CPU cores for true parrallel processing, and not just enable multiple concurrent threads as uvicorn (uvicorn spawns subprocesses)
We can address these (and get other benefits) by using Gunicorn. Gunicorn is a mature, fully featured server and process manager, that also enables utilization of available CPU cores.
pip install gunicorn
gunicorn -w $((2 * $(getconf _NPROCESSORS_ONLN) + 1)) --timeout 12600 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:80
The expression `$((2 * $(getconf _NPROCESSORS_ONLN) + 1))` calculates the number of worker processes for the Gunicorn server based on the number of available CPU cores. It first gets the number of CPU cores using `getconf _NPROCESSORS_ONLN` command. Then, it multiplies that number by 2 and adds 1, which follows the general recommendation for Gunicorn worker count (2 * CPU cores + 1). Given that we are working with agents here, we want a generous timeout so that Gunicorn does not kill the worker process while the agents are still running .. yikes!
Nevertheless, as of now, Uvicorn's capabilities for handling worker processes are more limited than Gunicorn's. So, if you want to have a process manager at this level (at the Python level), then it might be better to try with Gunicorn as the process manager.
Source: FastApi docs.
Overall, I recommend using Gunicorn as the main process manager. Also see the note below from the FastApi creator on using Gunicorn as the main process manager.
Extra Credit
There are a few things you can do to make your application even more interesting!
Create a data model for messages sent on the socket. This makes your socket+queue setup a message bus for sharing many types of updates.
Modify the LLM client in Autogen to also stream tokens for even better UX.
Create a database layer to persist interactions with your agents into sessions that can be resumed
Hint: The methods proposed above are used to implement AutoGen Studio
Conclusion
This article provides a walkthrough of creating a web API for running tasks with AutoGen agents. The steps include creating a WorkflowManager class to wrap your agents, keeping track of agent history, summarizing agent history, integrating into Fast API REST endpoint, implementing streaming of websockets, and improving performance with Gunicorn. Finally, it highlights the benefits of using Gunicorn as the main process manager for performance improvement.
Very insightful article. I would like to know your thoughts on the following: If the goal is to reduce resource utilisation on the server and bandwidth, would server-side events work for an agent based API? The client is limited to one call to initiate the agents workflow then the agent pushes intermediate steps and final output to the client.
What about using Microsoft Power Automate for workflows. Including websockets and FASTAI API?