VerneMQ: MQTT broker

VerneMQ is a rather simple to setup and run but powerful MQTT broker writen in Erlang that can be clustered easily. One of the nice aspects of the broker is that it can be extended with native Erlang/Elixir plugins but also using Lua scripts or Webhooks.

The extension that enables Lua scripts is called vmq_diversity and is shipped with the standar VerneMQ build.

The Lua support is limited to what luerl supports. What you get is support to communicate with MySQL, PostgreSQL, MongoDB, and Redis as well as JSON and HTTP libraries.

Setup

VerneMQ uses the port 8888 to serve various HTTP listeners:

  • Prometheus
  • Status page
  • Management API
  • Health check

To use Websockets:

listener.ws.default = 127.0.0.1:9001
listener.wss.default = 127.0.0.1:9002

Setup vernemq using docker:

$ docker run --name vernemq1 \
  -p 5883:1883 \
  -p 8888:8888 \
  -p 9001:9001 \
  -v ~/Development/vmq_plugins:/vernemq/share/lua \
  -d erlio/docker-vernemq

Connect using docker to create an API key:

$ docker exec -ti <CONTAINER> /bin/bash

Once inside the container:

$ vmq-admin api-key create
HhylwhuVh7ejkmQoKJsjczIBQIbVmv2D

You can now laverage the HTTP API. To check if its working:

$ curl "http://HhylwhuVh7ejkmQoKJsjczIBQIbVmv2D@192.168.10.105:8888/api/v1/session/show"
{"table":[],"type":"table"}

You don't have any clients connected.

The HTTP API maps vqm-admin commands to endpoints. A CLI command to join a cluster:

$ vmq-admin cluster join discovery-node=NodeB@10.0.0.2

Is mapped to this endpoint GET /api/v1/cluster/join?discovery-node=NodeB@10.0.0.2, using curl:

$ curl "http://HhylwhuVh7ejkmQoKJsjczIBQIbVmv2D@192.168.10.105:8888/api/v1/cluster/join?discovery-node=NodeB@10.0.0.2"

Configuration

Ther are some things you should change in the default configuration file, like the default cluster cookie- a secret used for erlang nodes to intercommunicate

distributed_cookie = vmq

If you use a peristence backend for auth you can configure connection details such as passwords, users, etc in this file as well.

You can use vmq-admin to set configuration options. During development you might want to test the broker without authentication or if you want to enable plugins:

$ vmq-admin set allow_anonymous=on

If you want to load a plugin script:

$ vmq-admin script load path=./share/lua/custom/hello_vmq.lua

If you make changes and want to reload the plugin:

$ vmq-admin script reload path=./share/lua/custom/hello_vmq.lua

If you are running a cluster you probably also want to change the nodename so that you can distinguish individual nodes, otherwise all nodes will have the default name VerneMQ@127.0.0.1.

NOTE: If your edits on vernemq.conf are being ignored, e.g. the section of listeners at the end of the configuration file is not what you had edited then is because the Dockerfile VerneMQ start command checks for a given name:

if [ -f /vernemq/etc/vernemq.conf.local ]; then
  cp /vernemq/etc/vernemq.con.local /vernemq/etc/vernemq.conf
else
    sed -i '/####### Start #######/, /######### End #########/d' /vernemq/etc/vernemq/conf
    echo ...
fi

MQTT Sessions

MQTT has a new protocol version: 5.0 which includes custom headers for metadata, request/response flows, provisioning features, and shared subscriptions, better diconnect handling to name a few.

The provisioning features enable brokers to tell clients to (re)connect to other broker instances. Shared subscriptions might provide a way to handle messages upstream if no local handlers are found (VERIFY!), more details can be found in this article.

https://blog.codecentric.de/files/2017/11/Shared-Subscriptions.png

Hooks

The different lifecyle hooks vary between MQTT versions.

