Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/distributed/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
__pycache__
*.pyc
*.pyo
*.pyd
.Python
*.so
*.egg
*.egg-info
dist
build
.git
.gitignore
README.md
27 changes: 27 additions & 0 deletions examples/distributed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Distributed Events

Events are automatically shared across all NiceGUI instances when you enable distributed mode (`ui.run(distributed=True)`).

## Demo

This demo uses docker swarm to deploy multiple NiceGUI instances with a Traefik load balancer in front.

```bash
docker swarm init
docker stack deploy -c docker-compose.yml distributed
```

Open http://localhost:8080 in multiple browser tabs/windows.

Update:

```bash
docker stack deploy -c docker-compose.yml distributed
```

Clean up:

```bash
docker stack rm distributed
docker swarm leave --force
```
47 changes: 47 additions & 0 deletions examples/distributed/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
version: "3.8"

services:
app:
image: zauberzeug/nicegui:3.0.2
volumes:
- ../../nicegui:/app/nicegui
- ./main.py:/app/main.py
entrypoint: ["/bin/sh", "-c"]
command: ["pip3 install eclipse-zenoh && python3 main.py"] # TODO: remove pip3 after this PR is released and nicegui is upgraded
networks:
- distributed
deploy:
replicas: 3
restart_policy:
condition: on-failure
labels:
- "traefik.enable=true"
- "traefik.http.routers.app.rule=PathPrefix(`/`)"
- "traefik.http.services.app.loadbalancer.server.port=8080"
- "traefik.http.services.app.loadbalancer.sticky.cookie=true"
- "traefik.http.services.app.loadbalancer.sticky.cookie.name=nicegui_server"

traefik:
image: traefik:v2.10
command:
- "--providers.docker=true"
- "--providers.docker.swarmMode=true"
- "--providers.docker.exposedbydefault=false"
- "--entrypoints.web.address=:80"
ports:
- "8080:80"
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
networks:
- distributed
deploy:
replicas: 1
placement:
constraints:
- node.role == manager
restart_policy:
condition: on-failure

networks:
distributed:
driver: overlay
33 changes: 33 additions & 0 deletions examples/distributed/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
import random
import socket

from nicegui import Event, context, ui

reaction_event = Event[str]()


@ui.page('/')
def index():
ui.add_css('''@keyframes float-up { to { transform: translateY(-500px) scale(1.5); opacity: 0; } }''')

with ui.column().classes('items-center justify-center h-screen w-screen gap-8'):
ui.label(f'This is instance {socket.gethostname()}').classes('text-3xl font-bold')
ui.label('Click an emoji - watch it appear on all instances (open multiple tabs)!').classes('text-gray-600')

emojis = ['🎉', '❤️', '👍', '😂', '🔥', '✨', '🚀', '👏']
with ui.row().classes('gap-4'):
for emoji in emojis:
ui.button(emoji, on_click=lambda e=emoji: reaction_event.emit(e)).props('flat size=lg')

def show_reaction(emoji: str):
left = random.randint(10, 90)
with context.client.layout:
floating = ui.html(emoji, sanitize=False).classes('fixed text-5xl pointer-events-none z-[9999]')
floating.style(f'animation: float-up 3s ease-out forwards; left: {left}%; bottom: 100px')
ui.timer(3.0, floating.delete, once=True)

reaction_event.subscribe(show_reaction)


ui.run(distributed=True)
122 changes: 122 additions & 0 deletions nicegui/distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import json
import uuid
from typing import Any, Callable, Union

from .logging import log

try:
import zenoh
from zenoh import Sample
ZENOH_AVAILABLE = True
except ImportError:
ZENOH_AVAILABLE = False
zenoh = None # type: ignore
Sample = Any # type: ignore


class DistributedSession:
"""Manages distributed event communication via Zenoh pub/sub.

This is an internal class used by the Event system to handle distributed messaging.
Publishers and subscribers are kept alive for the lifetime of the session.
Event instances are expected to be long-lived (typically module-level),
so no automatic cleanup of unused topics is performed.
"""

_instance: Union['DistributedSession', None] = None

def __init__(self, config: Union[dict, bool]) -> None:
"""Initialize a Zenoh session for distributed events.

:param config: Zenoh configuration (True for defaults, dict for custom config)
"""
if not ZENOH_AVAILABLE:
raise ImportError('zenoh is required for distributed events. '
'Install with: pip install "nicegui[distributed]"')

