Skip to content

InfluxDB Data Adapter — Review#

Info

Complete the Walkthrough first.

This demo project illustrates the data adapter pattern, where data is served to ATLAS without copying and format-shifting. In this tutorial, the store is InfluxDB — but since the data retrieval mechanics are quite simple, it could be any of a wide range of technologies.

Sample Code Structure#

The sample project has four key steps:

1
2
3
4
var dataIdentity = await WriteDataAsync(influxUri, sessionTag, startNanos, durationNanos, intervalNanos);
await WriteSchemaMappingAsync(schemaMappingClient, dataIdentity);
var configIdentifier = await WriteConfigAsync(configClient);
await WriteSessionAsync(sessionClient, sessionIdentity, dataIdentity, timestamp, startNanos, durationNanos, configIdentifier);

These steps are:

  1. Write data to InfluxDB;
    This step is just providing example data, and really has nothing RTA-specific at all.
  2. Describe how the database fields will map onto Timestamped Data requests;
    The data adapter (Influx Data Service) will use this to handle the requests.
  3. Describe the fields as Configuration;
    ATLAS will use this to show the user can see what parameters (fields) are available.
  4. Publish the Session;
    ATLAS will use this to provide data browsing and search.

Writing Data to InfluxDB#

The first stage is just creating sample data.

The data adapter pattern only cares about reading data: it does not consider how the data arrives in storage.

In this demo, the sample project writes data using the InfluxDB line protocol, but it could just as easily be batch-loaded from a CSV file, or streamed from a broker.

Defining the Schema Mapping#

ATLAS does not know anything about how the data is stored. Configuration provides the client with a catalog of parameters and all the information it needs to make requests to the REST API — but it doesn't contain any information about database tables and fields, or file paths and offsets.

When the data adapter receives a request, it is expressed in terms of:

  • data identity
  • channels
  • time range (optional)

The adapter must map this onto the data source schema to retrieve the data.
Schema Mappings provide a generalizable way to do this.

In this demo, the sample code defines and publishes a schema mapping like this:

var schemaMapping = new SchemaMapping
{
    Properties =
    {
        ["dialect"] = "influxql",
        ["database"] = Database,
        ["measurement"] = Measurement
    },
    FieldMappings =
    {
        Fields.Select((field, f) => new FieldMapping
        {
            SourceField = field,
            TargetChannel = (uint) f
        })
    }
};

await schemaMappingClient.PutSchemaMappingAsync(new PutSchemaMappingRequest
{
    DataSource = DataBindingSource,
    DataIdentity = dataIdentity,
    SchemaMapping = schemaMapping
});

Here is the demo schema mapping with the Fields, DataSource and DataIdentity unrolled:

var schemaMapping = new SchemaMapping
{
    Properties =
    {
        ["dialect"] = "influxql",
        ["database"] = Database,
        ["measurement"] = Measurement
    },
    FieldMappings =
    {
        new FieldMapping {SourceField = "alpha", TargetChannel = 0u},
        new FieldMapping {SourceField = "beta", TargetChannel = 1u},
        new FieldMapping {SourceField = "gamma", TargetChannel = 2u}
    }
};

await schemaMappingClient.PutSchemaMappingAsync(new PutSchemaMappingRequest
{
    DataSource = "rta-influxdatasvc",
    DataIdentity = "session=<guid>",
    SchemaMapping = schemaMapping
});

The data source needs to match the Gateway Service configuration and the Data Binding written to the Session Service.

The data identity and schema mapping properties are specific to the data adapter.

The Influx Data Service README describes its requirements:

  • The data identity must be a filter expression to select data that belongs to the session
    e.g. car_reg='EA04 LFG' AND trip_id='cd15a409'
  • The properties indicate:

Security Note

Take care to avoid query injection vulnerabilities with this data identity.

Un-escaped user-supplied data could allow attackers to manipulate the data retrieval query.

Configuration and Session#

The sample project also publishes the Configuration and Session — which will look familiar if you completed the Quick-Start Tutorial.

Data Pipeline Integration#

Now we have an example where ATLAS can connect to existing data, it's worth thinking about data pipeline integration.

The schema mapping, configuration and session could be published at any stage, including:

  • As the data is being written
  • At the end of each ingest job
  • From a script, to index existing data

These are all very similar — but the first option enables live-streaming to ATLAS.

To see a live-stream version of this InfluxDB setup, check out the next tutorial.