Skip to content

Kafka Replicator Source & Quix Environment Source #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
15 commits merged into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/build/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@
k: None
for k in [
"quixstreams.sources.base",
"quixstreams.sources.csv",
"quixstreams.sources.kafka.kafka",
"quixstreams.sources.kafka.quix",
]
},
}
Expand Down
2 changes: 2 additions & 0 deletions docs/connectors/sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ if __name__ == "__main__":
Quix streams provide a source out of the box.

* [CSVSource](./csv-source.md): A source that reads data from a single CSV file.
* [KafkaReplicatorSource](./kafka-source.md): A source that replicates a topic from a Kafka broker to your application broker.
* [QuixEnvironmentSource](./quix-source.md): A source that replicates a topic from a Quix Cloud environment to your application broker.

You can also implement your own, have a look at [Creating a Custom Source](custom-sources.md) for documentation on how to do that.

Expand Down
36 changes: 36 additions & 0 deletions docs/connectors/sources/kafka-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Kafka Replicator Source

A source that reads data from a Kafka topic and produce it to another Kafka topic. The two topics can be located on different Kafka clusters.

This source supports exactly-once guarantees.

## How to use the Kafka Replicator Source

To use a Kafka Replicator source, you need to create an instance of `KafkaReplicatorSource` and pass it to the `app.dataframe()` method.

```python
from quixstreams import Application
from quixstreams.sources import KafkaReplicatorSource

def main():
app = Application()
source = KafkaReplicatorSource(
name="my-source",
app_config=app.config,
topic="source-topic",
broker_address="source-broker-address"
)

sdf = app.dataframe(source=source)
sdf.print(metadata=True)

app.run(sdf)

if __name__ == "__main__":
main()
```

## Topic

The Kafka Replicator source only deals with bytes. It reads the remote keys and values as bytes and produces them directly as bytes.
You can configure the key and value deserializer used by the Streaming Dataframe with the `key_deserializer` and `value_deserializer` paramaters.
34 changes: 34 additions & 0 deletions docs/connectors/sources/quix-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Quix Environment Source

A specialised [Kafka Source](kafka-source.md) that simplify copying data from a Quix environment.

## How to use the Quix Environment Source

To use a Quix Environment source, you need to create an instance of `QuixEnvironmentSource` and pass it to the `app.dataframe()` method.

```python
from quixstreams import Application
from quixstreams.sources import QuixEnvironmentSource

def main():
app = Application()
source = QuixEnvironmentSource(
name="my-source",
app_config=app.config,
topic="source-topic",
quix_sdk_token="quix-sdk-token",
quix_workspace_id="quix-workspace-id",
)

sdf = app.dataframe(source=source)
sdf.print(metadata=True)

app.run(sdf)

if __name__ == "__main__":
main()
```

## Token

The Quix Environment Source requires the sdk token of the source environment. [Click here](../../../develop/authentication/streaming-token.md) for more information on SDK tokens.
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ nav:
- Sources:
- 'connectors/sources/README.md'
- CSV Source: connectors/sources/csv-source.md
- Kafka Replicator Source: connectors/sources/kafka-source.md
- Quix Source: connectors/sources/quix-source.md
- Creating a Custom Source: connectors/sources/custom-sources.md
- Upgrading Guide:
- Upgrading from Quix Streams v0.5: upgrading-legacy.md
Expand Down
15 changes: 11 additions & 4 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def get_producer(self) -> Producer:
producer.produce(topic=topic.name, key=b"key", value=b"value")
```
"""
self._setup_topics()
self.setup_topics()

return Producer(
broker_address=self._config.broker_address,
Expand Down Expand Up @@ -725,8 +725,11 @@ def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
# consumer.store_offsets(msg)

```

:param auto_commit_enable: Enable or disable auto commit
Default - True
"""
self._setup_topics()
self.setup_topics()

return Consumer(
broker_address=self._config.broker_address,
Expand Down Expand Up @@ -817,7 +820,7 @@ def _run(self, dataframe: Optional[StreamingDataFrame] = None):
if self.is_quix_app:
self._quix_runtime_init()

self._setup_topics()
self.setup_topics()

exit_stack = contextlib.ExitStack()
exit_stack.enter_context(self._processing_context)
Expand Down Expand Up @@ -881,7 +884,11 @@ def _quix_runtime_init(self):
if self._state_manager.stores:
check_state_management_enabled()

def _setup_topics(self):
def setup_topics(self):
"""
Validate and create the topics
"""

topics_list = ", ".join(
f'"{topic}"' for topic in self._topic_manager.all_topics
)
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/checkpointing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .checkpoint import Checkpoint
from .checkpoint import Checkpoint, BaseCheckpoint
from .exceptions import InvalidStoredOffset
70 changes: 53 additions & 17 deletions quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
from abc import abstractmethod
from typing import Dict, Tuple

from confluent_kafka import TopicPartition, KafkaException
Expand All @@ -24,21 +25,19 @@
logger = logging.getLogger(__name__)


class Checkpoint:
class BaseCheckpoint:
"""
Class to keep track of state updates and consumer offsets and to checkpoint these
Base class to keep track of state updates and consumer offsets and to checkpoint these
updates on schedule.

