Skip to content

Commit 5c4e5f8

Browse files
authored
Merge pull request #5042 from guzzijones/client_nginx
Fix sse and nginx buffering
2 parents ce31176 + 1ebe2be commit 5c4e5f8

File tree

12 files changed

+148
-14
lines changed

12 files changed

+148
-14
lines changed

CHANGELOG.rst

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ Changed
1414
* Improve the st2-self-check script to echo to stderr and exit if it isn't run with a
1515
ST2_AUTH_TOKEN or ST2_API_KEY environment variable. (improvement) #5068
1616

17+
Fixed
18+
~~~~~~~~~
19+
* Fix nginx buffering long polling stream to client. Instead of waiting for closed connection
20+
wait for final event to be sent to client. (bug fix) #4842 #5042
21+
22+
Contributed by @guzzijones
23+
1724
3.3.0 - October 06, 2020
1825
------------------------
1926

conf/nginx/st2.conf

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ server {
8585
proxy_set_header Host $host;
8686
proxy_set_header X-Real-IP $remote_addr;
8787
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
88+
proxy_read_timeout 200;
89+
proxy_connect_timeout 200;
8890

8991
sendfile on;
9092
tcp_nopush on;

st2client/st2client/base.py

+5
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ def get_client(self, args, debug=False):
8484
# Precedence order: cli arguments > environment variables > rc file variables
8585
cli_options = ['base_url', 'auth_url', 'api_url', 'stream_url', 'api_version', 'cacert']
8686
cli_options = {opt: getattr(args, opt, None) for opt in cli_options}
87+
if cli_options.get("cacert", None) is not None:
88+
if cli_options["cacert"].lower() in ['true', '1', 't', 'y', 'yes']:
89+
cli_options["cacert"] = True
90+
elif cli_options["cacert"].lower() in ['false', '0', 'f', 'no']:
91+
cli_options["cacert"] = False
8792
config_file_options = self._get_config_file_options(args=args)
8893

8994
kwargs = {}

st2client/st2client/commands/action.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -1502,7 +1502,10 @@ def tail_execution(cls, execution_manager, stream_manager, execution, output_typ
15021502
workflow_execution_ids.update(children_execution_ids)
15031503

15041504
events = ['st2.execution__update', 'st2.execution.output__create']
1505-
for event in stream_manager.listen(events, **kwargs):
1505+
for event in stream_manager.listen(events,
1506+
end_execution_id=execution_id,
1507+
end_event="st2.execution__update",
1508+
**kwargs):
15061509
status = event.get('status', None)
15071510
is_execution_event = status is not None
15081511

st2client/st2client/commands/pack.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ def run_and_print(self, args, **kwargs):
118118

119119
with term.TaskIndicator() as indicator:
120120
events = ['st2.execution__create', 'st2.execution__update']
121-
for event in stream_mgr.listen(events, **kwargs):
121+
for event in stream_mgr.listen(events, end_execution_id=parent_id,
122+
end_event="st2.execution__update", **kwargs):
122123
execution = Execution(**event)
123124

124125
if execution.id == parent_id \

st2client/st2client/models/core.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,12 @@ def listen(self, events=None, **kwargs):
638638
if 'api_key' in kwargs:
639639
query_params['st2-api-key'] = kwargs.get('api_key')
640640

641+
if 'end_event' in kwargs:
642+
query_params['end_event'] = kwargs.get('end_event')
643+
644+
if 'end_execution_id' in kwargs:
645+
query_params['end_execution_id'] = kwargs.get('end_execution_id')
646+
641647
if events:
642648
query_params['events'] = ','.join(events)
643649

@@ -655,7 +661,6 @@ def listen(self, events=None, **kwargs):
655661
# can be empty. In this case, rerun the query.
656662
if not message.data:
657663
continue
658-
659664
yield json.loads(message.data)
660665

661666

st2common/st2common/openapi.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -4485,6 +4485,16 @@ paths:
44854485
description: |
44864486
Event stream endpoint.
44874487
parameters:
4488+
- name: end_execution_id
4489+
in: query
4490+
description: execution id used to find last event in stream
4491+
type: string
4492+
required: false
4493+
- name: end_event
4494+
in: query
4495+
description: event to end sse stream
4496+
type: string
4497+
required: false
44884498
- name: events
44894499
in: query
44904500
description: List of events to listen for. If not provided, it listens to all events.

st2common/st2common/openapi.yaml.j2

+10
Original file line numberDiff line numberDiff line change
@@ -4481,6 +4481,16 @@ paths:
44814481
description: |
44824482
Event stream endpoint.
44834483
parameters:
4484+
- name: end_execution_id
4485+
in: query
4486+
description: execution id used to find last event in stream
4487+
type: string
4488+
required: false
4489+
- name: end_event
4490+
in: query
4491+
description: event to end sse stream
4492+
type: string
4493+
required: false
44844494
- name: events
44854495
in: query
44864496
description: List of events to listen for. If not provided, it listens to all events.

st2common/st2common/stream/listener.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -78,22 +78,28 @@ def emit(self, event, body):
7878
for queue in self.queues:
7979
queue.put(pack)
8080

81-
def generator(self, events=None, action_refs=None, execution_ids=None):
81+
def generator(self, events=None, action_refs=None, execution_ids=None,
82+
end_event=None, end_statuses=None, end_execution_id=None):
8283
queue = eventlet.Queue()
8384
queue.put('')
8485
self.queues.append(queue)
85-
8686
try:
87-
while not self._stopped:
87+
stop = False
88+
while not self._stopped and not stop:
8889
try:
8990
# TODO: Move to common option
9091
message = queue.get(timeout=cfg.CONF.stream.heartbeat)
91-
9292
if not message:
9393
yield message
9494
continue
95-
9695
event_name, body = message
96+
# check to see if this is the last message to send.
97+
if event_name == end_event:
98+
if body is not None and \
99+
body.status in end_statuses and \
100+
end_execution_id is not None and \
101+
body.id == end_execution_id:
102+
stop = True
97103
# TODO: We now do late filtering, but this could also be performed on the
98104
# message bus level if we modified our exchange layout and utilize routing keys
99105
# Filter on event name
@@ -109,7 +115,6 @@ def generator(self, events=None, action_refs=None, execution_ids=None):
109115
LOG.debug('Skipping event "%s" with action_ref "%s"' % (event_name,
110116
action_ref))
111117
continue
112-
113118
# Filter on execution id
114119
execution_id = self._get_execution_id_for_body(body=body)
115120
if execution_ids and execution_id not in execution_ids:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Copyright 2020 The StackStorm Authors.
2+
# Copyright 2019 Extreme Networks, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import mock
17+
import unittest2
18+
19+
from st2common.stream import listener
20+
21+
22+
class MockBody(object):
23+
24+
def __init__(self, id):
25+
self.id = id
26+
self.status = "succeeded"
27+
28+
29+
INCLUDE = "test"
30+
END_EVENT = "test_end_event"
31+
END_ID = "test_end_id"
32+
EVENTS = [(INCLUDE, MockBody("notend")), (END_EVENT, MockBody(END_ID))]
33+
34+
35+
class MockQueue():
36+
37+
def __init__(self):
38+
self.items = EVENTS
39+
40+
def get(self, *args, **kwargs):
41+
if len(self.items) > 0:
42+
return self.items.pop(0)
43+
return None
44+
45+
def put(self, event):
46+
self.items.append(event)
47+
48+
49+
class MockListener(listener.BaseListener):
50+
51+
def __init__(self, *args, **kwargs):
52+
super(MockListener, self).__init__(*args, **kwargs)
53+
54+
def get_consumers(self, consumer, channel):
55+
pass
56+
57+
58+
class TestStream(unittest2.TestCase):
59+
60+
@mock.patch('st2common.stream.listener.BaseListener._get_action_ref_for_body')
61+
@mock.patch('eventlet.Queue')
62+
def test_generator(self, mock_queue,
63+
get_action_ref_for_body):
64+
get_action_ref_for_body.return_value = None
65+
mock_queue.return_value = MockQueue()
66+
mock_consumer = MockListener(connection=None)
67+
mock_consumer._stopped = False
68+
app_iter = mock_consumer.generator(events=INCLUDE,
69+
end_event=END_EVENT,
70+
end_statuses=["succeeded"],
71+
end_execution_id=END_ID)
72+
events = EVENTS.append('')
73+
for index, val in enumerate(app_iter):
74+
self.assertEquals(val, events[index])

st2stream/st2stream/controllers/v1/executions.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,10 @@ def format(gen):
135135

136136
def make_response():
137137
app_iter = itertools.chain(existing_output_iter(), new_output_iter())
138-
res = Response(content_type='text/event-stream', app_iter=app_iter)
138+
res = Response(headerlist=[("X-Accel-Buffering", "no"),
139+
('Cache-Control', 'no-cache'),
140+
("Content-Type", "text/event-stream; charset=UTF-8")],
141+
app_iter=app_iter)
139142
return res
140143

141144
res = make_response()

st2stream/st2stream/controllers/v1/stream.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import six
1717

1818
from st2common import log as logging
19+
from st2common.constants import action as action_constants
1920
from st2common.router import Response
2021
from st2common.util.jsonify import json_encode
2122
from st2common.stream.listener import get_listener
@@ -54,16 +55,24 @@ def format(gen):
5455

5556

5657
class StreamController(object):
57-
def get_all(self, events=None, action_refs=None, execution_ids=None, requester_user=None):
58+
def get_all(self, end_execution_id=None, end_event=None,
59+
events=None, action_refs=None, execution_ids=None, requester_user=None):
5860
events = events if events else DEFAULT_EVENTS_WHITELIST
5961
action_refs = action_refs if action_refs else None
6062
execution_ids = execution_ids if execution_ids else None
6163

6264
def make_response():
6365
listener = get_listener(name='stream')
64-
app_iter = format(listener.generator(events=events, action_refs=action_refs,
65-
execution_ids=execution_ids))
66-
res = Response(content_type='text/event-stream', app_iter=app_iter)
66+
app_iter = format(listener.generator(events=events,
67+
action_refs=action_refs,
68+
end_event=end_event,
69+
end_statuses=action_constants.LIVEACTION_COMPLETED_STATES,
70+
end_execution_id=end_execution_id,
71+
execution_ids=execution_ids))
72+
res = Response(headerlist=[("X-Accel-Buffering", "no"),
73+
('Cache-Control', 'no-cache'),
74+
("Content-Type", "text/event-stream; charset=UTF-8")],
75+
app_iter=app_iter)
6776
return res
6877

6978
stream = make_response()

0 commit comments

Comments
 (0)