Working with Async code
Photons provides a number of utilities for working with Python async code. These
are in the photons_app.helpers
module. It is a Photons convention to import
this module with the alias hp
and access functionality from that:
from photons_app import helpers as hp
async def my_coroutine():
pass
task = hp.async_as_background(my_coroutine())
Making tasks
These functions are for turning coroutines into tasks. In asyncio Python a coroutine isn’t put onto the loop until it’s either turned into a task or awaited on.
- photons_app.helpers.async_as_background(coroutine, silent=False)
Create a task with
reporter()
as a done callback and return the created task. Ifsilent=True
then usesilent_reporter()
.This is useful because if a task exits with an exception, but nothing ever retrieves that exception then Python will print annoying warnings about this.
from photons_app import helpers as hp async def my_func(): await something() # Kick off the function in the background hp.async_as_background(my_func())
- async photons_app.helpers.async_with_timeout(coroutine, *, timeout=10, timeout_error=None, silent=False, name=None)
Run a coroutine as a task until it’s complete or times out.
If time runs out the task is cancelled.
If timeout_error is defined, that is raised instead of asyncio.CancelledError on timeout.
from photons_app.helpers import hp import asyncio async def my_coroutine(): await asyncio.sleep(120) await hp.async_with_timeout(my_coroutine(), timeout=20)
Managing futures
It can be useful to wait for all futures to complete or just one future to
complete. The standard library provides asyncio.wait
for this, but Photons
provides a simpler version of this that doesn’t need to worry about returning
results.
- async photons_app.helpers.wait_for_all_futures(*futs, name=None)
Wait for all the futures to be complete and return without error regardless of whether the futures completed successfully or not.
If there are no futures, nothing is done and we return without error.
We determine all the futures are done when the number of completed futures is equal to the number of futures we started with. This is to ensure if a future is special and calling done() after the future callback has been called is not relevant anymore, we still count the future as done.
- async photons_app.helpers.wait_for_first_future(*futs, name=None)
Return without error when the first future to be completed is done.
- async photons_app.helpers.cancel_futures_and_wait(*futs, name=None)
Cancel the provided futures and wait for them all to finish. We will still await the futures if they are all already done to ensure no warnings about futures being destroyed while still pending.
Future callbacks
Futures and tasks in asyncio Python can be given a callback that is executed when that future or task is completed.
- photons_app.helpers.reporter(res)
A generic reporter for asyncio tasks.
For example:
t = loop.create_task(coroutine()) t.add_done_callback(hp.reporter)
This means that exceptions are logged to the terminal and you won’t get warnings about tasks not being looked at when they finish.
This method will return
True
if there was no exception andNone
otherwise.It also handles and silences
asyncio.CancelledError
.
- photons_app.helpers.silent_reporter(res)
A generic reporter for asyncio tasks that doesn’t log errors.
For example:
t = loop.create_task(coroutine()) t.add_done_callback(hp.silent_reporter)
This means that exceptions are not logged to the terminal and you won’t get warnings about tasks not being looked at when they finish.
This method will return
True
if there was no exception andNone
otherwise.It also handles and silences
asyncio.CancelledError
.
- photons_app.helpers.transfer_result(fut, errors_only=False, process=None)
Return a
done_callback
that transfers the result, errors or cancellation to the provided future.If errors_only is
True
then it will not transfer a successful result to the provided future.from photons_app import helpers as hp import asyncio async def my_coroutine(): return 2 fut = hp.create_future() task = hp.async_as_background(my_coroutine()) task.add_done_callback(hp.transfer_result(fut)) assert (await fut) == 2
If process is provided, then when the coroutine is done, process will be called with the result of the coroutine and the future that result is being transferred to.
We can also tell if a future or task already has a particular function as a callback.
- photons_app.helpers.fut_has_callback(fut, callback)
Look at the callbacks on the future and return
True
if any of them are the providedcallback
.
Custom Future classes
Photons provides some classes that behave like Futures, but have additional functionality.
- class photons_app.helpers.ResettableFuture(name=None)
A future object with a
reset()
function that resets itUsage:
fut = ResettableFuture() fut.set_result(True) await fut == True fut.reset() fut.set_result(False) await fut == False
Calling reset on one of these will do nothing if it already isn’t resolved.
Calling reset on a resolved future will also remove any registered done callbacks.
- class photons_app.helpers.ChildOfFuture(original_fut, *, name=None)
Create a future that also considers the status of it’s parent.
So if the parent is cancelled, then this future is cancelled. If the parent raises an exception, then that exception is given to this result
The special case is if the parent receives a result, then this future is cancelled.
The recommended use is with it’s context manager:
from photons_app import helpers as hp parent_fut = hp.create_future() with hp.ChildOfFuture(parent_fut): ...
If you don’t use the context manager then ensure you resolve the future when you no longer need it (i.e.
fut.cancel()
) to avoid a memory leak.
Objects for doing async work
- class photons_app.helpers.ATicker(every, *, final_future=None, max_iterations=None, max_time=None, min_wait=0.1, pauser=None, name=None)
This object gives you an async generator that yields every
every
seconds, taking into account how long it takes for your code to finish for the next yield.For example:
from photons_app import helpers as hp import time start = time.time() timing = [] async for _ in hp.ATicker(10): timing.append(time.time() - start) asyncio.sleep(8) if len(timing) >= 5: break assert timing == [0, 10, 20, 30, 40]
The value that is yielded is a tuple of (iteration, time_till_next) where
iteration
is a counter of how many times we yield a value starting from 1 and thetime_till_next
is the number of seconds till the next time we yield a value.You can use the shortcut
tick()
to create one of these, but if you do create this yourself, you can change theevery
value while you’re iterating.from photons_app import helpers as hp ticker = hp.ATicker(10) done = 0 async with ticker as ticks: async for _ in ticks: done += 1 if done == 3: # This will mean the next tick will be 20 seconds after the last # tick and future ticks will be 20 seconds apart ticker.change_after(20) elif done == 5: # This will mean the next tick will be 40 seconds after the last # tick, but ticks after that will go back to 20 seconds apart. ticker.change_after(40, set_new_every=False)
There are three other options:
- final_future
If this future is completed then the iteration will stop
- max_iterations
Iterations after this number will cause the loop to finish. By default there is no limit
- max_time
After this many iterations the loop will stop. By default there is no limit
- min_wait
The minimum amount of time to wait after a tick.
If this is False then we will always just tick at the next expected time, otherwise we ensure this amount of time at a minimum between ticks
- pauser
If not None, we use this as a semaphore in an async with to pause the ticks
- photons_app.helpers.tick(every, *, final_future=None, max_iterations=None, max_time=None, min_wait=0.1, name=None, pauser=None)
from photons_app import helpers as hp async with hp.tick(every) as ticks: async for i in ticks: yield i # Is a nicer way of saying async for i in hp.ATicker(every): yield i
If you want control of the ticker during the iteration, then use
ATicker
directly.
- class photons_app.helpers.TaskHolder(final_future, *, name=None)
An object for managing asynchronous coroutines.
Usage looks like:
from photons_app import helpers as hp final_future = hp.create_future() async def something(): await asyncio.sleep(5) with hp.TaskHolder(final_future) as ts: ts.add(something()) ts.add(something())
If you don’t want to use the context manager, you can say:
from photons_app import helpers as hp final_future = hp.create_future() async def something(): await asyncio.sleep(5) ts = hp.TaskHolder(final_future) try: ts.add(something()) ts.add(something()) finally: await ts.finish()
Once your block in the context manager is done the context manager won’t exit until all coroutines have finished. During this time you may still use
ts.add
orts.add_task
on the holder.If the
final_future
is cancelled before all the tasks have completed then the tasks will be cancelled and properly waited on so their finally blocks run before the context manager finishes.ts.add
will also return the task object that is made from the coroutine.ts.add
also takes asilent=False
parameter, that when True will not log any errors that happen. Otherwise errors will be logged.If you already have a task object, you can give it to the holder with
ts.add_task(my_task)
.- add(coro, *, silent=False)
- add_task(task)
- async finish(exc_typ=None, exc=None, tb=None)
- class photons_app.helpers.ResultStreamer(final_future, *, error_catcher=None, exceptions_only_to_error_catcher=False, name=None)
An async generator you can add tasks to and results will be streamed as they become available.
To use this, you first create a streamer and give it a
final_future
anderror_catcher
. If thefinal_future
is cancelled, then the streamer will stop and any tasks it knows about will be cancelled.The
error_catcher
is a standard Photons error_catcher. If it’s a list then exceptions will be added to it. If it’s a function then it will be called with exceptions. Otherwise it is ignored. Note that if you don’t specifyexceptions_only_to_error_catcher=True
then result objects will be given to theerror_catcher
rather than the exceptions themselves.Once you have a streamer you add tasks, coroutines or async generators to the streamer. Once you have no more of these to add to the streamer then you call
streamer.no_more_work()
so that when all remaining tasks have finished, the streamer will stop iterating results.The streamer will yield
ResultStreamer.Result
objects that contain thevalue
from the task, acontext
object that you give to the streamer when you register a task and asuccessful
boolean that isFalse
when the result was from an exception.When you register a task/coroutine/generator you may specify an
on_done
callback which will be called when it finishes. For tasks and coroutines this is called with the result from that task. For async generators it is either called with an exception Result if the generator did not exit successfully, or a success Result with aResultStreamer.GeneratorComplete
instance.When you add an async generator, you may specify an
on_each
function that will be called for each value that is yielded from the generator.You may add tasks, coroutines and async generators while you are taking results from the streamer.
For example:
from photons_app import helpers as hp final_future = hp.create_future() def error_catcher(error_result): print(error_result) streamer = hp.ResultStreamer(final_future, error_catcher=error_catcher) await streamer.start() async def coro_function(): await something await streamer.add_generator(generator2, context=SomeContext()) return 20 async def coro_function2(): return 42 async def generator(): for i in range(3): yield i await something_else await streamer.add_coroutine(coro_function2()) await streamer.add_coroutine(coro_function(), context=20) await streamer.add_generator(coro_function()) async with streamer: async for result in streamer: print(result.value, result.context, result.successful)
If you don’t want to use the
async with streamer
then you must callawait streamer.finish()
when you are done to ensure everything is cleaned up.- class Result(value, context, successful)
The object that the streamer will yield. This contains the
value
being yielded, thecontext
associated with the coroutine/generator this value comes from; and asuccessful
boolean that says if this was an error.
- async add_generator(gen, *, context=None, on_each=None, on_done=None)
- async add_coroutine(coro, *, context=None, on_done=None, force=False)
- async add_value(value, *, context=None, on_done=None)
- async add_task(task, *, context=None, on_done=None, force=False)
- no_more_work()
- async finish(exc_typ=None, exc=None, tb=None)