Integrate. Transform. Explore. Operationalize.

Prepare and Explore Data at Scale

Empower data engineers, analysts, and scientists with one platform.

Track IoT Devices with MQTT and Elastic Stack

IoT devices (internet of things devices)

IoT devices are the nonstandard computing devices that connect wirelessly to a network and have the ability to transmit data. Such as the many devices on the internet of things (IoT).

IoT involves extending internet connectivity beyond standard devices, such as desktops, laptops, smartphones and tablets, to any range of traditionally dumb or non-internet-enabled physical devices and everyday objects. Embedded with technology, these devices can communicate and interact over the internet. They can also be remotely monitored and controlled.

MQTT: The Standard for IoT Messaging

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

Why MQTT ?

1. Lightweight and Efficient

MQTT clients are very small, require minimal resources so can be used on small microcontrollers. MQTT message headers are small to optimize network bandwidth.

2. Bi-directional Communications

MQTT allows for messaging between device to cloud and cloud to device. This makes for easy broadcasting messages to groups of things.

3. Scale to Millions of Things

MQTT can scale to connect with millions of IoT devices.

4. Reliable Message Delivery

Reliability of message delivery is important for many IoT use cases. This is why MQTT has 3 defined quality of service levels: 0 – at most once, 1- at least once, 2 – exactly once

5. Support for Unreliable Networks

Many IoT devices connect over unreliable cellular networks. MQTT’s support for persistent sessions reduces the time to reconnect the client with the broker.

6. Security Enabled

MQTT makes it easy to encrypt messages using TLS and authenticate clients using modern authentication protocols, such as OAuth.

MQTT Publish / Subscribe Architecture

Simulation & Case Study

In this article we will cover following topics :

  1. Example of data sent by an IoT Device into an MQTT Broker
  2. ActiveMQ as an MQTT Broker, How to spin up a docker image
  3. How sensors (IoT Devices) send data into ActiveMQ (Simulation with Python MQTT client)
  4. How Filebeat can be used to consume MQTT message and send it back to Elasticsearch instance
  5. How data is processed and store into Elasticsearch
  6. How sensors (IoT Devices) data can be visualized with Kibana and Elastic Maps

Sensors Data

Following are examples of json payloads sent by a sensor who send periodically informations related to temperature and humidity of the position where the device is installed (the device has an embedded gps tracker). IoT Devices data are perfectly timeserie metrics that can be stored into a datastore like Elasticsearch.

Any Raspberry Pi device can be programmed with python using
paho.mqtt.python (for more information about Softwares that implement MQTT check this link) to send data to an MQTT broker

[
  {
    "@timestamp": "2020-10-20T10:00:00.000",
    "metric": "temperature",
    "value": 23.1,
    "asset_id": 225,
    "latitude": 10,
    "longitude": -7
  },
  {
    "@timestamp": "2020-10-20T10:00:00.000",
    "metric": "humidity",
    "value": 34.65,
    "asset_id": 225,
    "latitude": 10,
    "longitude": -7
  }
]

ActiveMQ as an MQTT broker

ActiveMQ supports the MQTT protocol and will automatically map between JMS/NMS and MQTT clients and can easily be set up using the scripts that ship with it. On Linux it’s just a matter of executing activemq console. Using the admin console at http://127.0.0.1:8161/admin/ you can create new queues or topics and even enqueue messages for testing.

For this article we will use a docker image to spin up quickly an activeMQ instance. Use the following docker-compose.yml for your testing

version: '3.3'
services:
  activemq:
    image: synapticiel/activemq:5.16.0
    container_name: activemq-5.16.0
    ports:
      # mqtt
      - "1883:1883"
      # amqp
      - "5672:5672"
      # ui
      - "8161:8161"
      # stomp
      - "61613:61613"
      # ws
      - "61614:61614"
      # jms/openwire
      - "61616:61616"
    volumes: ["/opt/active-mq/data:/opt/apache-activemq/data", "/opt/active-mq/conf:/opt/apache-activemq/conf"]
    network_mode: "host"
volumes:
  activemq:
    driver: local

Send sensors data to MQTT Broker

For this part, we will use a python script that use
paho.mqtt.python to send some data into activeMQ for simulation. Most of Data will be pulled from dataset “Vehicle GPS Data: Department of Public Services” provided by the City of Cincinnati

## https://pypi.org/project/paho-mqtt/
import paho.mqtt.client as mqtt
import json
import requests
# Define Variables
MQTT_HOST = "127.0.0.1"
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 45
MQTT_TOPIC = "sensor-data"

# Get a sample dataset from a Rest API
response = requests.get("https://data.cincinnati-oh.gov/resource/b56d-ydmm.json?$limit=10&$offset=0&$order=time")
MQTT_DATA = response.json()

# Define on_publish event function
def on_publish(client, userdata, mid):
    print ("Message published with mid", mid)

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT Broker with result code :: ", str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    # client.subscribe(MQTT_TOPIC)
    for MQTT_MSG in MQTT_DATA:
        client.publish(MQTT_TOPIC, json.dumps(MQTT_MSG))

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic)
    print(msg.payload) # <- do you mean this payload = {...} ?
    payload = json.loads(msg.payload) # you can use json.loads to convert string to json
    print(payload['asset']) # then you can check the value
    client.disconnect() # Got message then disconnect

