Skip to content

Commit

Permalink
Turn until_not_none() into until_true()
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiSakuma committed Nov 5, 2024
1 parent be98a64 commit 21cad50
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 53 deletions.
4 changes: 2 additions & 2 deletions nextline/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
'ThreadTaskIdComposer',
'Timer',
'UntilNotNoneTimeout',
'until_not_none',
'until_true',
'is_timezone_aware',
'utc_timestamp',
]
Expand All @@ -47,5 +47,5 @@
from .thread_exception import ExcThread
from .thread_task_id import ThreadTaskIdComposer
from .timer import Timer
from .until import UntilNotNoneTimeout, until_not_none
from .until import UntilNotNoneTimeout, until_true
from .utc import is_timezone_aware, utc_timestamp
98 changes: 66 additions & 32 deletions nextline/utils/until.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,104 @@
import asyncio
from collections.abc import Awaitable, Callable
from typing import Optional, TypeVar

T = TypeVar('T')
from inspect import isawaitable
from typing import Optional


class UntilNotNoneTimeout(Exception):
pass


async def until_not_none(
func: Callable[[], Awaitable[T | None]],
async def until_true(
func: Callable[[], bool] | Callable[[], Awaitable[bool]],
/,
*,
timeout: Optional[float] = None,
interval: float = 0,
) -> T:
'''Return the first non-None value from `func`.
) -> None:
'''Return when `func` returns `True` or a truthy value.
Parameters:
-----------
func
A callable that returns either a boolean or an awaitable that returns a
boolean.
timeout
The maximum number of seconds to wait for `func` to return `True`.
If `None`, wait indefinitely.
interval
The number of seconds to wait before checking `func` again.
Examples
--------
Define a function `func` that returns a non-None value after having
returned `None` twice:
The `func` returns `True` when the third time it is called:
>>> async def gen():
... yield None
... yield None
... yield 42
>>> def gen():
... print('Once')
... yield False
... print('Twice')
... yield False
... print('Thrice')
... yield True
... print('Never reached')
>>> g = gen()
>>> func = g.__anext__
The first non-None value 42 will be returned:
>>> asyncio.run(until_not_none(g.__anext__))
42
An exception will be raised if `timeout` has passed before a non-None value
is returned:
>>> func = g.__next__
>>> asyncio.run(until_true(func))
Once
Twice
Thrice
The `afunc` is an async version of `func`:
>>> async def agen():
... print('Once')
... yield False
... print('Twice')
... yield False
... print('Thrice')
... yield True
... print('Never reached')
>>> g = agen()
>>> afunc = g.__anext__
>>> asyncio.run(until_true(afunc))
Once
Twice
Thrice
An exception will be raised if `timeout` has passed before `True` is
returned:
>>> async def gen_none():
... while True:
... yield None
... yield False
>>> g = gen_none()
>>> func = g.__anext__
>>> afunc = g.__anext__
>>> asyncio.run(until_not_none(func, timeout=0.001)) # doctest: +IGNORE_EXCEPTION_DETAIL
>>> asyncio.run(until_true(afunc, timeout=0.001)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
UntilNotNoneTimeout: Timed out after 0.001 seconds.
'''

async def _until_not_none() -> T:
while (ret := await func()) is None:
async def call_func() -> bool:
maybe_awaitable = func()
if isawaitable(maybe_awaitable):
return await maybe_awaitable
return maybe_awaitable

async def _until_true() -> None:
while not await call_func():
await asyncio.sleep(interval)
return ret
return

# NOTE: For Python 3.11+, `asyncio.timeout` can be used.

try:
return await asyncio.wait_for(_until_not_none(), timeout)
return await asyncio.wait_for(_until_true(), timeout)
except asyncio.TimeoutError:
raise UntilNotNoneTimeout(
f'Timed out after {timeout} seconds. '
Expand Down
76 changes: 57 additions & 19 deletions tests/utils/test_until.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,71 @@
import asyncio
from collections.abc import AsyncIterator
from typing import NoReturn
from collections.abc import Awaitable, Callable
from inspect import iscoroutinefunction
from typing import NoReturn, TypeGuard, cast
from unittest.mock import Mock

import pytest
from hypothesis import given
from hypothesis import strategies as st

from nextline.utils import UntilNotNoneTimeout, to_aiter, until_not_none
from nextline.utils import UntilNotNoneTimeout, until_true


@given(st.data())
async def test_return(data: st.DataObject) -> None:
val = data.draw(st.text())
nones = data.draw(st.lists(st.none(), max_size=10))
ret = nones + [val]
func = to_aiter(ret).__anext__
assert (await until_not_none(func)) == val
def func_factory(
counts: int, sync: bool = False
) -> Callable[[], bool] | Callable[[], Awaitable[bool]]:
assert counts

def func() -> bool:
nonlocal counts
counts -= 1
return counts == 0

async def afunc() -> bool:
return func()

return func if sync else afunc


def is_async_func(
f: Callable[[], bool] | Callable[[], Awaitable[bool]],
) -> TypeGuard[Callable[[], Awaitable[bool]]]:
return iscoroutinefunction(f)


@given(counts=st.integers(min_value=1, max_value=10))
def test_func_factory_sync(counts: int) -> None:
func = func_factory(counts, sync=True)
for _ in range(counts - 1):
assert not func()
assert func()


@given(counts=st.integers(min_value=1, max_value=10))
async def test_func_factory_async(counts: int) -> None:
func = func_factory(counts, sync=False)
assert is_async_func(func)
for _ in range(counts - 1):
assert not await func()
assert await func()


@given(counts=st.integers(min_value=1, max_value=10), sync=st.booleans())
async def test_counts(counts: int, sync: bool) -> None:
wrapped = func_factory(counts, sync=sync)
func = Mock(wraps=wrapped)
await until_true(func)
assert func.call_count == counts

async def test_timeout() -> None:
async def gen_none() -> AsyncIterator[None]:
while True:
await asyncio.sleep(0)
yield None

g = gen_none()
func = g.__anext__
@given(sync=st.booleans())
async def test_timeout(sync: bool) -> None:
counts = cast(int, float('inf'))
assert counts == counts - 1
wrapped = func_factory(counts, sync=sync)
func = Mock(wraps=wrapped)
with pytest.raises(UntilNotNoneTimeout):
await until_not_none(func, timeout=0.001)
await until_true(func, timeout=0.001)


@pytest.mark.timeout(5)
Expand All @@ -37,4 +75,4 @@ async def func() -> NoReturn:
await asyncio.sleep(0)

with pytest.raises(UntilNotNoneTimeout):
await until_not_none(func, timeout=0.001)
await until_true(func, timeout=0.001)

0 comments on commit 21cad50

Please sign in to comment.