zenoh_config = zenoh.Config() if config is True else zenoh.Config.from_obj(config)
self.session = zenoh.open(zenoh_config)
self.instance_id = str(uuid.uuid4())
self.publishers: dict[str, Any] = {}
self.subscribers: dict[str, Any] = {}
log.info(f'Distributed events enabled via Zenoh (instance: {self.instance_id[:8]}...)')

@classmethod
def get(cls) -> Union['DistributedSession', None]:
"""Get the active distributed session, if any."""
return cls._instance

@classmethod
def initialize(cls, config: Union[dict, bool]) -> None:
"""Initialize the global distributed session.

:param config: Zenoh configuration (True for defaults, dict for custom config)
"""
if cls._instance is None:
cls._instance = cls(config)
cls._setup_existing_events()

@classmethod
def _setup_existing_events(cls) -> None:
"""Set up distributed mode for all existing Event instances."""
from .event import Event
for event in Event.instances:
event._setup_distributed() # pylint: disable=protected-access

def publish(self, topic: str, data: Any) -> None:
"""Publish data to a topic.

:param topic: topic name
:param data: data to publish (must be JSON-serializable)
"""
try:
payload = json.dumps({
'instance_id': self.instance_id,
'data': data,
}).encode('utf-8')
if topic not in self.publishers:
self.publishers[topic] = self.session.declare_publisher(f'nicegui/events/{topic}')
self.publishers[topic].put(payload)
except (TypeError, ValueError) as e:
log.error(f'Failed to serialize event data for topic {topic}: {e}. '
'Event data must be JSON-serializable (str, int, float, bool, list, dict, None).')
raise
except Exception as e:
log.exception(f'Failed to publish event to topic {topic}: {e}')
raise

def subscribe(self, topic: str, callback: Callable[[Any], None]) -> None:
"""Subscribe to a topic.

:param topic: topic name
:param callback: function to call when data arrives
"""
def handler(sample: Sample) -> None:
try:
payload = json.loads(bytes(sample.payload).decode('utf-8'))
# NOTE: Ignore events from our own instance (deduplication)
if payload['instance_id'] == self.instance_id:
return
callback(payload['data'])
except (json.JSONDecodeError, UnicodeDecodeError) as e:
log.error(f'Failed to deserialize event from topic {topic}: {e}')
except Exception as e:
log.exception(f'Failed to handle event from topic {topic}: {e}')

if topic not in self.subscribers:
self.subscribers[topic] = self.session.declare_subscriber(
f'nicegui/events/{topic}',
handler
)

def shutdown(self) -> None:
"""Clean up the Zenoh session."""
try:
for pub in self.publishers.values():
pub.undeclare()
for sub in self.subscribers.values():
sub.undeclare()
self.session.close()
log.info('Distributed session closed')
except Exception:
log.exception('Error during distributed session shutdown')
60 changes: 59 additions & 1 deletion nicegui/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .client import Client, ClientConnectionTimeout
from .context import context
from .dataclasses import KWONLY_SLOTS
from .distributed import DistributedSession
from .logging import log
from .slot import Slot

