Developer Interface

Decorators

@zrq.task(func: Callable[[P], Awaitable[R]]) Task[P, R]
@zrq.task(func: None = None, queue: Queue | str | None = None, expiry: int | None = None, retry: int | None = None, timeout: int | None = None) Callable[[Callable[[P], Awaitable[R]]], Task[P, R]]

Creates a Task from the decorated function.

A convenient decorator to attach queueing capabilities to an async function. It can be used with or without providing additional parameters.

Without parameters:

@task
async def say_after(delay: int, what: str):
    asyncio.sleep(delay)
    print(what)

say_after.enqueue(3, "hello world")

Used without parameters, the defaults values from configuration will be used.

With additional parameters:

@task(queue="high-priority", timeout=1)
async def say_after(delay: int, what: str):
    asyncio.sleep(delay)
    print(what)

say_after.enqueue(3, "hello world")

The decorated function can still be called usually without side effect. IDE completion is supported on Task.enqueue since the decorated function parameters signature are proxied under the wood.

@zrq.Queue.task
Parameters:
  • func (t.AsyncCallable | None)

  • expiry (int | None)

  • retry (int | None)

  • timeout (int | None)

Return type:

Task[P, R] | t.Callable[[t.Callable[P, t.Awaitable[R]]], Task[P, R]]

Task

class zrq.Task(func, queue, expiry=None, retry=None, timeout=None)

A Task wrapps an awaitable callable.

Parameters:
  • func (t.Callable[P, t.Awaitable[R]])

  • queue (Queue)

  • expiry (int | None)

  • retry (int | None)

  • timeout (int | None)

async enqueue(*args, **kwargs)

Enqueue the decorated callable with given parameters.

Parameters:
  • args (P.args)

  • kwargs (P.kwargs)

Return type:

Job

Queue

class zrq.Queue(name, consumer_id=None)

Bases: ConsumableMixin, ProducableMixin

Parameters:
  • name (str)

  • consumer_id (UUID | None)

Return type:

Queue

async enqueue(obj, kwargs=None, *args, **config)

Helper method to enqueue any supported object.

Parameters:
  • obj (t.Callable[..., t.Awaitable[t.Any]] | t.Coroutine[t.Any, t.Any, t.Any] | Task[t.Any, t.Any])

  • kwargs (dict[str, object] | None)

  • args (object)

  • config (t.Unpack[JobConfig])

Return type:

Job

async delete()

Deletes the queue with its items, this action is irrevocable.

Return type:

None

async ack(job_id)

Acknowledge a job, which removes it from the pending queue

Parameters:

job_id (JobID)

Return type:

bool

async claim(job_id)

Claims Job, ensuring no other worker will reclaim it if they are already running.

Parameters:

job_id (JobID)

Return type:

None

async enqueue_at(task, dt)

Enqueue a task to be runned at a specific time.

Parameters:
  • task (Task[t.Any, t.Any])

  • dt (datetime)

async enqueue_in(task, dt)

Enqueue a task to be runned in a specific amount of time.

Parameters:
  • task (Task[t.Any, t.Any])

  • dt (timedelta)

async pop(count=1)

Gets the oldest jobs in the queue in bulk

Parameters:

count (int)

Return type:

AsyncIterator[Job]

async push(job)

Pushes a Job dict to the Queue

Parameters:

job (JobDict)

Return type:

JobID

async push_callable(callable, kwargs=None, *args, **config)

Pushes any callable into the Queue

Parameters:
Return type:

Job

async push_coroutine(coro, **config)

Pushes any coroutine into the Queue

Parameters:
Return type:

Job

async push_job(job)

Pushes a Job into the Queue

Parameters:

job (Job)

Return type:

JobID

async push_task(task, *args, **kwargs)

Pushes a Task object into the Queue

Parameters:
Return type:

Job

serializer

alias of JSONSerializer

class zrq.queue.BaseQueue(name)

Defines a base Queue methods

Parameters:

