Skip to main content

Queues - Consume

Overview

Consuming from a queue allows your application to process background jobs published by other parts of your system. A consumer represents a worker that receives jobs, executes user-defined logic, and explicitly acknowledges completion.

In RelayX, queue consumption is explicit and stateful. You must initialize the queue first, then register a consumer on that queue. Once registered, RelayX delivers jobs to the consumer based on consumer group membership and flow control rules.

This API is used to build workers for tasks such as email delivery, background processing, data enrichment, and any work that should not block real-time request paths.


How This Works

At runtime, queue consumption follows a strict sequence:

  1. Your application initializes the RelayX client.
  2. The client connects to the RelayX backend.
  3. A queue is explicitly initialized using initQueue.
  4. A consumer is registered on that queue using consume(config, callback).
  5. Jobs are delivered to the callback one at a time, respecting consumer group and backpressure rules.
  6. Each job must be acknowledged (ack) or retried (nack) explicitly.

A job is considered successfully processed only after your code calls ack(). If a job is not acknowledged within the configured time window, RelayX treats it as failed and schedules redelivery.


API Usage

Initializing the Queue

Before consuming, the queue must be initialized.

from relayx_py import Realtime
import os

client = Realtime({
"api_key": os.getenv("RELAYX_API_KEY", None),
"secret": os.getenv("RELAYX_API_SECRET", None)
})

client.init()

await client.connect()

Initializing a Queue

Before consuming jobs, explicitly initialize the queue.

worker_queue = await client.initQueue("<QUEUE_ID>")
  • <QUEUE_ID> uniquely identifies the queue.
  • The queue ID is found in the worker queue section of the developer console.

All publish and subscribe operations are performed through this queue instance.


Registering a Consumer

Consumption is started using the consume method.

await worker_queue.consume(config, callback)

Consumer Configuration

The config object controls how jobs are delivered and retried.

Required fields

These fields must be provided.

  • name
    A unique name for the consumer.

  • group
    The consumer group this consumer belongs to. Jobs are load-balanced across consumers in the same group.

  • topic
    The queue topic to consume jobs from.

Optional fields
  • ack_wait (seconds)
    Maximum time allowed for the consumer to acknowledge a job. If exceeded, the job is redelivered.

  • backoff (array of seconds)
    Retry schedule used for redelivery when jobs are not acknowledged.

  • max_deliver (integer)
    Maximum number of delivery attempts. Default is -1, meaning the job will be retried until acknowledged.

  • max_ack_pending (integer)
    Maximum number of unacknowledged jobs allowed at once. When reached, delivery is paused until acknowledgements arrive.

Example Configuration

config = {
"name": "email-worker-1",
"group": "email-workers",
"topic": "email-jobs",
"ack_wait": 30,
"backoff": [5, 10, 30],
"max_deliver": 5,
"max_ack_pending": 10,
}

Code Example

from relayx_py import Realtime
import os

client = Realtime({
"api_key": os.getenv("RELAYX_API_KEY", None),
"secret": os.getenv("RELAYX_API_SECRET", None)
})

client.init()

async def onJobReceived(job):
try:
print("Received job:", job.message)

# process job here

await job.ack()
except Exception as err:
print("Job processing failed")
print(err)

# Explicitly retry after a delay
await job.nack(15000)

async def onConnect(status):
if not status:
print("Auth failure :(")
return

worker_queue = await client.initQueue("<QUEUE_ID>")

config = {
"name": "email-worker-1",
"group": "email-workers",
"topic": "email-jobs",
"ack_wait": 30,
"backoff": [5, 10, 30],
"max_deliver": 5,
"max_ack_pending": 10,
}

await worker_queue.consume(config, onJobReceived)

await client.on(Realtime.CONNECTED, onConnect)
await client.connect()

Job Object

The job passed to the callback is an instance of the Message class.

Available Properties

  • job.id
    Unique identifier for the job.

  • job.message
    The job payload published by the producer.

  • job.topic
    The queue topic this job was received from.

Available Methods

  • job.ack()
    Marks the job as successfully processed. The job will not be delivered again.

  • job.nack(millis)
    Indicates job failure and schedules redelivery after the given delay. The millis value overrides the configured backoff schedule.


Failure & Edge Cases

Callback Errors

If your callback throws or rejects:

  • The job is not acknowledged
  • RelayX schedules redelivery based on retry rules
  • The job may be delivered to another consumer in the same group

Using nack(millis)

  • nack(millis) explicitly signals failure
  • The job is retried after millis
  • This ignores the backoff array for that retry

Consumer Crashes

If the process crashes:

  • Unacknowledged jobs are considered failed
  • Jobs are redelivered to another consumer

Detaching and Deleting Consumers

Detaching a Consumer

await worker_queue.detachConsumer("<Topic>")
  • Stops receiving jobs for the topic
  • Does not delete the consumer
  • Safe for temporary shutdowns

Deleting a Consumer

await worker_queue.deleteConsumer("<Consumer Name>")
  • Permanently deletes the consumer
  • All processes using this consumer stop receiving jobs
  • Use with extreme caution

Common Mistakes

  • Forgetting to call ack()
  • Assuming jobs run exactly once
  • Using deleteConsumer instead of detachConsumer
  • Expecting publish success to mean job completion

Notes & Limitations

  • Consumers are stateful entities
  • Acknowledgement timing directly affects redelivery
  • Misconfigured max_ack_pending can stall delivery
  • Always design handlers to be idempotent


Need Help?

Join our Discord server, post your concern & someone from our team will help you out ✌️