Streaming with Multi
Methods that return Multi<T> (or Flow.Publisher<T>) enable streaming subscriptions. Instead of a single response, the client receives a stream of items as JSON-RPC notifications.
Creating a Stream
@JsonRPCApi
public class TickerService {
public Multi<String> ticker() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(n -> "tick " + n);
}
public Multi<StockPrice> prices(String symbol) {
return priceService.streamPrices(symbol);
}
}
Subscription Protocol
When a client calls a Multi method, the following exchange happens:
1. Subscribe
The client sends a normal JSON-RPC request:
{
"jsonrpc": "2.0",
"id": 1,
"method": "TickerService#ticker"
}
2. Acknowledgement
The server responds with a subscription ID:
{
"jsonrpc": "2.0",
"id": 1,
"result": "sub-abc-123"
}
3. Items
Each item emitted by the Multi is sent as a JSON-RPC notification:
{
"jsonrpc": "2.0",
"method": "subscription",
"params": {
"subscription": "sub-abc-123",
"result": "tick 0"
}
}
Unsubscribing
A client can cancel a subscription by sending an unsubscribe request with the subscription ID:
{
"jsonrpc": "2.0",
"id": 2,
"method": "unsubscribe",
"params": ["sub-abc-123"]
}
The server responds with true if the subscription was found and cancelled, or false if the subscription ID was unknown:
{
"jsonrpc": "2.0",
"id": 2,
"result": true
}
Streaming POJOs
Multi works with any serializable type, not just strings:
@JsonRPCApi
public class SensorService {
public Multi<SensorReading> readings(String sensorId) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onItem().transform(n -> readSensor(sensorId));
}
}
Each item is serialized to JSON automatically via Jackson.