Streaming with Multi

Methods that return Multi<T> (or Flow.Publisher<T>) enable streaming subscriptions. Instead of a single response, the client receives a stream of items as JSON-RPC notifications.

Creating a Stream

@JsonRPCApi
public class TickerService {

    public Multi<String> ticker() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(n -> "tick " + n);
    }

    public Multi<StockPrice> prices(String symbol) {
        return priceService.streamPrices(symbol);
    }
}

Subscription Protocol

When a client calls a Multi method, the following exchange happens:

1. Subscribe

The client sends a normal JSON-RPC request:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "TickerService#ticker"
}

2. Acknowledgement

The server responds with a subscription ID:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": "sub-abc-123"
}

3. Items

Each item emitted by the Multi is sent as a JSON-RPC notification:

{
  "jsonrpc": "2.0",
  "method": "subscription",
  "params": {
    "subscription": "sub-abc-123",
    "result": "tick 0"
  }
}

4. Completion

When the stream completes, the server sends a completion notification:

{
  "jsonrpc": "2.0",
  "method": "subscription",
  "params": {
    "subscription": "sub-abc-123",
    "complete": true
  }
}

5. Error

If the stream fails, an error notification is sent instead:

{
  "jsonrpc": "2.0",
  "method": "subscription",
  "params": {
    "subscription": "sub-abc-123",
    "error": {
      "code": -32603,
      "message": "Method [...] failed: ..."
    }
  }
}

Unsubscribing

A client can cancel a subscription by sending an unsubscribe request with the subscription ID:

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "unsubscribe",
  "params": ["sub-abc-123"]
}

The server responds with true if the subscription was found and cancelled, or false if the subscription ID was unknown:

{
  "jsonrpc": "2.0",
  "id": 2,
  "result": true
}

Streaming POJOs

Multi works with any serializable type, not just strings:

@JsonRPCApi
public class SensorService {

    public Multi<SensorReading> readings(String sensorId) {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onItem().transform(n -> readSensor(sensorId));
    }
}

Each item is serialized to JSON automatically via Jackson.