Queues - Consume
Overview
Consuming from a queue allows your application to process background jobs published by other parts of your system. A consumer represents a worker that receives jobs, executes user-defined logic, and explicitly acknowledges completion.
In RelayX, queue consumption is explicit and stateful. You must initialize the queue first, then register a consumer on that queue. Once registered, RelayX delivers jobs to the consumer based on consumer group membership and flow control rules.
This API is used to build workers for tasks such as email delivery, background processing, data enrichment, and any work that should not block real-time request paths.
How This Works
At runtime, queue consumption follows a strict sequence:
- Your application initializes the RelayX client.
- The client connects to the RelayX backend.
- A queue is explicitly initialized using
initQueue. - A consumer is registered on that queue using
consume(config, callback). - Jobs are delivered to the callback one at a time, respecting consumer group and backpressure rules.
- Each job must be acknowledged (
ack) or retried (nack) explicitly.
A job is considered successfully processed only after your code calls ack().
If a job is not acknowledged within the configured time window, RelayX treats it as failed and schedules redelivery.
API Usage
Initializing the Queue
Before consuming, the queue must be initialized.
import relay.Realtime
import relay.models.RealtimeConfig
import kotlinx.coroutines.runBlocking
import java.io.File
val realtime = Realtime()
realtime.apiKey = "<API KEY>"
realtime.secretKey = "<SECRET KEY>"
realtime.filesDir = File.createTempFile("test", "dir").parentFile!!
val config = RealtimeConfig()
realtime.init(config)
runBlocking {
realtime.connect()
}
Initializing a Queue
Before consuming jobs, explicitly initialize the queue.
runBlocking {
val workerQueue = realtime.initQueue("<QUEUE_ID>")
if (workerQueue == null) {
println("Failed to initialize queue")
return@runBlocking
}
}
<QUEUE_ID>uniquely identifies the queue.- The queue ID is found in the worker queue section of the developer console.
initQueue()is a suspend function that returns aQueue?object (nullable).
All publish and subscribe operations are performed through this queue instance.
Registering a Consumer
Consumption is started using the consume method.
runBlocking {
workerQueue.consume(config) { job ->
// Process job
}
}
Note: consume() is a suspend function and must be called from a coroutine scope.
Consumer Configuration
The config is a ConsumerConfig object that controls how jobs are delivered and retried.
Required fields
These fields must be provided.
-
name: String?A unique name for the consumer. -
group: String?The consumer group this consumer belongs to. Jobs are load-balanced across consumers in the same group. -
topic: String?The queue topic to consume jobs from.
Optional fields
-
ack_wait: Long?(seconds) Maximum time allowed for the consumer to acknowledge a job. If exceeded, the job is redelivered. -
back_off: List<Long>?(array of seconds) Retry schedule used for redelivery when jobs are not acknowledged. Note: The property name isback_off(with underscore), notbackoff. -
max_deliver: Long?(integer) Maximum number of delivery attempts. Default is-1, meaning the job will be retried until acknowledged. -
max_ack_pending: Long?(integer) Maximum number of unacknowledged jobs allowed at once. When reached, delivery is paused until acknowledgements arrive.
Example Configuration
import relay.models.ConsumerConfig
val config = ConsumerConfig().apply {
name = "email-worker-1"
group = "email-workers"
topic = "email-jobs"
ack_wait = 30L
back_off = listOf(5L, 10L, 30L)
max_deliver = 5L
max_ack_pending = 10L
}
Code Example
import relay.Realtime
import relay.models.RealtimeConfig
import relay.models.ConsumerConfig
import relay.models.QueueMessage
import kotlinx.coroutines.runBlocking
import java.io.File
import java.time.Duration
val realtime = Realtime()
realtime.apiKey = "<API KEY>"
realtime.secretKey = "<SECRET KEY>"
realtime.filesDir = File.createTempFile("test", "dir").parentFile!!
val config = RealtimeConfig()
realtime.init(config)
runBlocking {
realtime.on(Realtime.CONNECTED) { status ->
val connected = status as Boolean
if (!connected) {
println("Auth failure :(")
return@on
}
runBlocking {
val workerQueue = realtime.initQueue("<QUEUE_ID>")
if (workerQueue == null) {
println("Failed to initialize queue")
return@runBlocking
}
val consumerConfig = ConsumerConfig().apply {
name = "email-worker-1"
group = "email-workers"
topic = "email-jobs"
ack_wait = 30L
back_off = listOf(5L, 10L, 30L)
max_deliver = 5L
max_ack_pending = 10L
}
workerQueue.consume(consumerConfig) { job ->
try {
val queueJob = job as QueueMessage
println("Received job: ${queueJob.message}")
// Process job here
queueJob.ack()
} catch (err: Exception) {
println("Job processing failed: ${err.message}")
// Explicitly retry after a delay (in milliseconds)
val queueJob = job as QueueMessage
queueJob.nack(15000L)
}
}
}
}
realtime.connect()
}
Job Object
The job passed to the callback is a QueueMessage object.
data class QueueMessage(
val message: Message,
val ack: () -> Unit,
val nack: (Long) -> Unit
)
Available Properties
job.message: MessageContains the job data. This is aMessageobject with the following properties:message.id: String?— Unique identifier for the jobmessage.room: String?— The queue topic this job was received frommessage.message: Any?— The job payload published by the producermessage.start: Long?— Timestamp when the job was created (milliseconds)
Available Methods
-
job.ack()Marks the job as successfully processed. The job will not be delivered again. -
job.nack(delayMillis: Long)Indicates job failure and schedules redelivery after the given delay.- Parameter:
delayMillis— delay in milliseconds before redelivery - This value overrides the configured
back_offschedule for that retry.
- Parameter:
Failure & Edge Cases
Callback Errors
If your callback throws or rejects:
- The job is not acknowledged
- RelayX schedules redelivery based on retry rules
- The job may be delivered to another consumer in the same group
Using nack(delayMillis)
nack(delayMillis)explicitly signals failure- The job is retried after
delayMillis(in milliseconds) - This ignores the back_off array for that retry
Consumer Crashes
If the process crashes:
- Unacknowledged jobs are considered failed
- Jobs are redelivered to another consumer
Detaching and Deleting Consumers
Detaching a Consumer
workerQueue.detachConsumer("<Topic>")
- Stops receiving jobs for the topic
- Does not delete the consumer
- Safe for temporary shutdowns
- This is a regular function (not a suspend function)
Deleting a Consumer
runBlocking {
val deleted = workerQueue.deleteConsumer("<Consumer Name>")
if (deleted) {
println("Consumer deleted successfully")
} else {
println("Failed to delete consumer")
}
}
- Permanently deletes the consumer
- All processes using this consumer stop receiving jobs
- Use with extreme caution
- This is a suspend function that returns a
Boolean
Common Mistakes
- Forgetting to call
ack() - Assuming jobs run exactly once
- Using
deleteConsumerinstead ofdetachConsumer - Expecting
publishsuccess to mean job completion
Notes & Limitations
- Consumers are stateful entities
- Acknowledgement timing directly affects redelivery
- Misconfigured
max_ack_pendingcan stall delivery - Always design handlers to be idempotent
Join our Discord server, post your concern & someone from our team will help you out ✌️