diff --git a/README.rst b/README.rst index 2d01892b..c8b58794 100644 --- a/README.rst +++ b/README.rst @@ -111,13 +111,18 @@ WebhookParser ※ You can use WebhookParser -\_\_init\_\_(self, channel\_secret) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +\_\_init\_\_(self, channel\_secret, skip\_signature\_verification=lambda: False) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code:: python parser = linebot.v3.WebhookParser('YOUR_CHANNEL_SECRET') + # or with skip_signature_verification + parser = linebot.v3.WebhookParser( + 'YOUR_CHANNEL_SECRET', + skip_signature_verification=lambda: True # or a function that returns a boolean + parse(self, body, signature, as_payload=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -143,13 +148,18 @@ WebhookHandler ※ You can use WebhookHandler -\_\_init\_\_(self, channel\_secret) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +\_\_init\_\_(self, channel\_secret, skip\_signature\_verification=lambda: False) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code:: python handler = linebot.v3.WebhookHandler('YOUR_CHANNEL_SECRET') + # or with skip_signature_verification + handler = linebot.v3.WebhookHandler( + 'YOUR_CHANNEL_SECRET', + skip_signature_verification=lambda: True # or a function that returns a boolean + handle(self, body, signature) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/linebot/v3/webhook.py b/linebot/v3/webhook.py index e02d035a..b8435367 100644 --- a/linebot/v3/webhook.py +++ b/linebot/v3/webhook.py @@ -112,12 +112,13 @@ def __init__(self, events=None, destination=None): class WebhookParser(object): """Webhook Parser.""" - def __init__(self, channel_secret): + def __init__(self, channel_secret, skip_signature_verification = lambda: False): """__init__ method. :param str channel_secret: Channel secret (as text) """ self.signature_validator = SignatureValidator(channel_secret) + self.skip_signature_verification = skip_signature_verification def parse(self, body, signature, as_payload=False): """Parse webhook request body as text. @@ -129,7 +130,7 @@ def parse(self, body, signature, as_payload=False): | :py:class:`linebot.v3.webhook.WebhookPayload` :return: Events list, or WebhookPayload instance """ - if not self.signature_validator.validate(body, signature): + if not self.skip_signature_verification() and not self.signature_validator.validate(body, signature): raise InvalidSignatureError( 'Invalid signature. signature=' + signature) @@ -154,12 +155,13 @@ class WebhookHandler(object): Please read https://github.com/line/line-bot-sdk-python#webhookhandler """ - def __init__(self, channel_secret): + def __init__(self, channel_secret, skip_signature_verification = lambda: False): """__init__ method. :param str channel_secret: Channel secret (as text) + :param skip_signature_verification: (optional) Function that returns a boolean value whether to skip signature validation """ - self.parser = WebhookParser(channel_secret) + self.parser = WebhookParser(channel_secret, skip_signature_verification) self._handlers = {} self._default = None diff --git a/tests/v3/test_skip_signature_verification.py b/tests/v3/test_skip_signature_verification.py new file mode 100644 index 00000000..a61ac032 --- /dev/null +++ b/tests/v3/test_skip_signature_verification.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import unicode_literals, absolute_import + +import unittest + +from linebot.v3 import ( + WebhookParser, WebhookHandler +) +from linebot.v3.exceptions import InvalidSignatureError + +class TestWebhookParserWithSkipSignatureVerification(unittest.TestCase): + def setUp(self): + self.parser = WebhookParser('channel_secret') + self.parser_with_skip = WebhookParser('channel_secret', skip_signature_verification=lambda: True) + + def test_parse_with_invalid_signature(self): + body = '{"events": []}' + signature = 'invalid_signature' + + # Should raise InvalidSignatureError when skip_signature_verification is False (default) + with self.assertRaises(InvalidSignatureError): + self.parser.parse(body, signature) + + # Should not raise InvalidSignatureError when skip_signature_verification is True + try: + self.parser_with_skip.parse(body, signature) + except InvalidSignatureError: + self.fail("parse() raised InvalidSignatureError unexpectedly!") + +class TestWebhookHandlerWithSkipSignatureVerification(unittest.TestCase): + def setUp(self): + self.handler = WebhookHandler('channel_secret') + self.handler_with_skip = WebhookHandler('channel_secret', skip_signature_verification=lambda: True) + self.handler_called = False + self.handler_with_skip_called = False + + @self.handler.default() + def default(event): + self.handler_called = True + + @self.handler_with_skip.default() + def default_with_skip(event): + self.handler_with_skip_called = True + + def test_handle_with_invalid_signature(self): + body = '{"events": [{"type": "message", "message": {"type": "text", "id": "123", "text": "test"}, "timestamp": 1462629479859, "source": {"type": "user", "userId": "U123"}, "replyToken": "reply_token", "mode": "active", "webhookEventId": "test_id", "deliveryContext": {"isRedelivery": false}}]}' + signature = 'invalid_signature' + + # Should raise InvalidSignatureError when skip_signature_verification is False (default) + with self.assertRaises(InvalidSignatureError): + self.handler.handle(body, signature) + + # Handler should not be called when signature verification fails + self.assertFalse(self.handler_called) + + # Should not raise InvalidSignatureError when skip_signature_verification is True + try: + self.handler_with_skip.handle(body, signature) + except InvalidSignatureError: + self.fail("handle() raised InvalidSignatureError unexpectedly!") + + # Handler should be called when signature verification is skipped + self.assertTrue(self.handler_with_skip_called) + + def test_dynamic_skip_signature_verification(self): + body = '{"events": [{"type": "message", "message": {"type": "text", "id": "123", "text": "test"}, "timestamp": 1462629479859, "source": {"type": "user", "userId": "U123"}, "replyToken": "reply_token", "mode": "active", "webhookEventId": "test_id", "deliveryContext": {"isRedelivery": false}}]}' + signature = 'invalid_signature' + skip_flag = [False] + + # Create a handler with dynamic skip flag + handler_with_dynamic_skip = WebhookHandler( + 'channel_secret', + skip_signature_verification=lambda: skip_flag[0] + ) + + dynamic_handler_called = [False] + + @handler_with_dynamic_skip.default() + def default_dynamic(event): + dynamic_handler_called[0] = True + + # Should raise InvalidSignatureError when skip_signature_verification returns False + with self.assertRaises(InvalidSignatureError): + handler_with_dynamic_skip.handle(body, signature) + + # Handler should not be called + self.assertFalse(dynamic_handler_called[0]) + + # Change the skip flag + skip_flag[0] = True + + # Should not raise InvalidSignatureError now + try: + handler_with_dynamic_skip.handle(body, signature) + except InvalidSignatureError: + self.fail("handle() raised InvalidSignatureError unexpectedly!") + + # Handler should be called now + self.assertTrue(dynamic_handler_called[0]) + +if __name__ == '__main__': + unittest.main() +