Sweetasync 0.7.0

A collection of simple functions, decorators and classes to make your asynchronous code sweeter.

AIORunner

class sweetasync.aiorunner(timeout=None, loop=None, new_loop=False)

A decorator and context manager to turn async flow synchronous.

Parameters:
  • timeout (int or None) – Seconds before dropping the execution.
  • loop (AbstractEventLoop or None) – Event loop to use.
  • new_loop (bool) – If set will create a new loop on every run.
Returns:

A decorator doubling as a context manager.

Raises:

Any exception the coroutine will raise.

When used as a context manager, you can block on awaitables by calling run(). When used as a decorator, it turns async functions sync.

cancel()

Cancel all tasks added while in the context up to that point.

Raise:Any exception but CancelledError.
new_loop

Boolean indicating if a new loop is created on each run.

run(task, *tasks, timeout=False)

Run the task with the choosen timeout and event loop.

Parameters:
  • task – One or several awaitables to block on.
  • type – Awaitables
  • timeout (int) – Timeout on the task(s). Defaults to the object’s timeout.
Returns:

The result of the awaitable or a list of results.

Raise:

AssertionError if one argument isn’t an awaitable.

schedule(awaitable, *awaitables, timeout=False)

Schedule one or several awaitables in the event loop.

Parameters:
  • awaitables – Iterable of awaitables to schedule.
  • timeout (int) – Timeout on the task(s). Defaults to the object’s timeout.
Returns:

A task gathering the awaitables.

Raise:

AssertionError if one argument isn’t an awaitable.

timeout

Seconds before dropping the execution.

sweetasync.sync(aiofunc)

Use aiorunner with its default parameters.

Parameters:aiofunc – Coroutine function to be decorated.
Returns:A synchronous function.
Raises:Any exception the coroutine will raise.

Channels

General usage

To send a message, call your channel instance directly, which returns a coroutine:

await channel(msg)

To get a message, await the channel object directly:

msg = await channel

You can await the coroutine function get() to the same effect. Receiving or sending will raise ClosedChannel once the channel is closed:

exception sweetasync.ClosedChannel

Channel is closed, can’t send any more messages.

A channel is also an asynchronous iterable that stops once it is closed:

async for msg in channel:
    do_something(msg)

Iteration stops when the channel is closed with close() or close_nowait(). Note that those methods are idempotent for every channel class.

Channels can also be used from threads with specific methods which block until the other side is ready.

To get a message from a coroutine while in a thread, use recv():

msg = channel.recv()

To send a message to a coroutine from a thread, use send():

channel.send(msg)

It’s also possible to iterate on a channel from a thread:

for msg in channel:
     do_something(msg)

To close a channel from a thread, use close_threadsafe() or close_nowait_threadsafe().

The input_max and output_max constructor parameters allow you to make sure there aren’t too many coroutines simultaneously waiting for a message or waiting to send a message. Once the number of coroutines is over the maximum, a FullWaiter exception is raised. This is yet another aspect of properly managing backpressure.

NB: if you get strange behavior like getting only half of the messages, verify that you aren’t unknowingly awaiting the channel in multiple places. Conversely, this behavior is useful when you want to dispatch data among multiple listeners with no need to control tightly who gets what.

Channel

class sweetasync.Channel(loop=None, input_max=None, output_max=None)
A simple class to pass message asynchronously.

This version doesn’t buffer messages; sending or receiving messages blocks until the other side is ready.

Parameters:
  • loop (AbstractEventLoop or None) – Event loop to use.
  • input_max (None or an int.) – Maximum number of coroutines awaiting a message.
  • output_max (None or an int.) – Maximum number of coroutines waiting to send a message.
__call__(*messages)

Send one or several messages. If no argument is specified, None is sent.

Raises:FullWaiter if there are more coroutines simultaneously waiting for sending a message than given to the input_max parameter.
close()

Wait for all pending operations to complete and close the channel.

close_threadsafe()

Thread-safe close().

close_nowait()

Immediatly close the channel.

close_nowait_threadsafe()

Thread-safe close_nowait().

closed

Boolean indicating if the channel is closed.

get()

Get one message from the channel. If several were sent, a tuple is received.

Raises:FullWaiter if there are more coroutines simultaneously waiting for receiving a message than given to the output_max parameter.
loop

Event loop used by the channel.

recv()

Thread-safe method to receive messages.

Raises:FullWaiter if there are more coroutines simultaneously waiting for receiving a message than given to the output_max parameter.
send(*messages)

Thread-safe method to send messages.

Raises:FullWaiter if there are more coroutines simultaneously waiting for sending a message than given to the input_max parameter.

Buffered channel

class sweetasync.BufferedChannel(loop=None, maxsize=None, input_max=None, output_max=None)

A channel with an internal buffer of messages.

Until the buffer’s maximum size is reached, sending a message doesn’t block. If it is reached, sending a message blocks. Receiving a message blocks if the buffer is empty, otherwise it doesn’t.

Note that this class doesn’t inherit from Channel.

Parameters:
  • loop (AbstractEventLoop or None) – Event loop to use.
  • maxsize (None or an int.) – The size of the internal buffer. If not given or None, there is no maximum size.
  • input_max (None or an int.) – Maximum number of coroutines awaiting a message.
  • output_max (None or an int.) – Maximum number of coroutines waiting to send a message.
maxsize

An int or None indicating the maximum size of the internal buffer.

full

Return True if the buffer is full.

empty

Return True if the buffer is empty.

__call__(*messages)

Send one or several messages. If no argument is specified, None is sent.

