Working with Async code

Photons provides a number of utilities for working with Python async code. These are in the photons_app.helpers module. It is a Photons convention to import this module with the alias hp and access functionality from that:

from photons_app import helpers as hp


async def my_coroutine():
    pass

task = hp.async_as_background(my_coroutine())

Making tasks

These functions are for turning coroutines into tasks. In asyncio Python a coroutine isn’t put onto the loop until it’s either turned into a task or awaited on.

photons_app.helpers.async_as_background(coroutine, silent=False)

Create a task with reporter() as a done callback and return the created task. If silent=True then use silent_reporter().

This is useful because if a task exits with an exception, but nothing ever retrieves that exception then Python will print annoying warnings about this.

from photons_app import helpers as hp


async def my_func():
    await something()

# Kick off the function in the background
hp.async_as_background(my_func())
async photons_app.helpers.async_with_timeout(coroutine, *, timeout=10, timeout_error=None, silent=False, name=None)

Run a coroutine as a task until it’s complete or times out.

If time runs out the task is cancelled.

If timeout_error is defined, that is raised instead of asyncio.CancelledError on timeout.

from photons_app.helpers import hp

import asyncio


async def my_coroutine():
    await asyncio.sleep(120)

await hp.async_with_timeout(my_coroutine(), timeout=20)

Managing futures

It can be useful to wait for all futures to complete or just one future to complete. The standard library provides asyncio.wait for this, but Photons provides a simpler version of this that doesn’t need to worry about returning results.

async photons_app.helpers.wait_for_all_futures(*futs, name=None)

Wait for all the futures to be complete and return without error regardless of whether the futures completed successfully or not.

If there are no futures, nothing is done and we return without error.

We determine all the futures are done when the number of completed futures is equal to the number of futures we started with. This is to ensure if a future is special and calling done() after the future callback has been called is not relevant anymore, we still count the future as done.

async photons_app.helpers.wait_for_first_future(*futs, name=None)

Return without error when the first future to be completed is done.

async photons_app.helpers.cancel_futures_and_wait(*futs, name=None)

Cancel the provided futures and wait for them all to finish. We will still await the futures if they are all already done to ensure no warnings about futures being destroyed while still pending.

Future callbacks

Futures and tasks in asyncio Python can be given a callback that is executed when that future or task is completed.

photons_app.helpers.reporter(res)

A generic reporter for asyncio tasks.

For example:

t = loop.create_task(coroutine())
t.add_done_callback(hp.reporter)

This means that exceptions are logged to the terminal and you won’t get warnings about tasks not being looked at when they finish.

This method will return True if there was no exception and None otherwise.

It also handles and silences asyncio.CancelledError.

photons_app.helpers.silent_reporter(res)

A generic reporter for asyncio tasks that doesn’t log errors.

For example:

t = loop.create_task(coroutine())
t.add_done_callback(hp.silent_reporter)

This means that exceptions are not logged to the terminal and you won’t get warnings about tasks not being looked at when they finish.

This method will return True if there was no exception and None otherwise.

It also handles and silences asyncio.CancelledError.

photons_app.helpers.transfer_result(fut, errors_only=False, process=None)

Return a done_callback that transfers the result, errors or cancellation to the provided future.

If errors_only is True then it will not transfer a successful result to the provided future.

from photons_app import helpers as hp

import asyncio


async def my_coroutine():
    return 2

fut = hp.create_future()
task = hp.async_as_background(my_coroutine())
task.add_done_callback(hp.transfer_result(fut))

assert (await fut) == 2

If process is provided, then when the coroutine is done, process will be called with the result of the coroutine and the future that result is being transferred to.

We can also tell if a future or task already has a particular function as a callback.

photons_app.helpers.fut_has_callback(fut, callback)

Look at the callbacks on the future and return True if any of them are the provided callback.

Custom Future classes

Photons provides some classes that behave like Futures, but have additional functionality.

class photons_app.helpers.ResettableFuture(name=None)

A future object with a reset() function that resets it

Usage:

fut = ResettableFuture()
fut.set_result(True)
await fut == True

fut.reset()
fut.set_result(False)
await fut == False

Calling reset on one of these will do nothing if it already isn’t resolved.

Calling reset on a resolved future will also remove any registered done callbacks.

class photons_app.helpers.ChildOfFuture(original_fut, *, name=None)

