Developer Interface¶
Decorators¶
- @zrq.task(func: Callable[[P], Awaitable[R]]) Task[P, R]¶
- @zrq.task(func: None = None, queue: Queue | str | None = None, expiry: int | None = None, retry: int | None = None, timeout: int | None = None) Callable[[Callable[[P], Awaitable[R]]], Task[P, R]]
Creates a
Taskfrom the decorated function.A convenient decorator to attach queueing capabilities to an async function. It can be used with or without providing additional parameters.
Without parameters:
@task async def say_after(delay: int, what: str): asyncio.sleep(delay) print(what) say_after.enqueue(3, "hello world")
Used without parameters, the defaults values from configuration will be used.
With additional parameters:
@task(queue="high-priority", timeout=1) async def say_after(delay: int, what: str): asyncio.sleep(delay) print(what) say_after.enqueue(3, "hello world")
The decorated function can still be called usually without side effect. IDE completion is supported on
Task.enqueuesince the decorated function parameters signature are proxied under the wood.
Task¶
Queue¶
- class zrq.Queue(name, consumer_id=None)¶
Bases:
ConsumableMixin,ProducableMixin- async enqueue(obj, kwargs=None, *args, **config)¶
Helper method to enqueue any supported object.
- async delete()¶
Deletes the queue with its items, this action is irrevocable.
- Return type:
None
- async ack(job_id)¶
Acknowledge a job, which removes it from the pending queue
- Parameters:
job_id (JobID)
- Return type:
- async claim(job_id)¶
Claims Job, ensuring no other worker will reclaim it if they are already running.
- Parameters:
job_id (JobID)
- Return type:
None
- async enqueue_at(task, dt)¶
Enqueue a task to be runned at a specific time.
- Parameters:
task (Task[t.Any, t.Any])
dt (datetime)
- async enqueue_in(task, dt)¶
Enqueue a task to be runned in a specific amount of time.
- Parameters:
task (Task[t.Any, t.Any])
dt (timedelta)
- async pop(count=1)¶
Gets the oldest jobs in the queue in bulk
- Parameters:
count (int)
- Return type:
AsyncIterator[Job]
- async push(job)¶
Pushes a Job dict to the Queue
- Parameters:
job (JobDict)
- Return type:
JobID
- async push_callable(callable, kwargs=None, *args, **config)¶
Pushes any callable into the Queue
- async push_coroutine(coro, **config)¶
Pushes any coroutine into the Queue
- async push_job(job)¶
Pushes a Job into the Queue
- Parameters:
job (Job)
- Return type:
JobID
- async push_task(task, *args, **kwargs)¶
Pushes a Task object into the Queue
- serializer¶
alias of
JSONSerializer
Stream¶
- class zrq.stream.Stream(name, client)¶
-
- static all(client=None)¶
- Parameters:
client (Redis | None)
- Return type:
AsyncGenerator[str, None]
- async create_group_with_consumer(group_name, consumer_name, id=0, mkstream=True)¶
Helper to creates both group with a registered consumer in a
Redis.pipeline().
- async read_group_oldest(group_name, consumer_name, count=1, noack=False)¶
Reads oldest items in the stream.
- Args:
block: Time to wait for a specific
- async destroy_group()¶
- async groups_info()¶
- async consumers_info()¶
- async claim(group_name, consumer_name, *ids)¶
- async range(min_id, max_id, count)¶
- async pending_range(group_name, min_id, max_id, count, consumer_name=None)¶
Typing¶
Provides all the types used for annotations.
Since types can be moved from typing to collections.abc
or even provided by both, this module is the single source of truth
to satisfy the supported Python versions.
Any annotation types and related generics used in zrq are imported from this module.
- zrq.typing.Any = typing.Any¶
from
typing.Any
- zrq.typing.FrameType¶
alias of
FrameType
- class zrq.typing.TypedDict¶
from
typing.TypedDict
- class zrq.typing.AsyncGenerator¶
- class zrq.typing.Mapping¶
- class zrq.typing.ParamSpec(name, *, bound=None, covariant=False, contravariant=False)¶
from
typing.ParamSpec
- class zrq.typing.Sequence¶
- zrq.typing.AsyncCallable¶
Matches asynchronous callable of any kind.
- class zrq.typing.AsyncIterator¶
- class zrq.typing.Awaitable¶
- class zrq.typing.Callable¶
- zrq.typing.Callback¶
Convenient alias to match any callable used as a callback.
- class zrq.typing.Coroutine¶
from
collections.abc.Coroutine- close()¶
Raise GeneratorExit inside coroutine.
- abstract send(value)¶
Send a value into the coroutine. Return next yielded value or raise StopIteration.
- abstract throw(typ, val=None, tb=None)¶
Raise an exception in the coroutine. Return next yielded value or raise StopIteration.
- class zrq.typing.Generator¶
from
collections.abc.Generator- close()¶
Raise GeneratorExit inside generator.
- abstract send(value)¶
Send a value into the generator. Return next yielded value or raise StopIteration.
- abstract throw(typ, val=None, tb=None)¶
Raise an exception in the generator. Return next yielded value or raise StopIteration.
- class zrq.typing.Generic¶
from
typing.Generic
- zrq.typing.TracebackType¶
alias of
TracebackType
- class zrq.typing.TypeVar(name, *constraints, bound=None, covariant=False, contravariant=False)¶
from
typing.TypeVar
- zrq.typing.assert_type(val, typ, /)¶
Ask a static type checker to confirm that the value is of the given type.
At runtime this does nothing: it returns the first argument unchanged with no checks or side effects, no matter the actual type of the argument.
When a static type checker encounters a call to assert_type(), it emits an error if the value is not of the specified type:
def greet(name: str) -> None: assert_type(name, str) # OK assert_type(name, int) # type checker error
- zrq.typing.cast(typ, val)¶
from
typing.cast
- zrq.typing.get_type_hints(obj, globalns=None, localns=None, include_extras=False)¶
- zrq.typing.overload(func)¶
from
typing.overload
Context¶
Provides settings and a (lazy) Redis connector for the runtime environment.
- class zrq.context.Constants(ENV_PREFIX: str = 'ZRQ_', GROUP_PREFIX: str = 'group:', QUEUE_PREFIX: str = 'queue:')¶
- class zrq.context.Settings(env_prefix: dataclasses.InitVar[str], REDIS_URL: str = 'redis://localhost:6379/0', DEFAULT_QUEUE: str = 'default', WORKER_SIZE: int = 100, WORKER_LATENCY: float = 0.1, JOB_MAX_RETRY: int = 3, JOB_TIMEOUT: int = 0)¶
- Parameters:
- lock()¶
Locking updates to ensure consistency.
- Return type:
None
- class zrq.context.Redis(settings)¶
Default Redis client lazy constructor.
Used when a client aren’t provided on Queue.
- Parameters:
settings (Settings)
- property client: ARedis¶
Returns a Redis client and locks settings for further updates.
- zrq.context.constants = Constants(ENV_PREFIX='ZRQ_', GROUP_PREFIX='group:', QUEUE_PREFIX='queue:')¶
Constants are instancied thus not updatable at all
- zrq.context.settings = Settings(REDIS_URL='redis://localhost:6379/0', DEFAULT_QUEUE='default', WORKER_SIZE=100, WORKER_LATENCY=0.1, JOB_MAX_RETRY=3, JOB_TIMEOUT=0)¶
Settings are built from envvars with a fallback to default value
- zrq.context.redis = <zrq.context.Redis object>¶
The default Redis client built from settings
Exceptions¶
Exposes all ZRQ exceptions.
- exception zrq.exceptions.MissingConsumerIdError¶
Bases:
QueueError
- exception zrq.exceptions.NonConsumableQueueError¶
Bases:
QueueError
- exception zrq.exceptions.NonCallableTaskError¶
Bases:
RuntimeError
- exception zrq.exceptions.ConsistencyError¶
Bases:
RuntimeError
Operational Interface¶
A CLI is provided for day-to-day operations.
Entrypoint¶
usage: zrq [-h] [-V] [-v {DEBUG,INFO,WARNING,ERROR}] [-r REDIS_URL] {queue,worker} ...
Named Arguments¶
- -V, --version
Show program’s version number and exit
Default:
'==SUPPRESS=='- -v, --verbosity
Possible choices: DEBUG, INFO, WARNING, ERROR
Output verbosity level
Default:
INFO- -r, --redis-url
Custom URL to use for connecting to Redis
Default:
'redis://localhost:6379/0'
Queue¶
zrq queue list¶
List existing queues
usage: zrq queue list [-h]
zrq queue size¶
Getting size of a queue
usage: zrq queue size [-h] name
Positional Arguments¶
- name
Desired queue name
Worker¶
Runs a worker
usage: zrq worker [-h] [-s SLOTS] [-p] [queues ...]
Positional Arguments¶
- queues
List of queues to consume
Default:
['default']
Named Arguments¶
- -s, --slots
Worker max concurrent slots
Default:
100- -p, --profile
Enable profiling using cProfile
Default:
False