Skip to main content

Queues - Consume

Overview

Consuming from a queue allows your application to process background jobs published by other parts of your system. A consumer represents a worker that receives jobs, executes user-defined logic, and explicitly acknowledges completion.

In RelayX, queue consumption is explicit and stateful. You must initialize the queue first, then register a consumer on that queue. Once registered, RelayX delivers jobs to the consumer based on consumer group membership and flow control rules.

This API is used to build workers for tasks such as email delivery, background processing, data enrichment, and any work that should not block real-time request paths.


How This Works

At runtime, queue consumption follows a strict sequence:

  1. Your application initializes the RelayX client.
  2. The client connects to the RelayX backend.
  3. A queue is explicitly initialized using initQueue.
  4. A consumer is registered on that queue using consume(config, callback).
  5. Jobs are delivered to the callback one at a time, respecting consumer group and backpressure rules.
  6. Each job must be acknowledged (ack) or retried (nack) explicitly.

A job is considered successfully processed only after your code calls ack(). If a job is not acknowledged within the configured time window, RelayX treats it as failed and schedules redelivery.


API Usage

Initializing the Queue

Before consuming, the queue must be initialized.

import relay.Realtime
import relay.models.RealtimeConfig
import kotlinx.coroutines.runBlocking
import java.io.File

val realtime = Realtime()
realtime.apiKey = "<API KEY>"
realtime.secretKey = "<SECRET KEY>"
realtime.filesDir = File.createTempFile("test", "dir").parentFile!!

val config = RealtimeConfig()
realtime.init(config)

runBlocking {
realtime.connect()
}

Initializing a Queue

Before consuming jobs, explicitly initialize the queue.

runBlocking {
val workerQueue = realtime.initQueue("<QUEUE_ID>")

if (workerQueue == null) {
println("Failed to initialize queue")
return@runBlocking
}
}
  • <QUEUE_ID> uniquely identifies the queue.
  • The queue ID is found in the worker queue section of the developer console.
  • initQueue() is a suspend function that returns a Queue? object (nullable).

All publish and subscribe operations are performed through this queue instance.


Registering a Consumer

Consumption is started using the consume method.

runBlocking {
workerQueue.consume(config) { job ->
// Process job
}
}

Note: consume() is a suspend function and must be called from a coroutine scope.

Consumer Configuration

The config is a ConsumerConfig object that controls how jobs are delivered and retried.

Required fields

These fields must be provided.

  • name: String? A unique name for the consumer.

  • group: String? The consumer group this consumer belongs to. Jobs are load-balanced across consumers in the same group.

  • topic: String? The queue topic to consume jobs from.

Optional fields
  • ack_wait: Long? (seconds) Maximum time allowed for the consumer to acknowledge a job. If exceeded, the job is redelivered.

  • back_off: List<Long>? (array of seconds) Retry schedule used for redelivery when jobs are not acknowledged. Note: The property name is back_off (with underscore), not backoff.

  • max_deliver: Long? (integer) Maximum number of delivery attempts. Default is -1, meaning the job will be retried until acknowledged.

  • max_ack_pending: Long? (integer) Maximum number of unacknowledged jobs allowed at once. When reached, delivery is paused until acknowledgements arrive.

Example Configuration

import relay.models.ConsumerConfig

val config = ConsumerConfig().apply {
name = "email-worker-1"
group = "email-workers"
topic = "email-jobs"
ack_wait = 30L
back_off = listOf(5L, 10L, 30L)
max_deliver = 5L
max_ack_pending = 10L
}

Code Example

import relay.Realtime
import relay.models.RealtimeConfig
import relay.models.ConsumerConfig
import relay.models.QueueMessage
import kotlinx.coroutines.runBlocking
import java.io.File
import java.time.Duration

val realtime = Realtime()
realtime.apiKey = "<API KEY>"
realtime.secretKey = "<SECRET KEY>"
realtime.filesDir = File.createTempFile("test", "dir").parentFile!!

val config = RealtimeConfig()
realtime.init(config)

runBlocking {
realtime.on(Realtime.CONNECTED) { status ->
val connected = status as Boolean
if (!connected) {
println("Auth failure :(")
return@on
}

runBlocking {
val workerQueue = realtime.initQueue("<QUEUE_ID>")

if (workerQueue == null) {
println("Failed to initialize queue")
return@runBlocking
}

val consumerConfig = ConsumerConfig().apply {
name = "email-worker-1"
group = "email-workers"
topic = "email-jobs"
ack_wait = 30L
back_off = listOf(5L, 10L, 30L)
max_deliver = 5L
max_ack_pending = 10L
}

workerQueue.consume(consumerConfig) { job ->
try {
val queueJob = job as QueueMessage
println("Received job: ${queueJob.message}")

// Process job here

queueJob.ack()
} catch (err: Exception) {
println("Job processing failed: ${err.message}")

// Explicitly retry after a delay (in milliseconds)
val queueJob = job as QueueMessage
queueJob.nack(15000L)
}
}
}
}

realtime.connect()
}

Job Object

The job passed to the callback is a QueueMessage object.

data class QueueMessage(
val message: Message,
val ack: () -> Unit,
val nack: (Long) -> Unit
)

Available Properties

  • job.message: Message Contains the job data. This is a Message object with the following properties:
    • message.id: String? — Unique identifier for the job
    • message.room: String? — The queue topic this job was received from
    • message.message: Any? — The job payload published by the producer
    • message.start: Long? — Timestamp when the job was created (milliseconds)

Available Methods

  • job.ack() Marks the job as successfully processed. The job will not be delivered again.

  • job.nack(delayMillis: Long) Indicates job failure and schedules redelivery after the given delay.

    • Parameter: delayMillis — delay in milliseconds before redelivery
    • This value overrides the configured back_off schedule for that retry.

Failure & Edge Cases

Callback Errors

If your callback throws or rejects:

  • The job is not acknowledged
  • RelayX schedules redelivery based on retry rules
  • The job may be delivered to another consumer in the same group

Using nack(delayMillis)

  • nack(delayMillis) explicitly signals failure
  • The job is retried after delayMillis (in milliseconds)
  • This ignores the back_off array for that retry

Consumer Crashes

If the process crashes:

  • Unacknowledged jobs are considered failed
  • Jobs are redelivered to another consumer

Detaching and Deleting Consumers

Detaching a Consumer

workerQueue.detachConsumer("<Topic>")
  • Stops receiving jobs for the topic
  • Does not delete the consumer
  • Safe for temporary shutdowns
  • This is a regular function (not a suspend function)

Deleting a Consumer

runBlocking {
val deleted = workerQueue.deleteConsumer("<Consumer Name>")

if (deleted) {
println("Consumer deleted successfully")
} else {
println("Failed to delete consumer")
}
}
  • Permanently deletes the consumer
  • All processes using this consumer stop receiving jobs
  • Use with extreme caution
  • This is a suspend function that returns a Boolean

Common Mistakes

  • Forgetting to call ack()
  • Assuming jobs run exactly once
  • Using deleteConsumer instead of detachConsumer
  • Expecting publish success to mean job completion

Notes & Limitations

  • Consumers are stateful entities
  • Acknowledgement timing directly affects redelivery
  • Misconfigured max_ack_pending can stall delivery
  • Always design handlers to be idempotent


Need Help?

Join our Discord server, post your concern & someone from our team will help you out ✌️