Python AsyncIO
The don’t make this stuff easy to understand, especially if you are stuck on an older version of python. No guarantee that these are the best methods, but they help me understand the program flow.
Coroutines and Callbacks
Tips for next time:
- A coroutine is much like a future.
- A sequence of callbacks on a conventional non blocking interface can be brought into the asyncio world using futures.
e.g. callback_and_coroutines.py
async def hello0():
print ("Hello0: Enter")
await _blocking_replaced_by_await()
print ("Hello0: Complete")
Is equivalent to this (assuming blocking_replaced_by*() functions are the same):
def hello1():
print ("Hello1: Enter")
complete = asyncio.Future()
def _my_callback():
try:
complete.set_result(True)
print ("Hello1: Complete")
except e:
complete.set_exception(e)
_blocking_replaced_by_callback(complete_callback=_my_callback)
print ("Hello1: Exit")
return complete
Both functions will work with the code below. It will run them to completion.
An important distinction is exceptions. For the callback/future
version ensure all paths are captured by a set_result()
or
set_exception()
or the program can lockup.
print ("Start")
loop = asyncio.get_event_loop()
# Both implementations should produce the same result
loop.run_until_complete(hello0())
loop.run_until_complete(hello1())
print ("End")
Gather just schedules a set of coroutines, futures or tasks.
Futures and ‘awaitable’ methods can be used to control run time, different ways of expressing the tasks have the same result.
- Schedule separate tasks via
asyncio.ensure_future()
and use a future to determine the end of processing.
e.g. gather.py
done=asyncio.Future()
asyncio.ensure_future(worker_task0(done), loop=loop)
asyncio.ensure_future(worker_task1(), loop=loop)
loop.run_until_complete(done)
- Gather separate tasks via
asyncio.gather()
and use the exit of all tasks to determine the end of processing vialoop.run_until_complete()
.
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.gather(
worker_task0(),
worker_task1()))
Gather can also capture multiple exit conditions.
- An inner function may perform a task that needs to be waited on.
- An outer function may also wait on another
awaitable
coroutine. asyncio.gather
can synchronize these so they can be scheduled concurrently.
e.g. gather.py
def _background_work(complete_callback):
loop = asyncio.get_event_loop()
loop.call_later(3.0, complete_callback)
def return_future_and_perform_background_work():
print("return_future_and_perform_background_work starts")
complete = asyncio.Future()
_background_work(complete_callback=lambda: complete.set_result(True))
print("return_future_and_perform_background_work returns")
return complete
async def another_background_task():
print("another_background_task starts")
await asyncio.sleep(2.0)
print("another_background_task returns")
async def do_multiple_background_tasks():
print("do_multiple_background_tasks starts")
task1_done = return_future_and_perform_background_work()
await asyncio.gather(another_background_task(), task1_done)
print("do_multiple_background_tasks returns")
loop.run_until_complete(do_multiple_background_tasks())
Event Handling Between Tasks.
Notifications from provider to consumer can use synchronization primitives. Events are simple.
- A callback driven flow can be converted to an asyncio flow.
- A consumer task can wait on the event to signal fresh data.
- The consumer program flow can be structured like a conventional procedural loop.
e.g. events.py
class Consumer:
def __init__(self):
self.event = asyncio.Event()
self.datas = collections.deque()
def callback_from_producer(self, data):
self.datas.append(data)
self.event.set()
async def pop(self):
if len(self.datas)==0:
await self.event.wait()
self.event.clear()
return self.datas.popleft()
async def worker_task(consumer):
while True:
data = await consumer.pop()
print(f"Got {data}")
if data == "data 3":
return
consumer = Consumer()
loop = asyncio.get_event_loop()
loop.call_later(1.0, lambda: consumer.callback_from_producer("data 1"))
loop.call_later(2.0, lambda: consumer.callback_from_producer("data 2"))
loop.call_later(3.0, lambda: consumer.callback_from_producer("data 3"))
loop.run_until_complete(worker_task(consumer))
Converting a Termination Signal to a Future
Synchronize asynchronous OS signals into the event loop.
loop.add_signal_handler()
replaces thestandard signal.loop.add_signal_handler()
.- This example uses a future to allow a clean up task to
await
the signal. loop.run_until_complete()
can be used to control the program lifespan.
(TODO. There is another way to convert signals to file descriptor writes that is interesting.)
e.g. signal_cleanup.py
async def worker_task(program_complete):
print("worker task starting, control-c to exit")
while not program_complete.done():
await asyncio.sleep(1)
print("working...")
async def cleanup_task(program_complete):
status = await program_complete
print("cleanup...")
return
program_complete = asyncio.Future()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: program_complete.set_result(True))
# Schedule a 'task'
asyncio.ensure_future(worker_task(program_complete), loop=loop)
# Keep the loop running with worker and cleanup until signal is processed and cleaned up.
loop.run_until_complete(cleanup_task(program_complete))
Canceling a Task
It’s easy to lock up an asyncio program with a task that won’t die, or exit without handling the application cleanup when scheduled tasks are not returned to if the loop exits.
Use the Task.cancel()
method on a task object, and catch the associated
asyncio.CancelledError
exception it in the task coroutine.
e.g. cancel.py
async def worker_task():
print("Worker task starting, control-C to exit")
try:
while True:
await asyncio.sleep(1.0)
print("working...")
except asyncio.CancelledError:
print("Worker task canceled..")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(worker_task())
print("Did we get here?")
finally:
print("Cancel all tasks")
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))
print("All tasks complete")
Lockup
How to lock up a program.
- Add signal handlers.
- Don’t catch exceptions!
Subscribe via RSS