-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathtopology.py
62 lines (50 loc) · 2.31 KB
/
topology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import builtins
from infra.configsettings import ConfigManager
from aws_cdk.aws_stepfunctions import StateMachineType
from infra.storage.topology import RivSharedDataStores
from infra.userportal.functions.topology import RivUserPortalFunctionSet
from infra.userportal.states.topology import RivUserPortalStateMachines
from infra.userportal.gateway.topology import RivUserPortalGateway
from json import dumps
from infra.interfaces import IVpcRivStack
from constructs import Construct
config = ConfigManager()
class RivUserPortal(Construct):
def __init__(self, scope: Construct, id: builtins.str, riv_stack:IVpcRivStack, sharedStorage, subnet_group_name:str='Default') -> None:
super().__init__(scope, id)
if config.use_isolated_subnets:
'''
Declare any VPC endpoints required by this construct.
'''
riv_stack.networking.endpoints.add_lambda_support()
riv_stack.networking.endpoints.add_apigateway_support()
riv_stack.networking.endpoints.add_rekognition_support()
'''
Declare the function set that powers the backend
'''
self.functions = RivUserPortalFunctionSet(self,'Functions',
riv_stack=riv_stack,
subnet_group_name=subnet_group_name,
sharedStorage=sharedStorage)
'''
Create an Amazon API Gateway and register Step Function Express integrations.
'''
self.api_gateway = RivUserPortalGateway(self,'Gateway', riv_stack=riv_stack)
self.state_machines = RivUserPortalStateMachines(self,'States',
riv_stack=riv_stack,
functions=self.functions,
state_machine_type= StateMachineType.EXPRESS)
self.api_gateway.bind_state_machines(self.state_machines)
self.api_gateway.bind_reset_user(self.functions)
self.api_gateway.bind_start_liveness_session(self.functions)
self.api_gateway.bind_liveness_session_result(self.functions)
self.api_gateway.bind_check_userid(self.functions)
self.api_gateway.bind_extract_id_card(self.functions)
if config.use_debug_state:
'''
Create Standard Stepfunctions to simplify developer troubleshooting.
'''
self.debug_state_machines = RivUserPortalStateMachines(self,'DebugStates',
riv_stack=riv_stack,
functions=self.functions,
state_machine_type= StateMachineType.STANDARD)