-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_example.py
More file actions
76 lines (64 loc) · 2.77 KB
/
basic_example.py
File metadata and controls
76 lines (64 loc) · 2.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio
import random
from typing import Optional, Tuple
from dotenv import load_dotenv
from fsmworkflow import DBConfig, WorkflowContext, WorkflowDefinition, WorkflowStep, PersistentWorkflowStateMachine, Transition, Condition
async def task1(context: WorkflowContext) -> Tuple[WorkflowContext, Optional[str]]:
context.data['task1_result'] = 'completed'
return context, None
async def task2(context: WorkflowContext) -> Tuple[WorkflowContext, Optional[str]]:
result = random.choice(['success', 'failure'])
context.data['task2_result'] = result
return context, None
async def task3a(context: WorkflowContext) -> Tuple[WorkflowContext, Optional[str]]:
context.data['task3_result'] = 'success_path_completed'
return context, None
async def task3b(context: WorkflowContext) -> Tuple[WorkflowContext, Optional[str]]:
context.data['task3_result'] = 'failure_path_completed'
return context, None
async def task4(context: WorkflowContext) -> Tuple[WorkflowContext, Optional[str]]:
context.data['task4_result'] = 'completed'
return context, None
async def main() -> None:
load_dotenv()
db_config = DBConfig()
workflow_definition = WorkflowDefinition(
name="Generic Workflow",
steps=["task1", "task2", "task3a", "task3b", "task4"],
transitions=[
Transition(trigger="next", source="task1", dest="task2"),
Transition(
trigger="success",
source="task2",
dest="task3a",
conditions=[Condition(field="task2_result", operator="eq", value="success")]
),
Transition(
trigger="failure",
source="task2",
dest="task3b",
conditions=[Condition(field="task2_result", operator="eq", value="failure")]
),
Transition(trigger="next", source="task3a", dest="task4"),
Transition(trigger="next", source="task3b", dest="task4"),
Transition(trigger="complete", source="task4", dest=None),
]
)
workflow = PersistentWorkflowStateMachine(db_config, workflow_definition)
workflow.steps = {
"task1": WorkflowStep(name="task1", execute=task1),
"task2": WorkflowStep(name="task2", execute=task2),
"task3a": WorkflowStep(name="task3a", execute=task3a),
"task3b": WorkflowStep(name="task3b", execute=task3b),
"task4": WorkflowStep(name="task4", execute=task4),
}
try:
await workflow.run_workflow()
print("Workflow completed successfully.")
except Exception as e:
print(f"An error occurred while running the workflow: {str(e)}")
finally:
if db_config.pool:
await db_config.pool.close()
if __name__ == "__main__":
asyncio.run(main())