The don’t make this stuff easy to understand, especially if you are stuck on an older version of python. No guarantee that these are the best methods, but they help me understand the program flow.

Coroutines and Callbacks

Tips for next time:

  • A coroutine is much like a future.
  • A sequence of callbacks on a conventional non blocking interface can be brought into the asyncio world using futures.

e.g. callback_and_coroutines.py

async def hello0():
    print ("Hello0: Enter")
    await _blocking_replaced_by_await()
    print ("Hello0: Complete")

Is equivalent to this (assuming blocking_replaced_by*() functions are the same):

def hello1():
    print ("Hello1: Enter")
    complete = asyncio.Future()
    def _my_callback():
        try:
            complete.set_result(True)
            print ("Hello1: Complete")
        except e:
            complete.set_exception(e)
    _blocking_replaced_by_callback(complete_callback=_my_callback)
    print ("Hello1: Exit")
    return complete

Both functions will work with the code below. It will run them to completion.

An important distinction is exceptions. For the callback/future version ensure all paths are captured by a set_result() or set_exception() or the program can lockup.

print ("Start")
loop = asyncio.get_event_loop()
# Both implementations should produce the same result
loop.run_until_complete(hello0())
loop.run_until_complete(hello1())
print ("End")

Gather just schedules a set of coroutines, futures or tasks.

Futures and ‘awaitable’ methods can be used to control run time, different ways of expressing the tasks have the same result.

  1. Schedule separate tasks via asyncio.ensure_future() and use a future to determine the end of processing.

e.g. gather.py

done=asyncio.Future()
asyncio.ensure_future(worker_task0(done), loop=loop)
asyncio.ensure_future(worker_task1(), loop=loop)
loop.run_until_complete(done)
  1. Gather separate tasks via asyncio.gather() and use the exit of all tasks to determine the end of processing via loop.run_until_complete().
loop = asyncio.get_event_loop()
loop.run_until_complete(    
    asyncio.gather(
        worker_task0(),
        worker_task1()))

Gather can also capture multiple exit conditions.

  • An inner function may perform a task that needs to be waited on.
  • An outer function may also wait on another awaitable coroutine.
  • asyncio.gather can synchronize these so they can be scheduled concurrently.

e.g. gather.py

def _background_work(complete_callback):
    loop = asyncio.get_event_loop()
    loop.call_later(3.0, complete_callback)

def return_future_and_perform_background_work():
    print("return_future_and_perform_background_work starts")
    complete = asyncio.Future()
    _background_work(complete_callback=lambda: complete.set_result(True))
    print("return_future_and_perform_background_work returns")
    return complete

async def another_background_task():
    print("another_background_task starts")
    await asyncio.sleep(2.0)
    print("another_background_task returns")

async def do_multiple_background_tasks():
    print("do_multiple_background_tasks starts")
    task1_done = return_future_and_perform_background_work()
    await asyncio.gather(another_background_task(), task1_done)
    print("do_multiple_background_tasks returns")

loop.run_until_complete(do_multiple_background_tasks())

Event Handling Between Tasks.

Notifications from provider to consumer can use synchronization primitives. Events are simple.

  • A callback driven flow can be converted to an asyncio flow.
  • A consumer task can wait on the event to signal fresh data.
  • The consumer program flow can be structured like a conventional procedural loop.

e.g. events.py

class Consumer:
    def __init__(self):
        self.event = asyncio.Event()
        self.datas = collections.deque()

    def callback_from_producer(self, data):
        self.datas.append(data)
        self.event.set()

    async def pop(self):
        if len(self.datas)==0:
            await self.event.wait()
            self.event.clear()
        return self.datas.popleft()

async def worker_task(consumer):
    while True:
        data = await consumer.pop()
        print(f"Got {data}")
        if data == "data 3":
            return

consumer = Consumer()

loop = asyncio.get_event_loop()
loop.call_later(1.0, lambda: consumer.callback_from_producer("data 1"))
loop.call_later(2.0, lambda: consumer.callback_from_producer("data 2"))
loop.call_later(3.0, lambda: consumer.callback_from_producer("data 3"))

loop.run_until_complete(worker_task(consumer))

Converting a Termination Signal to a Future

Synchronize asynchronous OS signals into the event loop.

  • loop.add_signal_handler() replaces the standard signal.loop.add_signal_handler().
  • This example uses a future to allow a clean up task to await the signal.
  • loop.run_until_complete() can be used to control the program lifespan.

(TODO. There is another way to convert signals to file descriptor writes that is interesting.)

e.g. signal_cleanup.py

async def worker_task(program_complete):
    print("worker task starting, control-c to exit")
    while not program_complete.done():
          await asyncio.sleep(1)
          print("working...")

async def cleanup_task(program_complete):
    status = await program_complete
    print("cleanup...")
    return

program_complete = asyncio.Future()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: program_complete.set_result(True))
# Schedule a 'task'
asyncio.ensure_future(worker_task(program_complete), loop=loop)
# Keep the loop running with worker and cleanup until signal is processed and cleaned up.
loop.run_until_complete(cleanup_task(program_complete))

Canceling a Task

It’s easy to lock up an asyncio program with a task that won’t die, or exit without handling the application cleanup when scheduled tasks are not returned to if the loop exits.

Use the Task.cancel() method on a task object, and catch the associated asyncio.CancelledError exception it in the task coroutine.

e.g. cancel.py

async def worker_task():
    print("Worker task starting, control-C to exit")
    try:
        while True:
            await asyncio.sleep(1.0)
            print("working...")
    except asyncio.CancelledError:
        print("Worker task canceled..")


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(worker_task())
    print("Did we get here?")
finally:
    print("Cancel all tasks")
    for task in asyncio.Task.all_tasks():
         task.cancel()
    loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))
    print("All tasks complete")

Lockup

How to lock up a program.

  • Add signal handlers.
  • Don’t catch exceptions!