The transformation pipeline of raw data in order to associate telemetry data to the analysis dimensions is a topic that is always rarely dealt with.
ADX Data Explorer and Bronze-to-Silver data stage for Process Control
A well-defined approach to transform IoT data within ADX
--
Introduction and Context
One of the most important points inherent in IoT projects is the ability to acquire data taking great care of the input throughput, in order not to occupy the internet channel of a plant. BUT it is even more important to have a standard methodology to transform raw data into structured data, in order to be able to enrich them with the Identifiers associated with the search dimensions.
This article talks about how to approach the problem of transformation of raw data, enriching it with the GUIDs necessary to be able to consume it, from a viewpoint of data that can be searched in a DWH or cataloged in a data lake.
It is important to focus on this because the impact, both in terms of cloud infrastructure costs and development costs, can be very high, and it is not advisable if you want to achieve immediate results, especially in an initial MVP phase with a customer.
What follows is a step by step tutorial on how to structure four essential parts in an IoT project:
1. The process data of an asset (continuous sampled surveys) — [This Post]
2. Production / performance data (machine status data, operational mode) — [Next Post]
3. The dimensions of asset monitoring (CMMS) — [Next Post]
4. The dimensions of production performance monitoring (MES) — [Next Post]
The result you will achieve will be having a standard structured approach to implement this part of IoT projects. By acting in this way, the problem to be solved will once again be the customer’s functional problem, and not the technological problem linked only to the technical implementation part.
Sending the data
There are many schools of thought. Thinking of all the experiences made, if there are no interventions on the process data in NRT, the smartest thing to do is to buffer the data locally and send it directly to ADX through a connection blob storage. For example, in IoT Edge there is the possibility to write bulk data on Blob Storage Edge, appropriately converted, through a custom module, into parquet format (which ensures a lot of savings in throughput).
Let’s consider, for the purpose of this tutorial, a CSV type file as it is immediately readable to the naked eye.
The first table we meet is the bronze stage:
#Create the raw table “TKVvT”.create-merge table bronzestage_rawData (Timestamp:datetime, TagId:string, TagValue:string, Tag_Datatype:string, AssetId:string)#Create table mapping with Asset ID.create-or-alter table bronzestage_rawData ingestion csv mapping "MAPPING-bronzestage_rawData"'['' { "column" : "Timestamp", "DataType":"datetime", "Properties":{"Ordinal":"0"}},'' { "column" : "TagId", "DataType":"string", "Properties":{"Ordinal":"1"}},'' { "column" : "TagValue", "DataType":"string", "Properties":{"Ordinal":"2"}},'' { "column" : "Tag_Datatype", "DataType":"string", "Properties":{"Ordinal":"3"}},'' { "column" : "AssetId", "DataType":"string", "Properties":{"Ordinal":"4"}}'']'#Manage retention policy.alter-merge table bronzestage_rawData policy retention softdelete = 7d recoverability = disabled
I prefer to aim for minimum troughput so what differentiates the assets in a multi asset / multi plant solution is the portion of storage where the various storage edge blobs put the data. For this I prefer not to use the PubSub OPC UA native format, but a Parquet file, already clean.
NB: In my case I prefer to use a storage account for PLANT, and a CONTAINER for ASSET organized in YYYY / MM / DD, in order to be aware of any gaps.
To avoid losing data windows, I prefer not to soft delete less than 7 canonical days. We will talk in another article about how to manage data blackholes and Tzunami data.
Now comes the most important part for the functional part of the process. How to structure the variable tree. Here the most sensible thing is to combine a technical drawing of the machine and logically group the variables into groups, in order to be able, if you want to describe in a widget or in an ad hoc page (in ADX of course) the trend of the precise variables of a certain machine area.
It is possible to expand the configuration table by inserting a Dynamic type field, containing for each operational state of the machine a series of indicators that can determine an anomaly or an explicit outlier. In that case please use PARSE operator ADX to distill json and then navigate into using JSON PATH technique.
# Create variable configuration table, drawing mappings, and rules.create-merge table asset_mapping_configuration (TagId:string, TagValueUnits:string, TwinId:string, TagGroupId:string, TagSubGroupId:string, Metrics: dynamic)
The Metrics field is of the Dynamic type as it can have a variable internal structure. In the case reported here, in fact, it only reports the tolerance limits of the variables, in the various operational states of the machine. Usually this field varies often, in fact it is not always advisable to keep it as an ADX table but as a File Blob or SQL table. Often this table is actually a frontier table of the QMS (Quality Management System), where the operating constraints are reported — depending on the product recipe.
{"running":{"min":100.0,"max":150.0}, "notrunning":{"min":0.0,"max":15.0}}
Instead, below we see how to structure the SilverTable which serves to merge the enrichment properties with respect to the acquired variables
Create Silver Table
.create-or-alter function with (folder = "Policies", skipvalidation = "true") UpdatePolicy_bronzestage_rawData() {let IngestionTime = now();bronzestage_rawData| join kind=leftouter asset_mapping_configuration on TagId| extend Tag_value_long = case(Tag_Datatype == "Int64", tolong(TagValue), long(null))| extend Tag_value_double = case(Tag_Datatype == "Double", todouble(TagValue), double(null))| extend Tag_value_boolean = case(Tag_Datatype == "Boolean", tobool(TagValue), bool(null))| extend Tag_value_string = case(Tag_Datatype == "String", tostring(TagValue), "")| extend Value_limits=parse_json(Metrics)| extend Running_min=Value_limits.running.min| extend Running_max=Value_limits.running.max| extend NotRunning_min=Value_limits.notrunning.min| extend NotRunning_max=Value_limits.notrunning.max| projectDataTimestamp = Timestamp,IngestionTimeStamp = IngestionTime,TagId,TagValueUnits,Tag_value_long,Tag_value_double,Tag_value_boolean,Tag_value_string,AssetId,TwinId,TagGroupId,TagSubGroupId,Running_min,Running_max,NotRunning_min,NotRunning_max}#Create the Silver table without an explicit CREATE but taking the structure from the update / population query, but declaring NO LINES.set-or-append silverstage_rawData <| UpdatePolicy_bronzestage_rawData() | limit 0#Manage update policy.alter table silverstage_rawData policy update
@'[{"IsEnabled": true, "Source": "bronzestage_rawData", "Query": "UpdatePolicy_bronzestage_rawData()", "IsTransactional": false, "PropagateIngestionProperties": true}]'
The Result
The general result is that all the ingested CSV Files are “converted” into an intermediate table that is “FACT COMPATIBLE”, and so ready to be filtered using a “Dimension table” (using a well-defined table, in terms of dimensions, such as SHIFTS, or RECIPES_EXECUTIONS for example).
With this approach is easy to add more value with few effort: For example, through the same approach it is possible to extend the dataset of a machine area by selecting (or creating) a “PROBE” variable which at a high level gives the operating status. This is a column derived when you mix the operational state of the machine (running / not running) with some metrics declared in the configuration phase. Potentially each variable can generate its own probe. A SubGroup (an engine for example) is a mix of multiple evaluations on the limits of the variables belonging to it, within the context of execution (running status).
In This way you can associate, for example, The “PROBE STATUS” of a Machine with Production variables (Rejects amount).
Quick recap
In this post I explained to you how to approach the data arriving from the field, through preprocessing at the edge, and with an ingestion pipeline implemented on ADX, through the bronze-to-silver stage approach, typical of a delta lake.