Skip to content

Streaming to Redis#

Redis Streams provides a buffer between telemetry ingest and the Stream Service implementing the WebSocket protocol.

Why Redis?

Redis Streams is a similar design to Apache Kafka.

Redis was chosen because:

  • it's generally considered to be easier to deploy and manage, with few configuration options
  • it's easier to implement lookups for the current Session JSON and time-range
  • it's then also available for caching data from the REST API (likely to be needed at some point)

Redis can be run on Windows using Docker.

Redis provides the communication layer, and this page defines a protocol layered over the top.

Info

This protocol is implemented by the MAT.OCS.RTA.StreamBuffer NuGet Package (.NET Core).

Stream Management#

Messages are sent to named Redis Streams.

As a guideline, create a new stream for each source of data, and sub-divide further as needed.

You can have as few or as many named streams as you like, but:

Streams need to be trimmed so the buffer doesn't grow indefinitely
This is done automatically by the Stream Service, but doesn't work effectively if new streams are being constantly created.
They should be persistent.
Streams can't be read selectively
The Stream Service will need to consume all data from a stream — even if it has a single client subscribed to a single session.
If there are too many concurrent sessions, this becomes inefficient.

As an example, if the source of the data is Kafka, consider creating a stream for each topic partition.

Session Metadata Updates#

ATLAS will not show a Session in the Session Browser until it is available from the REST API, along with Configuration. The session needs to be in the "open" state and have configuration bindings.

The WebSocket protocol also requires the Session JSON to be available before any data can be streamed to a client, which means that the Session needs to be published both via the REST API and via Redis and the WebSocket protocol.

It might be difficult to keep these consistent — particularly if the ingest pipeline is split into different processes.

Here are some guidelines:

  1. Make sure your ingest process will publish the Session via the REST API promptly.
    If it's a slow-starting batch process, consider publishing a record (e.g. to the Session Service) at the start of live streaming — which can then be updated.

  2. The Session JSON model can leave the details and configuration bindings set to null in the stream message to ensure the client uses the version in the REST API — or vice versa.

  3. The Session time-range does not need to be updated in the JSON at high frequency since there is a dedicated stream protocol message for this aspect. But it should ideally be updated in the REST API every 5-30 seconds so progress is indicated in the ATLAS Session Browser.

  4. Make sure the Session state is updated to "closed" in the stream and ultimately in the REST API. The WebSocket protocol is designed so that it doesn't matter if the two are slightly out of sync.

Protocol#

There are three Redis data structures to update:

  • stream:* keys are Streams carrying compressed StreamDataBurst messages
  • streams is a Set listing all the Streams
  • sessions is a Hash locating each Session and containing the latest JSON model and time-range

Streams#

Follow a naming convention where each Redis Stream is named with the stream: prefix.
For example: rocket-engine becomes stream:rocket-engine

A Redis Message is made up of name-value pairs. For this protocol, they are:

  • "id" — distinguishing the session
  • "data" — LZ4-compressed protobuf payload

The XADD command appends a message to a stream.

Message "id"#

A Redis Stream can carry multiple concurrent Sessions, so each message has an "id" field specifying a stream session ID.

This is a simple number. It should be tracked in a key named lastStreamSessionId and atomically polled and incremented using the INCRBY command.

Message "data"#

The core message payload in the "data" field is an LZ4-compressed StreamDataBurst, as described for the WebSocket protocol.

Important

Aim for messages that are roughly 64 KiB (before compression), and do not exceed 1 MiB.

This ensures that the messages contain enough data that the protocol overhead is insignificant, but are small enough that the Stream Service can process data efficiently by reusing memory buffers.

Streams Set#

Maintain a streams Set containing all stream names (without the stream: prefix).

This enables the Stream Service to discover all streams (for trimming) without scanning for keys — which is discouraged for clustered deployments.

The SADD command adds an entry to a Set.

Sessions Hash#

Maintain a sessions Hash containing a description of all sessions.

Each entry should use the Session Identity as the key, and must define the following value fields:

"streamSessionId"
  • Corresponds to the "id" field in each stream message
  • Enables the Stream Service to filter for the correct session
"streamName"
  • Corresponds to the stream name containing the session (without the stream: prefix)
  • Enables the Stream Service to subscribe to the correct stream
"streamPosition"
  • Current stream position when the first stream message was appended, or 0-0 if the stream is new
  • Enables the Stream Service to scan fewer messages when catching up to the leading edge
"lastUpdated"
  • Time (milliseconds since Unix epoch) this Hash was last updated
  • Enables a cleanup job to identify streams and sessions that haven't updated in a long time

Every time a message is streamed to update the Session JSON, this field must be added or updated:

"json"
  • Session JSON
  • Enables the Stream Service to give the client the latest Session model when it connects

Every time a message is streamed to update the Session time-range, these fields must be added or updated as a pair:

"tStart"
  • Earliest timestamp in the Session, in nanoseconds since the Unix epoch
"tEnd"
  • Latest timestamp in the Session, in nanoseconds since the Unix epoch

Important

A Session is not ready to be streamed via the WebSocket protocol until the "json" field is available.

It is not necessary to keep the time-range up to date in the JSON since this is covered by "tStart" and "tEnd", but the JSON should be updated periodically in the REST API so users can see an indication of activity in the ATLAS Session Browser.

The HMSET command updates the fields for a Hash entry.

Entries in the sessions hash should be set to expire automatically after a reasonable interval.