Model Execution#
A basic example of a model calculating the total horizontal acceleration parameter called gTotal.
gTotal = |gLat| + |gLong|.
The sample includes two classes, ModelSample and StreamModel .
Environment setup#
You need to prepare environment variables as follows:
private const string DependencyUrl = "http://localhost:8180/api/dependencies/";
private const string InputTopicName = "ModelsInput";
private const string OutputTopicName = "ModelsOutput";
private const string BrokerList = "localhost";
Before you start your model, create all the necessary topics using Topic management service.
Output format#
Specify output data of the model and publish it to dependency service:
var outputDataFormat = DataFormat.DefineFeed()
.Parameters(new List<string> { "gTotal:vTag" })
.AtFrequency(100)
.BuildFormat();
this.dataFormatId = dataFormatClient.PutAndIdentifyDataFormat(outputDataFormat);
var atlasConfiguration = this.CreateAtlasConfiguration();
this.atlasConfId = this.acClient.PutAndIdentifyAtlasConfiguration(atlasConfiguration);
Subscribe#
Subscribe for input topic streams:
using (var client = new KafkaStreamClient(BrokerList))
using (var outputTopic = client.OpenOutputTopic(OutputTopicName))
using (var pipeline = client.StreamTopic(InputTopicName).Into(streamId => this.CreateStreamPipeline(streamId, outputTopic)))
{
cancellationToken.WaitHandle.WaitOne();
pipeline.Drain();
pipeline.WaitUntilStopped(TimeSpan.FromSeconds(1), CancellationToken.None);
}
Into#
Each stream will raise callback Into() where a new instance of the model for the new stream is created.
This block of code is called for each new stream.
private IStreamInput CreateStreamPipeline(string streamId, IOutputTopic outputTopic)
{
var streamModel = new StreamModel(this.dataFormatClient, outputTopic, this.dataFormatId, this.atlasConfId);
return streamModel.CreateStreamInput(streamId);
}
Stream model#
For each new stream, we create a instance of StreamModel class. StreamModel
Create stream input#
At the beginning of each stream, we create new stream input and output.
public IStreamInput CreateStreamInput(string streamId)
{
// these templates provide commonly-combined data, but you can make your own
var input = new SessionTelemetryDataInput(streamId, dataFormatClient);
var output = new SessionTelemetryDataOutput(outputTopic, this.outputDataFormatId, dataFormatClient);
this.outputFeed = output.DataOutput.BindFeed("");
// we add output format reference to output session.
output.SessionOutput.AddSessionDependency(DependencyTypes.DataFormat, this.outputDataFormatId);
output.SessionOutput.AddSessionDependency(DependencyTypes.AtlasConfiguration, this.outputAtlasConfId);
// automatically propagate session metadata and lifecycle
input.LinkToOutput(output.SessionOutput, identifier => identifier + "_Models");
// we simply formward laps.
input.LapsInput.LapStarted += (s, e) => output.LapsOutput.SendLap(e.Lap);
// we bind our models to specific feed and parameters.
// In this example the gTotalModel() method is used for data modelling and upstreaming
input.DataInput.BindDefaultFeed("gLat:Chassis", "gLong:Chassis").DataBuffered += this.gTotalModel;
return input;
}
gTotal function#
In the callback, each bucket of data is calculated and the result is sent to the output topic. gTotal
private void gTotalModel(object sender, TelemetryDataFeedEventArgs e)
{
var inputData = e.Buffer.GetData();
var data = outputFeed.MakeTelemetryData(inputData.TimestampsNanos.Length, inputData.EpochNanos);
data.TimestampsNanos = inputData.TimestampsNanos;
data.Parameters[0].AvgValues = new double[inputData.TimestampsNanos.Length];
data.Parameters[0].Statuses = new DataStatus[inputData.TimestampsNanos.Length];
for (var index = 0; index < inputData.TimestampsNanos.Length; index++)
{
var gLat = inputData.Parameters[0].AvgValues[index];
var gLong = inputData.Parameters[1].AvgValues[index];
var gLatStatus = inputData.Parameters[0].Statuses[index];
var gLongStatus = inputData.Parameters[1].Statuses[index];
data.Parameters[0].AvgValues[index] = Math.Abs(gLat) + Math.Abs(gLong);
data.Parameters[0].Statuses[index] = (gLatStatus & DataStatus.Sample) > 0 && (gLongStatus & DataStatus.Sample) > 0
? DataStatus.Sample
: DataStatus.Missing;
}
outputFeed.EnqueueAndSendData(data);
Console.Write(".");
}