Skip to content

JetStream

JetStream support lives in io.github.n-hass:natskt-jetstream. It builds on an existing NatsClient connection and exposes stream usage and management, consumer usage and management, and key-value buckets.

If you are not using the natskt-platform module, add the following to your build.gradle.kts or build.gradle

commonMain.dependencies {
    implementation("io.github.n-hass:natskt-jetstream:1.0.0-rc.1")
}

Create A JetStream Client

import io.natskt.NatsClient
import io.natskt.jetstream.JetStreamClient

val client = NatsClient("nats://localhost:4222")
client.connect().getOrThrow()

val js = JetStreamClient(client)

Manage Streams

val stream = js.manager.createStream {
    name = "orders"
    subjects = mutableListOf("orders.>")
}

println(stream.info.value?.config?.name)

The manager also exposes update, delete, purge, listing, consumer operations, and direct message lookup APIs.

Publish With Acknowledgement

val ack = js.publish(
    subject = "orders.created",
    message = """{"id":42}""".encodeToByteArray(),
)

println(ack.seq)

Bind To A Consumer

Attach to an existing consumer:

import io.natskt.jetstream.api.consumer.SubscribeOptions

val consumer = js.subscribe(
    SubscribeOptions.Attach(
        streamName = "orders",
        consumerName = "orders-worker",
        manualAck = true,
    ),
)

Create or update one through the manager first:

val info = js.manager.createOrUpdateConsumer("orders") {
    name = "orders-worker"
    deliverSubject = "orders.consumer"
}

val consumer = js.subscribe(
    SubscribeOptions.Attach("orders", info.name),
)

For push consumers, collect messages from the returned consumer and acknowledge manually when you enable manualAck.

Key-Value Buckets

Create a bucket:

val bucket = js.keyValueManager.create {
    bucket = "profiles"
}

Use a bucket:

js.keyValue("profiles").use { bucket ->
    bucket.put("alice", "admin".encodeToByteArray())

    val entry = bucket.get("alice")
    println(entry.value.decodeToString())
}

KeyValueBucket keeps an internal request subscription alive. Close it when you are finished, or wrap it in use.