name (str)

Stream

class zrq.stream.Stream(name, client)
Parameters:
static all(client=None)
Parameters:

client (Redis | None)

Return type:

AsyncGenerator[str, None]

async add(message)
Parameters:

message (dict[str, object])

Return type:

str

async create_group_with_consumer(group_name, consumer_name, id=0, mkstream=True)

Helper to creates both group with a registered consumer in a Redis.pipeline().

Parameters:
Return type:

bool

async read_group_oldest(group_name, consumer_name, count=1, noack=False)

Reads oldest items in the stream.

Args:

block: Time to wait for a specific

Parameters:
Return type:

dict[str, list[list[tuple[str, dict[str, str | int | float]]]]]

async ack(group_name, *ids)
Parameters:
async destroy_group()
async groups_info()
async consumers_info()
async delete_group(group_name)
Parameters:

group_name (str)

Return type:

bool

async delete()
Return type:

bool

async len()
Return type:

int | None

async claim(group_name, consumer_name, *ids)
Parameters:
  • group_name (str)

  • consumer_name (str)

  • ids (str)

Return type:

None

async range(min_id, max_id, count)
Parameters:
Return type:

list

async get(id)
Parameters:

id (str)

Return type:

dict | None

async pending_range(group_name, min_id, max_id, count, consumer_name=None)
Parameters:
  • group_name (str)

  • min_id (str)

  • max_id (str)

  • count (int)

  • consumer_name (str | None)

Return type:

list

async get_pending(group_name, id)
Parameters:
Return type:

dict | None

Typing

Provides all the types used for annotations.

Since types can be moved from typing to collections.abc or even provided by both, this module is the single source of truth to satisfy the supported Python versions.

Any annotation types and related generics used in zrq are imported from this module.

zrq.typing.Any = typing.Any

from typing.Any

zrq.typing.FrameType

alias of FrameType

class zrq.typing.TypedDict

from typing.TypedDict

class zrq.typing.AsyncGenerator

from collections.abc.AsyncGenerator

class zrq.typing.Mapping

from collections.abc.Mapping

class zrq.typing.ParamSpec(name, *, bound=None, covariant=False, contravariant=False)

from typing.ParamSpec

class zrq.typing.Sequence

from collections.abc.Sequence

zrq.typing.AnyCallable

Matches callable of any kind.

alias of Callable[[…], Any]

zrq.typing.AsyncCallable

Matches asynchronous callable of any kind.

alias of Callable[[…], Awaitable[Any]]

class zrq.typing.AsyncIterator

from collections.abc.AsyncIterator

class zrq.typing.Awaitable

from collections.abc.Awaitable

class zrq.typing.Callable

from collections.abc.Callable

zrq.typing.Callback

Convenient alias to match any callable used as a callback.

alias of Callable[[], Any]

class zrq.typing.Coroutine

from collections.abc.Coroutine

close()

Raise GeneratorExit inside coroutine.

abstract send(value)

Send a value into the coroutine. Return next yielded value or raise StopIteration.

abstract throw(typ, val=None, tb=None)

Raise an exception in the coroutine. Return next yielded value or raise StopIteration.

class zrq.typing.Generator

from collections.abc.Generator

close()

Raise GeneratorExit inside generator.

abstract send(value)

Send a value into the generator. Return next yielded value or raise StopIteration.

abstract throw(typ, val=None, tb=None)

Raise an exception in the generator. Return next yielded value or raise StopIteration.

class zrq.typing.Generic

from typing.Generic

zrq.typing.TracebackType

alias of TracebackType

class zrq.typing.TypeVar(name, *constraints, bound=None, covariant=False, contravariant=False)

from typing.TypeVar

zrq.typing.assert_type(val, typ, /)

Ask a static type checker to confirm that the value is of the given type.

At runtime this does nothing: it returns the first argument unchanged with no checks or side effects, no matter the actual type of the argument.

When a static type checker encounters a call to assert_type(), it emits an error if the value is not of the specified type:

