Skip to content

Commit 8d3f053

Browse files
authored
Merge pull request #512 from nats-io/js-fixes
JetStream updates and fixes
2 parents 12fe022 + 47b5838 commit 8d3f053

File tree

8 files changed

+336
-23
lines changed

8 files changed

+336
-23
lines changed

nats/aio/client.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from nats.protocol.parser import (
5151
AUTHORIZATION_VIOLATION,
5252
PERMISSIONS_ERR,
53+
PING,
5354
PONG,
5455
STALE_CONNECTION,
5556
Parser,
@@ -64,7 +65,7 @@
6465
)
6566
from .transport import TcpTransport, Transport, WebSocketTransport
6667

67-
__version__ = '2.5.0'
68+
__version__ = '2.6.0'
6869
__lang__ = 'python3'
6970
_logger = logging.getLogger(__name__)
7071
PROTOCOL = 1
@@ -1705,22 +1706,23 @@ async def _process_msg(
17051706

17061707
# Process flow control messages in case of using a JetStream context.
17071708
ctrl_msg = None
1708-
fcReply = None
1709+
fc_reply = None
17091710
if sub._jsi:
17101711
#########################################
17111712
# #
17121713
# JetStream Control Messages Processing #
17131714
# #
17141715
#########################################
17151716
jsi = sub._jsi
1717+
jsi._active = True
17161718
if hdr:
17171719
ctrl_msg = self._is_control_message(data, hdr)
17181720

17191721
# Check if the heartbeat has a "Consumer Stalled" header, if
17201722
# so, the value is the FC reply to send a nil message to.
17211723
# We will send it at the end of this function.
17221724
if ctrl_msg and ctrl_msg.startswith("Idle"):
1723-
fcReply = hdr.get(nats.js.api.Header.CONSUMER_STALLED)
1725+
fc_reply = hdr.get(nats.js.api.Header.CONSUMER_STALLED)
17241726

17251727
# OrderedConsumer: checkOrderedMsgs
17261728
if not ctrl_msg and jsi._ordered and msg.reply:
@@ -1788,15 +1790,15 @@ async def _process_msg(
17881790
# DATA message that was received before this flow control message, which
17891791
# has sequence `jsi.fciseq`. However, it is possible that this message
17901792
# has already been delivered, in that case, we need to send the FC reply now.
1791-
if sub.delivered >= sub._jsi._fciseq:
1792-
fcReply = msg.reply
1793+
if sub._jsi.get_js_delivered() >= sub._jsi._fciseq:
1794+
fc_reply = msg.reply
17931795
else:
17941796
# Schedule a reply after the previous message is delivered.
17951797
sub._jsi.schedule_flow_control_response(msg.reply)
17961798

17971799
# Handle flow control response.
1798-
if fcReply:
1799-
await self.publish(fcReply)
1800+
if fc_reply:
1801+
await self.publish(fc_reply)
18001802

18011803
if ctrl_msg and not msg.reply and ctrl_msg.startswith("Idle"):
18021804
if sub._jsi:

nats/js/client.py

Lines changed: 127 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,15 @@ async def subscribe_bind(
358358
sub=sub,
359359
ccreq=config,
360360
)
361+
362+
if config.idle_heartbeat:
363+
sub._jsi._hbtask = asyncio.create_task(sub._jsi.activity_check())
364+
365+
if ordered_consumer:
366+
sub._jsi._fctask = asyncio.create_task(
367+
sub._jsi.check_flow_control_response()
368+
)
369+
361370
return psub
362371

363372
@staticmethod
@@ -375,7 +384,7 @@ async def new_callback(msg: Msg) -> None:
375384
async def pull_subscribe(
376385
self,
377386
subject: str,
378-
durable: str,
387+
durable: Optional[str] = None,
379388
stream: Optional[str] = None,
380389
config: Optional[api.ConsumerConfig] = None,
381390
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
@@ -415,16 +424,30 @@ async def main():
415424
if stream is None:
416425
stream = await self._jsm.find_stream_name_by_subject(subject)
417426

427+
should_create = True
418428
try:
419-
# TODO: Detect configuration drift with the consumer.
420-
await self._jsm.consumer_info(stream, durable)
429+
if durable:
430+
await self._jsm.consumer_info(stream, durable)
431+
should_create = False
421432
except nats.js.errors.NotFoundError:
433+
pass
434+
435+
consumer_name = durable
436+
if should_create:
422437
# If not found then attempt to create with the defaults.
423438
if config is None:
424439
config = api.ConsumerConfig()
440+
425441
# Auto created consumers use the filter subject.
442+
# config.name = durable
426443
config.filter_subject = subject
427-
config.durable_name = durable
444+
if durable:
445+
config.name = durable
446+
config.durable_name = durable
447+
else:
448+
consumer_name = self._nc._nuid.next().decode()
449+
config.name = consumer_name
450+
428451
await self._jsm.add_consumer(stream, config=config)
429452

430453
return await self.pull_subscribe_bind(
@@ -433,6 +456,7 @@ async def main():
433456
inbox_prefix=inbox_prefix,
434457
pending_bytes_limit=pending_bytes_limit,
435458
pending_msgs_limit=pending_msgs_limit,
459+
name=consumer_name,
436460
)
437461

438462
async def pull_subscribe_bind(
@@ -442,6 +466,7 @@ async def pull_subscribe_bind(
442466
inbox_prefix: bytes = api.INBOX_PREFIX,
443467
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
444468
pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
469+
name: Optional[str] = None,
445470
) -> JetStreamContext.PullSubscription:
446471
"""
447472
pull_subscribe returns a `PullSubscription` that can be delivered messages
@@ -475,11 +500,16 @@ async def main():
475500
pending_msgs_limit=pending_msgs_limit,
476501
pending_bytes_limit=pending_bytes_limit
477502
)
503+
consumer = None
504+
if durable:
505+
consumer = durable
506+
else:
507+
consumer = name
478508
return JetStreamContext.PullSubscription(
479509
js=self,
480510
sub=sub,
481511
stream=stream,
482-
consumer=durable,
512+
consumer=consumer,
483513
deliver=deliver,
484514
)
485515

@@ -533,18 +563,76 @@ def __init__(
533563
self._sub = sub
534564
self._ccreq = ccreq
535565

566+
# Heartbeat
567+
self._hbtask = None
568+
self._hbi = None
569+
if ccreq and ccreq.idle_heartbeat:
570+
self._hbi = ccreq.idle_heartbeat
571+
572+
# Ordered Consumer implementation.
536573
self._dseq = 1
537574
self._sseq = 0
538575
self._cmeta: Optional[str] = None
576+
self._fcr: Optional[str] = None
577+
self._fcd = 0
539578
self._fciseq = 0
540-
self._active: Optional[bool] = None
579+
self._active: Optional[bool] = True
580+
self._fctask = None
541581

542582
def track_sequences(self, reply: str) -> None:
543583
self._fciseq += 1
544584
self._cmeta = reply
545585

546586
def schedule_flow_control_response(self, reply: str) -> None:
547-
pass
587+
self._active = True
588+
self._fcr = reply
589+
self._fcd = self._fciseq
590+
591+
def get_js_delivered(self):
592+
if self._sub._cb:
593+
return self._sub.delivered
594+
return self._fciseq - self._sub._pending_queue.qsize()
595+
596+
async def activity_check(self):
597+
# Can at most miss two heartbeats.
598+
hbc_threshold = 2
599+
while True:
600+
try:
601+
if self._conn.is_closed:
602+
break
603+
604+
# Wait for all idle heartbeats to be received,
605+
# one of them would have toggled the state of the
606+
# consumer back to being active.
607+
await asyncio.sleep(self._hbi * hbc_threshold)
608+
active = self._active
609+
self._active = False
610+
if not active:
611+
if self._ordered:
612+
await self.reset_ordered_consumer(
613+
self._sseq + 1
614+
)
615+
except asyncio.CancelledError:
616+
break
617+
618+
async def check_flow_control_response(self):
619+
while True:
620+
try:
621+
if self._conn.is_closed:
622+
break
623+
624+
if (self._fciseq - self._psub._pending_queue.qsize()) >= self._fcd:
625+
fc_reply = self._fcr
626+
try:
627+
if fc_reply:
628+
await self._conn.publish(fc_reply)
629+
except Exception:
630+
pass
631+
self._fcr = None
632+
self._fcd = 0
633+
await asyncio.sleep(0.25)
634+
except asyncio.CancelledError:
635+
break
548636

549637
async def check_for_sequence_mismatch(self,
550638
msg: Msg) -> Optional[bool]:
@@ -684,6 +772,38 @@ def delivered(self) -> int:
684772
"""
685773
return self._sub._received
686774

775+
@delivered.setter
776+
def delivered(self, value):
777+
self._sub._received = value
778+
779+
@property
780+
def _pending_size(self):
781+
return self._sub._pending_size
782+
783+
@_pending_size.setter
784+
def _pending_size(self, value):
785+
self._sub._pending_size = value
786+
787+
async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
788+
"""
789+
:params timeout: Time in seconds to wait for next message before timing out.
790+
:raises nats.errors.TimeoutError:
791+
792+
next_msg can be used to retrieve the next message from a stream of messages using
793+
await syntax, this only works when not passing a callback on `subscribe`::
794+
"""
795+
msg = await self._sub.next_msg(timeout)
796+
797+
# In case there is a flow control reply present need to handle here.
798+
if self._sub and self._sub._jsi:
799+
self._sub._jsi._active = True
800+
if self._sub._jsi.get_js_delivered() >= self._sub._jsi._fciseq:
801+
fc_reply = self._sub._jsi._fcr
802+
if fc_reply:
803+
await self._conn.publish(fc_reply)
804+
self._sub._jsi._fcr = None
805+
return msg
806+
687807
class PullSubscription:
688808
"""
689809
PullSubscription is a subscription that can fetch messages.

nats/js/manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ async def find_stream_name_by_subject(self, subject: str) -> str:
6464
info = await self._api_request(
6565
req_sub, req_data.encode(), timeout=self._timeout
6666
)
67+
if not info['streams']:
68+
raise NotFoundError
6769
return info['streams'][0]
6870

6971
async def stream_info(self, name: str) -> api.StreamInfo:

nats/protocol/parser.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2016-2021 The NATS Authors
1+
# Copyright 2016-2023 The NATS Authors
22
# Licensed under the Apache License, Version 2.0 (the "License");
33
# you may not use this file except in compliance with the License.
44
# You may obtain a copy of the License at
@@ -63,7 +63,7 @@
6363
# States
6464
AWAITING_CONTROL_LINE = 1
6565
AWAITING_MSG_PAYLOAD = 2
66-
MAX_CONTROL_LINE_SIZE = 1024
66+
MAX_CONTROL_LINE_SIZE = 4096
6767

6868
# Protocol Errors
6969
STALE_CONNECTION = "stale connection"
@@ -165,10 +165,9 @@ async def parse(self, data: bytes = b''):
165165
del self.buf[:info.end()]
166166
continue
167167

168-
if len(self.buf
169-
) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf:
168+
if len(self.buf) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf:
170169
# FIXME: By default server uses a max protocol
171-
# line of 1024 bytes but it can be tuned in latest
170+
# line of 4096 bytes but it can be tuned in latest
172171
# releases, in that case we won't reach here but
173172
# client ping/pong interval would disconnect
174173
# eventually.

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# These are here for GitHub's dependency graph and help with setuptools support in some environments.
55
setup(
66
name="nats-py",
7-
version='2.5.0',
7+
version='2.6.0',
88
license='Apache 2 License',
99
extras_require={
1010
'nkeys': ['nkeys'],

0 commit comments

Comments
 (0)