Azure Data Explorer (ADX): A deeper focus on extent and ingestion properties

Riccardo Zamana
12 min readFeb 1, 2021

--

When we talk about Azure Data Explorer (ADX), the Microsoft service of data acquisition and analysis in real time of telemetry data, often the knowledge is that it is not a mature object, or too little complicated to be able to manage telemetry analysis .

On the other hand, I, like many others who believe in this service, are concentrated in the conferences to make people perceive the general value of the tool, and the rules of the game to be able to start using it.

First consideration: Ingest data correctly

I begin to realize, after having promoted and participated in a couple of real projects, that the first difficulty lies not in learning KUSTO as a querying language, but in understanding how the incoming data stream must really work for it to be ingested correctly by Azure Data Explorer (ADX).

If it is true that the power of the tool lies in the speed of execution and in the scalability, it is necessary to begin to ground the basic concepts in order to speak correctly of the crucial phase in the life of a database: the schema design.

It is therefore necessary to begin to deepen the concepts for which Azure Data Explorer (ADX) effectively becomes a scalable tool, while remaining efficient in terms of queries.

Second consideration: if you know the data, you know how to handle it

the design of a database has always been only related to itself. In the case of TimeSeries DB “As a Service” such as Azure Data Explorer (ADX), it is also possible to design the format with which the data to be acquired. Knowing the data, knowing how it arrives and how it should be handled before being included in the ADX pipeline is fundamental.

PAY ATTENTION: the opportunity is to be able to dump data on the cloud, without too many edge side changes, with very high initial acquisition frequencies, and make it possible to analyze phenomena whose acquisition frequency is between 1 and 10ms , therefore almost that found in Condition Monitoring projects, for example, in the industrial context.

Let’s take an example, on a practical use case

Let’s take an example, starting from a use case, to understand how Azure Data Explorer (ADX) must be managed properly: let’s say we want to analyze data for the purposes of Predictive Quality. I refer to the fact of being able to analyze the data flow of a production process as long as it is in progress, and, on the basis of the analysis in real time, I arrive at anticipating the output of the quality control phase of the manufactured product (also managing to interrupt phases processing of a product which in any case will be discarded). It is often required, but the recipe for which is unclear (given that the success of this objective is not systematic). But let’s pretend it is.

It is important to remember that the steps for constructing an inference using an ML model:
Let us remember that it is important to follow the necessary steps for the construction of a model (this list comes from my experience, and it is not necessarily the correct procedure):

1. acquire data series
2. tag and detect anomalies in series, building series of anomalies
3. clustering events in order to find first and second level thresholds
4. tag the phenomena with dimensions (Green, Yellow, Orange, Red) on the basis of the ranges deduced from the clustering
4. relate the product identifiers to the process data (taking into account that in the IoT, a production process corresponds to N windows of processing data, all related to the same product).
5. add the empirical data, ie derived from the statistical control, which testify whether the forecast is good, the “green” thresholds correspond to “not defective” products. These data can come from structured sources (cameras) and unstructured (manual typing)
6. select the right datasets to work with MLOPS
7. bring back the threshold / tolerance logics found in the edge
8. bring anomaly detection back to the edge
9. (I add): PATIENT AND HOPE

Which strategy to choose for step 1?

A) The programming? NO, as it would lead us towards an edge that is too pushed. Too many functions, local caches, bucket aggregation rules, etc. (which is fine, but with the phenomenon included. A classic pattern with BROKER + Executor Function (trigger “from QUEUE to Rest” + Inference API is optimal, but it can’t be do in the beginning, when nothing is certain).
B) Classical data analysis (ETL + DWH)? NO, it would lead us to load masses of data without an approach for subsequent refinements. The indexing of the aforementioned data would then be unsustainable if not with periods of offline services, and exposing us to not insignificant costs in terms of azure services. While it’s not about the article itself, it wouldn’t be a winner with the viewing part either. In cases of predictive quality, the PRIMA EXPERIMENTAL approach (with notebooks) is preferable rather than creating a POWER BI report right away.
C) A File driven approach with Azure Data Explorer (ADX), YES!
A file + adx approach on the one hand makes logic on the edge simple, and shifts the load to the cloud, paying only for synchronization from local and remote storage. Pivoting data or re-buffering them on “better” files, that’s all. Azure Data Explorer (ADX) takes care of the rest. But with a file + ADX approach (the most common, I’d say, at least for the process data pipeline) .. what should we pay attention to first?