Raises:FullWaiter if there are more coroutines simultaneously waiting for sending a message than given to the input_max parameter.
close()

Wait for all pending operations to complete and close the channel.

close_threadsafe()

Thread-safe close().

close_nowait()

Immediatly close the channel.

close_nowait_threadsafe()

Thread-safe close_nowait().

closed

Boolean indicating if the channel is closed.

get()

Get one message from the channel. If several were sent, a tuple is received.

Raises:FullWaiter if there are more coroutines simultaneously waiting for receiving a message than given to the output_max parameter.
loop

Event loop used by the channel.

recv()

Thread-safe method to receive messages.

Raises:FullWaiter if there are more coroutines simultaneously waiting for receiving a message than given to the output_max parameter.
send(*messages)

Thread-safe method to send messages.

Raises:FullWaiter if there are more coroutines simultaneously waiting for sending a message than given to the input_max parameter.

Event emitter

class sweetasync.EventEmitter(log_errors=True, loop=None)

Event emitter inspired by the same class in node.js.

Handling of backpressure is encouraged by emit(), which will block until all listeners have returned. All exceptions are sent to listeners of the “error” event; its default listener logs the exception.

Parameters:
  • loop (AbstractEventLoop or None) – Event loop to use.
  • log_errors (bool) – Set to False if you don’t want the default error listener to be active. Note that you’ll have to provide at least one for emit() to work.

This class should be inheritable without much difficulty as the public interface is as small as possible, and there are no private members and no complex metaprogramming.

emit(event, message, **kwargs)

Trigger an event, sending the same message to every listener.

This method must be awaited, as it makes far easier to design systems which correctly propagate constraints in processing speed (ie backpressure). If you really want to use it in a “fire and forget” fashion, use asyncio.ensure_future().

Exceptions and their associated listener are sent in a tuple to listeners of “error”.

Parameters:
  • event (Any hashable) – An event identifier.
  • message – An object that will be passed to every listener registered for that event. There is no guarantee the listeners will be launched in the order they were defined. Even if it does, don’t rely on it.
  • kwargs (Anything that the listeners can accept) – Optionnal arguments for listeners; for those which don’t accept keyword arguments, they won’t raise an exception.
Returns:

None.

Raises:

AssertionError if there is no listener for “error”.

listeners

Dict mapping event identifiers to lists of wrapped listeners. You can manipulate it to delete listeners or modify their priority.

loop

Event loop used by the event emitter.

on(event, listener=None, *, kwargs=False)

Add a listener for an event.

Contrarily to node.js’ EventEmitter, this method can’t be chained.

Parameters:
  • event (Any hashable) – An event identifier.
  • listener – Callable that will be passed a message each time an event is triggered. It can be an asynchronous function. If not provided, this method will return a decorator which will add the decorated function as a listener.
  • kwargs (bool) – Indicates if the listener can accept keyword arguments (this can’t be reliably auto-detected).
Returns:

A decorated listener which launches the actual listener asynchronously and drops keyword arguments if they aren’t accepted. You can use it to identify the listener in listeners.

Raises:

AssertionError – The listener isn’t a callable.

once(event, listener=None, *, kwargs=False)

Add a listener for only one activation of an event.

Otherwise works identically to on().

Miscellaneous

sweetasync.aiter(iterable)

Turn an iterable into an asynchronous iterable.

This is meant to spread intensive computation along the event loop.

Parameters:

iterable (Iterable) – Any iterable.

Returns:

An asynchronous iterator.

Raises:
  • TypeError – You must provide an iterable.
  • StopAsyncIteration

Example:

async for _ in aiter(range(10000)):
    do_something()
sweetasync.as_await(method)

Decorator to use an async function to define an __await__ method.

sweetasync.asyncify(callback)

A decorator to turn sync functions into async functions.

Useful to avoid duplicating code or to create tasks.

Waiter

exception sweetasync.EmptyWaiter
exception sweetasync.FullWaiter
class sweetasync.Waiter(loop=None, maxsize=None)

An awaitable which accepts multiple results.

Parameters:
  • loop (An event loop or None to use the default one) – The event loop to use.
  • maxsize (int or None for no limit) – Maximum number of simultaneous listeners.

The length of a waiter is the number of asynchronous listeners awaiting it. This class is subscriptable, and returns a future that was enqueued. Trying to send a value or get an item when there is no listener (or when no future has been enqueued with get()) will raise EmptyWaiter.

Waiters have an asynchronous side and a synchronous side. It can be awaited multiple times, and each time the waiter is called on a value, this value is sent to the first non-cancelled listener. The values are sent synchronously. Typically used to have multiple coroutines wait in line to get a value.

This class is useful to implement asynchronous primitives such as locks or channels and isn’t meant to be used casually as you have to carefully coordinate the moment values are sent and manage the exceptions raised when the awaiting side isn’t ready.

Here is an example of an implementation simplified thanks to this class, a basic asynchronous lock:

from sweetasync import Waiter, EmptyWaiter
from contextlib import suppress

class Lock:

    def __init__(self):
        self.active = False
        self._waiter = Waiter()

    async def acquire(self):
        if self.active:
            await self._waiter
        self.active = True

    def release(self):
        with suppress(EmptyWaiter):
            self._waiter(None)
        self.active = False
cancel()

Cancel all awaiters.

exception(exception)

Send an exception to all awaiters.

get()

Create and enqueue a new future into the waiter, then return it.

Returns:A future that was enqueued into the waiter.
Raises:FullWaiter if there are more than maxsize enqueued futures.
loop = None

The event loop the waiter is attached to.