MQTT Setup Vernemq Plugin Development
VerneMQ: MQTT broker
VerneMQ is a rather simple to setup and run but powerful MQTT broker writen in Erlang that can be clustered easily. One of the nice aspects of the broker is that it can be extended with native Erlang/Elixir plugins but also using Lua scripts or Webhooks.
The extension that enables Lua scripts is called vmq_diversity and is shipped with the standar VerneMQ build.
The Lua support is limited to what luerl supports. What you get is support to communicate with MySQL, PostgreSQL, MongoDB, and Redis as well as JSON and HTTP libraries.
Setup
VerneMQ uses the port 8888
to serve various HTTP listeners:
- Prometheus
- Status page
- Management API
- Health check
To use Websockets:
listener.ws.default = 127.0.0.1:9001
listener.wss.default = 127.0.0.1:9002
Setup vernemq using docker:
$ docker run --name vernemq1 \
-p 5883:1883 \
-p 8888:8888 \
-p 9001:9001 \
-v ~/Development/vmq_plugins:/vernemq/share/lua \
-d erlio/docker-vernemq
Connect using docker to create an API key:
$ docker exec -ti <CONTAINER> /bin/bash
Once inside the container:
$ vmq-admin api-key create
HhylwhuVh7ejkmQoKJsjczIBQIbVmv2D
You can now laverage the HTTP API. To check if its working:
$ curl "http://HhylwhuVh7ejkmQoKJsjczIBQIbVmv2D@192.168.10.105:8888/api/v1/session/show"
{"table":[],"type":"table"}
You don't have any clients connected.
The HTTP API maps vqm-admin
commands to endpoints. A CLI command to join a cluster:
$ vmq-admin cluster join discovery-node=NodeB@10.0.0.2
Is mapped to this endpoint GET /api/v1/cluster/join?discovery-node=NodeB@10.0.0.2
, using curl
:
$ curl "http://HhylwhuVh7ejkmQoKJsjczIBQIbVmv2D@192.168.10.105:8888/api/v1/cluster/join?discovery-node=NodeB@10.0.0.2"
Configuration
Ther are some things you should change in the default configuration file, like the default cluster cookie- a secret used for erlang nodes to intercommunicate
distributed_cookie = vmq
If you use a peristence backend for auth you can configure connection details such as passwords, users, etc in this file as well.
You can use vmq-admin
to set configuration options. During development you might want to test the broker without authentication or if you want to enable plugins:
$ vmq-admin set allow_anonymous=on
If you want to load a plugin script:
$ vmq-admin script load path=./share/lua/custom/hello_vmq.lua
If you make changes and want to reload the plugin:
$ vmq-admin script reload path=./share/lua/custom/hello_vmq.lua
If you are running a cluster you probably also want to change the nodename
so that you can distinguish individual nodes, otherwise all nodes will have the default name VerneMQ@127.0.0.1.
NOTE: If your edits on vernemq.conf are being ignored, e.g. the section of listeners at the end of the configuration file is not what you had edited then is because the Dockerfile VerneMQ start command checks for a given name:
if [ -f /vernemq/etc/vernemq.conf.local ]; then
cp /vernemq/etc/vernemq.con.local /vernemq/etc/vernemq.conf
else
sed -i '/####### Start #######/, /######### End #########/d' /vernemq/etc/vernemq/conf
echo ...
fi
MQTT Sessions
MQTT has a new protocol version: 5.0 which includes custom headers for metadata, request/response flows, provisioning features, and shared subscriptions, better diconnect handling to name a few.
The provisioning features enable brokers to tell clients to (re)connect to other broker instances. Shared subscriptions might provide a way to handle messages upstream if no local handlers are found (VERIFY!), more details can be found in this article.
Hooks
The different lifecyle hooks vary between MQTT versions.
MQTT v3.x provides the following session hooks:
auth_on_register
: Allow your plugin to grant or reject new client connectionson_client_wakeup
: The client has reached a completely initialized, normal state for accepting messageson_register
: Allow your plugin to get informed about a newly authenticated clienton_client_offline
: Called if a client usingclean_session=false
closes the connection or gets disconnected by a duplicate client.on_client_gone
: Called if a client usingclean_session=true
closes the connection or gets disconnected by a duplicate client.
MQTT v5 provides the following hooks on top of those provided by v3.x:
on_auth_m5
: SASL style authentication or enhanced authentication flows.
The following subscription hooks:
auth_on_subscribe
: Allow your plugin to grant or reject subscribe requests sent by a client. They also makes it possible to rewrite the subscribe topic and qos.on_subscribe
: Allow your plugin to get informed about an authorized subscribe requeston_unsubscribe
: Allow your plugin to get informed about an unsubscribe request. They also allow you to rewrite the unsubscribe topic if required.
MQTT v5 provides:
auth_on_subscribe_m5
: Allow your plugin to grant or reject subscribe requests sent by a client. They also makes it possible to rewrite the subscribe topic and qos.on_subscribe_m5
: Allow your plugin to get informed about an authorized subscribe requeston_unsubscribe_m5
: Allow your plugin to get informed about an unsubscribe request. They also allow you to rewrite the unsubscribe topic if required.
Here we can see the publish flow diagram
The following publish hooks:
auth_on_publish
: Allow your plugin to grant or reject publish requests sent by a client. It also enables to rewrite the publish topic, payload, qos.on_publish
: Allow your plugin to get informed about an authorized publish message.on_offline_message
: Allows your plugin to get notified about a new a queued message for a client that is currently offline.on_deliver
: Allow your plugin to get informed about outgoing publish messages, but also allows you to rewrite topic and payload of the outgoing message.
MQTT v5 provides:
auth_on_publish_m5
: Allow your plugin to grant or reject publish requests sent by a client. It also enables to rewrite the publish topic, payload, qos, or retain flag and in the case of auth_on_publish_m5 properties.on_publish_m5
: Allow your plugin to get informed about an authorized publish message.on_deliver_m5
: Allow your plugin to get informed about outgoing publish messages, but also allows you to rewrite topic and payload of the outgoing message.
Sample auth_on_publish
payload:
{
"client_id": "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-43-goliatorum.local",
"mountpoint": "",
"payload": "{\"action\":\"ping\", \"client\":\"092FD2C0-FB42-45CB-8BCC-C086A26C74B4\"}",
"qos": 0.0,
"retain": false,
"topic": "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping"
}
Sample on_client_offline
payload:
{
"client_id": "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-43-goliatorum.local",
"mountpoint": ""
}
Lua sample plugin
The following is a sample plugin to showcase the different hooks and to see the payloads we get from each.
config = {
pool_id = "metrics",
size = 10,
}
http.ensure_pool(config)
function my_auth_on_register_m5(reg)
send_data(req, "my_auth_on_register_m5")
return true
end
function my_auth_on_publish_m5(pub)
send_data(pub, "my_auth_on_publish_m5")
return true
end
function my_auth_on_subscribe_m5(sub)
send_data(sub, "my_auth_on_subscribe_m5")
return true
end
function my_auth_on_publish(pub)
send_data(pub, "my_auth_on_publish")
return true
end
function my_auth_on_subscribe(sub)
send_data(sub, "my_auth_on_subscribe")
return true
end
function my_on_unsubscribe(sub)
send_data(sub, "my_on_unsubscribe")
end
function my_on_client_gone(c)
send_data(c, "my_on_client_gone")
end
function my_on_client_offline(c)
send_data(c, "my_on_client_offline")
end
function my_auth_on_register(reg)
send_data(reg, "my_auth_on_register")
return true
end
function my_auth_on_publish(pub)
send_data(pub, "my_auth_on_publish")
return true
end
function send_data(out, label)
out = json.encode(out)
print("-------------------------------")
print("===== " .. label, out)
-- we are inside the container so we need to punch out, either use ngrok
-- or IP
headers = {}
headers["x-origin"] = "vmq-plugin"
headers["x-action"] = label
headers["content-type"] = "applicaton/json;charset=utf-8"
ret = http.post("metrics", "http://192.168.10.105:9595", out, headers)
if (ret.status) then
http.body(ret.client_ref)
print("===", json.encode(ret))
else
print("error")
end
print("*******************")
end
hooks = {
auth_on_register = my_auth_on_register,
auth_on_publish = my_auth_on_publish,
auth_on_subscribe = my_auth_on_subscribe,
on_unsubscribe = my_on_unsubscribe,
on_client_gone = my_on_client_gone,
on_client_offline = my_on_client_offline,
auth_on_register_m5 = my_auth_on_register,
auth_on_publish_m5 = my_auth_on_publish,
auth_on_subscribe_m5 = my_auth_on_subscribe_m5,
}
We will use a very simple node server to receive the data and trace it out to the screen. We will use polka to handle the server.
const polka = require('polka');
function bodyParser(req, res, next) {
var data = '';
req.setEncoding('utf8');
req.on('data', function(chunk) {
data += chunk;
});
req.on('end', function() {
console.log('end', data);
if (data.indexOf("{") === 0) data = JSON.parse(data);
req.body = data;
next();
});
}
polka()
.use(bodyParser)
.post('/', (req, res) => {
console.log('------------------------');
console.log(req.headers);
console.log('body', req.body);
console.log('\n');
res.end('ok');
})
.listen(9595, err => {
if (err) throw err;
console.log(`> POLKA! Running on localhost:9595`);
});
Notes
If you have the default vmq-acl
plugin running it will take precedence over other scripts unless you change the priority of your custom plugins.
A client that fires and forgets will generate two events:
auth_on_publish
on_client_gone
Webhooks
VerneMQ also offers webhooks as a way to execute external plugins. You register an HTTP endpoint with a plugin hook e.g. auth_on_register
and whenever the hook is called a POST
request will be sent to the given endpoint.
You need enable webhooks in the vernemq.conf file by settting plugins.vmq_webhooks = on
. Note that if you start your broker with webhooks off you might not get the vmq-admin webhooks
command.
Webhooks can be registered by adding them in the config file.
vmq_webhooks.mywebhook1.hook = auth_on_register
vmq_webhooks.mywebhook1.endpoint = http://192.168.10.105:8000/auth-on-register
It is also possible to load them dynamically using the admin tool:
$ vmq-admin webhooks register hook=auth_on_register endpoint="http://192.168.10.105:8000/auth-on-register"
To show which webhooks are registered:
$ vmq-admin webhooks show
Debug
vmq-admin
has support for a tracing mechanism- that can be used in production- to show what a client is sending/receiving.
$ vmq-admin trace client client-id=092FD2C0-FB42-45CB-8BCC-C086A26C74B4
The output includes calls to hooks.
Starting trace for 1 existing sessions for client "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" with PIDs
[<8263.3794.0>]
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
<<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGREQ()
<8263.3794.0> MQTT SEND: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGRESP()
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
<<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGREQ()
<8263.3794.0> MQTT SEND: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGRESP()
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
<<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGREQ()
<8263.3794.0> MQTT RECV: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PUBLISH(d0, q0, r0, m0, "event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping") with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Calling auth_on_publish(undefined,{[],
<<"092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local">>},0,event/092FD2C0-FB42-45CB-8BCC-C086A26C74B4/action/ping,false) with payload:
{"action":"ping", "client":"092FD2C0-FB42-45CB-8BCC-C086A26C74B4"}
<8263.3794.0> Hook returned "ok"
<8263.3794.0> MQTT SEND: CID: "092FD2C0-FB42-45CB-8BCC-C086A26C74B4-19-goliatorum.local" PINGRESP()