"""Async iterator lost and found missing methods: aiter, anext, etc."""
import collections.abc
import sys
from functools import singledispatch
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Iterable,
Iterator,
List,
Tuple,
)
__all__ = [
'aenumerate',
'aiter',
'alist',
'anext',
'arange',
'aslice',
'chunks',
]
[docs]async def aenumerate(it: AsyncIterable[Any],
start: int = 0) -> AsyncIterator[Tuple[int, Any]]:
"""``async for`` version of ``enumerate``."""
i = start
async for item in it:
yield i, item
i += 1
class AsyncIterWrapper(AsyncIterator):
"""Wrap regular Iterator into an AsyncIterator."""
def __init__(self, it: Iterator) -> None:
self._it: Iterator = it
def __aiter__(self) -> AsyncIterator:
return self
async def __anext__(self) -> Any:
try:
return next(self._it)
except StopIteration as exc:
raise StopAsyncIteration() from exc
def __repr__(self) -> str:
return f'<{type(self).__name__}: {self._it}>'
[docs]@singledispatch
def aiter(it: Any) -> AsyncIterator:
"""Create iterator from iterable.
Notes:
If the object is already an iterator, the iterator
should return self when ``__aiter__`` is called.
"""
raise TypeError(f'{it!r} object is not an iterable')
# XXX In Py3.7: register cannot take typing.AsyncIterator
@aiter.register(collections.abc.AsyncIterable)
def _aiter_async(it: AsyncIterable) -> AsyncIterator:
return it.__aiter__()
# XXX In Py3.7: register cannot take typing.Iterable
@aiter.register(collections.abc.Iterable)
def _aiter_iter(it: Iterable) -> AsyncIterator:
return AsyncIterWrapper(iter(it)).__aiter__()
[docs]async def anext(it: AsyncIterator, *default: Any) -> Any:
"""Get next value from async iterator, or `default` if empty.
Raises:
:exc:`StopAsyncIteration`: if default is not defined and
the async iterator is fully consumed.
"""
if default:
try:
return await it.__anext__()
except StopAsyncIteration:
return default[0]
return await it.__anext__()
class _ARangeIterator(AsyncIterator[int]):
def __init__(self, parent: 'arange', it: Iterator[int]) -> None:
self.parent = arange
self.it = it
def __aiter__(self) -> AsyncIterator[int]:
return self
async def __anext__(self) -> int:
try:
return next(self.it)
except StopIteration:
raise StopAsyncIteration()
[docs]class arange(AsyncIterable[int]):
"""Async generator that counts like :class:`range`."""
def __init__(self, *slice_args: int, **slice_kwargs: Any) -> None:
s = slice(*slice_args, **slice_kwargs)
self.start = s.start or 0
self.stop = s.stop
self.step = s.step or 1
self._range = range(self.start, self.stop, self.step)
[docs] def count(self, n: int) -> int:
return self._range.count(n)
[docs] def index(self, n: int) -> int:
return self._range.index(n)
def __contains__(self, n: int) -> bool:
return n in self._range
def __aiter__(self) -> AsyncIterator[int]:
return _ARangeIterator(self, iter(self._range))
[docs]async def alist(ait):
"""Convert async generator to list."""
return [x async for x in ait]
[docs]async def aslice(ait, *slice_args):
"""Extract slice from async generator."""
s = slice(*slice_args)
start = s.start or 0
stop = s.stop or sys.maxsize
step = s.step or 1
it = iter(range(start, stop, step))
try:
nexti = next(it)
async for i, item in aenumerate(ait):
if i == nexti:
yield item
nexti = next(it)
except StopIteration:
return
[docs]async def chunks(it: AsyncIterable, n: int) -> AsyncIterable[List]:
"""Split an async iterator into chunks with `n` elements each.
Example:
# n == 2
>>> x = chunks(arange(10), 2)
>>> [item async for item in x]
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
# n == 3
>>> x = chunks(arange(10)), 3)
>>> [item async for item in x]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
"""
ait = aiter(it)
async for item in ait:
yield [item] + [x async for x in aslice(ait, n - 1)]