Init 1
Pubsub with asyncio
posted on 2019 Oct 27Pubsub is one of the most recognizable patterns in computer science, employed
as a simple solution to a wide range of problems, with different
implementations. As an example, let’s suppose you need to dispatch
some data with different computation times to different destinations, sure the
first thing coming to mind is a broker or a simple job queue, and there’s a
plethora of battle-tested solutions out there, from Apache Kafka
to
RabbitMQ
to even Redis
or the solid AWS SNS/SQS
combination, where topics
can be defined on SNS
with Lambdas or SQS
queuing triggered at each
message received. Why? Because why not, it gives solid performances and near
unlimited scalability for just some bucks per month, with low to none
maintenance costs beside your business logic; something to consider if you
already have a part or your entire backend hosted on AWS.
Sometimes though it can be an overkill or all we need is just a prototype, a proof of concept, in those cases a simple micro-service can do well enough with little costs.
Asyncio pubsub
So in our case we’ll face a classic pubsub problem, with a bunch of data sources that have to be delivered to multiple consumers. The first solution that comes to mind attacks the problem by using a multi-threaded pattern, by running as many threads as the number of sources, each one with a loop synchronized with timers to dispatch results.
Asyncio offers two different approaches to this by using a single thread,
making it much simpler in terms of synchronization and flow of the application,
leveraging cooperative concurrency with coroutines
or using a synchronous
callback-based system.
Cooperative concurrency, coroutine solution
Starting with the coroutines
approach, we define a simple AsyncDeliver
class
which will act as out publisher
component, with 3 different sources, running
respectively once every 5, 10 and 20 seconds; we’ll need 3 different topics for
these tasks, for simplicity we’ll call them, 5sec-delivery
, 10sec-delivery
and 20sec-delivery
respectively.
Let’s move one with the implementation.
import asyncio
import collections
class AsyncDeliver:
"""Publisher component, deliver data from different sources to subscribed
consumers. This version uses a defaultdict with list as main values to
track subscribers (our consumers) for each topic.
Subscriber objects are expected to implement a simple interface with just
an update method for now.
"""
def __init__(self):
self.subscribers = collections.defaultdict(list)
def subscribe(self, subscriber, topic):
self.subscribers[topic].append(subscriber)
def deliver(self, data, topic):
"""Call update method of each subscriber for the topic. As long as
the calls are non-blocking there's no need to make the asynchronous, but
nothing would've stopped us to do so if needed.
"""
if topic not in self.subscribers:
return
for subscriber in self.subscribers[topic]:
subscriber.update(data)
async def run(self):
"""Main entry-point, here we instantiate all the tasks, 5 and 10 seconds
delivery, using the 20 seconds as our "blocking" call.
A shutdown method would be called on application stop to cancel all
running tasks and gracefully close the event loop.
"""
loop = asyncio.get_running_loop()
loop.create_task(self.deliver_every_5s())
loop.create_task(self.deliver_every_10s())
try:
await self.deliver_every_20s()
except KeyboardInterrupt:
await self.shutdown()
async def shutdown(self):
loop = asyncio.get_running_loop()
tasks = [task for task in asyncio.Task.all_tasks() if task is not
asyncio.tasks.Task.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
loop.close()
async def deliver_every_5s(self):
while True:
try:
await asyncio.sleep(5)
self.deliver("5 seconds delivery", "5sec-delivery")
except asyncio.CancelledError:
break
async def deliver_every_10s(self):
while True:
try:
await asyncio.sleep(10)
self.deliver("10 seconds delivery", "10sec-delivery")
except asyncio.CancelledError:
break
async def deliver_every_20s(self):
while True:
try:
await asyncio.sleep(20)
self.deliver("20 seconds delivery", "20sec-delivery")
except asyncio.CancelledError:
break
Let’s define a simple subscriber interface, it will only need an update
method. Sub-classes can easily define some logic or different communication
types such as HTTP clients, Kinesis/Kafka producers.
To be noted that for the sake of the example, all deliver
methods doesn’t
block in any way, but it would be trivial to declare async
even the deliver
method and await on all subscribers on the AsyncDeliver
class by calling
asyncio.gather
coroutine.
import abc
class Subscriber(abc.ABC):
def __init__(self, name):
self.name = name
@abc.abstractmethod
def update(self, data):
raise NotImplementedError()
class NullSubscriber(Subscriber):
"""Simple subscriber that does nothing but printing the updates it
receives
"""
def update(self, data):
"""We expect this method to do non-blocking operations, if something have
to do with I/O in a blocking manner, we'd better re-define it as a
coroutine method which can be awaited.
"""
print(f"[{self.name}] Received data: {data}")
And finally let’s smoke-test it by running a simple scenario, 3 consumers
Toki
, Shu
and Raoh
(names have clearly no interlacing on one another)
if __name__ == '__main__':
deliverboy = AsyncDeliver()
subscriber_one = NullSubscriber('Toki')
subscriber_two = NullSubscriber('Shu')
subscriber_three = NullSubscriber('Raoh')
deliverboy.subscribe(subscriber_one, "5sec-delivery")
deliverboy.subscribe(subscriber_two, "10sec-delivery")
deliverboy.subscribe(subscriber_three, "20sec-delivery")
try:
asyncio.run(deliverboy.run())
except KeyboardInterrupt:
pass
Callback based approach
In python, asyncio
implementation support either coroutines as well as
callbacks, this was borrowed from the Twisted
framework which predates the
asyncio
coming, and by no surprise, built-in APIs for TCP/UDP communication
offer both patterns, dividing them in low-level
APIs and high-level
APIs, the first using
a callback approach and the latter, called streams, using a wrapper around
those low-level callbacks which makes them simple to use as coroutines.
import asyncio
import collections
class CallbackDeliver:
"""Callback based implementation, subscriber interface retain the
implementation, this time we'll use the `AbstractEventLoop.call_later`
synchronous call to schedule again the run of a callable callback.
"""
def __init__(self):
self.subscribers = collections.defaultdict(list)
self.event = None
def subscribe(self, subscriber, topic):
self.subscribers[topic].append(subscriber)
def deliver(self, data, topic):
if topic not in self.subscribers:
return
for subscriber in self.subscribers[topic]:
subscriber.update(data)
async def run(self):
loop = asyncio.get_running_loop()
self.event = asyncio.Event(loop=loop)
loop.call_later(5, self.deliver_every_5s)
loop.call_later(10, self.deliver_every_10s)
loop.call_later(20, self.deliver_every_20s)
await self.event.wait()
def shutdown(self):
self.event.set()
def deliver_every_5s(self):
self.deliver("5 seconds delivery", "5sec-delivery")
asyncio.get_running_loop().call_later(5, self.deliver_every_5s)
def deliver_every_10s(self):
self.deliver("10 seconds delivery", "10sec-delivery")
asyncio.get_running_loop().call_later(10, self.deliver_every_10s)
def deliver_every_20s(self):
self.deliver("20 seconds delivery", "20sec-delivery")
asyncio.get_running_loop().call_later(20, self.deliver_every_20s)
To simplify a bit more it’s pretty straight to move some code inside a decorator to make a method a looping callback.
import asyncio
import functools
def run_every(seconds):
def inner_func(func):
@functools.wraps(func)
def func_wrapper(self, *args, **kwargs):
loop = asyncio.get_running_loop()
func(self, *args, **kwargs)
loop.call_later(seconds, func_wrapper, self, *args, **kwargs)
return func_wrapper
return inner_func
This allow us to write our delivery methods with no need to retrieve the
running event loop, a detail in the decorator definition that should be taken
in consideration is that in the AbstractEventLoop.call_later
call we pass in
as callable argument the func_wrapper
defined inside the inner_func
and not
only the func
like we would’ve done normally. This for the simple reason that
we need to re-apply the decorator at each scheduling, if we passed just the
func
the loop would’ve just run the next iteration of the callable without
re-scheduling it again.
@run_every(5)
def deliver_every_5s(self):
self.deliver("5 seconds delivery", "5sec-delivery")
Again we can run a simple main, that should result the same as the previous coroutine-based implementation
if __name__ == '__main__':
deliverboy = CallbackDeliver()
subscriber_one = NullSubscriber('Toki')
subscriber_two = NullSubscriber('Shu')
subscriber_three = NullSubscriber('Raoh')
deliverboy.subscribe(subscriber_one, "5sec-delivery")
deliverboy.subscribe(subscriber_two, "10sec-delivery")
deliverboy.subscribe(subscriber_three, "20sec-delivery")
try:
asyncio.run(deliverboy.run())
except KeyboardInterrupt:
deliverboy.shutdown()
And that’s it, these two snippets should be a good comparison of the two
methods, overall, as we can see, there are not many differences between them,
both systems accomplish to the results we expected, callback approach is probably
neater and require a bit less code to be written, but it assumes that deliver
method doesn’t block in any situation, so for the majority of cases it expects
some sort of buffering or queues to be involved. The coroutine-based instead
can be easily edited to make asynchronous even the deliver
method in order to
not block the loop at any time.