Skip to content

Streaming protocol#

Topics, Streams and Sessions#

Topic

A topic should represent a meaningful subset of parameters, where a consumer is likely to be interested in most of the data on the topic. It can carry multiple concurrent sessions, or streams.

Stream

Streams are labelled sequences of messages within a topic. There can be many concurrent streams in one topic, which is particularly useful if there are many sources of data.

Session

Similar to the ATLAS Session concept, and necessary for interaction with the ATLAS ecosystem. Implemented on top of Streams.

Time#

Stream sessions use nanosecond data precision, relative to a specified epoch (also in ns).

For more information, read the section It's all about Time.

Message structure#

Kafka messages have a key and a value.

Keys#

In ATLAS Advanced Streams, the key is always structured in two parts, like this:

<type>:<stream id>

The type indicates the message content, which allows a client to select an appropriate parser for the message value. This article includes a defined list of message types, but a conforming client must accept (and likely ignore) other message types to allow extensibility.

The stream id distinguishes the stream to which the message belongs. Messages with the same stream id must be sent on the same partition to maintain message sequencing; note that this means that default Kafka stream of distributing messages by key hash must not be used.

Keys must be encoded as UTF-8 with no BOM.

Values#

The value format is determined by the message type.

Values are typically either empty, or well-formed JSON - see attached schemas.
JSON values should be encoded as UTF-8 with no BOM.

Clients must not assume that all messages are JSON; future versions of the protocol may include more compact binary encodings, with their own message types.

Messages#

$start#

Indicates the start of a stream - enables clients to identify partial streams.\ Clients must not send stream messages before sending $start.\ This message has an empty payload.

$end#

Indicates the end of a stream.\ Clients must not send stream messages after sending $end.\ This message has an empty payload.

session#

Identifies the stream as a session.\ Contains descriptive metadata, traceability and references to dependencies.

The session message is repeated at intervals to ensure that a client can join a live session and establish all necessary context, and as a heartbeat so that clients can determine whether the upstream process generating the session is still alive.

Example:

{
  "id": "f2b8755c-c426-4cdc-9e51-ee7cd95a7879",
  "state": "open",
  "identifier": "random_walk",
  "dependencies": {
    "atlasConfiguration": ["e7548e787967"],
    "dataFormat": ["9a9627b7bc25"]
  },
  "activity": {
    "start": "2017-11-19T15:00:00Z",
    "durationNanos": 30000000
  }
}

Notice the declaration of both dataFormat and atlasConfiguration dependencies. This is a minimum requirement to use the ATLAS stream recorder.

session.schema.json

tdata#

Contains telemetry data, on a common timebase.

Example:

{
  "epoch": 1511091963784000000,
  "time": [10000000, 20000000],
  "data": [
    {
      "status": [1, 1],
      "avg": [0.0, 10.0]
    }
  ],
  "feed": "",
  "format": "9a9627b7bc25"
}

Aggregate arrays can include: avg, min, max, & first.\ Status values reflect results of data retrieval, and are defined as the following bitwise flags:

Value Name Description
0 Missing Missing sample
1 Sample Valid sample; if there is at least one sample in the interval.
2 Default Default sample; typically due to sensor failure.
4 Before Start Data was requested before the first available sample.
8 After End Data was requested after the last available sample.
16 Incomplete Interval included missing samples. Common when down-sampling.
32 Interpolated Sample is interpolated. Also common.
64 Pending Sample is being fetched. Unlikely to be seen in streams.
128 Gap Gap in data.

These statuses provide detailed information for models that require it, but in general, all values are useful except when NaN.\ See Data Feeds, Formats and Views for information about the feed and format fields.

tdata.schema.json

tsamples#

Contains unaggregated telemetry data. Essentially used for data ingest - where aggregates and statuses are not useful. The data field is a dictionary of parameters containing a one to one representation of timestamps and values.

Example:

{
  "feed": "5hz",
  "format": "9eb6ba4140f1",
  "data": {
    "parameter1": {
      "values": [
        53.424999237060547,
        53.325000762939453,
      ],
      "epoch": 1581033600000000000,
      "time": [
        37132482000000,
        37132682000000,
      ]
    },
    "parameter2": {
      "values": [
        47.724998474121094,
        47.5099983215332,
      ],
      "epoch": 1581033600000000000,
      "time": [
        37132882000000,
        37133082000000,
      ]
    },
  }
}

tsamples.schema.json

events#

A discrete event at a given timestamp. These could be events such as gear shifts or doors opening. The values array contains any numerical values associated with the event. The parameters of which are described in the event definition.

Example:

{
  "epoch": 1511091963784000000,
  "time": 10000000,
  "id": "IO2-2B02:TAG320BIOS",
  "status": "Active",
  "values": [
    10.0, 20.0
  ]
}

events.schema.json

lap#

Denotes a lap trigger.

Example:

{
  "number": 0,
  "epoch": 1511049600000000000,
  "time": 50639840000000,
  "type": "outLap",
  "triggerSource": 3
}

Trigger sources include:

Value Name
0 Main straight
1 Pit lane
2 Default
3 Telemetry start
4 Telemetry end

These triggers give rise to business logic to describe the lap type, which is one of:
fastLap, pitLane, outLap, or inLap. The exact business logic is outside the scope of this specification and may vary slightly by race formula.

lap.schema.json

sync#

Sync messages create synchronization points across message types.

See It's all about Time for more information.