MQTT client

MQTT client #

Client event script #

This example will publish the event value and then disconnect from the MQTT server.

Modify the broker, port, username, password and topic variables as needed.

broker = '192.168.1.101'
port = 1883
username = 'user'
password = '123456'
topic = 'lm/test/1'
payload = event.getvalue()

mqtt = require('mosquitto')
client = mqtt.new()

client.ON_CONNECT = function(status, rc, msg)
  if status then
    log('mqtt connected')
    client:publish(topic, tostring(payload))
  else
    log('mqtt connect failed ' .. tostring(msg))
    client:disconnect()
  end
end

client.ON_PUBLISH = function()
  client:disconnect()
end

client:login_set(username, password)
status, rc, msg = client:connect(broker, port)

if status then
  client:loop_forever()
else
  log('connect failed: ' .. tostring(msg))
end

Enable security for MQTT connection #

Change port as needed (secure port is usually 8883)

ca.pem is uploaded via ftp account to ftp root directory

Put the following code after client = mqtt.new():

client:tls_set('/data/ftp/ca.pem')

If you also need client certificate/private key:

client:tls_set('/data/ftp/ca.pem', nil, '/data/ftp/client.crt', '/data/ftp/client.key')

Object value exchange via MQTT. #

Create a resident script with 0 sleep interval. Change broker IP address and mapping tables as needed.

This example assumes that object name and topic name are the same.

Objects must have a correct tag assigned: mqtt-status tag is for MQTT to object mapping and mqtt-control is for object to MQTT mapping.

if not broker then
  broker = '192.168.1.101'

  function multiply(mult)
    return function(value)
      local num = tonumber(value)
      if num then
        return num * mult
      else
        return value
      end
    end
  end

  -- topic to object map
  mqtt_to_object = {}

  -- optional topic value conversion function
  mqtt_to_object_conv = {}

  -- object to topic map
  object_to_mqtt = {}

  objs = grp.tag('mqtt-control')
  for _, obj in ipairs(objs) do
    object_to_mqtt[ obj.address ] = obj.name
  end

  objs = grp.tag('mqtt-status')
  for _, obj in ipairs(objs) do
    mqtt_to_object[ obj.name ] = obj.address
  end

  datatypes = {}

  grp.sender = 'mq'
  require('socket')

  for addr, _ in pairs(object_to_mqtt) do
    local obj = grp.find(addr)
    if obj then
      datatypes[ addr ] = obj.datatype
    end
  end

  mclient = require('mosquitto').new()

  mclient.ON_CONNECT = function(res, ...)
    log('mqtt connect status', res, ...)

    if res then
      for topic, _ in pairs(mqtt_to_object) do
        mclient:subscribe(topic)
      end
    else
      mclient:disconnect()
    end
  end

  mclient.ON_MESSAGE = function(mid, topic, payload)
    local addr = mqtt_to_object[ topic ]
    if addr then
      local fn = mqtt_to_object_conv[ topic ]

      if fn then
        payload = fn(payload)
      end

      grp.write(addr, payload)
    end
  end

  mclient.ON_DISCONNECT = function(...)
    log('mqtt disconnect', ...)
    mclientfd = nil
  end

  function mconnect()
    local fd

    mclient:connect(broker)
    fd = mclient:socket()

    -- fd ref is valid
    if fd then
      mclientfd = fd
    end
  end

  mconnect()

  function publishvalue(event)
    -- message from us or client is not connected
    if event.sender == 'mq' or not mclientfd then
      return
    end

    local addr = event.dst
    local dpt = datatypes[ addr ]
    local topic = object_to_mqtt[ addr ]

    -- unknown object
    if not dpt or not topic then
      return
    end

    local value = busdatatype.decode(event.datahex, dpt)
    if value ~= nil then
      if type(value) == 'boolean' then
        value = value and 1 or 0
      end

      mclient:publish(topic, tostring(value))
    end
  end

  lbclient = require('localbus').new(1)
  lbclient:sethandler('groupwrite', publishvalue)

  lbclientfd = socket.fdmaskset(lbclient:getfd(), 'r')

  -- run timer every 5 seconds
  timer = require('timerfd').new(5)
  timerfd = socket.fdmaskset(timer:getfd(), 'r')
end

-- mqtt connected
if mclientfd then
  mclientfdset = socket.fdmaskset(mclientfd, mclient:want_write() and 'rw' or 'r')
  res, lbclientstat, timerstat, mclientstat =
      socket.selectfds(10, lbclientfd, timerfd, mclientfdset)
-- mqtt not connected
else
  res, lbclientstat, timerstat =
    socket.selectfds(10, lbclientfd, timerfd)
end

if mclientfd and mclientstat then
  if socket.fdmaskread(mclientstat) then
    mclient:loop_read()
  end

  if socket.fdmaskwrite(mclientstat) then
    mclient:loop_write()
  end
end

if lbclientstat then
  lbclient:step()
end

if timerstat then
  -- clear armed timer
  timer:read()

  if mclientfd then
    mclient:loop_misc()
  else
    mconnect()
  end
end

Further assistance can be found in this forum thread