Skip to content

Commit b222dc9

Browse files
authored
Merge pull request #62 from OpenMined/feature/add_listener
Feature/add listener
2 parents 8668cd2 + b62dcb4 commit b222dc9

File tree

4 files changed

+151
-14
lines changed

4 files changed

+151
-14
lines changed

.gitignore

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,4 @@ libs/om-aries-controller/demo/bob/image_received.png
1414
# exception to the rule
1515
!libs/om-aries-controller/demo/bob/received_files/.gitkeep
1616

17-
**/model.pt
18-
**/trained_model.pt
19-
**/untrained_model.pt
20-
**/part_trained.pt
17+
**/*.pt

libs/aries-basic-controller/aries_basic_controller/aries_controller.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,21 @@ def register_listeners(self, listeners, defaults=True):
9393
pub.subscribe(self.proofs.default_handler, "present_proof")
9494

9595
for listener in listeners:
96-
pub.subscribe(listener["handler"], listener["topic"])
96+
self.add_listener(listener)
97+
98+
def add_listener(self, listener):
99+
pub.subscribe(listener["handler"], listener["topic"])
100+
101+
def remove_listener(self, listener):
102+
if pub.isSubscribed(listener["handler"], listener["topic"]):
103+
pub.unsubscribe(listener["handler"], listener["topic"])
104+
else:
105+
logger.debug("Listener not subscribed", listener)
106+
107+
def remove_all_listeners(self, topic: str = None):
108+
# Note advanced use of function can include both listenerFilter and topicFilter for this
109+
# Add when needed
110+
pub.unsubAll(topicName=topic)
97111

98112
async def listen_webhooks(self):
99113
app = web.Application()
@@ -106,13 +120,13 @@ async def listen_webhooks(self):
106120
async def _receive_webhook(self, request: ClientRequest):
107121
topic = request.match_info["topic"]
108122
payload = await request.json()
109-
await self.handle_webhook(topic, payload)
123+
await self._handle_webhook(topic, payload)
110124
return web.Response(status=200)
111125

112-
async def handle_webhook(self, topic, payload):
126+
async def _handle_webhook(self, topic, payload):
113127
logging.debug(f"Handle Webhook - {topic}", payload)
114128
pub.sendMessage(topic, payload=payload)
115-
return web.Response(status=200)
129+
# return web.Response(status=200)
116130

117131
async def terminate(self):
118132
await self.client_session.close()

tutorials/aries-basic-controller/notebooks/alice/Part 2 - Aries Basic Controller.ipynb

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,85 @@
227227
"agent_controller.register_listeners([some_listener], defaults=True)"
228228
]
229229
},
230+
{
231+
"cell_type": "markdown",
232+
"metadata": {},
233+
"source": [
234+
"### You can also add listeners one at a time.\n",
235+
"\n",
236+
"And add more than one listener for the same topic. This is useful in larger applications where multiple processes need to react to a trigger from the agent.\n",
237+
"\n",
238+
"![PyPubSub figure](https://pypubsub.readthedocs.io/en/v4.0.3/_images/pubsub_concept.png)"
239+
]
240+
},
241+
{
242+
"cell_type": "code",
243+
"execution_count": 1,
244+
"metadata": {},
245+
"outputs": [
246+
{
247+
"ename": "NameError",
248+
"evalue": "name 'agent_controller' is not defined",
249+
"output_type": "error",
250+
"traceback": [
251+
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
252+
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
253+
"\u001b[0;32m<ipython-input-1-e5ea8d68770d>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 10\u001b[0m }\n\u001b[1;32m 11\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 12\u001b[0;31m \u001b[0magent_controller\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd_listener\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msome_listener2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
254+
"\u001b[0;31mNameError\u001b[0m: name 'agent_controller' is not defined"
255+
]
256+
}
257+
],
258+
"source": [
259+
"def some_handler2(payload):\n",
260+
"\n",
261+
" print(\"listener 2\")\n",
262+
" # Some other custom function\n",
263+
"\n",
264+
" \n",
265+
"some_listener2 = {\n",
266+
" \"handler\": some_handler2,\n",
267+
" \"topic\": \"sometopic\"\n",
268+
"}\n",
269+
"\n",
270+
"agent_controller.add_listener(some_listener2)"
271+
]
272+
},
273+
{
274+
"cell_type": "markdown",
275+
"metadata": {},
276+
"source": [
277+
"### Listeners can be removed individually"
278+
]
279+
},
280+
{
281+
"cell_type": "code",
282+
"execution_count": null,
283+
"metadata": {},
284+
"outputs": [],
285+
"source": [
286+
"agent_controller.remove_listener(some_listener2)"
287+
]
288+
},
289+
{
290+
"cell_type": "markdown",
291+
"metadata": {},
292+
"source": [
293+
"### And by topic\n",
294+
"\n",
295+
"If topic == None then all listeners will be removed."
296+
]
297+
},
298+
{
299+
"cell_type": "code",
300+
"execution_count": null,
301+
"metadata": {},
302+
"outputs": [],
303+
"source": [
304+
"topic = \"basicmessages\"\n",
305+
"agent_controller.remove_all(topic)\n",
306+
"\n"
307+
]
308+
},
230309
{
231310
"cell_type": "markdown",
232311
"metadata": {},

tutorials/aries-basic-controller/notebooks/alice/Part 3 - Establishing a Connection.ipynb

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,17 @@
4545
},
4646
{
4747
"cell_type": "code",
48-
"execution_count": null,
48+
"execution_count": 1,
4949
"metadata": {},
50-
"outputs": [],
50+
"outputs": [
51+
{
52+
"name": "stdout",
53+
"output_type": "stream",
54+
"text": [
55+
"IPython autoawait is `on`, and set to use `asyncio`\n"
56+
]
57+
}
58+
],
5159
"source": [
5260
"%autoawait\n",
5361
"import time\n",
@@ -69,7 +77,7 @@
6977
},
7078
{
7179
"cell_type": "code",
72-
"execution_count": null,
80+
"execution_count": 2,
7381
"metadata": {},
7482
"outputs": [],
7583
"source": [
@@ -104,7 +112,7 @@
104112
},
105113
{
106114
"cell_type": "code",
107-
"execution_count": null,
115+
"execution_count": 3,
108116
"metadata": {},
109117
"outputs": [],
110118
"source": [
@@ -141,9 +149,48 @@
141149
},
142150
{
143151
"cell_type": "code",
144-
"execution_count": null,
152+
"execution_count": 5,
145153
"metadata": {},
146-
"outputs": [],
154+
"outputs": [
155+
{
156+
"name": "stdout",
157+
"output_type": "stream",
158+
"text": [
159+
"Connection ID 0b42a8c8-d25f-4a2e-9ba1-d7aa74ebb9d4\n",
160+
"Invitation\n",
161+
"{'@type': 'did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation', '@id': '0f6250e8-f996-4a33-b798-9d5218cfd264', 'label': 'Alice', 'recipientKeys': ['5oBk3KC87hLHjbU14oetYAm2CRrxZTtiHituhyCCvEcs'], 'serviceEndpoint': 'http://172.17.0.1:8020'}\n",
162+
"Connection Handler Called\n",
163+
"Connection 0b42a8c8-d25f-4a2e-9ba1-d7aa74ebb9d4 in State invitation\n",
164+
"Connection Handler 2 Called\n",
165+
"Connection 0b42a8c8-d25f-4a2e-9ba1-d7aa74ebb9d4 in State invitation\n"
166+
]
167+
}
168+
],
169+
"source": [
170+
"# Create Invitation\n",
171+
"invite = await agent_controller.connections.create_invitation()\n",
172+
"connection_id = invite[\"connection_id\"]\n",
173+
"invite_message = invite['invitation']\n",
174+
"print(\"Connection ID\", connection_id)\n",
175+
"print(\"Invitation\")\n",
176+
"print(invite_message)"
177+
]
178+
},
179+
{
180+
"cell_type": "code",
181+
"execution_count": 14,
182+
"metadata": {},
183+
"outputs": [
184+
{
185+
"name": "stdout",
186+
"output_type": "stream",
187+
"text": [
188+
"Connection ID b2bed5c0-09e5-42e8-afaf-93304b84c5a6\n",
189+
"Invitation\n",
190+
"{'@type': 'did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/connections/1.0/invitation', '@id': '66d5629d-5ea6-4a46-8f19-a5f0b1758a58', 'label': 'Alice', 'recipientKeys': ['31qfD7BQ3i2sAQzNR3BZocynrc7vgDwWYAQ7xXcYyKtk'], 'serviceEndpoint': 'http://172.17.0.1:8020'}\n"
191+
]
192+
}
193+
],
147194
"source": [
148195
"# Create Invitation\n",
149196
"invite = await agent_controller.connections.create_invitation()\n",

0 commit comments

Comments
 (0)