MQTT – A small guide

Message Queuing Telemetry Transport (MQTT) is a Publish / Subscribe communication protocol which runs over TCP/IP stack. It is ideal for IoT applications where devices need small code / battery footprint. Its first version came out from IBM in 1999 to monitor Oil pipelines.

Clients connect to the MQTT brokers which hosts topics. The clients publish messages to topics and the broker forwards the message to any client who has subscribed to the topic.

QOS – Quality of service

As with Kafka, MQTT also supports message delivery QOS levels

  • 0 – At most once
  • 1 – At least once
  • 2 – exactly once

Hierarchal topics

Since MQTTT is oriented towards IoT devices where we try to send single element in message instead of big nested jsons, the MQTT brokers supports a the hierarchy in topic names rather than relying on the message body. The way it is done by topic naming convention and using wild cards in the consumer side.

Topic name examples:

/Home/Floor1/Room1/Temperature
/Home/Floor1/Room1/Pressure
/Home/Floor1/Room2/Temperature
/Home/Floor1/Room2/Pressure
/Home/Floor2/Room1/Temperature
/Home/Floor2/Room1/Pressure
/Home/Floor2/Room2/Temperature
/Home/Floor2/Room2/Pressure

The publisher will publish a message on one of these topics from the relevant sensor. The Consumer uses two types of wild cards to collect the data

(+) This is a single level wild card which replaces one topic level

/Home/Floor1/+/Temperature will give us Temperature from Room1 and Room2 from Floor1

(#) This is a multi level wild card which replaces multiple topic levels

/Home/#/Temperature will give us Temperature from Room1 and Room2 from Floor1 and Floor2

Example

Download and install the mosquito broker from https://mosquitto.org/download/

Publisher

import paho.mqtt.client as mqtt
import json

client = mqtt.Client()
# create connection
client.connect("<BROKER_IP_ADDRESS>", <PORT>, <KEEP_ALIVE_TIME>)
client.publish("/Home/Floor1/Room1/temperature","75")

Subscriber

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    client.subscribe("/Home/Floor1/Room1/temperature")

# the callback function, it will be triggered when messages are received
def on_message(client, userdata, message):
    print("Message received: " ,str(message.payload.decode("utf-8")))
 
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# create connection
client.connect("<BROKER_IP_ADDRESS>", <PORT>, <KEEP_ALIVE_TIME>)

# set the network loop blocking
client.loop_forever()

One question which may arise is – how its different from Kafka since Kafka is also pub-sub. Well MQTT is light weight IoT protocol, and Kafka is utilized in transferring / processing huge data reliably across systems in a scalable manner.

Well this was simple!!

Cheers – Amit Tomar