Skip to content

Commit 326ac0c

Browse files
committed
refactor code for webhook listener
1 parent b6b5eb6 commit 326ac0c

File tree

8 files changed

+798
-347
lines changed

8 files changed

+798
-347
lines changed

libs/aries-basic-controller/aries_basic_controller/aries_controller.py

Lines changed: 32 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from .controllers.action_menu import ActionMenuController
2424
from .controllers.revocation import RevocationController
2525

26+
from .aries_webhook_listener import AriesWebhookListener
27+
2628
import logging
2729

2830
logger = logging.getLogger("aries_controller")
@@ -48,7 +50,7 @@ class AriesAgentController:
4850
Specify whether to create connecitons (default is True)
4951
messaging : bool
5052
Initialise the messaging interface (default is True)
51-
multitenant : bool
53+
is_multitenant : bool
5254
Initialise the multitenant interface (default is False)
5355
mediation : bool
5456
Initialise the mediation interface (default is False)
@@ -62,24 +64,26 @@ class AriesAgentController:
6264
The API key (default is None)
6365
tenant_jwt: str
6466
The tenant JW token (default is None)
67+
wallet_id : str
68+
The tenant wallet identifier
6569
"""
6670

6771
## TODO rethink how to initialise. Too many args?
6872
## is it important to let users config connections/issuer etc
69-
webhook_host: str
70-
webhook_port: int
7173
admin_url: str
74+
webhook_host: str = None
75+
webhook_port: int = None
7276
webhook_base: str = ""
7377
connections: bool = True
7478
messaging: bool = True
75-
multitenant: bool = False
79+
is_multitenant: bool = False
7680
mediation: bool = False
7781
issuer: bool = True
7882
action_menu: bool = True
7983
revocations: bool = True
8084
api_key: str = None
8185
tenant_jwt: str = None
82-
wallet_id: str = "base"
86+
wallet_id: str = None
8387

8488

8589
def __post_init__(self):
@@ -101,7 +105,9 @@ def __post_init__(self):
101105

102106

103107
self.client_session: ClientSession = ClientSession(headers=self.headers)
104-
108+
109+
self.webhook_listener: AriesWebhookListener = AriesWebhookListener(webhook_host=self.webhook_host, webhook_port=self.webhook_port, webhook_base=self.webhook_base, is_multitenant=self.is_multitenant)
110+
105111
# Instantiate controllers based on the provided attributes
106112
if self.connections:
107113
self.connections = ConnectionsController(self.admin_url, self.client_session)
@@ -115,7 +121,7 @@ def __post_init__(self):
115121
self.server = ServerController(self.admin_url, self.client_session)
116122
self.oob = OOBController(self.admin_url, self.client_session)
117123

118-
if self.multitenant:
124+
if self.is_multitenant:
119125
self.multitenant = MultitenancyController(self.admin_url, self.client_session)
120126

121127
if self.mediation:
@@ -137,17 +143,29 @@ def __post_init__(self):
137143
self.client_session
138144
)
139145

146+
def update_wallet_id(self, wallet_id: str):
147+
"""This wallet_id is used to register for webhooks specific to this sub_wallet
148+
149+
Args:
150+
----
151+
wallet_id : str
152+
The tenant wallet identifier
153+
"""
154+
self.wallet_id = wallet_id
140155

141156

142-
def update_tenant_jwt(self, tenant_jwt: str):
157+
def update_tenant_jwt(self, tenant_jwt: str, wallet_id: str):
143158
"""Update the tenant JW token attribute and the header
144159
145160
Args:
146161
----
147162
tenant_jwt : str
148163
The tenant's JW token
164+
wallet_id : str
165+
The tenant wallet identifier
149166
"""
150167
self.tenant_jwt = tenant_jwt
168+
self.update_wallet_id(wallet_id)
151169
self.headers.update({'Authorization': 'Bearer ' + tenant_jwt, 'content-type': "application/json"})
152170
self.client_session.headers.update(self.headers)
153171

@@ -210,8 +228,6 @@ def register_listeners(self, listeners, defaults=True):
210228
print(f"Register webhooks listeners failed! {exc!r} occurred.")
211229
logger.warn(f"Register webhooks listeners failed! {exc!r} occurred.")
212230

213-
214-
215231
def add_listener(self, listener):
216232
"""Subscribe to a listeners for a topic
217233
@@ -267,77 +283,15 @@ def remove_all_listeners(self, topic: str = None):
267283
pub.unsubAll(topicName=topic)
268284
except Exception as exc:
269285
print(f"Removing all webhooks listeners failed! {exc!r} occurred.")
270-
logger.warn(f"Removing all webhooks listeners failed! {exc!r} occurred.")
271-
286+
logger.warning(f"Removing all webhooks listeners failed! {exc!r} occurred.")
272287

