Skip to content

Commit acad93d

Browse files
Lita Chogarrettheel
authored andcommitted
Add support for signals via blinker (#278)
1 parent e998a19 commit acad93d

File tree

6 files changed

+231
-12
lines changed

6 files changed

+231
-12
lines changed

docs/signals.rst

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
Signals
2+
=======
3+
Starting with PynamoDB 3.1.0, there is support for signalling. This support is provided by the `blinker`_ library, which is not installed by default. In order to ensure blinker is installed, specify your PynamoDB requirement like so:
4+
5+
::
6+
7+
pynamodb[signals]==<YOUR VERSION NUMBER>
8+
9+
Signals allow certain senders to notify subscribers that something happened. PynamoDB currently sends signals before and after every DynamoDB API call.
10+
11+
.. note::
12+
13+
It is recommended to avoid business logic in signal callbacks, as this can have performance implications. To reinforce this, only the operation name and table name are available in the signal callback.
14+
15+
16+
Subscribing to Signals
17+
----------------------
18+
19+
PynamoDB fires two signal calls, `pre_dynamodb_send` before the network call and `post_dynamodb_send` after the network call to DynamoDB.
20+
21+
The callback must taking the following arguments:
22+
23+
================ ===========
24+
Arguments Description
25+
================ ===========
26+
*sender* The object that fired that method.
27+
*operation_name* The string name of the `DynamoDB action`_
28+
*table_name* The name of the table the operation is called upon.
29+
*req_uuid* A unique identifer so subscribers can correlate the before and after events.
30+
================ ===========
31+
32+
To subscribe to a signal, the user needs to import the signal object and connect your callback, like so.
33+
34+
.. code:: python
35+
36+
from pynamodb.signals import pre_dynamodb_send, post_dynamodb_send
37+
38+
def record_pre_dynamodb_send(sender, operation_name, table_name, req_uuid):
39+
pre_recorded.append((operation_name, table_name, req_uuid))
40+
41+
def record_post_dynamodb_send(sender, operation_name, table_name, req_uuid):
42+
post_recorded.append((operation_name, table_name, req_uuid))
43+
44+
pre_dynamodb_send.connect(record_pre_dynamodb_send)
45+
post_dynamodb_send.connect(record_post_dynamodb_send)
46+
47+
.. _blinker: https://pypi.python.org/pypi/blinker
48+
.. _Dynamo action: https://github.com/pynamodb/PynamoDB/blob/cd705cc4e0e3dd365c7e0773f6bc02fe071a0631/

pynamodb/connection/base.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,24 @@
22
Lowest level connection
33
"""
44
from __future__ import division
5-
from base64 import b64decode
5+
66
import logging
77
import math
88
import random
99
import time
10+
import uuid
11+
from base64 import b64decode
1012

1113
import six
12-
from six.moves import range
13-
from botocore.session import get_session
14-
from botocore.exceptions import BotoCoreError
1514
from botocore.client import ClientError
15+
from botocore.exceptions import BotoCoreError
16+
from botocore.session import get_session
1617
from botocore.vendored import requests
18+
from botocore.vendored.requests import Request
19+
from six.moves import range
1720

18-
from pynamodb.connection.util import pythonic
19-
from pynamodb.types import HASH, RANGE
2021
from pynamodb.compat import NullHandler
21-
from pynamodb.exceptions import (
22-
TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist,
23-
VerboseClientError
24-
)
22+
from pynamodb.connection.util import pythonic
2523
from pynamodb.constants import (
2624
RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES, COMPARISON_OPERATOR_VALUES,
2725
RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES, ATTR_UPDATE_ACTIONS,
@@ -38,8 +36,13 @@
3836
CONDITIONAL_OPERATORS, NULL, NOT_NULL, SHORT_ATTR_TYPES, DELETE,
3937
ITEMS, DEFAULT_ENCODING, BINARY_SHORT, BINARY_SET_SHORT, LAST_EVALUATED_KEY, RESPONSES, UNPROCESSED_KEYS,
4038
UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED)
39+
from pynamodb.exceptions import (
40+
TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist,
41+
VerboseClientError
42+
)
4143
from pynamodb.settings import get_settings_value
42-
from botocore.vendored.requests import Request
44+
from pynamodb.signals import pre_dynamodb_send, post_dynamodb_send
45+
from pynamodb.types import HASH, RANGE
4346

4447
BOTOCORE_EXCEPTIONS = (BotoCoreError, ClientError)
4548

@@ -261,7 +264,12 @@ def dispatch(self, operation_name, operation_kwargs):
261264
operation_kwargs.update(self.get_consumed_capacity_map(TOTAL))
262265
self._log_debug(operation_name, operation_kwargs)
263266

267+
table_name = operation_kwargs.get(TABLE_NAME)
268+
req_uuid = uuid.uuid4()
269+
270+
self.send_pre_boto_callback(operation_name, req_uuid, table_name)
264271
data = self._make_api_call(operation_name, operation_kwargs)
272+
self.send_post_boto_callback(operation_name, req_uuid, table_name)
265273

266274
if data and CONSUMED_CAPACITY in data:
267275
capacity = data.get(CONSUMED_CAPACITY)
@@ -270,6 +278,18 @@ def dispatch(self, operation_name, operation_kwargs):
270278
log.debug("%s %s consumed %s units", data.get(TABLE_NAME, ''), operation_name, capacity)
271279
return data
272280

281+
def send_post_boto_callback(self, operation_name, req_uuid, table_name):
282+
try:
283+
post_dynamodb_send.send(self, operation_name=operation_name, table_name=table_name, req_uuid=req_uuid)
284+
except Exception as e:
285+
log.exception("post_boto callback threw an exception.")
286+
287+
def send_pre_boto_callback(self, operation_name, req_uuid, table_name):
288+
try:
289+
pre_dynamodb_send.send(self, operation_name=operation_name, table_name=table_name, req_uuid=req_uuid)
290+
except Exception as e:
291+
log.exception("pre_boto callback threw an exception.")
292+
273293
def _make_api_call(self, operation_name, operation_kwargs):
274294
"""
275295
This private method is here for two reasons:
@@ -281,7 +301,6 @@ def _make_api_call(self, operation_name, operation_kwargs):
281301
operation_kwargs,
282302
operation_model
283303
)
284-
285304
prepared_request = self._create_prepared_request(request_dict, operation_model)
286305

