MQTT and Nebri

Notice: This article is maintained for historical purposes. The Bixly Automate/Nebri OS platform that Bixly built, and this article features, is no longer available. Checkout Serverless Providers as an alternative.

What is MQTT?

MQTT is a network protocol used for device to device messaging. Once a server is set up, you can subscribe to topics. Once a subscription is in place, you’ll receive any messages published to that topic. You can also publish messages to said topic. These published messages are sent to all devices subscribed to that particular topic.

Using MQTT with Nebri

We recommend the package `paho-mqtt` for utilizing MQTT on a Nebri instance. This package can be pip installed over an SSH connection to your Nebri Instance. There are a few modifications that need to be taken into consideration when implementing MQTT on a Nebri instance versus a regular python application. We don’t recommend using `loop_forever()` without adding a disconnect to one of your callback functions as it will cause timeouts, which will lead to your instance being restarted. Instead, we recommend using a manual loop that will timeout and calling `loop()` within it.

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
    
client.connect("iot.eclipse.org", 1883, 60)
start_time = datetime.now()
while datetime.now() < start_time + timedelta(seconds=30):
    client.loop(timeout=1.0)
client.disconnect()

The above example shows one way to implement a loop that will not result in the internal time out being triggered. Keep in mind that any script whose functionality takes longer than 5 minutes to complete will be forced to time out. If a script times out, it will be marked as inactive and won’t rerun automatically. Here’s another way to implement this in a Nebri rule script.

import paho.mqtt.client as mqtt
import logging

logging.basicConfig(filename="mqtt.log", level=logging.DEBUG)

class mqtt_example(NebriOS):
    listens_to = ['mqtt_example']

    def check(self):
        return self.mqtt_example == True

    def action(self):
        # for debouncing purposes
        self.mqtt_example = "Ran"
        # The callback for when the client receives a CONNACK response from the server.
        def on_connect(client, userdata, rc):
            logging.debug("Connected with result code "+str(rc))
            # Subscribing in on_connect() means that if we lose the connection and
            # reconnect then subscriptions will be renewed.
            client.subscribe("$SYS/#")

        # The callback for when a PUBLISH message is received from the server.
        def on_message(client, userdata, msg):
            logging.debug(msg.topic+" "+str(msg.payload))
            if datetime.now() > self.start_time + timedelta(seconds=self.timeout):
                client.disconnect()
                logging.debug('disconnected')
        
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        
        client.connect("iot.eclipse.org", 1883, 60)

        self.timeout = 60
        self.start_time = datetime.now()
        client.loop_forever(timeout=1.0)

The nice thing about using MQTT in a rule script is you can set variables when you trigger said script. In the above example, `self.timeout` can be sent via debug mode when you trigger a script.

mqtt_example := true
timeout := 60