273288

274289
async def listen_webhooks(self):
275-
"""Create a server to listen to webhooks"""
276-
try:
277-
app = web.Application()
278-
app.add_routes([web.post(self.webhook_base + "/{wallet}/topic/{topic}/", self._receive_webhook)])
279-
runner = web.AppRunner(app)
280-
await runner.setup()
281-
self.webhook_site = web.TCPSite(runner, self.webhook_host, self.webhook_port)
282-
await self.webhook_site.start()
283-
except Exception as exc:
284-
print(f"Listening webhooks failed! {exc!r} occurred.")
285-
logger.warn(f"Listening webhooks failed! {exc!r} occurred.")
286-
287-
288-
289-
async def _receive_webhook(self, request: ClientRequest):
290-
"""Helper to receive webhooks by requesting it
291-
292-
Args:
293-
----
294-
request : ClientRequest
295-
The client request to which the corresponding webhooks shall be received
296-
297-
Returns:
298-
-------
299-
Response:
300-
A response with status 200
301-
"""
302-
topic = request.match_info["topic"]
303-
wallet = request.match_info["wallet"]
304-
print("wallet", wallet)
305-
try:
306-
payload = await request.json()
307-
await self._handle_webhook(wallet, topic, payload)
308-
return web.Response(status=200)
309-
except Exception as exc:
310-
logger.warn(f"Receiving webhooks failed! {exc!r} occurred.")
311-
312-
313-
314-
async def _handle_webhook(self, wallet, topic, payload):
315-
"""Helper handling a webhook
316-
317-
Args:
318-
----
319-
topic : str
320-
The topic to handle webhooks for
321-
payload : dict
322-
A JSON-like dictionary representation of the payload
323-
"""
324-
try:
325-
pub_topic_path = f"{wallet}.{topic}"
326-
print(f"Handle Webhook - {pub_topic_path}", payload)
327-
logging.debug(f"Handle Webhook - {pub_topic_path}", payload)
328-
pub.sendMessage(pub_topic_path, payload=payload)
329-
# return web.Response(status=200)
330-
except Exception as exc:
331-
logger.warn(f"Handling webhooks failed! {exc!r} occurred when trying to handle this topic: {topic}")
332-
290+
# self.webhook_listener: AriesWebhookListener = AriesWebhookListener(webhook_host=webhook_host, webhook_port=webhook_port, webhook_base=webhook_base, is_multitenant=is_multitenant)
333291

292+
if self.webhook_listener:
293+
await self.webhook_listener.listen_webhooks()
334294

