Welcome to the first lesson of Parallelizing Gemini Agentic Systems in Python! In the previous courses, you built a solid foundation by creating an Agent class that can handle conversations, use tools, and even hand off control to other specialized agents using Gemini's API. Now, we're going to take your agent system to the next level by converting your agent to handle multiple concurrent conversations using async/await patterns. By the end, you'll have an agent that can juggle several API calls at once without blocking, dramatically improving the efficiency of your workflows.
Let's start by understanding why we need async in the first place. When you make a regular API call to Gemini, your program stops and waits for the response. This is called "blocking" behavior. If you need to have three separate conversations with Gemini, your program handles them one at a time: start conversation 1, wait for all responses, finish conversation 1, then start conversation 2, and so on.
This sequential approach wastes time. While your program waits for Gemini to respond to conversation 1, it could be starting conversation 2 or 3. Network calls and API processing take time, but your CPU sits idle during these waits.
Async programming solves this problem. When you make an async API call, your program can continue doing other work while waiting for the response. Think of it like a restaurant: a synchronous waiter takes one order, goes to the kitchen, waits for the food, delivers it, and only then takes the next order. An async waiter takes multiple orders, sends them all to the kitchen, and delivers each meal as it becomes ready. The kitchen (Gemini's API) processes multiple requests in parallel, and your program efficiently manages all of them.
This async approach is particularly effective for I/O-bound operations like API calls, network requests, and database queries, where most of the time is spent waiting rather than computing. Async/await improves performance for I/O-bound operations, such as API calls, network requests, or database queries. CPU-heavy tasks still block the event loop and may require multiprocessing or separate threads.
For agent systems, this means a single agent can manage multiple conversations simultaneously, or you can run several agents in parallel. This becomes especially powerful when agents need to make multiple tool calls or coordinate with other agents through handoffs. Let's start by making the necessary changes to your Agent class.
The Gemini API client (google.genai) is synchronous by design — it doesn't provide native async methods like some other APIs. However, we can still make our agent async by using Python's asyncio.to_thread() function. This function runs synchronous code in a separate thread, allowing the event loop to continue processing other tasks while waiting for the result.
This pattern is often referred to as a synchronous wrapper, as it allows us to "wrap" blocking code so it can be used within an asynchronous workflow without stopping the entire program.
Here's how we use it in the run method:
The key line is await asyncio.to_thread(client.models.generate_content, model=..., contents=messages, config=...). Instead of calling client.models.generate_content() directly, which would block the entire event loop, we wrap it with asyncio.to_thread(). This tells Python: "Run this synchronous function in a separate thread, and let me know when it's done." While that thread waits for Gemini's response, the event loop can switch to other tasks, like processing another conversation.
The await keyword is crucial here. It tells Python that this operation will take time and that the event loop should be free to do other work. When the API call completes, Python automatically resumes execution at this point with the response.
Now, let's look at the complete async run method. The method signature changes to include the async keyword, and we use await for any operations that might take time:
The async def keyword at the beginning tells Python this is an asynchronous function. Inside the method, we use await before asyncio.to_thread() when making the API call. This is where the magic happens — when your code hits the await keyword, it tells Python's event loop that it can switch to other work while waiting for Gemini's response.
Next, we need to update our tool execution to be asynchronous. Let's modify the _call_tool method:
The key change here is using the synchronous wrapper pattern again: wrapping the tool function call with await asyncio.to_thread(fn, **args). Even though our math functions execute quickly, this pattern is important for tools that might make external API calls or perform I/O operations. By using asyncio.to_thread(), we ensure that even synchronous tools don't block the event loop.
In the run method, we create tasks for multiple tool calls and use asyncio.gather() to execute them concurrently:
This means if Gemini requests multiple tool calls in a single response, they can all execute concurrently rather than sequentially.
When your agent hands off control to another agent, it needs to wait for that agent to complete its work. Since the other agent's run() method is now async, we need to make the _call_handoff method async as well:
The key change here is adding async to the method definition and using await when calling . This allows the handoff to happen asynchronously. If the target agent needs to make multiple API calls or use tools, your original agent doesn't sit idle waiting. The event loop can switch to other work while the handoff completes.
Before we run multiple conversations concurrently, we need to set up our imports and tools. The key addition here is importing asyncio, which provides the tools for running async code:
With our tools ready, we can now define an async main function that creates and runs multiple conversations concurrently:
We start by creating a list of prompts that we want to process concurrently, then initialize our agent with the necessary tools and schemas. The key part is creating a list of tasks by calling agent.run() for each prompt. Notice that we don't use await here yet — each call returns a coroutine object (a promise of future work) but doesn't start executing immediately.
Then we use asyncio.gather(*tasks) with await. The gather() function takes all our tasks and runs them concurrently. It waits for all of them to complete and returns their results in the same order as the input tasks. While one conversation waits for an API response, another conversation can make progress.
Finally, we need an entry point that creates the event loop and runs our async main function:
The asyncio.run(main()) function creates an event loop, runs our main() function, and handles cleanup when everything completes. This is the standard way to start an async Python program.
When we run this code, you'll see output that demonstrates the concurrent execution:
Notice how the tool calls from both conversations are interleaved. This shows that both conversations are running concurrently. The agent switches between them as it waits for API responses, making efficient use of time. The first conversation solves the arithmetic problem while the second finds the roots of the quadratic equation, and both complete much faster than if they ran sequentially.
Now that we've seen concurrent execution in action, let's reflect on what we've achieved and what patterns we're using. By converting our agent to use async/await with asyncio.to_thread(), we've enabled a single agent instance to manage multiple conversations simultaneously. We can now process different user requests in parallel without creating multiple agent instances. In our example above, one agent handles both math problems at the same time, switching between them efficiently while waiting for API responses.
What would have happened if we ran these conversations using our original synchronous code? In a standard single-threaded script, they would execute one after another, waiting for each API response before starting the next conversation. While one could manually manage a complex pool of threads or processes to achieve concurrency, asyncio provides a structured and efficient way to handle these I/O-bound tasks. By using asyncio.to_thread, we are essentially delegating the blocking API calls to separate threads automatically, allowing our program to "wait" on multiple fronts simultaneously without getting stuck.
An interesting aspect of our implementation is that tool calls use await asyncio.to_thread(fn, **args), which means individual tools within a single conversation can execute concurrently when Gemini requests multiple tool calls at once. You can see this in the output — when Gemini asks for multiple calculations, they can all start processing without waiting for each other to complete.
However, there's still room for optimization in how we coordinate multiple agents and handle complex workflows. In the next lesson, we'll explore patterns for orchestrating multiple specialized agents working together, allowing different agents to process different parts of a complex task simultaneously.
You've successfully converted your agent system to use async/await patterns by wrapping synchronous Gemini API calls with asyncio.to_thread(), adding async and await to the run() and _call_handoff() methods, and using asyncio.gather() to run multiple conversations in parallel. This async foundation is crucial for building more advanced parallel agent systems, and in the upcoming practice exercises, you'll implement concurrent agent workflows and explore different patterns for coordinating multiple agents.