MQTT v3.x provides the following session hooks:

  • auth_on_register: Allow your plugin to grant or reject new client connections
  • on_client_wakeup: The client has reached a completely initialized, normal state for accepting messages
  • on_register: Allow your plugin to get informed about a newly authenticated client
  • on_client_offline: Called if a client using clean_session=false closes the connection or gets disconnected by a duplicate client.
  • on_client_gone: Called if a client using clean_session=true closes the connection or gets disconnected by a duplicate client.

MQTT v5 provides the following hooks on top of those provided by v3.x:

https://blobscdn.gitbook.com/v0/b/gitbook-28427.appspot.com/o/assets%2F-LOvr-CQz8jJxldqmC1L%2F-LQhetiSJ7WSAsU0I94M%2F-LQhevSFyludKgrPjRuv%2Fsession_lifecycle.svg?generation=1541585748617188&alt=media

The following subscription hooks:

  • auth_on_subscribe: Allow your plugin to grant or reject subscribe requests sent by a client. They also makes it possible to rewrite the subscribe topic and qos.
  • on_subscribe: Allow your plugin to get informed about an authorized subscribe request
  • on_unsubscribe: Allow your plugin to get informed about an unsubscribe request. They also allow you to rewrite the unsubscribe topic if required.

MQTT v5 provides:

  • auth_on_subscribe_m5: Allow your plugin to grant or reject subscribe requests sent by a client. They also makes it possible to rewrite the subscribe topic and qos.
  • on_subscribe_m5: Allow your plugin to get informed about an authorized subscribe request
  • on_unsubscribe_m5: Allow your plugin to get informed about an unsubscribe request. They also allow you to rewrite the unsubscribe topic if required.

Here we can see the publish flow diagram

https://blobscdn.gitbook.com/v0/b/gitbook-28427.appspot.com/o/assets%2F-LOvr-CQz8jJxldqmC1L%2F-LQhetiSJ7WSAsU0I94M%2F-LQhevgl6zCd9v-6YLn7%2Fpublish_flow.svg?generation=1541585748633398&alt=media

The following publish hooks:

  • auth_on_publish: Allow your plugin to grant or reject publish requests sent by a client. It also enables to rewrite the publish topic, payload, qos.
  • on_publish: Allow your plugin to get informed about an authorized publish message.
  • on_offline_message: Allows your plugin to get notified about a new a queued message for a client that is currently offline.
  • on_deliver: Allow your plugin to get informed about outgoing publish messages, but also allows you to rewrite topic and payload of the outgoing message.

MQTT v5 provides:

  • auth_on_publish_m5: Allow your plugin to grant or reject publish requests sent by a client. It also enables to rewrite the publish topic, payload, qos, or retain flag and in the case of auth_on_publish_m5 properties.
  • on_publish_m5: Allow your plugin to get informed about an authorized publish message.
  • on_deliver_m5: Allow your plugin to get informed about outgoing publish messages, but also allows you to rewrite topic and payload of the outgoing message.

Sample auth_on_publish payload:

{
  "client_id": "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-43-goliatorum.local",
  "mountpoint": "",
  "payload": "{\"action\":\"ping\", \"client\":\"092FD2C0-FB42-45CB-8BCC-C086A26C74B4\"}",
  "qos": 0.0,
  "retain": false,
  "topic": "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping"
}

Sample on_client_offline payload:

{
  "client_id": "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-43-goliatorum.local",
  "mountpoint": ""
}

Lua sample plugin

The following is a sample plugin to showcase the different hooks and to see the payloads we get from each.

config = {
    pool_id = "metrics",
    size = 10,
}

http.ensure_pool(config)

function my_auth_on_register_m5(reg)
    send_data(req, "my_auth_on_register_m5")
    return true
end

function my_auth_on_publish_m5(pub)
    send_data(pub, "my_auth_on_publish_m5")
    return true
end

function my_auth_on_subscribe_m5(sub)
    send_data(sub, "my_auth_on_subscribe_m5")
    return true
end

function my_auth_on_publish(pub)
    send_data(pub, "my_auth_on_publish")
    return true
end

function my_auth_on_subscribe(sub)
    send_data(sub, "my_auth_on_subscribe")
    return true
end