335295
async def terminate(self):
336-
"""Terminate the controller client session and webhook listeners"""
337-
try:
338-
await self.client_session.close()
339-
if self.webhook_site:
340-
await self.webhook_site.stop()
341-
except Exception as exc:
342-
print(f"Terminating webhooks listener failed! {exc!r} occurred.")
343-
logger.warn(f"Terminating webhooks listener failed! {exc!r} occurred.")
296+
await self.client_session.close()
297+
await self.webhook_listener.terminate()
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from aiohttp import (
2+
web,
3+
ClientSession,
4+
ClientRequest,
5+
)
6+
from dataclasses import dataclass
7+
from pubsub import pub
8+
import logging
9+
10+
logger = logging.getLogger("aries_webhook_listener")
11+
12+
@dataclass
13+
class AriesWebhookListener:
14+
"""The Aries Webhook Listener class
15+
16+
This class allows you to interact with Aries by exposing the aca-py API.
17+
18+
Attributes
19+
----------
20+
webhook_host : str
21+
The url of the webhook host
22+
webhook_port : int
23+
The exposed port for webhooks on the host
24+
admin_url : str
25+
The URL for the Admin API
26+
webhook_base : str
27+
The base url for webhooks (default is "")
28+
wallet_id : str
29+
The tenant wallet identifier
30+
is_multitenant : bool
31+
Initialise the multitenant interface (default is False)
32+
"""
33+
34+
35+
webhook_host: str
36+
webhook_port: int
37+
webhook_base: str = ""
38+
is_multitenant: bool = False
39+
40+
async def listen_webhooks(self):
41+
"""Create a server to listen to webhooks"""
42+
try:
43+
app = web.Application()
44+
app.add_routes([web.post(self.webhook_base + "/topic/{topic}/", self._receive_webhook)])
45+
if self.is_multitenant:
46+
app.add_routes([web.post(self.webhook_base + "/{wallet_id}/topic/{topic}/", self._receive_webhook)])
47+
runner = web.AppRunner(app)
48+
await runner.setup()
49+
self.webhook_site = web.TCPSite(runner, self.webhook_host, self.webhook_port)
50+
await self.webhook_site.start()
51+
except Exception as exc:
52+
print(f"Listening webhooks failed! {exc!r} occurred.")
53+
logger.warn(f"Listening webhooks failed! {exc!r} occurred.")
54+
55+
async def _receive_webhook(self, request: ClientRequest):
56+
"""Helper to receive webhooks by requesting it
57+
58+
Args:
59+
----
60+
request : ClientRequest
61+
The client request to which the corresponding webhooks shall be received
62+
63+
Returns:
64+
-------
65+
Response:
66+
A response with status 200
67+
"""
68+
topic = request.match_info["topic"]
69+
wallet_id = None
70+
if self.is_multitenant:
71+
wallet_id = request.match_info["wallet_id"]
72+
print("wallet", wallet_id)
73+
try:
74+
payload = await request.json()
75+
await self._handle_webhook(wallet_id, topic, payload)
76+
return web.Response(status=200)
77+
except Exception as exc:
78+
logger.warning(f"Receiving webhooks failed! {exc!r} occurred.")
79+
80+
async def _handle_webhook(self, wallet_id, topic, payload):
81+
"""Helper handling a webhook
82+
83+
Args:
84+
----
85+
wallet_id: str
86+
The identifier for the wallet the webhook is for
87+
topic : str
88+
The topic to handle webhooks for
89+
payload : dict
90+
A JSON-like dictionary representation of the payload
91+
"""
92+
try:
93+
pub_topic_path = topic
94+
if wallet_id:
95+
pub_topic_path = f"{wallet_id}.{topic}"
96+
logging.debug(f"Handle Webhook - {pub_topic_path}", payload)
97+
pub.sendMessage(pub_topic_path, payload=payload)
98+
# return web.Response(status=200)
99+
except Exception as exc:
100+
logger.warn(f"Handling webhooks failed! {exc!r} occurred when trying to handle this topic: {topic}")
101+
102+
async def terminate(self):
103+
"""Terminate the controller client session and webhook listeners"""
104+
try:
105+
if self.webhook_site:
106+
await self.webhook_site.stop()
107+
except Exception as exc:
108+
print(f"Terminating webhooks listener failed! {exc!r} occurred.")
109+
logger.warn(f"Terminating webhooks listener failed! {exc!r} occurred.")