# Initiate MQTT Client
mqttc = mqtt.Client()

# Register publish callback function
mqttc.on_publish = on_publish
mqttc.on_connect = on_connect
mqttc.on_message = on_message

# Connect with MQTT Broker
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
# Loop forever
mqttc.loop_forever()

Filebeat to bring sensors data into Elasticsearch

The following config (filebeat.yml) is helpfull to stream the sensors data from the MQTT topic into Elasticsearch.

filebeat.inputs:
  - type: mqtt
    hosts:
      - 'tcp://127.0.0.1:1883'
    topics:
      - sensor-data
    qos: 2
    client_id: filebeat_mqtt
    username: system
    password: manager
    tags:
      - mqtt
      - iot
      - filebeat
      - broker
      - gps
    enabled: true
    fields_under_root: true
    fields:
      event.dataset: sensor-data
      event.module: filebeat-mqtt
      event.outcome: iot-sensors
      event.category: sensor
    processors:
      - decode_json_fields:
          fields:
            - message
          process_array: false
          max_depth: 1
          target: ''
          overwrite_keys: false
          add_error_key: true
      - add_host_metadata:
          when.not.contains.tags: forwarded
      - add_cloud_metadata: null
    keep_null: false
    index: 'sensor-data-%{+yyyy-MM-dd}'
    pipeline: sensor-data-geo
setup.template.settings:
  index.number_of_shards: 1
  index.number_of_replicas: 0
  index.codec: best_compression
output.elasticsearch:
  hosts:
    - '127.0.0.1:9200'
  protocol: http
  username: elastic
  password: changeme

The data contain following fields (Check API)

                   "asset" => "440",
                    "time" => "2020-10-18T00:29:29.000",
                "latitude" => "39.103951",
               "longitude" => "-84.527099",
              "streetfrom" => "WESTERN AV TO I-75 SB EXWY RAMP",
              "ppolylabel" => "I-75 SB EXWY",
              "assetnhood" => "WEST END",
      "id_ham_pvmnt_plygn" => "S-HAM52730",
       "distance_traveled" => "2.6",
            "reasons_text" => "Standard Event",
	        "odometer" => "7867.3",
                 "heading" => "SEE",
                   "speed" => "45.2",
                 "reasons" => "11",
                  "loadts" => "2020-10-18T00:30:26.000",
                "streetto" => "FORT WASHINGTON EB WY"

To use the powerful of EQL each document must contain @timestamp and event.category

Following ingest pipeline will be used to map latitude and longitude into a geo_point field

PUT _ingest/pipeline/sensor-data-geo
{
  "description": "Create geo_point field form latitude and latitude",
  "processors": [
    {
      "set": {
        "field": "device_position",
        "value": "{{latitude}},{{longitude}}"
      }
    },
    {
      "rename": {
        "field": "@timestamp",
        "target_field": "processing_date"
      }
    },
    {
      "rename": {
        "field": "time",
        "target_field": "@timestamp"
      }
    },
    {
      "set": {
        "field": "ingest_time",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

The following mapping has been used to store data into a timeseries index in Elasticsearch

PUT /_index_template/sensor-data
{
  "index_patterns": ["sensor-data-*"],
  "version": 1,
  "priority": 1,
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "index.codec": "best_compression",
      "refresh_interval": "5s"
    },
    "mappings": {
      "dynamic_templates": [
        {
          "string_fields": {
            "mapping": {
              "norms": false,
              "type": "keyword"
            },
            "match_mapping_type": "string",
            "match": "*"
          }
        }
      ],
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "device_position": {
          "type": "geo_point"
        },
        "message": {
          "type": "text"
        }
      }
    }
  }
}

So let’s resume all steps :

  1. You are running your compoenents (ActiveMQ, Elasticsearch, Kibana)
  2. You are running the python script to stream data from Rest API into MQTT Broker (sensors-data topic)
  3. Tou are configuring Elasticsearch ingest pipeline and a mapping template
  4. You are running filebeat to stream data from MQTT broker into Elasticsearch

Now let see what we can do with the data inside elasticsearch, for that we will rely on Kibana UI. If everything is running smoothly you will probably see a series of index in elasticsearch with the pattern sensor-data-*


You can easily configure Logs App (Part of Elastic Observability Solution) to stream data from the set of indexes sensor-data-*

Now let’s create an index pattern to use the set of indexes sensor-data-* . Go to Kibana >> Stack Management >> Index patterns and create a new index pattern

Now let’s play a little bit with these data inside Elastic Maps, Go to Kibana >> Maps and Create new Map, then add a new layer

Select the layer type “Documents” and choose your index pattern sensors-data-*, then choose to Show top hits per entity. our entity is the id of the IOT Device wish is the field “asset”, we want to display an aggregation of top hits (only 1 hit) per Device, this will show us the lastest position of each device on the Map (Check this link for more information about top hist aggregation on Elastic Maps), then click on “Add Layer”

In Amazon AWS, Elastic Maps Service is restricted to zoom level 10. With Elastic’s default distribution (Basic), which is available from Elastic Cloud & On Prem, Elastic Maps Service has no restrictions and is available to zoom level 24.

All configurations and scripts used in this blog post can be found in our github repository

Posted on October 21, 2020 by Yassine, LASRI