Skip to content

Commit

Permalink
refactor: update python workflow example with daprWorkflowClient (#1101)
Browse files Browse the repository at this point in the history
Signed-off-by: Eileen Yu <eileenylj@gmail.com>
  • Loading branch information
Eileen-Yu authored Nov 29, 2024
1 parent 9e9cb5e commit c32f421
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 47 deletions.
2 changes: 1 addition & 1 deletion workflows/python/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ cd ..
name: Running this example
expected_stdout_lines:
- "There are now 90 cars left in stock"
- "Workflow completed! Result: Completed"
- "Workflow completed!"
output_match_mode: substring
background: true
timeout_seconds: 120
Expand Down
72 changes: 33 additions & 39 deletions workflows/python/sdk/order-processor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,48 @@

from dapr.clients import DaprClient
from dapr.conf import settings
from dapr.ext.workflow import WorkflowRuntime
from dapr.ext.workflow import DaprWorkflowClient, WorkflowStatus

from workflow import order_processing_workflow, notify_activity, process_payment_activity, \
verify_inventory_activity, update_inventory_activity, requst_approval_activity
from workflow import wfr, order_processing_workflow
from model import InventoryItem, OrderPayload

store_name = "statestore"
workflow_component = "dapr"
workflow_name = "order_processing_workflow"
default_item_name = "cars"

class WorkflowConsoleApp:
def main(self):
print("*** Welcome to the Dapr Workflow console app sample!", flush=True)
print("*** Using this app, you can place orders that start workflows.", flush=True)

wfr.start()
# Wait for the sidecar to become available
sleep(5)

workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
workflowRuntime.register_workflow(order_processing_workflow)
workflowRuntime.register_activity(notify_activity)
workflowRuntime.register_activity(requst_approval_activity)
workflowRuntime.register_activity(verify_inventory_activity)
workflowRuntime.register_activity(process_payment_activity)
workflowRuntime.register_activity(update_inventory_activity)
workflowRuntime.start()
wfClient = DaprWorkflowClient()

baseInventory = {
"paperclip": InventoryItem("Paperclip", 5, 100),
"cars": InventoryItem("Cars", 15000, 100),
"computers": InventoryItem("Computers", 500, 100),
}

daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
baseInventory = {}
baseInventory["paperclip"] = InventoryItem("Paperclip", 5, 100)
baseInventory["cars"] = InventoryItem("Cars", 15000, 100)
baseInventory["computers"] = InventoryItem("Computers", 500, 100)

daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
self.restock_inventory(daprClient, baseInventory)

print("==========Begin the purchase of item:==========", flush=True)
item_name = default_item_name
order_quantity = 10

total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)

print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
start_resp = daprClient.start_workflow(workflow_component=workflow_component,
workflow_name=workflow_name,
input=order)
_id = start_resp.instance_id
instance_id = wfClient.schedule_new_workflow(
workflow=order_processing_workflow, input=order.to_json())
_id = instance_id

def prompt_for_approval(daprClient: DaprClient):
def prompt_for_approval(wfClient: DaprWorkflowClient):
"""This is a helper function to prompt for approval.
Not using the prompt here ACTUALLY, as quickstart validation is required to be automated.
Expand All @@ -65,9 +59,9 @@ def prompt_for_approval(daprClient: DaprClient):
if state.runtime_status.name == "COMPLETED":
return
if approved.lower() == "y":
client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})
wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})
else:
client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False})
wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False})
## Additionally, you would need to import signal and define timeout_error:
# import signal
Expand All @@ -76,32 +70,32 @@ def prompt_for_approval(daprClient: DaprClient):
# signal.signal(signal.SIGALRM, timeout_error)
"""
daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
event_name="manager_approval", event_data={'approval': True})
wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})

approval_seeked = False
start_time = datetime.now()
while True:
time_delta = datetime.now() - start_time
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
state = wfClient.get_workflow_state(instance_id=_id)

