Queues - Consume
Overview
Consuming from a queue allows your application to process background jobs published by other parts of your system. A consumer represents a worker that receives jobs, executes user-defined logic, and explicitly acknowledges completion.
In RelayX, queue consumption is explicit and stateful. You must initialize the queue first, then register a consumer on that queue. Once registered, RelayX delivers jobs to the consumer based on consumer group membership and flow control rules.
This API is used to build workers for tasks such as email delivery, background processing, data enrichment, and any work that should not block real-time request paths.
How This Works
At runtime, queue consumption follows a strict sequence:
- Your application initializes the RelayX client.
- The client connects to the RelayX backend.
- A queue is explicitly initialized using
initQueue. - A consumer is registered on that queue using
consume(config, callback). - Jobs are delivered to the callback one at a time, respecting consumer group and backpressure rules.
- Each job must be acknowledged (
ack) or retried (nack) explicitly.
A job is considered successfully processed only after your code calls ack().
If a job is not acknowledged within the configured time window, RelayX treats it as failed and schedules redelivery.
API Usage
Initializing the Queue
Before consuming, the queue must be initialized.
import { Realtime } from "relayx-js";
const client = new Realtime({
api_key: process.env.RELAYX_API_KEY,
secret: process.env.RELAYX_API_SECRET,
});
await client.init();
await client.connect();
Initializing a Queue
Before consuming jobs, explicitly initialize the queue.
const workerQueue = await client.initQueue("<QUEUE_ID>");
<QUEUE_ID>uniquely identifies the queue.- The queue ID is found in the worker queue section of the developer console.
All publish and subscribe operations are performed through this queue instance.
Registering a Consumer
Consumption is started using the consume method.
await workerQueue.consume(config, callback);
Consumer Configuration
The config object controls how jobs are delivered and retried.
Required fields
These fields must be provided.
-
name
A unique name for the consumer. -
group
The consumer group this consumer belongs to. Jobs are load-balanced across consumers in the same group. -
topic
The queue topic to consume jobs from.
Optional fields
-
ack_wait(seconds)
Maximum time allowed for the consumer to acknowledge a job. If exceeded, the job is redelivered. -
backoff(array of seconds)
Retry schedule used for redelivery when jobs are not acknowledged. -
max_deliver(integer)
Maximum number of delivery attempts. Default is-1, meaning the job will be retried until acknowledged. -
max_ack_pending(integer)
Maximum number of unacknowledged jobs allowed at once. When reached, delivery is paused until acknowledgements arrive.
Example Configuration
const config = {
name: "email-worker-1",
group: "email-workers",
topic: "email-jobs",
ack_wait: 30,
backoff: [5, 10, 30],
max_deliver: 5,
max_ack_pending: 10,
};
Code Example
import { Realtime } from "relayx-js";
const client = new Realtime({
api_key: process.env.RELAYX_API_KEY,
secret: process.env.RELAYX_API_SECRET,
});
client.on(CONNECTED, (status) => {
if(!status){
console.log("Auth failure :("))
return
}
const workerQueue = await client.initQueue("<QUEUE_ID>");
const config = {
name: "email-worker-1",
group: "email-workers",
topic: "email-jobs",
ack_wait: 30,
backoff: [5, 10, 30],
max_deliver: 5,
max_ack_pending: 10,
};
await workerQueue.consume(config, async (job) => {
try {
console.log("Received job:", job.message);
// process job here
await job.ack();
} catch (err) {
console.error("Job processing failed", err);
// Explicitly retry after a delay
job.nack(15000);
}
});
})
await client.init();
await client.connect();
Job Object
The job passed to the callback is an instance of the Message class.
Available Properties
-
job.id
Unique identifier for the job. -
job.message
The job payload published by the producer. -
job.topic
The queue topic this job was received from.
Available Methods
-
job.ack()
Marks the job as successfully processed. The job will not be delivered again. -
job.nack(millis)
Indicates job failure and schedules redelivery after the given delay. Themillisvalue overrides the configured backoff schedule.
Failure & Edge Cases
Callback Errors
If your callback throws or rejects:
- The job is not acknowledged
- RelayX schedules redelivery based on retry rules
- The job may be delivered to another consumer in the same group
Using nack(millis)
nack(millis)explicitly signals failure- The job is retried after
millis - This ignores the backoff array for that retry
Consumer Crashes
If the process crashes:
- Unacknowledged jobs are considered failed
- Jobs are redelivered to another consumer
Detaching and Deleting Consumers
Detaching a Consumer
workerQueue.detachConsumer("<Topic>");
- Stops receiving jobs for the topic
- Does not delete the consumer
- Safe for temporary shutdowns
Deleting a Consumer
await workerQueue.deleteConsumer("<Consumer Name>");
- Permanently deletes the consumer
- All processes using this consumer stop receiving jobs
- Use with extreme caution
Common Mistakes
- Forgetting to call
ack() - Assuming jobs run exactly once
- Using
deleteConsumerinstead ofdetachConsumer - Expecting
publishsuccess to mean job completion
Notes & Limitations
- Consumers are stateful entities
- Acknowledgement timing directly affects redelivery
- Misconfigured
max_ack_pendingcan stall delivery - Always design handlers to be idempotent
Join our Discord server, post your concern & someone from our team will help you out ✌️