Skip to content

Model Execution#

A basic example of a model calculating the total horizontal acceleration parameter called gTotal.

gTotal = |gLat| + |gLong|.

The sample includes two classes, ModelSample and StreamModel .

Environment setup#

You need to prepare environment variables as follows:

Environment variables

private const string DependencyUrl = "http://localhost:8180/api/dependencies/";
private const string InputTopicName = "ModelsInput";
private const string OutputTopicName = "ModelsOutput";
private const string BrokerList = "localhost";

Before you start your model, create all the necessary topics using Topic management service.

Output format#

Specify output data of the model and publish it to dependency service:

Output format

var outputDataFormat = DataFormat.DefineFeed()
                            .Parameters(new List<string> { "gTotal:vTag" })
                            .AtFrequency(100)
                            .BuildFormat();

this.dataFormatId = dataFormatClient.PutAndIdentifyDataFormat(outputDataFormat);

var atlasConfiguration = this.CreateAtlasConfiguration();
this.atlasConfId = this.acClient.PutAndIdentifyAtlasConfiguration(atlasConfiguration);

Subscribe#

Subscribe for input topic streams:

using (var client = new KafkaStreamClient(BrokerList))
using (var outputTopic = client.OpenOutputTopic(OutputTopicName))
using (var pipeline = client.StreamTopic(InputTopicName).Into(streamId => this.CreateStreamPipeline(streamId, outputTopic)))
{
    cancellationToken.WaitHandle.WaitOne();
    pipeline.Drain();
    pipeline.WaitUntilStopped(TimeSpan.FromSeconds(1), CancellationToken.None);
}

Into#

Each stream will raise callback Into() where a new instance of the model for the new stream is created.

This block of code is called for each new stream.

private IStreamInput CreateStreamPipeline(string streamId, IOutputTopic outputTopic)
{
    var streamModel = new StreamModel(this.dataFormatClient, outputTopic, this.dataFormatId, this.atlasConfId);

    return streamModel.CreateStreamInput(streamId);
}

Stream model#

For each new stream, we create a instance of StreamModel class. StreamModel

Create stream input#

At the beginning of each stream, we create new stream input and output.

CreateStreamInput

public IStreamInput CreateStreamInput(string streamId)
{
    // these templates provide commonly-combined data, but you can make your own
    var input = new SessionTelemetryDataInput(streamId, dataFormatClient);
    var output = new SessionTelemetryDataOutput(outputTopic, this.outputDataFormatId, dataFormatClient);

    this.outputFeed = output.DataOutput.BindFeed("");

    // we add output format reference to output session.
    output.SessionOutput.AddSessionDependency(DependencyTypes.DataFormat, this.outputDataFormatId);
    output.SessionOutput.AddSessionDependency(DependencyTypes.AtlasConfiguration, this.outputAtlasConfId);

    // automatically propagate session metadata and lifecycle
    input.LinkToOutput(output.SessionOutput, identifier => identifier + "_Models");

    // we simply formward laps.
    input.LapsInput.LapStarted += (s, e) => output.LapsOutput.SendLap(e.Lap);

    // we bind our models to specific feed and parameters.
    // In this example the gTotalModel() method is used for data modelling and upstreaming
    input.DataInput.BindDefaultFeed("gLat:Chassis", "gLong:Chassis").DataBuffered += this.gTotalModel;

    return input;
}

gTotal function#

In the callback, each bucket of data is calculated and the result is sent to the output topic. gTotal

private void gTotalModel(object sender, TelemetryDataFeedEventArgs e)
{
    var inputData = e.Buffer.GetData();

    var data = outputFeed.MakeTelemetryData(inputData.TimestampsNanos.Length, inputData.EpochNanos);

    data.TimestampsNanos = inputData.TimestampsNanos;

    data.Parameters[0].AvgValues = new double[inputData.TimestampsNanos.Length];
    data.Parameters[0].Statuses = new DataStatus[inputData.TimestampsNanos.Length];

    for (var index = 0; index < inputData.TimestampsNanos.Length; index++)
    {
        var gLat = inputData.Parameters[0].AvgValues[index];
        var gLong = inputData.Parameters[1].AvgValues[index];

        var gLatStatus = inputData.Parameters[0].Statuses[index];
        var gLongStatus = inputData.Parameters[1].Statuses[index];

        data.Parameters[0].AvgValues[index] = Math.Abs(gLat) + Math.Abs(gLong);
        data.Parameters[0].Statuses[index] = (gLatStatus & DataStatus.Sample) > 0 && (gLongStatus & DataStatus.Sample) > 0
                                                ? DataStatus.Sample
                                                : DataStatus.Missing;

    }
    outputFeed.EnqueueAndSendData(data);

    Console.Write(".");
}