My personal opinion is that it must be necessary to understand how to correctly divide and then manage the shards of the data coming from a world that is not perfect, and often brings the data not only with different temperatures (hot warm cold paths) but with different delay.
Fortunately Azure Data Explorer (ADX) has the ability not only to ingest data at very high frequency but also has the ability to manage its location according to a model called extent which exploits the principle of the columnar store.
The very simple principle means that small aggregations of data can be consolidated into small shards, managed according to a time slot. Initially they may correspond to the i-th file sent, unless we will see some details later.
Managing extents (data management) is dangerous for performance, as the mechanism of extents updates is based on data swapping. Below is an easy explanation of data swapping: When there is a need to update / aggregate / delete an extent, the operation is done on a copy of the starting extent, and then the reference to the existing extent is replaced . The more management operations there are, perhaps due to semantically wrong extents, the worse it is.
KUSTO’s optimization work then goes back to the compression of the columns and the indexing of the same.
It goes without saying that a very large extent is obviously not recommended. But what does big mean?
IMPORTANT: Remember that from now on we will be talking about millions of lines per extent, so what we are talking about here is to bring the high frequency (3–10ms) signals to the cloud to analyze them.

So how is the data aggregated according to the logic of extents?

The data is aggregated in the extents, starting from what is a field declared as creationTime, precisely “the data date” and not the date of ingestion of the same.
But in the case of iot acquisition from different non-synchronous sources, it is possible to archive buckets of data that are not temporally contiguous. This dynamism arises from the fact that the data sources are not ingested at the same time, they can come from sources that by their nature are asynchronous, even for whole days. I am referring for example to quality assessment in a continuous process, where laboratory data (chemical tests or mechanical tests) arrive hours if not days behind the process data.
I therefore hypothesize to do a remapping of the data BEFORE in the edge, which must locally buffer data, and then send them with congruent buckets, of a maximum time span (my advice: 5 minutes) where the field that will act as “data of generation “of the sampled values.

with (creationTime="2017–02–13T11:09:36.7992775Z")

In the meantime, it is necessary to understand that three fundamental parts of ADX are managed thanks to the creationTime parameter:
- Retention: on the basis of the creationTime, it is decided when the extent is out of retention and therefore will be deleted.
- Caching: based on the creationTime, Azure Data Explorer (ADX) decides the extents to be cached
- Sampling: when using the “take” (ex: | take 1000, ie take the first 1000) recent extents are favored
The first mistake you make is not to take this into account, therefore the data that maybe temporally then are distributed on different windows, are actually acquired with the creationTime default field (now ()) and for this reason retention / caching and sampling will work worse.

Since the creationTime is only a time stamp, how does the Extent know what the data range is?

Since the extent is regulated by a time range, it actually stores two fundamental attributes as its metadata: the minimum date (MinCreatedOn) and the maximum date (MaxCreatedOn) within which the data it contains is located.
Having an extent with a very large data range makes the search operation more difficult (especially an extent where due to a record, the overall timerange is stretched in a non-dense way).

But apart from the dates, how are the extents recognized, or perhaps selected?

Within the Tags it is certainly possible to insert two properties that suggest to ADX how to behave in case of reworking of the extents. These properties are drop-by and ingest-by.

A) Drop-by: tags starting with drop-by suggest which extents an extent can be merged with.

.ingest … with (tags = '["drop-by:2016–02–17"]')

NOTES ON DROP-BY: If drop-by is used, it is necessary to know in advance how the extents are organized on Azure Data Explorer (ADX) in such a way as to be able to indicate with the same tags, even delayed data bundles, which therefore they must be aggregated in the same Extent.

B) Ingest-by: Tags starting with ingest-by suggest that the data was ingested only once. Alternatively, you can use the ingestIfNotExists option to prevent duplicates.

.ingest … with (tags = '["ingest-by:2016–02–17"]')
.ingest … with (ingestIfNotExists = '["2016–02–17"]')

NOTES ON INGEST-BY: It is a tag that recognizes the EXTENT as unique, so if a subsequent packet arrives with the same “tag”, it is not considered. It is therefore good to use it only for here data that does not need to be subsequently managed, and that must be “sculpted” over time without data-swapping opportunities (eg: a recipe change, or a settings change, during a production process. in general, ON-CHANGE type parameters).

OTHER TAGS: Correct input of other tags also allows targeted deletion of data:.drop extents <| .show table MyTable extents where tags has "drop-by:2016–02–17"

How do I check EXTENTS?

Below we see some primary commands to understand how extents are managed:

.show cluster extents [hot]

Displays information about the extents present in the cluster. If you specify [hot] it shows those present only in the cache.

.show database DatabaseName extents [(ExtentId1,…,ExtentIdN)] [hot] [where tags (has|contains|!has|!contains) Tag1 [and tags (has|contains|!has|!contains) Tag2…]]

Displays information about the extents present in the database. You can filter by tag using the has (index comparison) / contains (value scan) clauses

We wrote that extents are partitioned according to ingestion time. But is there no partitioning in Azure Data Explorer (ADX)? What do extents have to do with the partition policy?

As for the partitioning and partitioning policies part, it defines if and how extents are to be classified (partitioned) for a table. While dividing extents by ingestion time might seem sufficient, there are cases and scenarios where further splitting is necessary.
The Microsoft documentation suggests important indications on the basis of some scenarios given by the cardinality of the partitions (low if for example we use x in multi tenant mode where each partition who corresponds to a tenant, or high if for each partition who we intend corresponds for example to a sensor ).
With reference to the example use-case (quality control), it is important to focus instead on the third type suggested, precisely to manage typical situations of quality control, where there are multiple data sources (i.e. not only sensors), which arrive in asynchronous mode with delays of up to one day.