Expand Down Expand Up @@ -44,7 +45,7 @@ async def await_result(self, result: Awaitable | AwaitableResponse | asyncio.Tas
class Event(Generic[P]):
instances: ClassVar[WeakSet[Event]] = WeakSet()

def __init__(self) -> None:
def __init__(self, local: bool = False) -> None:
"""Event

Events are a powerful tool distribute information between different parts of your code,
Expand All @@ -53,10 +54,30 @@ def __init__(self) -> None:
Handlers can be synchronous or asynchronous.
They can also take arguments if the event contains arguments.

When distributed mode is enabled via `ui.run(distributed=True)`, events are automatically shared
across all instances in the network unless `local=True` is specified.

*Added in version 3.0.0*

:param local: if True, event will not be distributed even when distributed mode is active (default: False)
"""
self.callbacks: list[Callback[P]] = []
self.local = local
# NOTE: Use creation location for topic to ensure same Event in different processes shares the same topic
frame = inspect.currentframe()
assert frame is not None
frame = frame.f_back
assert frame is not None
# NOTE: Skip frames from typing module (when using Event[T]() syntax)
while frame and 'typing.py' in frame.f_code.co_filename:
frame = frame.f_back
assert frame is not None
module = inspect.getmodule(frame)
module_name = module.__name__ if module else 'unknown'
self.topic = f'event_{module_name}:{frame.f_code.co_filename}:{frame.f_lineno}'
self._zenoh_setup_done = False
self.instances.add(self)
self._setup_distributed()

def subscribe(self, callback: Callable[P, Any] | Callable[[], Any], *,
unsubscribe_on_disconnect: bool | None = None) -> None:
Expand Down Expand Up @@ -104,15 +125,52 @@ def unsubscribe(self, callback: Callable[P, Any] | Callable[[], Any]) -> None:
"""
self.callbacks[:] = [c for c in self.callbacks if c.func != callback]

def _setup_distributed(self) -> None:
"""Set up distributed event handling if enabled.

This method is safe to call multiple times due to the _zenoh_setup_done guard.
It's called during Event initialization and retroactively when DistributedSession.initialize() is called.
Events emitted before distributed mode is initialized will only be local.
"""
if self.local or self._zenoh_setup_done:
return
session = DistributedSession.get()
if session is None:
return

def remote_handler(data: dict) -> None:
"""Handle events received from remote instances."""
for callback in self.callbacks:
_invoke_and_forget(callback, *data.get('args', ()), **data.get('kwargs', {}))

session.subscribe(self.topic, remote_handler)
self._zenoh_setup_done = True

def emit(self, *args: P.args, **kwargs: P.kwargs) -> None:
"""Fire the event without waiting for the subscribed callbacks to complete."""
if not self.local and not self._zenoh_setup_done:
self._setup_distributed()

for callback in self.callbacks:
_invoke_and_forget(callback, *args, **kwargs)

if not self.local:
session = DistributedSession.get()
if session is not None:
session.publish(self.topic, {'args': args, 'kwargs': kwargs})

async def call(self, *args: P.args, **kwargs: P.kwargs) -> None:
"""Fire the event and wait asynchronously until all subscribed callbacks are completed."""
if not self.local and not self._zenoh_setup_done:
self._setup_distributed()

await asyncio.gather(*[_invoke_and_await(callback, *args, **kwargs) for callback in self.callbacks])

if not self.local:
session = DistributedSession.get()
if session is not None:
session.publish(self.topic, {'args': args, 'kwargs': kwargs})

async def emitted(self, timeout: float | None = None) -> Any:
"""Wait for an event to be fired and return its arguments.

Expand Down
4 changes: 4 additions & 0 deletions nicegui/nicegui.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ async def _shutdown() -> None:
if app.native.main_window:
app.native.main_window.signal_server_shutdown()
air.disconnect()
from .distributed import DistributedSession
session = DistributedSession.get()
if session is not None:
session.shutdown()
await app.stop()
run.tear_down()

Expand Down
10 changes: 10 additions & 0 deletions nicegui/ui_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def run(root: Optional[Callable] = None, *,
endpoint_documentation: Literal['none', 'internal', 'page', 'all'] = 'none',
storage_secret: Optional[str] = None,
show_welcome_message: bool = True,
distributed: Optional[Union[dict, bool]] = None,
**kwargs: Any,
) -> None:
"""ui.run
Expand Down Expand Up @@ -112,6 +113,7 @@ def run(root: Optional[Callable] = None, *,
:param endpoint_documentation: control what endpoints appear in the autogenerated OpenAPI docs (default: 'none', options: 'none', 'internal', 'page', 'all')
:param storage_secret: secret key for browser-based storage (default: `None`, a value is required to enable ui.storage.individual and ui.storage.browser)
:param show_welcome_message: whether to show the welcome message (default: `True`)
:param distributed: enable distributed events across network instances (default: `None`, use `True` for defaults or pass a dict with Zenoh config)
:param kwargs: additional keyword arguments are passed to `uvicorn.run`
"""
if core.script_mode:
Expand Down Expand Up @@ -184,6 +186,14 @@ def run_script() -> None:
set_storage_secret(storage_secret)
return

if distributed is not None:
from .distributed import ZENOH_AVAILABLE, DistributedSession
if not ZENOH_AVAILABLE:
log.warning('zenoh is not installed. Distributed events disabled. '
'Install with: pip install "nicegui[distributed]"')
else:
DistributedSession.initialize(distributed)

if on_air:
core.air = Air('' if on_air is True else on_air)

Expand Down
Loading
Loading