Airthings is a IoT Norwegian company commercializing various sensors to measure the air quality. The sensors measurements are sent to their cloud and can be retrieved by customers either using their HTTP consumer API, or for business customers directly with a MQTT client. This article uses the latter and explains how to automatically route all the Airhtings MQTT topics to local Bacnet objects readable by standard BOS (Building Operating Systems).
This article explains the technicalities of this MQTT to Bacnet routing, and more particulary the advanced routing functions code. The technical principles discussed are a good starting point for different kind of similar projects, and even for projects using other protocols. In this article all the routing is done by one single route which simplifies a lot the configuration and improves the portability of the solution.
People wanting to quickly deploy the Airthings to Bacnet solution should refer directly to the Airthings article.
The configuration of the MQTT client gateway is detailed in the Airthings article. It is necessary to configure the correct MQTT client ID, select MQTTS as protocol on port 8883, and configure the security certificates downloaded from the Airthings for Business Dashboard.
The structure of the MQTT topics for each IoT meter is the following %mqtt:{aid}/{lid}/devices/{serialNumber}/samples
. Placeholders such as {aid}
are described in the table below.
Symbol | Description | Example |
---|---|---|
{aid} |
Airthings account id | Obfc1033-b6a5-48c5-8aa8-4d530368ec0c |
{lid} |
Airthings location id | 33bc1850-d5e2-4854-98f8-7bba5304c6cf |
{serialNumber} |
Airthings meter serial number (unique to each Airthings IoT devices) | 2950010267 |
Here is an example of an address name and its JSON address value containing all air quality measurements.
address name: %mqtt:Obfc1033-b6a5-48c5-8aa8-4d530368ec0c/33bc1850-d5e2-4854-98f8-7bba5304c6cf/devices/2950010267/samples
address value:
{
"serialNumber" : "2950010267", // Airthings meter serial number
"co2": 500, // parts per million
"humidity": 70, // relative humidity (%)
"light": 20, // percent
"pm1": 200, // microgramsPerCubicMeter
"pm10": 50, // microgramsPerCubicMeter
"pm25": 10, // microgramsPerCubicMeter
"pressure": 97, // pascals
"pressureDifference": 10, // pascals
"radonShortTermAvg": 148, // becquerels per cubic meters
"soundLevel": 30, // decibelsA
"temp": 23, // degreesCelsius
"voc": 0.66 // partsPerBillion
};
In order to register for all topics of a given Airthings account and location, it is possible to use MQTT wildcards by inserting the address %mqtt:{aid}/{lid}/#
. After inserting this address the MQTT client will subscribe to it and receive the data from all the meters within that location. When new data is received by the MQTT client, the MQTT samples addresses, if they do not exist yet, are automatically inserted. Be aware that it can take up to 5 minutes before the first data is available after the subscription is added. The data is transmitted to the device whenever samples are received by the Airthings cloud.
The local bacnet device is used to expose the Airthings data to the BOS (other bacnet devices on the site network). There is nothing special about the configuration of the bacnet device, it is possible to choose on which network interface to listen to if the universal IoT gateway hardware has multiple network ports. It can be good in case of security concerns to connect to the cloud from one network interface, and use the other network interface for the bacnet local area network. Note that 4g/5g usb routers can also be used to connect to the Airthings cloud.
For this article use case each sampled data (co2, humidity, ...) are stored in their own local bacnet analogInput object. Those bacnet objects (and their corresponding addresses) are created automatically when the route tries to write a value into a bacnet object that does not exists yet. The local bacnet objects have the following address format %bac:local/analogInput:{instance}
. For each bacnet object, there is 4 properties derived from the MQTT sampled values.
%bac:local/analogInput/{instance}/objectName
: the name of the bacnet object. It is defined as the Airthings meter serial id followed by the name of the identifier of the sample such as {serialNumber}_co2, or {serialNumber}_humidity.%bac:local/analogInput/{instance}/description
: the description of the bacnet object. It is a text description of the sampled property such as CO2 from Airthings device {serialNumber} or Relative humidity from Airthings device {serialNumber}.%bac:local/analogInput/{instance}/presentValue
: the last sampled value received by the MQTT client from the Airthings MQTT broker, such as 500 for the co2, or 70 for the humidity.%bac:local/analogInput/{instance}/units
: the bacnet units of the sampled values, such as partsPerMillion for the co2 and percentRelativeHumidity for the humidity.Instead of writing each property address seperately each time a new sampled property is received, the route writes a single JSON object containing the objectName, description, presentValue, and units to the parent bacnet analogInput object address. Below here is an example of a JSON object for a co2 sample written by the route on the bacnet analogInput address.
%bac:local/analogInput:1
{
"presentValue": 500,
"units": "partsPerMillion",
"objectName": "2950010267_co2",
"description": "CO2 from Airthings device 2950010267"
}
Note that the objectName, description, and units are only used once at the bacnet object creation, and are upon subsequent updates ignored (because their value stays the same). Only the presentValue property is updated regularly with new values.
The end goal of the routing is to retrieve all the sensors data and map each of the values (co2, humidity, light, ...) to a single bacnet analog input object. As the bacnet uses the bacnet object instance number to address each object, it is necessary to create a mapping between each Airthings sensor MQTT topic and the bacnet object instances. The following schema describes the mapping to achieve below. Note that for each single Airthings meter samples, there is up to 20 bacnet object instances (one instance for co2, one instance for humidity, and so on). Bacnet object instances are designed as a simple counter initialized at 0 and incremented by 20 for each new additional Airthings device.
The figure below shows an example of this n → 20 n mapping between sources and destinations addresses.
The order of the mapping is arbitrary. The mapping is computed once at the routing initialization. In order to ensure its consistency after system reboots and backups/restores, it is saved by the route in a file named {DATADIR}/safeFs/airthings_mqtt_bacnet_{route UUID}
. The file is a JSON dictionary having the name of the MQTT addresses as keys and their corresponding bacnet object instance starting index as values. Note that all the bacnet object instances in the mapping file are multiples of 20.
{
"%mqtt:{aid}/{lid}/devices/2950010267/samples" : 0,
"%mqtt:{aid}/{lid}/devices/2950010268/samples" : 20,
"%mqtt:{aid}/{lid}/devices/2950011111/samples" : 40,
//... and so on ...
"%mqtt:{aid}/{lid}/devices/2950023456/samples" : 20000
}
The table below explicits the bacnet objectName, description, and untis properties and determines the mapping between the bacnet object instances (modulo 20) and the air quality measurements (co2, humidity, ... , voc).
Object Instance | Object Name | Object Description | Object Units |
---|---|---|---|
Analog Input 01 | {serialNumber}_co2 | CO2 from Airthings device | Parts per million |
Analog Input 02 | {serialNumber}_humidity | Relative humidity from Airthings device | Percent relative humidity |
Analog Input 03 | {serialNumber}_light | Light from Airthings device | Percent |
Analog Input 04 | {serialNumber}_pm1 | PM1 from Airthings device | Micrograms per cubic meter |
Analog Input 05 | {serialNumber}_pm10 | PM 2.5 from Airthings device | Micrograms per cubic meter |
Analog Input 06 | {serialNumber}_pm25 | PM 10 from Airthings device | Micrograms per cubic meter |
Analog Input 07 | {serialNumber}_pressure | Pressure from Airthings device | Pascals |
Analog Input 08 | {serialNumber}_pressureDifference | Differential pressure from Airthings device | Pascals |
Analog Input 09 | {serialNumber}_radonShortTermAvg | Radon (rolling 24h average) from Airthings device | Becquerels |
Analog Input 10 | {serialNumber}_soundLevel | Sound level from Airthings device | Decibels A |
Analog Input 11 | {serialNumber}_temp | Temperature from Airthings device | Degrees celsius |
Analog Input 12 | {serialNumber}_voc | VOC from Airthings device | Parts per billion |
Analog Input 13..20 | N/A | N/A | N/A |
The mapping described in the section above could be done by creating multiple routes (one route or multiple routes per MQTT topic), but this multiple routes approach is tedious as it requires the manual configuration and insertion of all the routes.
The one single route mapping approach on the other hand eases a lot the configuration. This route uses advanced function to determine the source and destination addresses (section 3.2 and section 3.3). The mapping JSON file described in the previous section is computed and stored by the route itself at its initialization (see section 3.3).
Once the mapping is done, whenever a new JSON object is received from MQTT, its value is transformed by a function to update the corresponding bacnet object presentValue, objectName, description, and unit properties (section 3.4).
The table below summarizes all the route parameters.
parameter | value | description |
---|---|---|
Id | 2b00a098-3a90-415b-8b80-641c899d3f7f | The id is automatically generated at the route creation. |
Name | Airthings to BACnet | The name of the route is only an informative text label. |
Source gateway | Airthings MQTT Broker | The name of the MQTT gateway. |
Source address | function of section 3.2 | The filter function selecting the mqtt addresses (one MQTT topic per Airthings meter). It is called once for each MQTT addresses at the route initialization. |
Source value | function of section 3.4 | Transform function called when a new value (JSON object with samples of an airthing meter) is received from the MQTT broker. |
Direction | right (→) | Defines that the route direction is from MQTT to Bacnet (unidirectional left to right arrow). |
Destination gateway | Airthings BACnet Server | The name of the Bacnet gateway. |
Destination address | function of section 3.3 | The function selecting the bacnet destination addresses. It transforms the MQTT source address to the desired bacnet objects addresses. It is called once for each MQTT addresses that passes the Source address filter function during the route initialization. |
Destination value | undefined | That function is not used as the routing direction is strictly from MQTT to Bacnet. It could be used if the routing was bidirectional for transforming the bacnet values before sending those over MQTT. |
Active | 1 | Defines if the route is active or not. |
This section describes the function used for the route Source address parameter.
In routes, the Source address parameter can be either the address id (numerical database id), the address name (fixed string), a regular expression, or a Javascript function. Given the use case it is easy to implement a function to filter the MQTT addresses ending with /samples
in order to select all Airthings meters addresses. That function is called for all MQTT addresses and returns a boolean (true or false) in order to select the matching addresses.
To do so, here is the arrow function expression code
srcadrname => srcadrname.endsWith("/samples")
%mqtt:{aid}/{lid}/devices/2950010267/samples
%mqtt:{aid}/{lid}/devices/2950010268/samples
%mqtt:{aid}/{lid}/devices/2950011111/samples
...
%mqtt:{aid}/{lid}/devices/2950023456/samples
This section describes the function used for the route Destination address parameter.
Since all the MQTT source addresses were selected through the filtering function defined above (section 3.2), the next step is to define their corresponding destination address(es). For this article use case, each MQTT adddress is mapped to multiple bacnet addresses (one bacnet object for each meter data like co2, humidity, etc... ).
The destination address function takes as parameter one of the source addresses (mqtt address), and returns the list of corresponding bacnet objects address name (1 to n mapping). This function is called once for each MQTT source addresses. The snippet below is the header of the function with its parameters (the source address name, source address id, source address object, and the info object containing the route context and extra information).
function (srcname,srcid,srcadr,info) {
/* function code */
}
This function needs to return the destination address(es). In case of a 1→1 mapping, the Destination address function returns a single destination address string. However here a 1→n mapping is needed, so the function must return an array of strings.
Find below an example of the expected return value given a source address passed as argument :
source address argument : %mqtt:{aid}/{lid}/devices/2950010268/samples
returned destination addresses :
[ "%bac:local/analogInput:21",
"%bac:local/analogInput:22",
"%bac:local/analogInput:23",
"%bac:local/analogInput:24",
"%bac:local/analogInput:25",
"%bac:local/analogInput:26",
"%bac:local/analogInput:27",
"%bac:local/analogInput:28",
"%bac:local/analogInput:29",
"%bac:local/analogInput:30",
"%bac:local/analogInput:31",
"%bac:local/analogInput:32" ]
// Note that points 33-40 are not assigned and kept as reserve for future use.
The following code snippet is the complete Destination address function code accomplishing the desired result described above. In subsequent sections (section 3.3.1, section 3.3.2, section 3.3.3) the code is splitted in multiple parts for detailed explanations.
function (srcname,srcid,srcadr,info){
var self = this
var context = info.context
var mappingfilename = "airthings_mqtt_bacnet_"+info.route.id
if(context.counter === undefined){
context.map = {}
let storedObj = {}
try{
storedObj = JSON.parse(self._fs.readFileSync(mappingfilename)+"")
}catch(e){
}
var max = -20;
for(let k of Object.keys(storedObj)){
context.map[k] = storedObj[k]
if(storedObj[k] > max){
max = storedObj[k]
}
}
context.counter = max;
}
var bacnetInstance = context.map[srcname]
if(bacnetInstance === undefined){
context.counter += 20
context.map[srcname] = context.counter
bacnetInstance = context.counter
clearTimeout(context.timeout)
context.timeout = setTimeout(function(){
self._fs.writeFileSync(mappingfilename, JSON.stringify(context.map))
},1000)
}
var destAddresses = []
for(var i=1; i<=12; i++ ){
destAddresses.push("%bac:local/analogInput:"+(bacnetInstance+i))
}
return destAddresses
}
In order to do this mapping between the MQTT samples and the bacnet object instances, the JSON mapping file described in section 3 is built. The function keeps in info.context (an internal cache of the route) a counter (info.context.counter) containing the last bacnet instance mapped to a MQTT sample. At the first execution of the function, when the info.context.counter variable is not yet defined, it is necessary to read the mapping file (if it already exists) and to keep it in the caching variable info.context.map. The info.context.counter is also updated to the highest (max) bacnet instance stored in the file. Note that the this variable contains the gateways API library object, and this library contains multiple functions including this._fs, which is similar to the node.js file system module, but operates only inside the {DATADIR}/safeFs
subfolder. So in the Destination address function, this._fs.readFileSync is used to read the mapping file. Here is below the code snippet used to read the mapping file and intialize the mapping at the first function execution.
/* code before */
var self = this // contains the gateways library
var context = info.context // 4th arugment of the function
var mappingfilename = "airthings_mqtt_bacnet_"+info.route.id
if(context.counter === undefined){
context.map = {}
let storedObj = {} // intialized to an empty mapping.
try{
// reads the file, and parse the JSON mapping.
storedObj = JSON.parse(self._fs.readFileSync(mappingfilename)+"")
}catch(e){
// the mapping file does not exists yet.
}
// stores the mapping in info.context.map
var max = -20;
for(let k of Object.keys(storedObj)){
context.map[k] = storedObj[k]
if(storedObj[k] > max){
max = storedObj[k]
}
}
// initalize info.context.counter to the correct last used bacnet instance.
context.counter = max;
}
/* code after */
Once the mapping is loaded from the file (or empty if the file does not exists yet) and kept in cache, it is necessary to add new mapping entries when new encountered MQTT addresses are not yet mapped to one bacnet object instance. To do the code checks if info.context.map contains or not the source address name key. If the mapping does not contain a bacnet instance, info.context.counter is increased by 20 and added to the mapping cache in variable info.context.map. In order to make the mapping persistent, it is necessary to write the mapping file with the last version of the mapping. To do so JSON.stringify and this._fs.writeFileSync functions are used. In order to avoid writing the mapping file multiple times (once for each source address) during the first route initialization, a timeout (info.context.timeout) is added to wait 1000 ms ( 1 second ). If after 1000 ms no changes are made to the info.context.map cache, then it is written to the disk in the mapping file. Note that var self = this, and then self._fs._writeFileSync(...) is used because the anonymous function called after the 1000 ms timeout has a different this variable which does not contain this._fs.
/* code before */
var bacnetInstance = context.map[srcname]
if(bacnetInstance === undefined){ // the mapping does not contain srcname
context.counter += 20 // increments the counter by 20.
context.map[srcname] = context.counter
bacnetInstance = context.counter // sets the bacnet instance
clearTimeout(context.timeout) // clears the last timeout if it exists
context.timeout = setTimeout(function(){
self._fs.writeFileSync(mappingfilename, JSON.stringify(context.map))
},1000) // waits 1000 ms before writing the mapping to the file.
}
/* code after */
Once the MQTT source address name are mapped to their corresponding bacnet instance, the last thing to do is to return the array containing the list of all bacnet analog input objects address names (strings). In the table of section 3, it is explained that instances (modulo 20) from 13 to 20 are not assigned, so only the instances from 1 to 12 need to be returned . To do so a simple for loop can be used to construct the destAddresses return array.
/* code before */
var destAddresses = [] // initializes the array (empty array)
for(var i=1; i<=12; i++ ){
// adds to the array the bacnet instances from the start bacnet instance + i
destAddresses.push("%bac:local/analogInput:"+(bacnetInstance+i))
}
return destAddresses // returns the array and the function ends.
This section describes the function used for the route Source value parameter. When new samples are received on the MQTT from a meter (address pattern %mqtt:{aid}/{lid}/devices/{serialNumber}/samples
), the source value transform function is called once for each of its destination bacnet objects, i.e. 12 times (once for each measurement: co2, humidity, ..., and voc). The first argument of the function is the routed MQTT JSON value containing the meter new samples, the second argument is the source MQTT address object, the third argument is the bacnet destination address object (unless the destination address does not exists yet, in which case destination is the string address name). The returned value written on the bacnet analog input object contains the new presentValue, but also the objectName, description, and units. The tricky part is to take the destination address bacnet object instance and by computing the module 20 (the rest of the integer division by 20) to get the correct property index (ranging from 1 to 12 here).
The figure below shows an example of the function arguments and the returned value.
The code snippet below is the complete Source value function accomplishing the desired result.
function valueTransform (value, source, destination, info) {
var units = {
"co2": "partsPerMillion",
"humidity": "percentRelativeHumidity",
"light": "percent",
"pm1": "microgramsPerCubicMeter",
"pm10": "microgramsPerCubicMeter",
"pm25": "microgramsPerCubicMeter",
"pressure": "pascals",
"pressureDifference": "pascals",
"radonShortTermAvg": "becquerels",
"soundLevel": "decibelsA",
"temp": "degreesCelsius",
"voc": "partsPerBillion"
};
// Mapping table for renaming the keys
var keyNameMapping = {
"co2": "CO2",
"humidity": "Relative humidity",
"light": "Light",
"pm1": "PM1",
"pm25": "PM2.5",
"pm10": "PM10",
"pressure": "Pressure",
"pressureDifference": "Differential pressure",
"radonShortTermAvg": "Radon (rolling 24h average)",
"soundLevel": "Sound level",
"temp": "Temperature",
"voc": "VOC"
};
var keyInstanceMapping = {
1 : "co2",
2 : "humidity",
3 : "light",
4 : "pm1",
5 : "pm10",
6 : "pm25",
7 : "pressure",
8 : "pressureDifference",
9 : "radonShortTermAvg",
10 : "soundLevel",
11 : "temp",
12 : "voc"
};
var destadrname = typeof destination == 'string' ? destination : destination.name;
var destInstance = (destadrname.split(":")[2] * 1) % 20;
var key = keyInstanceMapping[destInstance]
if (key) {
let v = value[key];
if (v === null || v === undefined) return;
let n = value.serialNumber + "_" + key;
let d = (keyNameMapping[key] || key) + " from Airthings device " + value.serialNumber;
return {
"presentValue": v,
"units": units[key],
"objectName": n,
"description": d
};
}
}
For reference please find below the complete route definition.
{
"id":"2b00a098-3a90-415b-8b80-641c899d3f7f",
"name":"Airthings to BACnet",
"type":"value",
"direction":"right",
"disableDelay":"",
"repeat":0,
"minimumDiff":0,
"buffer":0,
"active":true,
"source":{
"gateway" : "Airthings MQTT Broker",
"address" : srcadrname => srcadrname.endsWith("/samples"),
"value" : function valueTransform (value, source, destination, info) {
var units = {
"co2": "partsPerMillion",
"humidity": "percentRelativeHumidity",
"light": "percent",
"pm1": "microgramsPerCubicMeter",
"pm10": "microgramsPerCubicMeter",
"pm25": "microgramsPerCubicMeter",
"pressure": "pascals",
"pressureDifference": "pascals",
"radonShortTermAvg": "becquerels",
"soundLevel": "decibelsA",
"temp": "degreesCelsius",
"voc": "partsPerBillion"
};
// Mapping table for renaming the keys
var keyNameMapping = {
"co2": "CO2",
"humidity": "Relative humidity",
"light": "Light",
"pm1": "PM1",
"pm25": "PM2.5",
"pm10": "PM10",
"pressure": "Pressure",
"pressureDifference": "Differential pressure",
"radonShortTermAvg": "Radon (rolling 24h average)",
"soundLevel": "Sound level",
"temp": "Temperature",
"voc": "VOC"
};
var keyInstanceMapping = {
1 : "co2",
2 : "humidity",
3 : "light",
4 : "pm1",
5 : "pm10",
6 : "pm25",
7 : "pressure",
8 : "pressureDifference",
9 : "radonShortTermAvg",
10 : "soundLevel",
11 : "temp",
12 : "voc"
};
var destadrname = typeof destination == 'string' ? destination : destination.name;
var destInstance = (destadrname.split(":")[2] * 1) % 20;
var key = keyInstanceMapping[destInstance]
if (key) {
let v = value[key];
if (v === null || v === undefined) return;
let n = value.serialNumber + "_" + key;
let d = (keyNameMapping[key] || key) + " from Airthings device " + value.serialNumber;
return {
"presentValue": v,
"units": units[key],
"objectName": n,
"description": d
};
}
}
},
"destination" : {
"gateway" : "Airthings BACnet Server",
"address" : function (srcname,srcid,srcadr,info){
var self = this
var context = info.context
var mappingfilename = "airthings_mqtt_bacnet_"+info.route.id
if(context.counter === undefined){
context.map = {}
let storedObj = {}
try{
storedObj = JSON.parse(self._fs.readFileSync(mappingfilename)+"")
}catch(e){
}
var max = -20;
for(let k of Object.keys(storedObj)){
context.map[k] = storedObj[k]
if(storedObj[k] > max){
max = storedObj[k]
}
}
context.counter = max;
}
var bacnetInstance = context.map[srcname]
if(bacnetInstance === undefined){
context.counter += 20
context.map[srcname] = context.counter
bacnetInstance = context.counter
clearTimeout(context.timeout)
context.timeout = setTimeout(function(){
self._fs.writeFileSync(mappingfilename, JSON.stringify(context.map))
},1000)
}
var destAddresses = []
for(var i=1; i<=12; i++ ){
destAddresses.push("%bac:local/analogInput:"+(bacnetInstance+i))
}
return destAddresses
}
}
}