From ddbbd44a06c0f6a64ae38030066e6fe439d20756 Mon Sep 17 00:00:00 2001 From: habara keigo Date: Fri, 27 Jun 2025 19:02:13 +0900 Subject: [PATCH 1/3] Add option to skip signature verification --- examples/aiohttp-echo/app.py | 3 ++- linebot/v3/webhook.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/aiohttp-echo/app.py b/examples/aiohttp-echo/app.py index aef68abcf..5ac593d5e 100644 --- a/examples/aiohttp-echo/app.py +++ b/examples/aiohttp-echo/app.py @@ -89,7 +89,8 @@ async def echo(self, request): async def main(port=8000): async_api_client = AsyncApiClient(configuration) line_bot_api = AsyncMessagingApi(async_api_client) - parser = WebhookParser(channel_secret) + # Dummy value is fine to skip webhook signature verification + parser = WebhookParser("Dummy Channel Secret", lambda: True) handler = Handler(line_bot_api, parser) diff --git a/linebot/v3/webhook.py b/linebot/v3/webhook.py index e02d035a0..b0ebafc14 100644 --- a/linebot/v3/webhook.py +++ b/linebot/v3/webhook.py @@ -112,12 +112,13 @@ def __init__(self, events=None, destination=None): class WebhookParser(object): """Webhook Parser.""" - def __init__(self, channel_secret): + def __init__(self, channel_secret, skip_signature_verification = lambda: False): """__init__ method. :param str channel_secret: Channel secret (as text) """ self.signature_validator = SignatureValidator(channel_secret) + self.skip_signature_verification = skip_signature_verification def parse(self, body, signature, as_payload=False): """Parse webhook request body as text. @@ -129,7 +130,7 @@ def parse(self, body, signature, as_payload=False): | :py:class:`linebot.v3.webhook.WebhookPayload` :return: Events list, or WebhookPayload instance """ - if not self.signature_validator.validate(body, signature): + if not self.skip_signature_verification() and not self.signature_validator.validate(body, signature): raise InvalidSignatureError( 'Invalid signature. signature=' + signature) From b4b5d146c9e224993ac66eec2ec7cc8a106702ad Mon Sep 17 00:00:00 2001 From: habara keigo Date: Tue, 8 Jul 2025 12:03:01 +0900 Subject: [PATCH 2/3] Update WebhookHandler, update document, and add tests --- README.rst | 18 ++- linebot/v3/webhook.py | 5 +- tests/v3/test_skip_signature_verification.py | 116 +++++++++++++++++++ 3 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 tests/v3/test_skip_signature_verification.py diff --git a/README.rst b/README.rst index 2d01892b5..c8b587944 100644 --- a/README.rst +++ b/README.rst @@ -111,13 +111,18 @@ WebhookParser ※ You can use WebhookParser -\_\_init\_\_(self, channel\_secret) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +\_\_init\_\_(self, channel\_secret, skip\_signature\_verification=lambda: False) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code:: python parser = linebot.v3.WebhookParser('YOUR_CHANNEL_SECRET') + # or with skip_signature_verification + parser = linebot.v3.WebhookParser( + 'YOUR_CHANNEL_SECRET', + skip_signature_verification=lambda: True # or a function that returns a boolean + parse(self, body, signature, as_payload=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -143,13 +148,18 @@ WebhookHandler ※ You can use WebhookHandler -\_\_init\_\_(self, channel\_secret) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +\_\_init\_\_(self, channel\_secret, skip\_signature\_verification=lambda: False) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code:: python handler = linebot.v3.WebhookHandler('YOUR_CHANNEL_SECRET') + # or with skip_signature_verification + handler = linebot.v3.WebhookHandler( + 'YOUR_CHANNEL_SECRET', + skip_signature_verification=lambda: True # or a function that returns a boolean + handle(self, body, signature) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/linebot/v3/webhook.py b/linebot/v3/webhook.py index b0ebafc14..b8435367d 100644 --- a/linebot/v3/webhook.py +++ b/linebot/v3/webhook.py @@ -155,12 +155,13 @@ class WebhookHandler(object): Please read https://github.com/line/line-bot-sdk-python#webhookhandler """ - def __init__(self, channel_secret): + def __init__(self, channel_secret, skip_signature_verification = lambda: False): """__init__ method. :param str channel_secret: Channel secret (as text) + :param skip_signature_verification: (optional) Function that returns a boolean value whether to skip signature validation """ - self.parser = WebhookParser(channel_secret) + self.parser = WebhookParser(channel_secret, skip_signature_verification) self._handlers = {} self._default = None diff --git a/tests/v3/test_skip_signature_verification.py b/tests/v3/test_skip_signature_verification.py new file mode 100644 index 000000000..a61ac032a --- /dev/null +++ b/tests/v3/test_skip_signature_verification.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import unicode_literals, absolute_import + +import unittest + +from linebot.v3 import ( + WebhookParser, WebhookHandler +) +from linebot.v3.exceptions import InvalidSignatureError + +class TestWebhookParserWithSkipSignatureVerification(unittest.TestCase): + def setUp(self): + self.parser = WebhookParser('channel_secret') + self.parser_with_skip = WebhookParser('channel_secret', skip_signature_verification=lambda: True) + + def test_parse_with_invalid_signature(self): + body = '{"events": []}' + signature = 'invalid_signature' + + # Should raise InvalidSignatureError when skip_signature_verification is False (default) + with self.assertRaises(InvalidSignatureError): + self.parser.parse(body, signature) + + # Should not raise InvalidSignatureError when skip_signature_verification is True + try: + self.parser_with_skip.parse(body, signature) + except InvalidSignatureError: + self.fail("parse() raised InvalidSignatureError unexpectedly!") + +class TestWebhookHandlerWithSkipSignatureVerification(unittest.TestCase): + def setUp(self): + self.handler = WebhookHandler('channel_secret') + self.handler_with_skip = WebhookHandler('channel_secret', skip_signature_verification=lambda: True) + self.handler_called = False + self.handler_with_skip_called = False + + @self.handler.default() + def default(event): + self.handler_called = True + + @self.handler_with_skip.default() + def default_with_skip(event): + self.handler_with_skip_called = True + + def test_handle_with_invalid_signature(self): + body = '{"events": [{"type": "message", "message": {"type": "text", "id": "123", "text": "test"}, "timestamp": 1462629479859, "source": {"type": "user", "userId": "U123"}, "replyToken": "reply_token", "mode": "active", "webhookEventId": "test_id", "deliveryContext": {"isRedelivery": false}}]}' + signature = 'invalid_signature' + + # Should raise InvalidSignatureError when skip_signature_verification is False (default) + with self.assertRaises(InvalidSignatureError): + self.handler.handle(body, signature) + + # Handler should not be called when signature verification fails + self.assertFalse(self.handler_called) + + # Should not raise InvalidSignatureError when skip_signature_verification is True + try: + self.handler_with_skip.handle(body, signature) + except InvalidSignatureError: + self.fail("handle() raised InvalidSignatureError unexpectedly!") + + # Handler should be called when signature verification is skipped + self.assertTrue(self.handler_with_skip_called) + + def test_dynamic_skip_signature_verification(self): + body = '{"events": [{"type": "message", "message": {"type": "text", "id": "123", "text": "test"}, "timestamp": 1462629479859, "source": {"type": "user", "userId": "U123"}, "replyToken": "reply_token", "mode": "active", "webhookEventId": "test_id", "deliveryContext": {"isRedelivery": false}}]}' + signature = 'invalid_signature' + skip_flag = [False] + + # Create a handler with dynamic skip flag + handler_with_dynamic_skip = WebhookHandler( + 'channel_secret', + skip_signature_verification=lambda: skip_flag[0] + ) + + dynamic_handler_called = [False] + + @handler_with_dynamic_skip.default() + def default_dynamic(event): + dynamic_handler_called[0] = True + + # Should raise InvalidSignatureError when skip_signature_verification returns False + with self.assertRaises(InvalidSignatureError): + handler_with_dynamic_skip.handle(body, signature) + + # Handler should not be called + self.assertFalse(dynamic_handler_called[0]) + + # Change the skip flag + skip_flag[0] = True + + # Should not raise InvalidSignatureError now + try: + handler_with_dynamic_skip.handle(body, signature) + except InvalidSignatureError: + self.fail("handle() raised InvalidSignatureError unexpectedly!") + + # Handler should be called now + self.assertTrue(dynamic_handler_called[0]) + +if __name__ == '__main__': + unittest.main() + From 1a31c0974db3a454aeae7f87da8e597e537016dc Mon Sep 17 00:00:00 2001 From: habara keigo Date: Tue, 8 Jul 2025 12:03:43 +0900 Subject: [PATCH 3/3] Revert examples --- examples/aiohttp-echo/app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/aiohttp-echo/app.py b/examples/aiohttp-echo/app.py index 5ac593d5e..aef68abcf 100644 --- a/examples/aiohttp-echo/app.py +++ b/examples/aiohttp-echo/app.py @@ -89,8 +89,7 @@ async def echo(self, request): async def main(port=8000): async_api_client = AsyncApiClient(configuration) line_bot_api = AsyncMessagingApi(async_api_client) - # Dummy value is fine to skip webhook signature verification - parser = WebhookParser("Dummy Channel Secret", lambda: True) + parser = WebhookParser(channel_secret) handler = Handler(line_bot_api, parser)