function my_on_unsubscribe(sub)
    send_data(sub, "my_on_unsubscribe")
end

function my_on_client_gone(c)
    send_data(c, "my_on_client_gone")
end

function my_on_client_offline(c)
    send_data(c, "my_on_client_offline")
end

function my_auth_on_register(reg)
    send_data(reg, "my_auth_on_register")
    return true
end

function my_auth_on_publish(pub)
    send_data(pub, "my_auth_on_publish")
    return true
end

function send_data(out, label)
    out = json.encode(out)
    print("-------------------------------")
    print("===== " .. label, out)

    -- we are inside the container so we need to punch out, either use ngrok
    -- or IP
    headers = {}
    headers["x-origin"] = "vmq-plugin"
    headers["x-action"] = label
    headers["content-type"] = "applicaton/json;charset=utf-8"
    ret = http.post("metrics", "http://192.168.10.105:9595",  out, headers)

    if (ret.status) then
        http.body(ret.client_ref)
        print("===", json.encode(ret))
    else
        print("error")
    end
    print("*******************")
end

hooks = {
    auth_on_register = my_auth_on_register,
    auth_on_publish = my_auth_on_publish,
    auth_on_subscribe = my_auth_on_subscribe,

    on_unsubscribe = my_on_unsubscribe,
    on_client_gone = my_on_client_gone,
    on_client_offline = my_on_client_offline,

    auth_on_register_m5 = my_auth_on_register,
    auth_on_publish_m5 = my_auth_on_publish,
    auth_on_subscribe_m5 = my_auth_on_subscribe_m5,
}

We will use a very simple node server to receive the data and trace it out to the screen. We will use polka to handle the server.

const polka = require('polka');

function bodyParser(req, res, next) {
    var data = '';
    req.setEncoding('utf8');
    req.on('data', function(chunk) {
        data += chunk;
    });

    req.on('end', function() {
        console.log('end', data);
        if (data.indexOf("{") === 0) data = JSON.parse(data);
        req.body = data;
        next();
    });
}

polka()
    .use(bodyParser)
    .post('/', (req, res) => {
        console.log('------------------------');
        console.log(req.headers);
        console.log('body', req.body);
        console.log('\n');
        res.end('ok');
    })
    .listen(9595, err => {
        if (err) throw err;
        console.log(`> POLKA! Running on localhost:9595`);
    });

Notes

If you have the default vmq-acl plugin running it will take precedence over other scripts unless you change the priority of your custom plugins.

A client that fires and forgets will generate two events:

  • auth_on_publish
  • on_client_gone

Webhooks

VerneMQ also offers webhooks as a way to execute external plugins. You register an HTTP endpoint with a plugin hook e.g. auth_on_register and whenever the hook is called a POST request will be sent to the given endpoint.

You need enable webhooks in the vernemq.conf file by settting plugins.vmq_webhooks = on. Note that if you start your broker with webhooks off you might not get the vmq-admin webhooks command.

Webhooks can be registered by adding them in the config file.

vmq_webhooks.mywebhook1.hook = auth_on_register
vmq_webhooks.mywebhook1.endpoint = http://192.168.10.105:8000/auth-on-register

It is also possible to load them dynamically using the admin tool:

$ vmq-admin webhooks register hook=auth_on_register endpoint="http://192.168.10.105:8000/auth-on-register"

To show which webhooks are registered:

$ vmq-admin webhooks show

Debug

vmq-admin has support for a tracing mechanism- that can be used in production- to show what a client is sending/receiving.

$ vmq-admin trace client client-id=092FD2C0-FB42-45CB-8BCC-C086A26C74B4

The output includes calls to hooks.

Starting trace for 1 existing sessions for client "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" with PIDs
    [<8263.3794.0>]
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
                                                 <<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGREQ()
<8263.3794.0> MQTT SEND: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGRESP()
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
                                                 <<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGREQ()
<8263.3794.0> MQTT SEND: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGRESP()
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
                                                 <<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGREQ()
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
                                                 <<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
    {"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT SEND: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGRESP()

Resources