Working with Async code¶
Photons provides a number of utilities for working with Python async code. These
are in the
photons_app.helpers module. It is a Photons convention to import
this module with the alias
hp and access functionality from that:
from photons_app import helpers as hp async def my_coroutine(): pass task = hp.async_as_background(my_coroutine())
These functions are for turning coroutines into tasks. In asyncio Python a coroutine isn’t put onto the loop until it’s either turned into a task or awaited on.
This is useful because if a task exits with an exception, but nothing ever retrieves that exception then Python will print annoying warnings about this.
from photons_app import helpers as hp async def my_func(): await something() # Kick off the function in the background hp.async_as_background(my_func())
async_with_timeout(coroutine, *, timeout=10, timeout_error=None, silent=False, name=None)¶
Run a coroutine as a task until it’s complete or times out.
If time runs out the task is cancelled.
If timeout_error is defined, that is raised instead of asyncio.CancelledError on timeout.
from photons_app.helpers import hp import asyncio async def my_coroutine(): await asyncio.sleep(120) await hp.async_with_timeout(my_coroutine(), timeout=20)
It can be useful to wait for all futures to complete or just one future to
complete. The standard library provides
asyncio.wait for this, but Photons
provides a simpler version of this that doesn’t need to worry about returning
Wait for all the futures to be complete and return without error regardless of whether the futures completed successfully or not.
If there are no futures, nothing is done and we return without error.
We determine all the futures are done when the number of completed futures is equal to the number of futures we started with. This is to ensure if a future is special and calling done() after the future callback has been called is not relevant anymore, we still count the future as done.
Return without error when the first future to be completed is done.
Cancel the provided futures and wait for them all to finish. We will still await the futures if they are all already done to ensure no warnings about futures being destroyed while still pending.
Futures and tasks in asyncio Python can be given a callback that is executed when that future or task is completed.
A generic reporter for asyncio tasks.
t = loop.create_task(coroutine()) t.add_done_callback(hp.reporter)
This means that exceptions are logged to the terminal and you won’t get warnings about tasks not being looked at when they finish.
This method will return
Trueif there was no exception and
It also handles and silences
A generic reporter for asyncio tasks that doesn’t log errors.
t = loop.create_task(coroutine()) t.add_done_callback(hp.silent_reporter)
This means that exceptions are not logged to the terminal and you won’t get warnings about tasks not being looked at when they finish.
This method will return
Trueif there was no exception and
It also handles and silences
transfer_result(fut, errors_only=False, process=None)¶
done_callbackthat transfers the result, errors or cancellation to the provided future.
If errors_only is
Truethen it will not transfer a successful result to the provided future.
from photons_app import helpers as hp import asyncio async def my_coroutine(): return 2 fut = hp.create_future() task = hp.async_as_background(my_coroutine()) task.add_done_callback(hp.transfer_result(fut)) assert (await fut) == 2
If process is provided, then when the coroutine is done, process will be called with the result of the coroutine and the future that result is being transferred to.
We can also tell if a future or task already has a particular function as a callback.
Look at the callbacks on the future and return
Trueif any of them are the provided
Custom Future classes¶
Photons provides some classes that behave like Futures, but have additional functionality.
A future object with a
reset()function that resets it
fut = ResettableFuture() fut.set_result(True) await fut == True fut.reset() fut.set_result(False) await fut == False
Calling reset on one of these will do nothing if it already isn’t resolved.
Calling reset on a resolved future will also remove any registered done callbacks.
ChildOfFuture(original_fut, *, name=None)¶
Create a future that also considers the status of it’s parent.
So if the parent is cancelled, then this future is cancelled. If the parent raises an exception, then that exception is given to this result
The special case is if the parent receives a result, then this future is cancelled.
The recommended use is with it’s context manager:
from photons_app import helpers as hp parent_fut = hp.create_future() with hp.ChildOfFuture(parent_fut): ...
If you don’t use the context manager then ensure you resolve the future when you no longer need it (i.e.
fut.cancel()) to avoid a memory leak.
Objects for doing async work¶
ATicker(every, *, final_future=None, max_iterations=None, max_time=None, min_wait=0.1, name=None)¶
This object gives you an async generator that yields every
everyseconds, taking into account how long it takes for your code to finish for the next yield.
from photons_app import helpers as hp import time start = time.time() timing =  async for _ in hp.ATicker(10): timing.append(time.time() - start) asyncio.sleep(8) if len(timing) >= 5: break assert timing == [0, 10, 20, 30, 40]
The value that is yielded is a tuple of (iteration, time_till_next) where
iterationis a counter of how many times we yield a value starting from 1 and the
time_till_nextis the number of seconds till the next time we yield a value.
You can use the shortcut
tick()to create one of these, but if you do create this yourself, you can change the
everyvalue while you’re iterating.
from photons_app import helpers as hp ticker = hp.ATicker(10) done = 0 async with ticker as ticks: async for _ in ticks: done += 1 if done == 3: # This will mean the next tick will be 20 seconds after the last # tick and future ticks will be 20 seconds apart ticker.change_after(20) elif done == 5: # This will mean the next tick will be 40 seconds after the last # tick, but ticks after that will go back to 20 seconds apart. ticker.change_after(40, set_new_every=False)
There are three other options:
If this future is completed then the iteration will stop
Iterations after this number will cause the loop to finish. By default there is no limit
After this many iterations the loop will stop. By default there is no limit
The minimum amount of time to wait after a tick.
If this is False then we will always just tick at the next expected time, otherwise we ensure this amount of time at a minimum between ticks
tick(every, *, final_future=None, max_iterations=None, max_time=None, min_wait=0.1, name=None)¶
from photons_app import helpers as hp async wit hp.tick(every) as ticks: async for i in ticks: yield i # Is a nicer way of saying async for i in hp.ATicker(every): yield i
If you want control of the ticker during the iteration, then use
TaskHolder(final_future, *, name=None)¶
An object for managing asynchronous coroutines.
Usage looks like:
from photons_app import helpers as hp final_future = hp.create_future() async def something(): await asyncio.sleep(5) with hp.TaskHolder(final_future) as ts: ts.add(something()) ts.add(something())
If you don’t want to use the context manager, you can say:
from photons_app import helpers as hp final_future = hp.create_future() async def something(): await asyncio.sleep(5) ts = hp.TaskHolder(final_future) try: ts.add(something()) ts.add(something()) finally: await ts.finish()
Once your block in the context manager is done the context manager won’t exit until all coroutines have finished. During this time you may still use
ts.add_taskon the holder.
final_futureis cancelled before all the tasks have completed then the tasks will be cancelled and properly waited on so their finally blocks run before the context manager finishes.
ts.addwill also return the task object that is made from the coroutine.
ts.addalso takes a
silent=Falseparameter, that when True will not log any errors that happen. Otherwise errors will be logged.
If you already have a task object, you can give it to the holder with
add(coro, *, silent=False)¶
finish(exc_type=None, exc=None, tb=None)¶
ResultStreamer(final_future, *, error_catcher=None, exceptions_only_to_error_catcher=False, name=None)¶
An async generator you can add tasks to and results will be streamed as they become available.
To use this, you first create a streamer and give it a
error_catcher. If the
final_futureis cancelled, then the streamer will stop and any tasks it knows about will be cancelled.
error_catcheris a standard Photons error_catcher. If it’s a list then exceptions will be added to it. If it’s a function then it will be called with exceptions. Otherwise it is ignored. Note that if you don’t specify
exceptions_only_to_error_catcher=Truethen result objects will be given to the
error_catcherrather than the exceptions themselves.
Once you have a streamer you add tasks, coroutines or async generators to the streamer. Once you have no more of these to add to the streamer then you call
streamer.no_more_work()so that when all remaining tasks have finished, the streamer will stop iterating results.
The streamer will yield
ResultStreamer.Resultobjects that contain the
valuefrom the task, a
contextobject that you give to the streamer when you register a task and a
successfulboolean that is
Falsewhen the result was from an exception.
When you register a task/coroutine/generator you may specify an
on_donecallback which will be called when it finishes. For tasks and coroutines this is called with the result from that task. For async generators it is either called with an exception Result if the generator did not exit successfully, or a success Result with a
When you add an async generator, you may specify an
on_eachfunction that will be called for each value that is yielded from the generator.
You may add tasks, coroutines and async generators while you are taking results from the streamer.
from photons_app import helpers as hp final_future = hp.create_future() def error_catcher(error_result): print(error_result) streamer = hp.ResultStreamer(final_future, error_catcher=error_catcher) async def coro_function(): await something await streamer.add_generator(generator2, context=SomeContext()) return 20 async def coro_function2(): return 42 async def generator(): for i in range(3): yield i await something_else await streamer.add_coroutine(coro_function2()) await streamer.add_coroutine(coro_function(), context=20) await streamer.add_generator(coro_function()) async with streamer: async for result in streamer: print(result.value, result.context, result.successful)
If you don’t want to use the
async with streamerthen you must call
await streamer.finish()when you are done to ensure everything is cleaned up.
Result(value, context, successful)¶
The object that the streamer will yield. This contains the
valuebeing yielded, the
contextassociated with the coroutine/generator this value comes from; and a
successfulboolean that says if this was an error.
add_generator(gen, *, context=None, on_each=None, on_done=None)¶
add_coroutine(coro, *, context=None, on_done=None, force=False)¶
add_value(value, *, context=None, on_done=None)¶
add_task(task, *, context=None, on_done=None, force=False)¶
ThreadToAsyncQueue(stop_fut, num_threads, onerror, *args, name=None, **kwargs)¶
A Queue for requesting data from some thread:
from photons_app import helpers as hp class MyQueue(hp.ThreadToAsyncQueue): def create_args(self, thread_number, existing): if existing: # Assuming you have a way of seeing if you should refresh the args # Then this gives you an opportunity to do any shutdown logic for # existing args and create new, or just return what exists already if not should_refresh(): return existing cleanup(existing) my_thread_thing = THING() return (my_thread_thing, ) def onerror(exc): ''' This is called on unexpected errors. The return of this function is ignored ''' pass # If stop_fut is cancelled or has a result, then the queue will stop # But the thread will continue until queue.finish() is called queue = MyQueue(stop_fut, 10, onerror) await queue.start() def action(my_thread_thing): '''This runs in one of the threads''' return my_thread_thing.stuff() await queue.request(action) await queue.finish()
Hook for extra setup and takes in the extra unused positional and keyword arguments from instantiating this class.
Hook to return extra args to give to functions when the are requested
This is called once when the queue is started and then subsequently before every request.
It must return
Noneor a tuple of arguments to pass into request functions.
Hook to return a function that will perform the work
By default this says:
def wrapped(): return proc(*args) return wrapped
Make a request and get back a future representing the result of that request.
funcprovided will be called in one of our threads and provided the
from photons_app import helpers as hp class MyQueue(hp.ThreadToAsyncQueue): def create_args(self, thread_number, existing): return ("a", "b") queue = MyQueue(...) await queue.start() def action(letter1, letter2): assert letter1 == "a" assert letter1 == "b" return "c" assert (await queue.request(action)) == "c"
Start tasks to listen for requests made with the
Signal to the tasks to stop at the next available moment