Code listing for mqtt.lua
The code listing below, mqtt.lua, gives the code to implement the MQTT protocol on NetScaler using protocol extensions. The code only has the TCP client data callback function defined - client.on_data(). For server data, it does not add a callback function and the server to client takes the fast native path. For client data, the code parses the CONNECT MQTT protocol message and extracts the ClientID. It then uses the ClientID for user_token value, which is used to load balance all the client traffic for the connection based on the ClientID by setting LB method for the LB vserver as USER_TOKEN. It uses the ClientID also for user_session value, which can be used for LB persistence by setting persistence type for the LB vserver as USERSESSION. The code uses the ns.send() to do LB and send the initial data. It uses the ns.pipe() API to send the rest of the client traffic directly to server connection, bypassing calls to extension callback handler.
--[[
  MQTT event handler for TCP client data
    ctxt - TCP client side App processing context.
    data - TCP Data stream received.
    - parse the client ID from the connect message - the first message should be connect
    - send the data to LB with ClientID as user token and session
    - pipe the subsequent data to LB directly. This way the subsequent MQTT traffic will
      bypass the tcp client on_data handler
    - if a parse error is seen, throw an error so the connection is reset
--]]
function client.on_data(ctxt, payload)
    local data = payload.data
    local data_len = data:len()
    local offset = 1
    local byte = nil
    local utf8_str_len = 0
    local msg_type = 0
    local multiplier = 1
    local max_multiplier = 128 * 128 * 128
    local rem_length = 0
    local clientID = nil
    -- check if MQTT fixed header is present (fixed header length is atleast 2 bytes)
    if (data_len < 2) then
        goto need_more_data
    end
    byte = data:byte(offset)
    offset = offset + 1
       -- check for connect packet - type value 1
    msg_type = bit32.rshift(byte, 4)
    if (msg_type ~= 1) then
        error("Missing MQTT Connect packet.")
    end
    -- parse the remaining length
    repeat
        if (multiplier > max_multiplier) then
           error("MQTT CONNECT packet parse error - invalid Remaining Length.")
       end
       if (data_len < offset) then
          goto need_more_data
       end
       byte = data:byte(offset)
       offset = offset + 1
       rem_length = rem_length + (bit32.band(byte, 0x7F) * multiplier)
       multiplier = multiplier * 128
    until (bit32.band(byte, 0x80) == 0)
    -- protocol name
    -- check if protocol name length is present
    if (data_len < offset + 1) then
       goto need_more_data
    end
    -- protocol name length MSB
    byte = data:byte(offset)
    offset = offset + 1
    utf8_str_len = byte * 256
    -- length LSB
    byte = data:byte(offset)
    offset = offset + 1
    utf8_str_len = utf8_str_len + byte
       -- skip the variable header for connect message
    -- the four required fields (protocol name, protocol level, connect flags, keep alive)
    offset = offset + utf8_str_len + 4
    -- parse the client ID
    --
    -- check if client ID len is present
    if (data_len < offset + 1) then
       goto need_more_data
    end
       -- client ID length MSB
    byte = data:byte(offset)
    offset = offset + 1
    utf8_str_len = byte * 256
    -- length LSB
    byte = data:byte(offset)
    offset = offset + 1
    utf8_str_len = utf8_str_len + byte
    if (data_len < (offset + utf8_str_len - 1)) then
        goto need_more_data
    end
    clientID = data:sub(offset, offset + utf8_str_len - 1)
    -- send the data so far to lb, user_token is set to do LB based on clientID
    -- user_session is set to clientID as well (it will be used to persist session)
    ns.send(ctxt.output, "DATA", {data = data,
                               user_token = clientID,
                               user_session = clientID})
       -- pipe the subsequent traffic to the lb - to bypass the extension handler
    ns.pipe(ctxt.input, ctxt.output)
    goto parse_done
    ::need_more_data::
    ctxt:hold(data)
    ::parse_done::
    return
end
<!--NeedCopy-->