Create a future that also considers the status of it’s parent.

So if the parent is cancelled, then this future is cancelled. If the parent raises an exception, then that exception is given to this result

The special case is if the parent receives a result, then this future is cancelled.

The recommended use is with it’s context manager:

from photons_app import helpers as hp

parent_fut = hp.create_future()

with hp.ChildOfFuture(parent_fut):
    ...

If you don’t use the context manager then ensure you resolve the future when you no longer need it (i.e. fut.cancel()) to avoid a memory leak.

Objects for doing async work

class photons_app.helpers.ATicker(every, *, final_future=None, max_iterations=None, max_time=None, min_wait=0.1, name=None)

This object gives you an async generator that yields every every seconds, taking into account how long it takes for your code to finish for the next yield.

For example:

from photons_app import helpers as hp

import time


start = time.time()
timing = []

async for _ in hp.ATicker(10):
    timing.append(time.time() - start)
    asyncio.sleep(8)
    if len(timing) >= 5:
        break

assert timing == [0, 10, 20, 30, 40]

The value that is yielded is a tuple of (iteration, time_till_next) where iteration is a counter of how many times we yield a value starting from 1 and the time_till_next is the number of seconds till the next time we yield a value.

You can use the shortcut tick() to create one of these, but if you do create this yourself, you can change the every value while you’re iterating.

from photons_app import helpers as hp


ticker = hp.ATicker(10)

done = 0

async with ticker as ticks:
    async for _ in ticks:
        done += 1
        if done == 3:
            # This will mean the next tick will be 20 seconds after the last
            # tick and future ticks will be 20 seconds apart
            ticker.change_after(20)
        elif done == 5:
            # This will mean the next tick will be 40 seconds after the last
            # tick, but ticks after that will go back to 20 seconds apart.
            ticker.change_after(40, set_new_every=False)

There are three other options:

final_future

If this future is completed then the iteration will stop

max_iterations

Iterations after this number will cause the loop to finish. By default there is no limit

max_time

After this many iterations the loop will stop. By default there is no limit

min_wait

The minimum amount of time to wait after a tick.

If this is False then we will always just tick at the next expected time, otherwise we ensure this amount of time at a minimum between ticks

photons_app.helpers.tick(every, *, final_future=None, max_iterations=None, max_time=None, min_wait=0.1, name=None)
from photons_app import helpers as hp


async wit hp.tick(every) as ticks:
    async for i in ticks:
        yield i

# Is a nicer way of saying

async for i in hp.ATicker(every):
    yield i

If you want control of the ticker during the iteration, then use ATicker directly.

class photons_app.helpers.TaskHolder(final_future, *, name=None)

An object for managing asynchronous coroutines.

Usage looks like:

from photons_app import helpers as hp


final_future = hp.create_future()

async def something():
    await asyncio.sleep(5)

with hp.TaskHolder(final_future) as ts:
    ts.add(something())
    ts.add(something())

If you don’t want to use the context manager, you can say:

from photons_app import helpers as hp


final_future = hp.create_future()

async def something():
    await asyncio.sleep(5)

ts = hp.TaskHolder(final_future)

try:
    ts.add(something())
    ts.add(something())
finally:
    await ts.finish()

Once your block in the context manager is done the context manager won’t exit until all coroutines have finished. During this time you may still use ts.add or ts.add_task on the holder.

If the final_future is cancelled before all the tasks have completed then the tasks will be cancelled and properly waited on so their finally blocks run before the context manager finishes.

ts.add will also return the task object that is made from the coroutine.

ts.add also takes a silent=False parameter, that when True will not log any errors that happen. Otherwise errors will be logged.

If you already have a task object, you can give it to the holder with ts.add_task(my_task).

add(coro, *, silent=False)
add_task(task)
async finish(exc_type=None, exc=None, tb=None)
class photons_app.helpers.ResultStreamer(final_future, *, error_catcher=None, exceptions_only_to_error_catcher=False, name=None)

An async generator you can add tasks to and results will be streamed as they become available.

To use this, you first create a streamer and give it a final_future and error_catcher. If the final_future is cancelled, then the streamer will stop and any tasks it knows about will be cancelled.

