-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add note about lifespans in multi-thread environments #277
base: main
Are you sure you want to change the base?
Conversation
asgi.rst states that:
however most asgi apps seem to assume that only one lifespan task will be in progress per process eg: https://fastapi.tiangolo.com/advanced/async-sql-databases/ asgi frameworks should use a RunVar to make sure they create and teardown async resources per lifespan not per process |
It's also possible to run two eventloops in one thread. Eg two asyncio eventloops using alternating .run_until_complete calls. or trio guest mode. A RunVar will also handle this scenario |
Alternatively asgiref should specify that once a web server starts a lifespan task it MUST NOT call the asgi app from any other thread or event loop until the lifespan task terminates |
Hm, I had not envisioned that lifespan would be once-per-thread - I think that confuses things unless you're thinking only in terms of a given set of fixed threads spun up at the start of the process, as opposed to launching them dynamically. What's the problem you're trying to solve here, exactly? Lifespan events are meant to be per-process for very basic startup tasks, so it would seem that saying they should be thread-pinned is a reasonable response. |
I'm using lifespan to manage a trio Nursery via anyio. This doc change would mean I and quart-trio would need to use a RunVar to store the Nursery in |
Unfortunately I'm not super familiar with trio, so if you could explain the impact of both the options on the code you're writing that would be great - what are the side effects, what would you prefer, etc? |
Sure I'll see what I can do |
consider an asgi app and threaded server: from __future__ import annotations
import sys
import asyncio
import anyio.abc
import traceback
import concurrent.futures
async def background_async_fn():
print("processing")
await anyio.sleep(1)
print("done")
class App:
task_group: anyio.abc.TaskGroup
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] in ("http", "lifespan")
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
return
await self.http(scope, receive, send)
return
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
app = scope.get("app")
try:
async with anyio.create_task_group() as app.task_group:
await send({"type": "lifespan.startup.complete"})
await receive()
except BaseException:
exc_text = traceback.format_exc()
await send({"type": "lifespan.shutdown.failed", "message": exc_text})
else:
await send({"type": "lifespan.shutdown.complete"})
async def http(self, scope: Scope, receive: Receive, send: Send) -> None:
self.task_group.start_soon(background_async_fn)
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})
async def lifespan(started, done, app):
async def receive():
return await done.wait()
async def send(event, *args):
started.set()
print(event.get("message"))
await app({"type": "lifespan", "app": app}, receive, send)
async def serve(app):
done = asyncio.Event()
started = asyncio.Event()
task = asyncio.create_task(lifespan(started, done, app))
async def receive():
return {"type": "http.disconnect"}
async def send(*args):
pass
await started.wait()
await app({"type": "http", "app": app}, receive, send)
done.set()
await task
def asgi_tpe_server(app):
with concurrent.futures.ThreadPoolExecutor(5) as tpe:
futs = [tpe.submit(asyncio.run, serve(app)) for _ in range(5)]
for v in concurrent.futures.as_completed(futs):
print(v.result())
def main():
app = App()
asgi_tpe_server(app)
return 0
if __name__ == "__main__":
sys.exit(main()) When I run it I get:
|
however I can fix the app by using an anyio.lowlevel.RunVar to store the task_group in: from __future__ import annotations
import sys
import asyncio
import anyio.abc
import anyio.lowlevel
import traceback
import concurrent.futures
async def background_async_fn():
print("processing")
await anyio.sleep(1)
print("done")
class App:
def __init__(self):
self.task_group_runvar = anyio.lowlevel.RunVar[anyio.abc.TaskGroup]("task_group")
@property
def task_group(self):
return self.task_group_runvar.get()
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] in ("http", "lifespan")
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
return
await self.http(scope, receive, send)
return
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
app = scope.get("app")
try:
async with anyio.create_task_group() as tg:
app.task_group_runvar.set(tg)
await send({"type": "lifespan.startup.complete"})
await receive()
except BaseException:
exc_text = traceback.format_exc()
await send({"type": "lifespan.shutdown.failed", "message": exc_text})
else:
await send({"type": "lifespan.shutdown.complete"})
async def http(self, scope: Scope, receive: Receive, send: Send) -> None:
self.task_group.start_soon(background_async_fn)
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})
async def lifespan(started, done, app):
async def receive():
return await done.wait()
async def send(event, *args):
started.set()
print(event.get("message"))
await app({"type": "lifespan", "app": app}, receive, send)
async def serve(app):
done = asyncio.Event()
started = asyncio.Event()
task = asyncio.create_task(lifespan(started, done, app))
async def receive():
return {"type": "http.disconnect"}
async def send(*args):
pass
await started.wait()
await app({"type": "http", "app": app}, receive, send)
done.set()
await task
def asgi_tpe_server(app):
with concurrent.futures.ThreadPoolExecutor(5) as tpe:
futs = [tpe.submit(asyncio.run, serve(app)) for _ in range(5)]
for v in concurrent.futures.as_completed(futs):
print(v.result())
def main():
app = App()
asgi_tpe_server(app)
return 0
if __name__ == "__main__":
sys.exit(main())
``` |
|
Coming from #322 (comment) I think one important guarantee is that the lifespan event and any requests that are processed are in the same event loop. If the server spins up multiple threads or multiple event loops, I think it should run the lifespan events once per event loop. #322 would help because it provides a clear way for the application to store state without caring about the execution model of the ASGI server. @graingert please correct me if I'm wrong, but if we said that lifespans must be run once per thread and that the ASGI server can spin up multiple event loops per thread, wouldn't that result in having some async resources initialized in a one event loop and then used in another? I think that would break things. So to be safe the guarantee would need to be that the lifespan and requests are run in the same event loop. |
@andrewgodwin are there to your knowledge any ASGI servers that run lifespans in different threads than requests, or even in different event loops? |
I don't have much insight into the innards of the ASGI servers around at the moment - I don't remember any that do strange things with multiple event loops, but I can't rule it out. |
I checked and both Uvicorn and Hypercorn use a single event loop. Daphne doesn't support lifespans at all but it looks like it does use multiple threads and dispatches to them. Maybe something like "ASGI servers supporting lifespans should run the lifespan event once per event loop" makes sense? Passing event loops between threads is dubious at best, so this would also mean "lifespan events should be run for each thread" (the current change this PR is proposing). |
From discussion today with @pgjones, if we take this just one step further and say "requests must be handled as child tasks/child contexts of the lifespan task/context" we'd also make sure context vars work correctly across lifespans/requests (they only currently work correctly when using hypercorn with trio, they don't work in Uvicorn), which solves #322 |
Does he recently merged #338 resolve this @graingert ? |
Yeah #338 looks great. I'd like to see an example in the docs though, eg: database = Database()
@contextlib.asynccontextmanager
async def lifespan_context(app):
async with database: yield
app = App(lifespan_context=lifespan_context)
app.database = database Good: @contextlib.asynccontextmanager
async def lifespan_context(app):
async with Database() as database:
app.database = database
yield
def create_app():
return App(lifespan_context=lifespan_context) |
I'd like to understand a bit more what's "bad" about the first version. Things I'm spotting:
I think the second example "solves" the first issue certainly because the object is created inside of an event loop. It maybe solves the second issue as long as the server calls Did I get this right? |
No description provided.