scripts/startup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
if [[ -n "${MULTITENANT_TUTORIAL}" ]]; then
1+
If [ “$MULTITENANT_TUTORIAL== “MULTITENANT_TUTORIAL” ]; then
22
echo $MULTITENANT_TUTORIAL
33
python3 ./scripts/parse_yml_env_variables.py -c ./configuration/aries-args-multitenant.yaml;aca-py start --arg-file /tmp/agent_conf.yml
44
else

tutorials/4. Multitenancy/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ services:
9999
- AGENT_NAME=${MULTITENANT_AGENT_NAME}
100100
- ADMIN_PORT=${MULTITENANT_ADMIN_PORT}
101101
- HTTP_PORT=${MULTITENANT_HTTP_PORT}
102-
- ACAPY_WEBHOOK_URL=${MULTITENANT_WEBHOOK_URL}/base
102+
- ACAPY_WEBHOOK_URL=${MULTITENANT_WEBHOOK_URL}
103103
- AGENT_ENDPOINT=${MULTITENANT_AGENT_ENDPOINT}
104104
- WALLET_SEED=${MULTITENANT_WALLET_SEED}
105105
- WALLET_NAME=${MULTITENANT_WALLET_NAME}

tutorials/4. Multitenancy/notebooks/mediator/Configure Mediator.ipynb

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,14 @@
109109
"name": "stdout",
110110
"output_type": "stream",
111111
"text": [
112-
"base.connections\n"
112+
"Subscribing too: connections\n"
113113
]
114114
}
115115
],
116116
"source": [
117117
"\n",
118118
"loop = asyncio.get_event_loop()\n",
119-
"loop.create_task(agent_controller.listen_webhooks())\n",
119+
"loop.create_task(agent_controller.webhook_listener.listen_webhooks())\n",
120120
"\n",
121121
"def connection_handler(payload):\n",
122122
" print(\"Connection Handler Called\")\n",
@@ -154,9 +154,29 @@
154154
"name": "stdout",
155155
"output_type": "stream",
156156
"text": [
157-
"Connection ID aa9a800e-9e65-4070-98d8-4f56971d6292\n",
157+
"Connection ID a347eb87-8018-4d3e-a051-a945fcf52ff1\n",
158158
"Invitation\n",
159-
"{'@type': 'did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation', '@id': '61520ae1-80f8-473c-ae56-31472c252ceb', 'label': 'MEDIATOR', 'recipientKeys': ['EZ1hdmdntvBTMDW7SHFmQrL28hSV2U3Ctj83ERCzL34C'], 'serviceEndpoint': 'https://7976bda13d4b.ngrok.io'}\n"
159+
"{'@type': 'did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation', '@id': 'e08a7e80-d96b-43ba-9ea8-368e5e61eeb6', 'serviceEndpoint': 'https://7eceb3b5d5a4.ngrok.io', 'label': 'MEDIATOR', 'recipientKeys': ['69hmbswFgse281vL3Z7eAzv8mc6m5RwCYuTgZxhGk4GU']}\n",
160+
"wallet None\n",
161+
"Handle Webhook - connections {'connection_id': 'a347eb87-8018-4d3e-a051-a945fcf52ff1', 'accept': 'auto', 'invitation_key': '69hmbswFgse281vL3Z7eAzv8mc6m5RwCYuTgZxhGk4GU', 'created_at': '2021-03-31 17:42:19.462303Z', 'invitation_mode': 'multi', 'their_role': 'invitee', 'routing_state': 'none', 'state': 'invitation', 'updated_at': '2021-03-31 17:42:19.462303Z', 'rfc23_state': 'invitation-sent'}\n",
162+
"Connection Handler Called\n",
163+
"Connection a347eb87-8018-4d3e-a051-a945fcf52ff1 in State invitation\n",
164+
"wallet None\n",
165+
"Handle Webhook - connections {'connection_id': '49431565-9d12-42e9-8341-8699834fc785', 'accept': 'auto', 'invitation_key': '69hmbswFgse281vL3Z7eAzv8mc6m5RwCYuTgZxhGk4GU', 'created_at': '2021-03-31 17:43:17.833726Z', 'invitation_mode': 'once', 'their_role': 'invitee', 'routing_state': 'none', 'state': 'invitation', 'updated_at': '2021-03-31 17:43:17.833726Z', 'my_did': 'ArH29nQAn4GYa3RCTLJehD', 'rfc23_state': 'invitation-sent'}\n",
166+
"Connection Handler Called\n",
167+
"Connection 49431565-9d12-42e9-8341-8699834fc785 in State invitation\n",
168+
"wallet None\n",
169+
"Handle Webhook - connections {'connection_id': '49431565-9d12-42e9-8341-8699834fc785', 'accept': 'auto', 'invitation_key': '69hmbswFgse281vL3Z7eAzv8mc6m5RwCYuTgZxhGk4GU', 'created_at': '2021-03-31 17:43:17.833726Z', 'their_label': 'Alice', 'invitation_mode': 'once', 'their_role': 'invitee', 'routing_state': 'none', 'state': 'request', 'updated_at': '2021-03-31 17:43:17.851433Z', 'their_did': '9RUfRQwTJ68tZeuE38HXRj', 'my_did': 'ArH29nQAn4GYa3RCTLJehD', 'rfc23_state': 'request-received'}\n",
170+
"Connection Handler Called\n",
171+
"Connection 49431565-9d12-42e9-8341-8699834fc785 in State request\n",
172+
"wallet None\n",
173+
"Handle Webhook - connections {'connection_id': '49431565-9d12-42e9-8341-8699834fc785', 'accept': 'auto', 'invitation_key': '69hmbswFgse281vL3Z7eAzv8mc6m5RwCYuTgZxhGk4GU', 'created_at': '2021-03-31 17:43:17.833726Z', 'their_label': 'Alice', 'invitation_mode': 'once', 'their_role': 'invitee', 'routing_state': 'none', 'state': 'response', 'updated_at': '2021-03-31 17:43:17.866994Z', 'their_did': '9RUfRQwTJ68tZeuE38HXRj', 'my_did': 'ArH29nQAn4GYa3RCTLJehD', 'rfc23_state': 'response-sent'}\n",
174+
"Connection Handler Called\n",
175+
"Connection 49431565-9d12-42e9-8341-8699834fc785 in State response\n",
176+
"wallet None\n",
177+
"Handle Webhook - connections {'connection_id': '49431565-9d12-42e9-8341-8699834fc785', 'accept': 'auto', 'invitation_key': '69hmbswFgse281vL3Z7eAzv8mc6m5RwCYuTgZxhGk4GU', 'created_at': '2021-03-31 17:43:17.833726Z', 'their_label': 'Alice', 'invitation_mode': 'once', 'their_role': 'invitee', 'routing_state': 'none', 'state': 'active', 'updated_at': '2021-03-31 17:43:18.488996Z', 'their_did': '9RUfRQwTJ68tZeuE38HXRj', 'my_did': 'ArH29nQAn4GYa3RCTLJehD', 'rfc23_state': 'completed'}\n",
178+
"Connection Handler Called\n",
179+
"Connection 49431565-9d12-42e9-8341-8699834fc785 in State active\n"
160180
]
161181
}
162182
],
@@ -202,7 +222,7 @@
202222
},
203223
{
204224
"cell_type": "code",
205-
"execution_count": 7,
225+
"execution_count": 6,
206226
"metadata": {
207227
"pycharm": {
208228
"name": "#%%\n"
@@ -214,8 +234,8 @@
214234
"output_type": "stream",
215235
"text": [
216236
"Mediation Record\n",
217-
"connection_id ab02b489-033d-4378-8202-ad31751d64f0\n",
218-
"State granted\n"
237+
"connection_id 49431565-9d12-42e9-8341-8699834fc785\n",
238+
"State request\n"
219239
]
220240
}
221241
],

tutorials/4. Multitenancy/notebooks/mediator/Untitled.ipynb

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)