Skip to content

danfimov/taskiq-ydb

Repository files navigation

taskiq + ydb

PyPI - Python Version PyPI Checks

Plugin for taskiq that adds a new result backend and broker based on YDB.

Installation

This project can be installed using pip/poetry/uv (choose your preferred package manager):

pip install taskiq-ydb

Usage

Let's see the example with YDB broker and result backend:

# example.py
import asyncio

from ydb.aio.driver import DriverConfig

from taskiq_ydb import YdbBroker, YdbResultBackend


driver_config = DriverConfig(
    endpoint='grpc://localhost:2136',
    database='/local',
)

broker = YdbBroker(
    driver_config=driver_config,
).with_result_backend(YdbResultBackend(driver_config=driver_config))


@broker.task(task_name='best_task_ever')
async def best_task_ever() -> str:
    """Solve all problems in the world."""
    return 'Problems solved!'


async def main() -> None:
    """Start the application with broker."""
    await broker.startup()
    task = await best_task_ever.kiq()
    result = await task.wait_result()
    print(f'Task result: {result.return_value}')
    await broker.shutdown()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Example can be run using the following command:

# Start broker
python3 -m example
# Start worker for executing command
taskiq worker example:broker

Configuration

Broker:

  • driver_config: connection config for YDB client, you can read more about it in YDB documentation;
  • topic_path: path to the topic where tasks will be stored, default is /taskiq-tasks;
  • connection_timeout: timeout for connection to database during startup, default is 5 seconds.
  • read_timeout: timeout for read topic operation, default is 5 seconds.

Result backend:

  • driver_config: connection config for YDB client, you can read more about it in YDB documentation;
  • table_name: name of the table to store task results;
  • table_primary_key_type: type of primary key in task results table, default is uuid;
  • serializer: type of TaskiqSerializer default is PickleSerializer;
  • pool_size: size of the connection pool for YDB client, default is 5;
  • connection_timeout: timeout for connection to database during startup, default is 5 seconds.

About

Plugin for taskiq that adds a new result backend and broker based on YDB.

Topics

Resources

License

Stars

Watchers

Forks

Languages