sav.channels
¶
Channels between coroutines.
A simple Python implementation of the channel synchronization construct for async/await applications.
Channels are used for synchronization in the CSP concurrency model. They are natively supported by languages that implement this model, such as occam and Go. Python has asynchronous generators, which are similar to channels except that they require yielding instead of calling from one of the two endpoints. While this makes no difference in many cases, some problems are easier to solve if a data stream can be accessed from both ends by calling instead of yielding.
The sav.channels
module implements channels as pairs of
asynchronous generators. When an object is sent into one generator, it
will be yielded by the other generator, and vice-versa.
Installation¶
This module requires Python 3.8 or higher. Use pip
to install it
from the command line:
pip install sav.channels
Overview¶
To use the module, import channels
from the sav
namespace
into an asyncio
application. The module contains two functions,
channels.create()
and channels.open()
. The following example
demonstrates how they are used:
import asyncio
from sav import channels
a_receiver, a_sender = channels.create()
b_receiver, b_sender = channels.create()
async def send_messages():
"""Send messages into multiple channels."""
async with channels.open(a_sender), channels.open(b_sender):
await a_sender.asend('Hello Arnold.')
await b_sender.asend('Hello Bernard.')
await a_sender.asend('Goodbye Arnold.')
await b_sender.asend('Goodbye Bernard.')
async def show_messages(name, receiver):
"""Show messages from a single channel."""
async for message in receiver:
print(f'Message for {name}: {message}')
async def main():
"""Run both channels concurrently."""
await asyncio.gather(send_messages(),
show_messages('Arnold', a_receiver),
show_messages('Bernard', b_receiver))
asyncio.run(main())
Channels are created by calling channels.create()
:
sav.channels.
create
() → Tuple[AsyncGenerator, AsyncGenerator]¶Create a new channel.
- Returns
A pair of asynchronous generators. When an object is sent into one of the generators, it is yielded by the other, and vice-versa.
In the example, the first generator of each channel is simply iterated
over with an async for
loop. The second generator may be started and
closed using async with
and channels.open()
:
sav.channels.
open
(ag: _AG, *, start: bool = True, clear: bool = True, close: bool = True) → AsyncGenerator[_AG, None]¶Use a context manager to start and close a generator.
- Parameters
ag – The async generator.
start – Whether the generator should be started.
clear – Whether StopAsyncIteration should be cleared.
close – Whether the generator should be closed.
- Returns
The async context manager.
The async with
statement waits until the async for
loops at the
other ends of both channels are started, after which each second
endpoint is ready to send.
When control flows out of the async with
block, the context
managers close the generators and schedules their counterparts at the
async for
ends of the channels to raise StopAsyncIteration
.
Bidirectional channels¶
The following example shows how to use a channel bidirectionally:
import asyncio
import itertools
from typing import AsyncGenerator
from sav import channels
async def numbers(c: AsyncGenerator[str, int]) -> None:
async with channels.open(c, start=False):
print(await c.asend(None))
for i in itertools.count():
print(await c.asend(i))
async def letters(c: AsyncGenerator[int, str]) -> None:
async with channels.open(c):
print(await c.asend("A"))
print(await c.asend("B"))
async def main() -> None:
c_left, c_right = channels.create()
await asyncio.gather(numbers(c_left), letters(c_right))
asyncio.run(main())
In this case, channels.open()
is used to set up async with
blocks at both ends of the channel.
The first endpoint always starts receiving. Therefore, it is opened
with start=False
, and must be started manually by sending None
in order to await the first value from the second end.
The second endpoint waits until the first endpoint has been started.
From that point onwards, the channel operates symmetrically: when one generator is suspended, the other one is waiting, and when a value is sent into the suspended generator, the waiting generator is scheduled to yield that value.
When one of the two generators is closed, the other generator will be
scheduled to raise StopAsyncIteration
. Although there is no
async for
loop in this case, channels.open()
will clear the
StopAsyncIteration
exception when it reaches the end of the
async with
block. In this way, the bidirectional usage elegantly
resembles the unidirectional usage: when one of the two async with
blocks is exited, the other one will be exited as well.
Channels and generator pipelines¶
One powerful application of generators is that they may be used to build generator pipelines. The following example shows how a channel may be used to send values into such a pipeline from the upstream end. In this case, the channel functions like a generator adapter, reversing the directions in which data is being sent and yielded.
import asyncio
from typing import AsyncIterator
from sav import channels
receiver, sender = channels.create()
async def produce() -> None:
async with channels.open(sender):
send_item = sender.asend
await send_item("Item 1")
await send_item("Item 2")
await send_item("Item 3")
await send_item("Item 4")
async def transform() -> AsyncIterator[str]:
t = None
async for item in receiver:
if t is None:
t = ['Combining:', item]
else:
t.append(item)
s = ' '.join(t)
yield s
t = None
async def consume() -> None:
async for s in transform():
print(s)
async def main() -> None:
await asyncio.gather(produce(), consume())
asyncio.run(main())