Two implementations exist:
* one for checkpointing the Application in quixstreams/checkpoint/checkpoint.py
* one for checkpointing the kafka source in quixstreams/sources/kafka/checkpoint.py
"""

def __init__(
self,
commit_interval: float,
producer: RowProducer,
consumer: Consumer,
state_manager: StateStoreManager,
sink_manager: SinkManager,
pausing_manager: PausingManager,
exactly_once: bool = False,
commit_every: int = 0,
):
self._created_at = time.monotonic()
Expand All @@ -52,18 +51,9 @@ def __init__(
# Passing zero or lower will flush the checkpoint after each processed message
self._commit_interval = max(commit_interval, 0)

self._state_manager = state_manager
self._consumer = consumer
self._producer = producer
self._exactly_once = exactly_once
self._sink_manager = sink_manager
self._pausing_manager = pausing_manager
self._commit_every = commit_every
self._total_offsets_processed = 0

if self._exactly_once:
self._producer.begin_transaction()

def expired(self) -> bool:
"""
Returns `True` if checkpoint deadline has expired OR
Expand Down Expand Up @@ -107,6 +97,52 @@ def store_offset(self, topic: str, partition: int, offset: int):
self._starting_tp_offsets[tp] = offset
self._total_offsets_processed += 1

@abstractmethod
def close(self):
"""
Perform cleanup (when the checkpoint is empty) instead of committing.

Needed for exactly-once, as Kafka transactions are timeboxed.
"""

@abstractmethod
def commit(self):
"""
Commit the checkpoint.
"""
pass


class Checkpoint(BaseCheckpoint):
"""
Checkpoint implementation used by the application
"""

def __init__(
self,
commit_interval: float,
producer: RowProducer,
consumer: Consumer,
state_manager: StateStoreManager,
sink_manager: SinkManager,
pausing_manager: PausingManager,
exactly_once: bool = False,
commit_every: int = 0,
):
super().__init__(
commit_interval=commit_interval,
commit_every=commit_every,
)

self._state_manager = state_manager
self._consumer = consumer
self._producer = producer
self._sink_manager = sink_manager
self._pausing_manager = pausing_manager
self._exactly_once = exactly_once
if self._exactly_once:
self._producer.begin_transaction()

def get_store_transaction(
self, topic: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME
) -> PartitionTransaction:
Expand Down
10 changes: 10 additions & 0 deletions quixstreams/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,13 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

# support pickling by dropping the inner consumer
def __getstate__(self) -> object:
state = self.__dict__.copy()
state.pop("_inner_consumer", None)
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._inner_consumer = None
10 changes: 10 additions & 0 deletions quixstreams/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
logger.debug("Kafka producer flushed")

# support pickling by dropping the inner producer
def __getstate__(self) -> object:
state = self.__dict__.copy()
state.pop("_inner_producer", None)
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._inner_producer = None


class TransactionalProducer(Producer):
"""
Expand Down
10 changes: 10 additions & 0 deletions quixstreams/models/topics/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,13 @@ def create_topics(
),
finalize_timeout=finalize_timeout,
)

# support pickling by dropping the inner admin
def __getstate__(self) -> object:
state = self.__dict__.copy()
state.pop("_inner_admin", None)
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._inner_admin = None
2 changes: 1 addition & 1 deletion quixstreams/rowproducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def produce_row(
return
raise

def poll(self, timeout: float = None):
def poll(self, timeout: float = 0):
"""
Polls the producer for events and calls `on_delivery` callbacks.

Expand Down
1 change: 1 addition & 0 deletions quixstreams/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .manager import SourceException
from .multiprocessing import multiprocessing
from .csv import CSVSource
from .kafka import KafkaReplicatorSource, QuixEnvironmentSource
2 changes: 2 additions & 0 deletions quixstreams/sources/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .kafka import *
from .quix import *
Loading
Loading