Skip to content

Releases: nats-io/nats.py

Release v2.0.0rc1

23 Nov 00:08
83763fc
Compare
Choose a tag to compare
Release v2.0.0rc1 Pre-release
Pre-release

nats.py v2.0.0rc1

Major upgrade to the APIs of the Python3 client! For this release the client has also been renamed to be nats-py from asyncio-nats-client, it can now be installed with:

pip install nats-py

# With NKEYS / JWT support
pip install nats-py[nkeys]

This version of the client is not completely compatible with previous versions
of the client and it is designed to be used with Python 3.7.

Overall, the API of the client should resemble more the APIs of the Go client:

import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  sub = await nc.subscribe("hello")

  await nc.publish("hello")

  msg = await sub.next_msg()
  print(f"Received [{msg.subject}]: {msg.data}")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

There is support for NATS Headers ⚡

import asyncio
import nats
from nats.errors import TimeoutError

async def main():
  nc = await nats.connect("demo.nats.io")

  async def help_request(msg):
      print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
      print("Headers", msg.header)
      await msg.respond(b'OK')

  sub = await nc.subscribe("hello", "workers", help_request)

  try:
      response = await nc.request("help", b'help me', timeout=0.5)
      print("Received response: {message}".format(
          message=response.data.decode()))
  except TimeoutError:
      print("Request timed out")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

It also now includes JetStream support:

import asyncio
import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  # Create JetStream context.
  js = nc.jetstream()

  # Persist messages on 'foo' subject.
  await js.add_stream(name="sample-stream", subjects=["foo"])

  for i in range(0, 10):
      ack = await js.publish("foo", f"hello world: {i}".encode())
      print(ack)

  # Create pull based consumer on 'foo'.
  psub = await js.pull_subscribe("foo", "psub")

  # Fetch and ack messagess from consumer.
  for i in range(0, 10):
      msgs = await psub.fetch()
      for msg in msgs:
          print(msg)

  await nc.close()

if __name__ == '__main__':
  asyncio.run(main())

As well as JetStream KV support:

import asyncio
import nats

async def main():
  nc = await nats.connect()
  js = nc.jetstream()

  # Create a KV
  kv = await js.create_key_value(bucket='MY_KV')

  # Set and retrieve a value
  await kv.put('hello', b'world')
  entry = await kv.get('hello')
  print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
  # KeyValue.Entry: key=hello, value=world

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

New Documentation site:

The following site has been created to host the API of the Python3 client: https://nats-io.github.io/nats.py/
The contents of the doc site can be found in the following branch from this same repo: https://github.com/nats-io/nats.py/tree/docs/source

Breaking changes

  • Changed the return type of subscribe instead of returning a sid.

  • Changed suffix of most errors to follow PEP-8 style and now use the Error suffix. For example, ErrSlowConsumer is now SlowConsumerError. Old style errors are subclasses of the new ones so exceptions under try...catch blocks would be still caught.

Deprecated

Several areas of the client got deprecated in this release:

  • Deprecated is_async parameter for subscribe

  • Deprecated Client.timed_request

  • Deprecated passing loop parameter to most functions

Release v0.11.4

12 Nov 05:03
9f69047
Compare
Choose a tag to compare

Fixed

  • Fixed issue of NATS client not reconnecting to NATS Server nodes behind LB #189

Release v0.11.2

14 Sep 15:35
7360df6
Compare
Choose a tag to compare

