-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathuser_feed.py
38 lines (27 loc) · 1.67 KB
/
user_feed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from datetime import timedelta
import restate
from restate import VirtualObject, ObjectContext
from utils import create_post, get_post_status, update_user_feed, SocialMediaPost, Status
# Processing events (from Kafka) to update various downstream systems
# - Journaling actions in Restate and driving retries from Restate, recovering
# partial progress
# - Preserving the order-per-key, but otherwise allowing high-fanout, because
# processing of events does not block other events.
# - Ability to delay events when the downstream systems are busy, without blocking
# entire partitions.
user_feed = VirtualObject("UserFeed")
# The Kafka key routes events to the correct Virtual Object.
# Events with the same key are processed one after the other.
@user_feed.handler("processPost")
async def process_post(ctx: ObjectContext, post: SocialMediaPost):
user_id = ctx.key()
# event handler is a durably executed function that can use all the features of Restate
post_id = await ctx.run("profile update", lambda: create_post(user_id, post))
# Delay processing until content moderation is complete (handler suspends when on FaaS).
# This only blocks other posts for this user (Virtual Object), not for other users.
while await ctx.run("post status", lambda: get_post_status(post_id)) == Status.PENDING:
await ctx.sleep(timedelta(seconds=5))
await ctx.run("update feed", lambda: update_user_feed(user_id, post_id))
app = restate.app(services=[user_feed])
# Process new posts for users via Kafka or by calling the endpoint over HTTP: curl
# localhost:8080/userFeed/userid1/processPost --json '{"content": "Hi! This is my first post!", "metadata": "public"}'