In our previous blog we saw that the most common library that we use for making web requests i.e. requests
can’t be used with asyncio as it doesn’t return coroutines and make blocking calls.
Because of that it blocks the only thread that is used in asyncio and make it sequential.
Alternative to this is another library aiohttp
, Asynchronous HTTP Client/Server for asyncio makes non blocking calls upto socket layer and allows us to run concurrently with single-threaded model of asyncio.
On using any resource, it is important to cleanly close or free up those resources after use or in case of exceptions. To cleanly do this in Python, it uses what are called as context managers. For example, for files, we use it like this:
with open('example.txt') as f:
lines = f.readlines()
After reading all lines or on receiving any exceptions, above context manager will close the file.
But the catch is with
context manager only works with synchronous resources, if we want to asynchronously use a resource, we have to use Python’s asynchronous context managers.
It is similar to with syntax, only difference is it comes after async keyword. Asynchronous context managers are classes that implements two methods.
aenter
__aenter__
which asynchronously acquires a resource.aexit
__aexit__
which closes the resource.
To understand this with a sample class and how it works behind the scene, let’s take a small example
import asyncio
import socket
class AsynchronousContextManager:
def __init__(self, sock):
self.sock = sock
# accepts a connection
async def __aenter__(self):
print('Entering context manager')
loop = asyncio.get_event_loop()
self.conn, _ = await loop.sock_accept(self.sock)
return self.conn
async def __aexit__(self, exc_type, exc, tb):
print('Exiting context manager')
self.conn.close()
return True
async def main():
loop = asyncio.get_event_loop()
# create a socket server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 25000))
sock.listen()
sock.setblocking(False)
async with AsynchronousContextManager(sock) as conn:
data = await loop.sock_recv(conn, 1024)
print(data)
asyncio.run(main())
Output:
# Terminal 1
python3 async_context_managers.py
Entering context manager
b'Hello!!\r\n'
Exiting context manager
#Terminal 2
telnet localhost 25000
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello!!
aiohttp comes with inbuilt context managers, so we don’t have to create one but above examples helps in understanding how this works internally.
aiohttp comes with concept of session. Within a session we can keep many connections open which can then be reused. This concept is called as Connection Pooling. Establishing connections is a resource intensive operation, so a pool of connections is maintained and are reused. Let’s see how this works internally by using aiohttp context manager in an example.
import aiohttp
import asyncio
import logging
# Set up logging to see connection activities
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("aiohttp.client")
async def fetch_data(url, session, session_id):
# Use an existing session passed as parameter
async with session.get(url) as response:
logger.info(f"Session {session_id}: Connected to {url}")
return await response.json()
async def main():
# Create a single session for all requests
connector = aiohttp.TCPConnector(limit=1) # Connector
async with aiohttp.ClientSession(connector=connector) as session:
# Print connector information before requests
connector = session._connector
print(f"Initial connections: {len(connector._conns)}")
# Make multiple requests to the same host to encourage connection reuse
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
'https://jsonplaceholder.typicode.com/posts/3',
]
# Run first batch sequentially to observe connections
print("First batch:")
tasks = [fetch_data(url, session, 1) for url in urls]
results = await asyncio.gather(*tasks)
# Wait a bit but keep connections alive
await asyncio.sleep(1)
print(f"Active connections: {len(connector._conns)}")
print(f"Connection keys: {list(connector._conns.keys())}")
# Run second batch to see if connections are reused
print("\nSecond batch (should reuse connections):")
tasks = [fetch_data(url, session, 1) for url in urls]
results = await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Output:
>> python connection_pooling_aiohttp.py
DEBUG:asyncio:Using selector: KqueueSelector <-- event notification of OS
Initial connections: 0
First batch:
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/1
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/2
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/3
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/3
Result first batch 0: {'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto'}
Result first batch 1: {'userId': 1, 'id': 2, 'title': 'qui est esse', 'body': 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla'}
Result first batch 2: {'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'body': 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut'}
Result first batch 3: {'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'body': 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut'}
Active connections: 1
Connection keys: [ConnectionKey(host='jsonplaceholder.typicode.com', port=443, is_ssl=True, ssl=True, proxy=None, proxy_auth=None, proxy_headers_hash=None)]
Second batch (should reuse connections):
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/1
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/2
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/3
INFO:aiohttp.client:Session 1: Connected to https://jsonplaceholder.typicode.com/posts/3
Result second batch 0: {'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto'}
Result second batch 1: {'userId': 1, 'id': 2, 'title': 'qui est esse', 'body': 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla'}
Result second batch 2: {'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'body': 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut'}
Result second batch 3: {'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'body': 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut'}
In the above output at any point of time active connections is 1 which was initially zero, signalling the signs of connection pooling.
Let’s see the output of netstat, since in the example , we can using a TCPConnector with limit=1, so at any point we should be using only this connection.
Every 0.1s: netstat -tnal | grep 2606:4700 <-- IP pattern got from dig on host
tcp6 0 169 2405:201:5c19:987a:8cfa:92eb:d122:7da9.49657 2606:4700:3030::6815:6001.443 ESTABLISHED
Only 1 connection is established as per above output and transferring data.
Internally, call to session.get
acquires a connection from session. This connection is first requested from a pool of already existing connections, if there is any idle connection then that gets passed otherwise a new is created depending on the limit specified in connection. It is the job of TCPConnector
to acquire, release or create connection. Flow diagram below explains how it works.
aiohttp with other methods solve the problem of making concurrent requests. It is important how the library handles other standard web requests stuff, let’s explore that in next post.
All code samples are part of my Github Repo.