Creating a Custom Data Adapter — Review#
Info
Complete the Walkthrough first.
This demo project is another example of the data adapter pattern, where data is served to ATLAS without copying and format-shiting.
The initial indexing step is essentially the same for any data adapter.
Generating Sample SQLite Files#
The data generator writes files to the working directory:
- When running from Visual Studio, this will be the bin directory
- When running from the terminal, this will be the current directory
Each file contains three tables:
properties
has the session timestamp, identifier, and detailssamples_100
has three channels of 100Hz data, with Unix timestampssamples_50
has three channels of 50Hz data, with Unix timestamps
The timestamps are all at millisecond precision, with perfect spacing — so they can easily be represented with Periodic Data.
Tip
You can open SQLite files using a database browser, such as:
Each file contains 1 hour of data:
Field | Frequency | Samples | Running Total |
---|---|---|---|
alpha | 100Hz | 360,000 | 360,000 |
beta | 100Hz | 360,000 | 720,000 |
gamma | 100Hz | 360,000 | 1,080,000 |
delta | 50Hz | 180,000 | 1,260,000 |
epsilon | 50Hz | 180,000 | 1,440,000 |
zeta | 50Hz | 180,000 | 1,620,000 |
Indexing the SQLite Files#
The indexing process is completely independent of data generation.
In this example, it could be taking place after an ETL pipeline run, or after detecting new files, or integrating ATLAS with existing data for the first time.
Configuration#
The Configuration should look familiar from the other tutorials.
Channels are described as Periodic, with an interval of 10ms or 20ms (100Hz and 50Hz).
Schema Mappings#
Schema Mappings contain the information needed by a Data Adapter to handle requests.
In this case, each field has an extra property to map it onto one of the two tables.
Example
{
"properties": {},
"field_mappings": [
{
"source_field": "alpha",
"target_channel": 0,
"properties": {
"table": "samples_100"
}
},
{
"source_field": "beta",
"target_channel": 1,
"properties": {
"table": "samples_100"
}
},
{
"source_field": "gamma",
"target_channel": 2,
"properties": {
"table": "samples_100"
}
},
{
"source_field": "delta",
"target_channel": 3,
"properties": {
"table": "samples_50"
}
},
{
"source_field": "epsilon",
"target_channel": 4,
"properties": {
"table": "samples_50"
}
},
{
"source_field": "zeta",
"target_channel": 5,
"properties": {
"table": "samples_50"
}
}
],
"event_mappings": []
}
Sessions#
Sessions are published as follows:
identity
is just the filename (e.g.001.sqlite
)identifier
,timestamp
anddetails
are taken from theproperties
table in each SQLite file- configuration binding to the shared configuration
- data binding:
- data source is
sqlite
to match the Gateway Service configuration - data identity is the full path to the file
- data source is
Important
It's bad practise to expose internal details like file paths through the public API.
The Data Bindings are not available via the Gateway Service, and are treated as both trusted and private.
Data Adapter Service#
This service is based on the ASP.NET Core Web Application template in Visual Studio 2019, using the McLaren MAT.OCS.RTA.Services.AspNetCore NuGet Package to provide common functionality. This library is also used in all the Toolkit Services.
The key modifications are:
Controllers\DemoDataController.cs
— handles the incoming API requestsDemoSampleDataStore.cs
— reads data from SQLite and returns it in ChunksStartup.cs
— ties it all together
Controller#
The MAT.OCS.RTA.Services.AspNetCore package provides a controller base class: BaseDataServiceControllerV2
.
This just needs sub-classing and annotating as shown:
[Route("rta/v2")]
[ApiController]
public class DemoDataController : BaseDataServiceControllerV2
{
public DemoDataController(IEventStore eventStore, ISampleDataStore sampleDataStore) :
base(eventStore, sampleDataStore)
{
}
}
Info
There are two sets of Controller base classes.
MAT.OCS.RTA.Services.AspNetCore.Controllers.API
provides controllers for the RTA public API, like the Gateway Service.MAT.OCS.RTA.Services.AspNetCore.Controllers.Services
provides controllers for microservices like this data adapter.
There are small differences. For example the request route for data requests:
- RTA Public API:
/rta/v2/sessions/{sessionIdentity}/data/{type}/{channels}
- Data microservice:
/rta/v2/data/{dataIdentity}/{type}/{channels}
This emphasizes the difference between the session identity — which is public — and the data identity — which is private and might be bound to multiple sessions.
The base controller class depends on services implementing IEventStore
and ISampleDataStore
(defined in the MAT.OCS.RTA.Services package).
Sample Data Store#
Interface#
The ISampleDataStore
interface works with the MAT.OCS.RTA.Services.AspNetCore to handle the REST API responses.
The library also provides a DefaultSampleDataStore
with a no-op implementation of this interface, so the demo extends this class and adds just one method, handling Periodic Data:
public class DemoSampleDataStore : DefaultSampleDataStore
{
public override async Task<ChunkedResult> GetPeriodicDataChunksAsync(
string dataIdentity,
long? startTime,
long? endTime,
ChannelRangeSet channels,
RequestContext context,
CancellationToken cancellationToken)
{
// ...
return new ChunkedResult(/*...*/);
}
}
ChunkedResult
wraps an IEnumerable<Chunk>
or IAsyncEnumerable<Chunk>
on more recent versions of .NET.
The NuGet package provides formatters which recognise and handle this response automatically.
Calling the Schema Mapping Service#
We'll need an instance of SchemaMappingStoreClient
to retrieve the Schema Mapping.
In ASP.NET Core, this is done by "injecting" the dependency in the constructor.
The framework will instantiate DemoSampleDataStore
and automatically pass in the client.
public class DemoSampleDataStore : DefaultSampleDataStore
{
private readonly SchemaMappingStore.SchemaMappingStoreClient schemaMappingClient;
public DemoSampleDataStore(SchemaMappingStore.SchemaMappingStoreClient schemaMappingClient)
{
this.schemaMappingClient = schemaMappingClient;
}
// ...
}
Once injected, the first step in handling a request is to query the Schema Mapping Service:
SchemaMapping sm;
try
{
sm = await schemaMappingClient.QuerySchemaMappingAsync(
new QuerySchemaMappingRequest
{
DataSource = DataSource,
DataIdentity = dataIdentity,
SelectChannels = channels.ToString()
},
new CallOptions(cancellationToken: cancellationToken));
}
catch (RpcException ex)
{
if (ex.StatusCode == StatusCode.NotFound)
return ChunkedResult.Empty();
throw;
}
A query request (vs. get request) filters the SchemaMapping
using the specified Channels, so the result is ready for use.
In this demo, the data is split between two tables:
samples_100
— 100Hzsamples_50
— 50Hz
Each FieldMapping
has a table
property, so we can apply a little more business logic to query each of the tables using only the relevant fields:
schemaMapping.FieldMappings.Where(field => field.Properties["table"] == table)
These fields are then put together into a simple SQL query.
Encoding Chunks#
Encoding Chunks involves:
-
Widening the time bounds to achieve stable chunk boundaries:
var (startChunkTime, endChunkTime) = ChunkTime.FromSessionTimeRange(startTime, endTime);
-
Buffering the row-oriented database records into column-oriented chunk data:
This is done in the
ChunkingBuffer
class, following a time-based chunking strategy, in two steps.First, collating the samples into
PeriodicData
results:for (var i = 0; i < buffers.Length; i++) { var result = new PeriodicData { ChannelId = fields[i].TargetChannel, Samples = sampleCount, Interval = intervalNanos, StartTimestamp = bufferStartTime, Buffer = ByteString.CopyFrom(MemoryMarshal.AsBytes(buffers[i].AsSpan(0, sampleCount))) }; results[i].Add(result); }
Second, encoding the results into chunks:
var chunks = new Chunk[fields.Count]; for (var i = 0; i < chunks.Length; i++) { // each Chunk contains a compressed PeriodicDataList var data = new PeriodicDataList { PeriodicData = { results[i] } }; // one periodic channel per chunk var chId = fields[i].TargetChannel; var chunkData = ChunkData.EncodePooled(ChunkDataMemoryPool.Shared, data, new[] {chId}); var chunk = new Chunk(unflushedStartTime, unflushedEndTime, chunkData); chunks[i] = chunk; results[i].Clear(); }
The steps are triggered based on the chunk size limits and known maximum data frequency.
Startup#
The Startup.cs
script is where ASP.NET services handle configuration, and setup "services" which handle internal functionality.
Schema Mapping Service#
Setting up the connection to the Schema Mapping Service:
services.AddSingleton<ChannelBase>(_ => GrpcChannel.ForAddress("http://localhost:2682"));
services.AddSingleton<SchemaMappingStore.SchemaMappingStoreClient>();
Registering Services#
Registering Sample Data Store:
services.AddTransient<IEventStore, DefaultEventStore>();
services.AddTransient<ISampleDataStore, DemoSampleDataStore>();
Boiler-plate to enable RTA library support:
services.AddRTA();
Respond to GET /
#
Add a response for a top-level GET request, so that the Gateway Service can see that the Data Adapter is reachable:
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapGet("/",
async context =>
{
context.Response.ContentType = "text/plain";
await context.Response.WriteAsync("RTA.Examples.DataAdapter.Service");
});
});
Improvements#
Writing Data Adapter services in C# and ASP.NET Core shortens development using the RTA NuGet packages — and even without this advantage, ASP.NET Core is a mature, powerful, cross-platform framework, with a rich ecosystem.
The package ecosystem offers some really useful packages to bring the service up to production standard:
- Configure the service with the ASP.NET Configuration framework
- Add
/health
endpoint with the AspNetCore.HealthChecks NuGet package - Add
/metrics
endpoint with the prometheus-net.AspNetCore NuGet package - Add structured logging with the Serilog.AspNetCore NuGet package