Streaming to Redis#
Redis Streams provides a buffer between telemetry ingest and the Stream Service implementing the WebSocket protocol.
Why Redis?
Redis Streams is a similar design to Apache Kafka.
Redis was chosen because:
- it's generally considered to be easier to deploy and manage, with few configuration options
- it's easier to implement lookups for the current Session JSON and time-range
- it's then also available for caching data from the REST API (likely to be needed at some point)
Redis provides the communication layer, and this page defines a protocol layered over the top.
Info
This protocol is implemented by the MAT.OCS.RTA.StreamBuffer NuGet Package (.NET Core).
Stream Management#
Messages are sent to named Redis Streams.
As a guideline, create a new stream for each source of data, and sub-divide further as needed.
You can have as few or as many named streams as you like, but:
- Streams need to be trimmed so the buffer doesn't grow indefinitely
- This is done automatically by the Stream Service,
but doesn't work effectively if new streams are being constantly created.
They should be persistent. - Streams can't be read selectively
- The Stream Service will need to consume all data from a stream
— even if it has a single client subscribed to a single session.
If there are too many concurrent sessions, this becomes inefficient.
As an example, if the source of the data is Kafka, consider creating a stream for each topic partition.
Session Metadata Updates#
ATLAS will not show a Session in the Session Browser until it is available from the REST API,
along with Configuration. The session needs to be in the "open"
state and
have configuration bindings.
The WebSocket protocol also requires the Session JSON to be available before any data can be streamed to a client, which means that the Session needs to be published both via the REST API and via Redis and the WebSocket protocol.
It might be difficult to keep these consistent — particularly if the ingest pipeline is split into different processes.
Here are some guidelines:
-
Make sure your ingest process will publish the Session via the REST API promptly.
If it's a slow-starting batch process, consider publishing a record (e.g. to the Session Service) at the start of live streaming — which can then be updated. -
The Session JSON model can leave the details and configuration bindings set to
null
in the stream message to ensure the client uses the version in the REST API — or vice versa. -
The Session time-range does not need to be updated in the JSON at high frequency since there is a dedicated stream protocol message for this aspect. But it should ideally be updated in the REST API every 5-30 seconds so progress is indicated in the ATLAS Session Browser.
-
Make sure the Session state is updated to
"closed"
in the stream and ultimately in the REST API. The WebSocket protocol is designed so that it doesn't matter if the two are slightly out of sync.
Protocol#
There are three Redis data structures to update:
stream:*
keys are Streams carrying compressedStreamDataBurst
messagesstreams
is a Set listing all the Streamssessions
is a Hash locating each Session and containing the latest JSON model and time-range
Streams#
Follow a naming convention where each Redis Stream is named with the stream:
prefix.
For example: rocket-engine
becomes stream:rocket-engine
A Redis Message is made up of name-value pairs. For this protocol, they are:
The XADD
command appends a message to a stream.
Message "id"
#
A Redis Stream can carry multiple concurrent Sessions, so each message has an "id"
field
specifying a stream session ID.
This is a simple number. It should be tracked in a key named lastStreamSessionId
and atomically polled and incremented
using the INCRBY
command.
Message "data"
#
The core message payload in the "data"
field is an LZ4-compressed StreamDataBurst
,
as described for the WebSocket protocol.
Important
Aim for messages that are roughly 64 KiB (before compression), and do not exceed 1 MiB.
This ensures that the messages contain enough data that the protocol overhead is insignificant, but are small enough that the Stream Service can process data efficiently by reusing memory buffers.
Streams Set#
Maintain a streams
Set containing all stream names (without the stream:
prefix).
This enables the Stream Service to discover all streams (for trimming) without scanning for keys — which is discouraged for clustered deployments.
The SADD
command adds an entry to a Set.
Sessions Hash#
Maintain a sessions
Hash containing a description of all sessions.
Each entry should use the Session Identity as the key, and must define the following value fields:
"streamSessionId"
-
- Corresponds to the
"id"
field in each stream message - Enables the Stream Service to filter for the correct session
- Corresponds to the
"streamName"
-
- Corresponds to the stream name containing the session (without the
stream:
prefix) - Enables the Stream Service to subscribe to the correct stream
- Corresponds to the stream name containing the session (without the
"streamPosition"
-
- Current stream position when the first stream message was appended, or
0-0
if the stream is new - Enables the Stream Service to scan fewer messages when catching up to the leading edge
- Current stream position when the first stream message was appended, or
"lastUpdated"
-
- Time (milliseconds since Unix epoch) this Hash was last updated
- Enables a cleanup job to identify streams and sessions that haven't updated in a long time
Every time a message is streamed to update the Session JSON, this field must be added or updated:
"json"
-
- Session JSON
- Enables the Stream Service to give the client the latest Session model when it connects
Every time a message is streamed to update the Session time-range, these fields must be added or updated as a pair:
"tStart"
-
- Earliest timestamp in the Session, in nanoseconds since the Unix epoch
"tEnd"
-
- Latest timestamp in the Session, in nanoseconds since the Unix epoch
Important
A Session is not ready to be streamed via the WebSocket protocol until the "json"
field is available.
It is not necessary to keep the time-range up to date in the JSON since this is covered by "tStart"
and "tEnd"
,
but the JSON should be updated periodically in the REST API so users can see an indication of activity in the ATLAS Session Browser.
The HMSET
command updates the fields for a Hash entry.
Entries in the sessions
hash should be set to expire automatically after a reasonable interval.