InfluxDB

InfluxDB #

Task #

Export object values into InfluxDB v2 either in real-time or periodically.

Resident script #

Set sleep time to 0. Adjust url and token variables as needed.

All objects that have influxdb will have their values sent to InfluxDB in real-time (for each received telegram).

local http = require('socket.http')
http.TIMEOUT = 5

local url = 'http://192.168.1.200:8086/api/v2/write?bucket=XXX&org=XXX'
local token = 'XXXXX'
local influxtable = 'rawdata'
local tagobjs = grp.tag('influxdb')

local function escape(value)
  return tostring(value):gsub('[\\ ,=]', '\\%1')
end

local objects = {}

for _, obj in ipairs(tagobjs) do
  objects[ obj.id ] = {
    name = escape(obj.name),
    datatype = obj.datatype,
  }
end

local function send(name, addr, value)
  local body

  if type(value) == 'boolean' then
    body = string.format('%s,name=%s,addr=%s state=%s', influxtable, name, addr, value)
  elseif type(value) == 'number' then
    body = string.format('%s,name=%s,addr=%s value=%s', influxtable, name, addr, value)
  elseif type(value) == 'string' then
    body = string.format('%s,name=%s,addr=%s string=%q', influxtable, name, addr, value)
  else
    log('invalid data type', addr, type(value))
    return
  end

  local res, code = http.request({
    url = url,
    method = 'POST',
    body = body,
    headers = {
      Authorization = 'Token ' .. token
    }
  })

  if code ~= 204 then
    log('error sending to influx', res, code, body)
  end
end

local busclient = require('localbus').new(60)
busclient:sethandler('groupwrite', function(event)
  local obj = objects[ event.dstraw ]

  if obj then
    local value = busdatatype.decode(event.datahex, obj.datatype)

    if value ~= nil then
      send(obj.name, event.dst, value)
    end
  end
end)

while true do
  busclient:loop(60)
end

Scheduled script #

Adjust url and token variables as needed.

All objects that have influxdb will have their values sent to InfluxDB periodically (depending on how often the script is set to run).

local http = require('socket.http')
http.TIMEOUT = 5

local url = 'http://192.168.1.200:8086/api/v2/write?bucket=XXX&org=XXX'
local token = 'XXXXX'
local influxtable = 'rawdata'
local tagobjs = grp.tag('influxdb')

local function escape(value)
  return tostring(value):gsub('[\\ ,=]', '\\%1')
end

local function send(name, addr, value)
  local body

  if type(value) == 'boolean' then
    body = string.format('%s,name=%s,addr=%s state=%s', influxtable, name, addr, value)
  elseif type(value) == 'number' then
    body = string.format('%s,name=%s,addr=%s value=%s', influxtable, name, addr, value)
  elseif type(value) == 'string' then
    body = string.format('%s,name=%s,addr=%s string=%q', influxtable, name, addr, value)
  else
    log('invalid data type', addr, type(value))
    return
  end

  local res, code = http.request({
    url = url,
    method = 'POST',
    body = body,
    headers = {
      Authorization = 'Token ' .. token
    }
  })

  if code ~= 204 then
    log('error sending to influx', res, code, body)
  end
end

for _, obj in ipairs(tagobjs) do
  send(escape(obj.name), obj.address, obj.value)
end

Further assistance can be found in this forum thread