def greet(name: str) -> None:
    assert_type(name, str)  # OK
    assert_type(name, int)  # type checker error
Parameters:
Return type:

object

zrq.typing.cast(typ, val)

from typing.cast

zrq.typing.get_type_hints(obj, globalns=None, localns=None, include_extras=False)

from typing.get_type_hints

zrq.typing.overload(func)

from typing.overload

Context

Provides settings and a (lazy) Redis connector for the runtime environment.

class zrq.context.Constants(ENV_PREFIX: str = 'ZRQ_', GROUP_PREFIX: str = 'group:', QUEUE_PREFIX: str = 'queue:')
Parameters:
  • ENV_PREFIX (str)

  • GROUP_PREFIX (str)

  • QUEUE_PREFIX (str)

class zrq.context.Settings(env_prefix: dataclasses.InitVar[str], REDIS_URL: str = 'redis://localhost:6379/0', DEFAULT_QUEUE: str = 'default', WORKER_SIZE: int = 100, WORKER_LATENCY: float = 0.1, JOB_MAX_RETRY: int = 3, JOB_TIMEOUT: int = 0)
Parameters:
  • env_prefix (dataclasses.InitVar[str])

  • REDIS_URL (str)

  • DEFAULT_QUEUE (str)

  • WORKER_SIZE (int)

  • WORKER_LATENCY (float)

  • JOB_MAX_RETRY (int)

  • JOB_TIMEOUT (int)

lock()

Locking updates to ensure consistency.

Return type:

None

update(**values)

Updates settings since they are frozen.

Parameters:

values (object)

Return type:

None

class zrq.context.Redis(settings)

Default Redis client lazy constructor.

Used when a client aren’t provided on Queue.

Parameters:

settings (Settings)

property client: ARedis

Returns a Redis client and locks settings for further updates.

zrq.context.constants = Constants(ENV_PREFIX='ZRQ_', GROUP_PREFIX='group:', QUEUE_PREFIX='queue:')

Constants are instancied thus not updatable at all

zrq.context.settings = Settings(REDIS_URL='redis://localhost:6379/0', DEFAULT_QUEUE='default', WORKER_SIZE=100, WORKER_LATENCY=0.1, JOB_MAX_RETRY=3, JOB_TIMEOUT=0)

Settings are built from envvars with a fallback to default value

zrq.context.redis = <zrq.context.Redis object>

The default Redis client built from settings

Exceptions

Exposes all ZRQ exceptions.

exception zrq.exceptions.QueueError

Bases: Exception

Common queue error

exception zrq.exceptions.MissingConsumerIdError

Bases: QueueError

exception zrq.exceptions.NonConsumableQueueError

Bases: QueueError

exception zrq.exceptions.NonCallableTaskError

Bases: RuntimeError

exception zrq.exceptions.ConsistencyError

Bases: RuntimeError

Operational Interface

A CLI is provided for day-to-day operations.

Entrypoint

usage: zrq [-h] [-V] [-v {DEBUG,INFO,WARNING,ERROR}] [-r REDIS_URL] {queue,worker} ...

Named Arguments

-V, --version

Show program’s version number and exit

Default: '==SUPPRESS=='

-v, --verbosity

Possible choices: DEBUG, INFO, WARNING, ERROR

Output verbosity level

Default: INFO

-r, --redis-url

Custom URL to use for connecting to Redis

Default: 'redis://localhost:6379/0'

Queue

zrq queue list

List existing queues

usage: zrq queue list [-h]

zrq queue size

Getting size of a queue

usage: zrq queue size [-h] name

Positional Arguments

name

Desired queue name

Worker

Runs a worker

usage: zrq worker [-h] [-s SLOTS] [-p] [queues ...]

Positional Arguments

queues

List of queues to consume

Default: ['default']

Named Arguments

-s, --slots

Worker max concurrent slots

Default: 100

-p, --profile

Enable profiling using cProfile

Default: False