sav.channels

Channels between coroutines.

A simple Python implementation of the channel synchronization construct for async/await applications.

Channels are used for synchronization in the CSP concurrency model. They are natively supported by languages that implement this model, such as occam and Go. Python has asynchronous generators, which are similar to channels except that they require yielding instead of calling from one of the two endpoints. While this makes no difference in many cases, some problems are easier to solve if a data stream can be accessed from both ends by calling instead of yielding.

The sav.channels module implements channels as pairs of asynchronous generators. When an object is sent into one generator, it will be yielded by the other generator, and vice-versa.

Installation

This module requires Python 3.8 or higher. Use pip to install it from the command line:

pip install sav.channels

Or visit the online project pages on GitHub and PyPI.

Overview

To use the module, import channels from the sav namespace into an asyncio application. The module contains two functions, channels.create() and channels.open(). The following example demonstrates how they are used:

import asyncio
from sav import channels

a_receiver, a_sender = channels.create()
b_receiver, b_sender = channels.create()


async def send_messages():
    """Send messages into multiple channels."""
    async with channels.open(a_sender), channels.open(b_sender):
        await a_sender.asend('Hello Arnold.')
        await b_sender.asend('Hello Bernard.')
        await a_sender.asend('Goodbye Arnold.')
        await b_sender.asend('Goodbye Bernard.')


async def show_messages(name, receiver):
    """Show messages from a single channel."""
    async for message in receiver:
        print(f'Message for {name}: {message}')


async def main():
    """Run both channels concurrently."""
    await asyncio.gather(send_messages(),
                         show_messages('Arnold', a_receiver),
                         show_messages('Bernard', b_receiver))


asyncio.run(main())

Channels are created by calling channels.create():

sav.channels.create() → Tuple[AsyncGenerator, AsyncGenerator]

Create a new channel.

Returns

A pair of asynchronous generators. When an object is sent into one of the generators, it is yielded by the other, and vice-versa.

In the example, the first generator of each channel is simply iterated over with an async for loop. The second generator may be started and closed using async with and channels.open():

sav.channels.open(ag: _AG, *, start: bool = True, clear: bool = True, close: bool = True) → AsyncGenerator[_AG, None]

Use a context manager to start and close a generator.

Parameters
  • ag – The async generator.

  • start – Whether the generator should be started.

  • clear – Whether StopAsyncIteration should be cleared.

  • close – Whether the generator should be closed.

Returns

The async context manager.

The async with statement waits until the async for loops at the other ends of both channels are started, after which each second endpoint is ready to send.

When control flows out of the async with block, the context managers close the generators and schedules their counterparts at the async for ends of the channels to raise StopAsyncIteration.

Bidirectional channels

The following example shows how to use a channel bidirectionally:

import asyncio
import itertools
from typing import AsyncGenerator
from sav import channels


async def numbers(c: AsyncGenerator[str, int]) -> None:
    async with channels.open(c, start=False):
        print(await c.asend(None))
        for i in itertools.count():
            print(await c.asend(i))


async def letters(c: AsyncGenerator[int, str]) -> None:
    async with channels.open(c):
        print(await c.asend("A"))
        print(await c.asend("B"))


async def main() -> None:
    c_left, c_right = channels.create()
    await asyncio.gather(numbers(c_left), letters(c_right))


asyncio.run(main())

In this case, channels.open() is used to set up async with blocks at both ends of the channel.

The first endpoint always starts receiving. Therefore, it is opened with start=False, and must be started manually by sending None in order to await the first value from the second end.

The second endpoint waits until the first endpoint has been started.

From that point onwards, the channel operates symmetrically: when one generator is suspended, the other one is waiting, and when a value is sent into the suspended generator, the waiting generator is scheduled to yield that value.

When one of the two generators is closed, the other generator will be scheduled to raise StopAsyncIteration. Although there is no async for loop in this case, channels.open() will clear the StopAsyncIteration exception when it reaches the end of the async with block. In this way, the bidirectional usage elegantly resembles the unidirectional usage: when one of the two async with blocks is exited, the other one will be exited as well.

Channels and generator pipelines

One powerful application of generators is that they may be used to build generator pipelines. The following example shows how a channel may be used to send values into such a pipeline from the upstream end. In this case, the channel functions like a generator adapter, reversing the directions in which data is being sent and yielded.

import asyncio
from typing import AsyncIterator
from sav import channels

receiver, sender = channels.create()


async def produce() -> None:
    async with channels.open(sender):
        send_item = sender.asend
        await send_item("Item 1")
        await send_item("Item 2")
        await send_item("Item 3")
        await send_item("Item 4")


async def transform() -> AsyncIterator[str]:
    t = None
    async for item in receiver:
        if t is None:
            t = ['Combining:', item]
        else:
            t.append(item)
            s = ' '.join(t)
            yield s
            t = None


async def consume() -> None:
    async for s in transform():
        print(s)


async def main() -> None:
    await asyncio.gather(produce(), consume())


asyncio.run(main())