Added

  • Error callback is now added by default (#170)

Fixed

  • Fixed drain timeout error after using request (#171)

Release v0.11.0

09 Sep 21:40
624e63b
Compare
Choose a tag to compare

Added

  • Added ability to access the connection identifier / client id (#146)
print(nc.client_id)
  • Added option to specify tls_hostname (#157)
await nc.connect(servers=["tls://127.0.0.1:4443"], loop=loop, tls=ssl_ctx, tls_hostname="localhost")

Fixed

  • Fixed drain non-callback subscriptions correctly (#147)
  • Fixed cleaning up subscriptions in the event of a timeout (#156)
  • Fixed TLS upgrade method, uvloop TLS now supported (#164)
  • Fixed waiting for all messages to be processed while draining (#165)
  • Fixed pending byte/msg limits can't be disabled with -1 as it can in other clients ()
  • Fixed following connect_timeout in TLS connections (5d7e01c)
  • Fixed set outstanding pings to zero when a pong is received 84c6d9a
  • Fixed support for auto unsubscribe (bf881ae)

Improved

  • Raise ErrInvalidCallbackType error when client callback functions are not coroutines (#128)
  • Many yapf formatting fixes (#152)
  • Fixed deprecation warnings. Remove unnecessary coverage runs. (#149)

Changed

  • Testing againts NATS 2.1.8
  • Updated examples to async/await syntax
  • Client uses f strings (#152)

Deprecated

  • Deprecated support for Python 3.5. Next release will drop support for Python 3.6.

Release v0.9.2

31 May 20:45
7ad8e54
Compare
Choose a tag to compare

Added

Add nkeys_seed option to be able to connect to server that supports nkeys based auth.

    await nc.connect(
        "tls://127.0.0.1:4222",
        nkeys_seed="./path/to/nkeys/user.nk",
    )

Fixed

  • Fixes handling permissions violation errors and call error cb when they occur in case it is configured.

Release v0.9.0

14 May 19:25
1acca35
Compare
Choose a tag to compare

Added

  • Added support for NKEYS/JWT for NATS v2 new auth features. The nkeys dependency is optional and can be installed via pip as an extra package: pip install asyncio-nats-client[nkeys].

Usage:

await nc.connect(
    "tls://connect.ngs.global:4222",
    user_credentials="./tests/nkeys/user.creds"
)
await nc.connect(
    "tls://connect.ngs.global:4222",
    user_credentials=(
      "./tests/nkeys/user.jwt", "./tests/nkeys/user.nk"
    )
)
  • Added support for functools partials to subscriptions
async def subscription_handler(arg1, msg):
    print(arg1)
    msgs.append(msg)

partial_sub_handler = functools.partial(subscription_handler, "example")
await nc.subscribe("foo", cb=partial_sub_handler)
  • Added Pipfile to the repo to adopt pipenv based flow

  • Added support to connect securely with TLS to implicit ips (c5b1f4e)

Fixed

  • Fixed issue when using verbose mode (#93)

Changed

  • Changed Repo name is now nats.py like other NATS clients.
  • Adopted yapf for formatting
  • Changed testing pytest is now used to run the tests

Deprecated

  • Removed tests/test.py since no longer used

Release v0.8.2

17 Sep 18:38
a370dcc
Compare
Choose a tag to compare

Added

  • Support using tls scheme to setup default context to connect securely (#88)
    await nc.connect("tls://demo.nats.io:4443")
    await nc.connect(servers=["tls://demo.nats.io:4443"])

Example using nats-pub & nats-sub:

$ python3.7 examples/nats-pub -s tls://demo.nats.io:4443 hello -d world
...

$ python3.7 examples/nats-sub -s tls://demo.nats.io:4443 hello 
Connected to NATS at demo.nats.io:4443...
Received a message on 'hello ': world

If using Python 3.7 in OS X and getting SSL errors, run first the following to install the certifi default certs:

/Applications/Python\ 3.7/Install\ Certificates.command

Release v0.8.0

24 Aug 20:17
30ee515
Compare
Choose a tag to compare

Added

  • Support for drain mode (#82)

    This feature allows clients to gracefully disconect, letting the subscribers
    handle any inflight messages that may have been sent by the server already.

    async def handler(msg):
      print("[Received] ", msg)
      await nc.publish(msg.reply, b'I can help')
    
      # Can check whether client is in draining state
      if nc.is_draining:
        print("Connection is draining")
    
    await nc.subscribe("help", "workers", cb=handler)
    
    requests = []
    for i in range(0, 1000):
        request = nc.request("help", b'help!', 0.2)
        requests.append(request)
    
    # Wait for all the responses
    responses = await asyncio.gather(*requests)
    print("Received {} responses", len(responses))
    
    # Gracefully close the connection.
    await nc.drain()

    Example usage can be found at:
    https://github.com/nats-io/asyncio-nats/blob/e1996e7c4ae30daa63c49af20700d42fad1bd2f2/examples/drain-sub.py

  • Support for no_echo mode (#74)

    When connected to a NATS Server v1.2.0 or above, a client can now opt to avoid
    receiving messages that it itself has published.

    await ncA.connect(no_echo=True)
    await ncB.connect()
    
    async def handler(msg):
      # Messages sent by `ncA' will not be received.
      print("[Received] ", msg)
    
    await ncA.subscribe("greetings", cb=handler)
    await ncA.publish("greetings", b'Hello World!')
    await ncB.publish("greetings", b'Hello World!')
  • Added connect_timeout option to disconnect from unhealthy servers in the pool (https://github.com/nats-io/asyncio-nats/pull/83/files)

    # Give up if can't connect to a server in 5 seconds
    await nc.connect("demo.nats.io:4222", connect_timeout=5)
  • Added loop parameter to connect to set the event loop like in asyncio APIs

    await nc.connect("demo.nats.io", loop=asyncio.new_event_loop())

Improved

  • connect API is now modeled to work more closely to how the Go client works:

    # Assume 'nats://' scheme
    await nc.connect("demo.nats.io:4222")
    
    # Complete with scheme a la Go client classic usage.
    await nc.connect("nats://demo.nats.io:4222")
    
    # Use default 4222 port.
    await nc.connect("demo.nats.io")
    
    # Explicit cluster list
    await nc.connect(servers="demo.nats.io")

Fixed

  • Examples and tests now support Python 3.7 (#71)

  • Added user, password, token parameters to set the auth credentials for servers that were discovered implicitly. (f8c28b3)

    # Set user and info credentials
    await nc.connect("127.0.0.1:4222", user="foo", password="bar")
    
    # Token
    await nc.connect("127.0.0.1:4222", token="secretoken")

Changed

  • Examples were changed to use async/ await instead of asyncio.coroutine and yield from (e1996e7)

Deprecated

  • Removed Python 3.4 from build (b2ee929)

  • Tests are now using async/await syntax throughout (#78)
    In the next v1.0 release client will be solely using async/await syntax (#68)

Release v0.7.2

19 Jul 15:55
Compare
Choose a tag to compare

Fixed

  • Improve behavior of reconnection logic (#67)

  • Issue with tasks left pending after calling close() (#69)

Release v0.7.0

04 Apr 22:44
Compare
Choose a tag to compare

Added

  • New style request/response implementation (#59)

  • The ErrSlowConsumer now include sid and subject when a message is dropped (#58)

  • New options pending_msgs_limit and pending_bytes_limit were added to subscribe API
    for controlling limits of pending data waiting to be processed before dropping messages.
    (#58)

  • Msg type now uses __slots__ (#58)

Improved

  • More performant inbox generation via NUID approach (#57)

Fixed

  • Each Subscription now has a task so coroutines no longer
    cause head of line blocking to each other.
    (#58)

Changed

  • request API when a callback is not passed now blocks and waits for response
    using the new style request/response API which is less chatty over
    the network.
    (#59)

Deprecated

  • subscribe_async will be deprecated in next release,
    as a workaround a task can be created inside of a handler and would
    result in more control than can have in the library.