The error_catcher is a standard Photons error_catcher. If it’s a list then exceptions will be added to it. If it’s a function then it will be called with exceptions. Otherwise it is ignored. Note that if you don’t specify exceptions_only_to_error_catcher=True then result objects will be given to the error_catcher rather than the exceptions themselves.

Once you have a streamer you add tasks, coroutines or async generators to the streamer. Once you have no more of these to add to the streamer then you call streamer.no_more_work() so that when all remaining tasks have finished, the streamer will stop iterating results.

The streamer will yield ResultStreamer.Result objects that contain the value from the task, a context object that you give to the streamer when you register a task and a successful boolean that is False when the result was from an exception.

When you register a task/coroutine/generator you may specify an on_done callback which will be called when it finishes. For tasks and coroutines this is called with the result from that task. For async generators it is either called with an exception Result if the generator did not exit successfully, or a success Result with a ResultStreamer.GeneratorComplete instance.

When you add an async generator, you may specify an on_each function that will be called for each value that is yielded from the generator.

You may add tasks, coroutines and async generators while you are taking results from the streamer.

For example:

from photons_app import helpers as hp


final_future = hp.create_future()

def error_catcher(error_result):
    print(error_result)

streamer = hp.ResultStreamer(final_future, error_catcher=error_catcher)

async def coro_function():
    await something
    await streamer.add_generator(generator2, context=SomeContext())
    return 20

async def coro_function2():
    return 42

async def generator():
    for i in range(3):
        yield i
        await something_else
    await streamer.add_coroutine(coro_function2())

await streamer.add_coroutine(coro_function(), context=20)
await streamer.add_generator(coro_function())

async with streamer:
    async for result in streamer:
        print(result.value, result.context, result.successful)

If you don’t want to use the async with streamer then you must call await streamer.finish() when you are done to ensure everything is cleaned up.

class Result(value, context, successful)

The object that the streamer will yield. This contains the value being yielded, the context associated with the coroutine/generator this value comes from; and a successful boolean that says if this was an error.

async add_generator(gen, *, context=None, on_each=None, on_done=None)
async add_coroutine(coro, *, context=None, on_done=None, force=False)
async add_value(value, *, context=None, on_done=None)
async add_task(task, *, context=None, on_done=None, force=False)
no_more_work()
async finish()
class photons_app.helpers.ThreadToAsyncQueue(stop_fut, num_threads, onerror, *args, name=None, **kwargs)

A Queue for requesting data from some thread:

from photons_app import helpers as hp


class MyQueue(hp.ThreadToAsyncQueue):
    def create_args(self, thread_number, existing):
        if existing:
            # Assuming you have a way of seeing if you should refresh the args
            # Then this gives you an opportunity to do any shutdown logic for
            # existing args and create new, or just return what exists already
            if not should_refresh():
                return existing

            cleanup(existing)

        my_thread_thing = THING()
        return (my_thread_thing, )

def onerror(exc):
    '''
    This is called on unexpected errors. The return of this function
    is ignored
    '''
    pass

# If stop_fut is cancelled or has a result, then the queue will stop
# But the thread will continue until queue.finish() is called
queue = MyQueue(stop_fut, 10, onerror)
await queue.start()

def action(my_thread_thing):
    '''This runs in one of the threads'''
    return my_thread_thing.stuff()

await queue.request(action)
await queue.finish()
setup(*args, **kwargs)

Hook for extra setup and takes in the extra unused positional and keyword arguments from instantiating this class.

create_args(thread_number, existing)

Hook to return extra args to give to functions when the are requested

This is called once when the queue is started and then subsequently before every request.

It must return None or a tuple of arguments to pass into request functions.

wrap_request(proc, args)

Hook to return a function that will perform the work

This takes in the proc, which is the function you give to request() and the args returned from create_args().

By default this says:

def wrapped():
    return proc(*args)

return wrapped
request(func)

Make a request and get back a future representing the result of that request.

The func provided will be called in one of our threads and provided the args provided by create_args().

from photons_app import helpers as hp


class MyQueue(hp.ThreadToAsyncQueue):
    def create_args(self, thread_number, existing):
        return ("a", "b")

queue = MyQueue(...)
await queue.start()

def action(letter1, letter2):
    assert letter1 == "a"
    assert letter1 == "b"
    return "c"

assert (await queue.request(action)) == "c"
start()

Start tasks to listen for requests made with the request method

async finish()

Signal to the tasks to stop at the next available moment