287306
for i in range(0, self._max_retry_attempts_exception + 1):

pynamodb/signals.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""
2+
Implements signals based on blinker if available, otherwise
3+
falls silently back to a noop.
4+
5+
This implementation was taken from Flask:
6+
https://github.com/pallets/flask/blob/master/flask/signals.py
7+
"""
8+
signals_available = False
9+
10+
11+
class _FakeNamespace(object):
12+
def signal(self, name, doc=None):
13+
return _FakeSignal(name, doc)
14+
15+
16+
class _FakeSignal(object):
17+
"""
18+
If blinker is unavailable, create a fake class with the same
19+
interface that allows sending of signals but will fail with an
20+
error on anything else. Instead of doing anything on send, it
21+
will just ignore the arguments and do nothing instead.
22+
"""
23+
24+
def __init__(self, name, doc=None):
25+
self.name = name
26+
self.__doc__ = doc
27+
28+
def _fail(self, *args, **kwargs):
29+
raise RuntimeError('signalling support is unavailable '
30+
'because the blinker library is '
31+
'not installed.')
32+
33+
send = lambda *a, **kw: None # noqa
34+
connect = disconnect = has_receivers_for = receivers_for = \
35+
temporarily_connected_to = _fail
36+
del _fail
37+
38+
39+
try:
40+
from blinker import Namespace
41+
signals_available = True
42+
except ImportError: # pragma: no cover
43+
Namespace = _FakeNamespace
44+
45+
# The namespace for code signals. If you are not PynamoDB code, do
46+
# not put signals in here. Create your own namespace instead.
47+
_signals = Namespace()
48+
49+
pre_dynamodb_send = _signals.signal('pre_dynamodb_send')
50+
post_dynamodb_send = _signals.signal('post_dynamodb_send')

pynamodb/tests/test_signals.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import mock
2+
import pytest
3+
4+
from pynamodb.connection import Connection
5+
from pynamodb.signals import _FakeNamespace
6+
from pynamodb.signals import pre_dynamodb_send, post_dynamodb_send
7+
8+
try:
9+
import blinker
10+
except ImportError:
11+
blinker = None
12+
13+
PATCH_METHOD = 'pynamodb.connection.Connection._make_api_call'
14+
15+
16+
@mock.patch(PATCH_METHOD)
17+
@mock.patch('pynamodb.connection.base.uuid')
18+
def test_signal(mock_uuid, mock_req):
19+
pre_recorded = []
20+
post_recorded = []
21+
UUID = '123-abc'
22+
23+
def record_pre_dynamodb_send(sender, operation_name, table_name, req_uuid):
24+
pre_recorded.append((operation_name, table_name, req_uuid))
25+
26+
def record_post_dynamodb_send(sender, operation_name, table_name, req_uuid):
27+
post_recorded.append((operation_name, table_name, req_uuid))
28+
29+
pre_dynamodb_send.connect(record_pre_dynamodb_send)
30+
post_dynamodb_send.connect(record_post_dynamodb_send)
31+
try:
32+
mock_uuid.uuid4.return_value = UUID
33+
mock_req.return_value = {'TableDescription': {'TableName': 'table', 'TableStatus': 'Creating'}}
34+
c = Connection()
35+
c.dispatch('CreateTable', {'TableName': 'MyTable'})
36+
assert ('CreateTable', 'MyTable', UUID) == pre_recorded[0]
37+
assert ('CreateTable', 'MyTable', UUID) == post_recorded[0]
38+
finally:
39+
pre_dynamodb_send.disconnect(record_pre_dynamodb_send)
40+
post_dynamodb_send.disconnect(record_post_dynamodb_send)
41+
42+
43+
@mock.patch(PATCH_METHOD)
44+
@mock.patch('pynamodb.connection.base.uuid')
45+
def test_signal_exception_pre_signal(mock_uuid, mock_req):
46+
post_recorded = []
47+
UUID = '123-abc'
48+
49+
def record_pre_dynamodb_send(sender, operation_name, table_name, req_uuid):
50+
raise ValueError()
51+
52+
def record_post_dynamodb_send(sender, operation_name, table_name, req_uuid):
53+
post_recorded.append((operation_name, table_name, req_uuid))
54+
55+
pre_dynamodb_send.connect(record_pre_dynamodb_send)
56+
post_dynamodb_send.connect(record_post_dynamodb_send)
57+
try:
58+
mock_uuid.uuid4.return_value = UUID
59+
mock_req.return_value = {'TableDescription': {'TableName': 'table', 'TableStatus': 'Creating'}}
60+
c = Connection()
61+
c.dispatch('CreateTable', {'TableName': 'MyTable'})
62+
assert ('CreateTable', 'MyTable', UUID) == post_recorded[0]
63+
finally:
64+
pre_dynamodb_send.disconnect(record_pre_dynamodb_send)
65+
post_dynamodb_send.disconnect(record_post_dynamodb_send)
66+
67+
68+
@mock.patch(PATCH_METHOD)
69+
@mock.patch('pynamodb.connection.base.uuid')
70+
def test_signal_exception_post_signal(mock_uuid, mock_req):
71+
pre_recorded = []
72+
UUID = '123-abc'
73+
74+
def record_pre_dynamodb_send(sender, operation_name, table_name, req_uuid):
75+
pre_recorded.append((operation_name, table_name, req_uuid))
76+
77+
def record_post_dynamodb_send(sender, operation_name, table_name, req_uuid):
78+
raise ValueError()
79+
80+
pre_dynamodb_send.connect(record_pre_dynamodb_send)
81+
post_dynamodb_send.connect(record_post_dynamodb_send)
82+
try:
83+
mock_uuid.uuid4.return_value = UUID
84+
mock_req.return_value = {'TableDescription': {'TableName': 'table', 'TableStatus': 'Creating'}}
85+
c = Connection()
86+
c.dispatch('CreateTable', {'TableName': 'MyTable'})
87+
assert ('CreateTable', 'MyTable', UUID) == pre_recorded[0]
88+
finally:
89+
pre_dynamodb_send.disconnect(record_pre_dynamodb_send)
90+
post_dynamodb_send.disconnect(record_post_dynamodb_send)
91+
92+
93+
def test_fake_signals():
94+
_signals = _FakeNamespace()
95+
pre_dynamodb_send = _signals.signal('pre_dynamodb_send')
96+
with pytest.raises(RuntimeError):
97+
pre_dynamodb_send.connect(lambda x: x)
98+
pre_dynamodb_send.send(object, operation_name="UPDATE", table_name="TEST", req_uuid="something")

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pytest-cov==2.4.0
66
pytest-mock==1.6.0
77
python-coveralls==2.5.0
88
tox==2.1.1
9+
blinker==1.4
910
flake8==2.4.1
1011
requests==2.7.0
1112
# Pinned because latest version requires Python 2.7+ (https://github.com/sphinx-doc/sphinx/issues/2919).

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,7 @@
3939
'Programming Language :: Python :: 3.3',
4040
'License :: OSI Approved :: MIT License',
4141
],
42+
extras_require={
43+
'signals': ['blinker>=1.3,<2.0']
44+
},
4245
)

0 commit comments

Comments
 (0)