Reference:

https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/partitioningpolicy

An alternative technique to the preventive rehash in the bucket edge is to act precisely on partitioning, preferring a uniform partitioning, according to the temporal logic of the data contained in the buckets, and not referring to the creation date.

OverrideCreationTime: is a Boolean parameter that indicates whether the minimum and maximum creation times of the extent should be overridden by the range of values ​​in the partition key. The default is false.

Set to true if data is not imported based on arrival time (e.g. a single source file may include very distant datetime values) and / or you want to force retention / caching based on datetime and non-datetime values based on the time of ingestion.

In the case of quality control, where the arrival is discontinuous, it is important to indicate the OverrideCreationTime property to true option.
Two types of partitioning keys can be created: through hash functions, and through temporal uniformity (equal time slices). Here are two examples:

{
"PartitionKeys": [{
"ColumnName": "tenant_id",
"Kind": "Hash",
"Properties": {
"Function": "XxHash64",
"MaxPartitionCount": 256,
"Seed": 1,
"PartitionAssignmentMode": "Default"
}
},
{
"ColumnName": "timestamp",
"Kind": "UniformRange",
"Properties": {
"Reference": "1970–01–01T00:00:00",
"RangeSize": "1.00:00:00",
"OverrideCreationTime": false
}
}
]
}

In the example above we are talking about a partitioning with HASH by tenant_id, combined with a partitioning by slice of 1 day, starting from 01.01.1970
But how are extents managed in terms of union and division?
In this case too, we act on certain policies: merge policy and sharding policy.

Merge and Sharding policy
Extents can clearly be merged together by Azure Data Explorer (ADX) or they can split into multiple extents in turn, according to merge and shard policies.

Merge policy
Let’s start with an example:

.alter table [table_name] policy merge '{' ' "RowCountUpperBoundForMerge": 16000000,' ' "OriginalSizeMBUpperBoundForMerge": 0,' ' "MaxExtentsToMerge": 100,' ' "LoopPeriod": "01:00:00",' ' "MaxRangeInHours": 24,' ' "AllowRebuild": true,' ' "AllowMerge": true,' ' "Lookback": {' ' "Kind": "Default"' ' }' '}'

Remembering that extents with different “drop-by” tags cannot be merged together, the main merge parameters set by default are described below in order to understand if they can always make sense or not.

  • Allow merge: defines if it is possible to merge or not (does it make sense to divide or merge extents that MUST remain separate?)
  • MaxRangeInHours: Defines how long a single extend can be
  • Loopback: defines how extents are considered for rebuild or merge. Very interesting among the allowed values ​​is the CUSTOM option, which de facto indicates a period of time beyond which the extents can be merged together.
  • MaxExtentsToMerge: defines the number of extenders that can be merged togethere

There are two types of merge operations: Merge, which recompiles the indexes, and Rebuild, which completely reinserts the data. Both types of operations generate a single extent that replaces the source extents. If the extent is small, ADX is better off re-ingesting the data during a merge, otherwise the preferred route is to just rearrange the indexes.

Sharding policy

It is used to decide if and how extents are “packaged / merged / divided”, with respect to certain parameters. It therefore serves to close the circle as it manages the parameters for which a rebuild will act or not.

  • MaxRowCount: maximum rows created by ingestion
  • MaxExtentSizeInMb: maximum mb for an extent created by a merge operation
  • MaxOriginalSizeInMb: maximum mb for an extent created by a rebuild operation

Final Thoughts:

At the moment I don’t have any truth in my pocket, yet, but I’m starting to reflect. I hope they are legitimate.
For example: might it make sense to have different Azure Data Explorer (ADX) clusters? the former with the most recent data could be divided into very small extents, the latter could instead be data similar to the long term archiviation, however searchable, divided into very large extents, perhaps partitioned with different logics (produced batch for example?).

Thinking that the Azure Data Explorer update-policy (which allows you to reposition the incoming data from a starting scheme, to an arrival scheme) can act as a load re-distributor is wrong as the effort made is enormous compared to to what is possible by acting correctly on the policies and on the inbound buckets.

And then the questions come, such as: when is it correct to manually act on the re-building part? since at the moment, although commands are made available to control the compression and indexing of the tables, it is not so clear when a rehash of what is the data part is necessary.

What is clear to me instead, and which I hope I have passed on, is to understand how important it is to say that Kusto how to manage the creation of databases and tables, before assigning data to them through extents.

--

--

Riccardo Zamana
Riccardo Zamana

Written by Riccardo Zamana

I’m a management professional with 20 years of experience, skilled in delivering business results by creating tailor made Cloud and IOT based solutions.

Responses (1)