if not state:
print("Workflow not found!") # not expected
elif state.runtime_status == "Completed" or\
state.runtime_status == "Failed" or\
state.runtime_status == "Terminated":
break

if state.runtime_status in {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.TERMINATED}:
print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
break


if time_delta.total_seconds() >= 10:
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
if total_cost > 50000 and (
state.runtime_status != "Completed" or
state.runtime_status != "Failed" or
state.runtime_status != "Terminated"
) and not approval_seeked:
state = wfClient.get_workflow_state(instance_id=_id)
if total_cost > 50000 and state not in {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.TERMINATED} and not approval_seeked:
approval_seeked = True
threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()
threading.Thread(target=prompt_for_approval(wfClient), daemon=True).start()

wfr.shutdown()


print("Purchase of item is ", state.runtime_status, flush=True)

def restock_inventory(self, daprClient: DaprClient, baseInventory):
for key, item in baseInventory.items():
Expand Down
21 changes: 14 additions & 7 deletions workflows/python/sdk/order-processor/workflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@

from datetime import timedelta
import logging
import json

from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, when_any
from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, WorkflowRuntime, when_any
from dapr.clients import DaprClient
from dapr.conf import settings

Expand All @@ -12,10 +11,13 @@

store_name = "statestore"

wfr = WorkflowRuntime()

logging.basicConfig(level=logging.INFO)


def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload):
@wfr.workflow(name="order_processing_workflow")
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str):
"""Defines the order processing workflow.
When the order is received, the inventory is checked to see if there is enough inventory to
fulfill the order. If there is enough inventory, the payment is processed and the inventory is
Expand All @@ -39,7 +41,7 @@ def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: Order
return OrderResult(processed=False)

if order_payload["total_cost"] > 50000:
yield ctx.call_activity(requst_approval_activity, input=order_payload)
yield ctx.call_activity(request_approval_activity, input=order_payload)
approval_task = ctx.wait_for_external_event("manager_approval")
timeout_event = ctx.create_timer(timedelta(seconds=200))
winner = yield when_any([approval_task, timeout_event])
Expand Down Expand Up @@ -76,15 +78,15 @@ def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: Order
message=f'Order {order_id} has completed!'))
return OrderResult(processed=True)


@wfr.activity(name="notify_activity")
def notify_activity(ctx: WorkflowActivityContext, input: Notification):
"""Defines Notify Activity. This is used by the workflow to send out a notification"""
# Create a logger
logger = logging.getLogger('NotifyActivity')
logger.info(input.message)



@wfr.activity(name="process_payment_activity")
def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest):
"""Defines Process Payment Activity.This is used by the workflow to process a payment"""
logger = logging.getLogger('ProcessPaymentActivity')
Expand All @@ -94,6 +96,7 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest
logger.info(f'Payment for request ID {input.request_id} processed successfully')


@wfr.activity(name="verify_inventory_activity")
def verify_inventory_activity(ctx: WorkflowActivityContext,
input: InventoryRequest) -> InventoryResult:
"""Defines Verify Inventory Activity. This is used by the workflow to verify if inventory
Expand All @@ -117,6 +120,8 @@ def verify_inventory_activity(ctx: WorkflowActivityContext,
return InventoryResult(False, None)



@wfr.activity(name="update_inventory_activity")
def update_inventory_activity(ctx: WorkflowActivityContext,
input: PaymentRequest) -> InventoryResult:
"""Defines Update Inventory Activity. This is used by the workflow to check if inventory
Expand All @@ -139,7 +144,9 @@ def update_inventory_activity(ctx: WorkflowActivityContext,
logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock')


def requst_approval_activity(ctx: WorkflowActivityContext,

@wfr.activity(name="request_approval_activity")
def request_approval_activity(ctx: WorkflowActivityContext,
input: OrderPayload):
"""Defines Request Approval Activity. This is used by the workflow to request approval
for payment of an order. This activity is used only if the order total cost is greater than
Expand Down

0 comments on commit c32f421

Please sign in to comment.