Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
haltdos-pro-waf / usr / local / lib / lua / 5.1 / resty / amqp / frame.lua
Size: Mime:
--
-- Copyright (C) 2016 Meng Zhang @ Yottaa,Inc
-- Copyright (C) 2018 4mig4
--
--
-- [0].https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
-- [1].https://www.rabbitmq.com/amqp-0-9-1-reference.html
--

local c = require('resty.amqp.consts')
local buffer = require('resty.amqp.buffer')
local logger = require('resty.amqp.logger')
local bit = require('bit')

local band = bit.band
local bor = bit.bor

local byte = string.byte

local debug = logger.dbg
local is_debug_enabled = logger.is_debug_enabled

local amqp_frame = {}


local function declare_exchange_flags(method)
  local bits = 0

  if method.passive then
    bits = bor(bits,1)
  end

  if method.durable then
    bits = bor(bits, 2)
  end

  if method.auto_delete then
    bits = bor(bits, 4)
  end

  if method.internal then
    bits = bor(bits,8)
  end

  if method.no_wait then
    bits = bor(bits, 16)
  end

  return bits
end

local function toboolean(n)
  if n and n ~= 0 then
    return true
  end
  return false
end

local function parse_exchange_flags(bits)
  local flags = {}
  flags.passive = toboolean(band(bits, 1))
  flags.auto_delete = toboolean(band(bits, 4))
  flags.internal = toboolean(band(bits, 8))
  flags.no_wait = toboolean(band(bits, 16))
  return flags
end

local function declare_queue_flags(method)
  local bits = 0

  if method.passive then
    bits = bor(bits,1)
  end

  if method.durable then
    bits = bor(bits, 2)
  end

  if method.exclusive then
    bits = bor(bits, 4)
  end

  if method.auto_delete then
    bits = bor(bits, 8)
  end

  if method.no_wait then
    bits = bor(bits, 16)
  end

  return bits
end

local function parse_queue_flags(bits)
  local flags = {}
  flags.passive = toboolean(band(bits, 1))
  flags.durable = toboolean(band(bits, 2))
  flags.exclusive = toboolean(band(bits, 4))
  flags.auto_delete = toboolean(band(bits, 8))
  flags.no_wait = toboolean(band(bits, 16))
  return flags
end

local function basic_consume_flags(method)
  local bits = 0

  if method.no_local then
    bits = bor(bits,1)
  end

  if method.no_ack then
    bits = bor(bits, 2)
  end

  if method.exclusive then
    bits = bor(bits, 4)
  end

  if method.no_wait then
    bits = bor(bits, 8)
  end

  return bits
end

local function parse_consume_flags(bits)
  local flags = {}
  flags.no_local = toboolean(band(bits,1))
  flags.no_ack = toboolean(band(bits,2))
  flags.exclusive = toboolean(band(bits,4))
  flags.no_wait = toboolean(band(bits,8))
  return flags
end

local function parse_exchange_delete_flags(bits)
  local flags = {}
  flags.if_unused = toboolean(band(bits,1))
  flags.no_wait = toboolean(band(bits,2))
  return flags
end

local function parse_queue_delete_flags(bits)
  local flags = {}
  flags.if_unused = toboolean(band(bits,1))
  flags.if_empty = toboolean(band(bits,2))
  flags.no_wait = toboolean(band(bits,4))
  return flags
end

local function decode_close_reply(b)
  local frame = {}
  frame.reply_code = b:get_i16()
  frame.reply_text = b:get_short_string()
  frame.class_id = b:get_i16()
  frame.method_id = b:get_i16()
  return frame
end

local function encode_close_reply(method)
  local b = buffer.new()
  b:put_i16(method.reply_code)
  b:put_short_string(method.reply_text)
  b:put_i16(method.class_id)
  b:put_i16(method.method_id)
  return b:payload()
end

local function nop()
  return nil
end

local methods_ = {
  [c.class.CONNECTION] = {
    name = "connection",
    --[[
    major octet
    minor octet
    properties field_table
    mechanism long_string
    locales long_string
    --]]
    [c.method.connection.START] = {
      name = "start",
      r = function(b)
        local f = {}
        f.major = b:get_i8()
        f.minor = b:get_i8()
        f.props = b:get_field_table()
        f.mechanism = b:get_long_string()
        f.locales = b:get_long_string()
        return f
      end
    },
    --[[
    client_properties field_table
    mechanism short_string
    response long_string
    locale short_string
    --]]
    [c.method.connection.START_OK] = {
      name = "start_ok",
      w = function(method)
        local b = buffer.new()
        b:put_field_table(method.properties)
        b:put_short_string(method.mechanism)
        b:put_long_string(method.response)
        b:put_short_string(method.locale)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.properties = b:get_field_table()
        f.mechanism = b:get_short_string()
        f.response = b:get_long_string()
        f.locale = b:get_short_string()
        return f
      end
    },
    --[[
    secure long_string
    --]]
    [c.method.connection.SECURE] = {
      name = "secure"
    },
    [c.method.connection.SECURE_OK] = {
      name = "secure_ok"
    },
    --[[
    channel_max i16
    frame_max i32
    beartbeat i16
    --]]
    [c.method.connection.TUNE] = {
      name = "tune",
      r = function(b)
        local f = {}
        f.channel_max = b:get_i16()
        f.frame_max = b:get_i32()
        f.heartbeat = b:get_i16()
        return f
      end
    },
    --[[
    channel_max i16
    frame_max i32
    heartbeat i16
    --]]
    [c.method.connection.TUNE_OK] = {
      name = "tune_ok",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.channel_max)
        b:put_i32(method.frame_max)
        b:put_i16(method.heartbeat or c.DEFAULT_HEARTBEAT)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.channel_max = b:get_i16()
        f.frame_max = b:get_i32()
        f.heartbeat = b:get_i16()
        return f
      end
    },
    --[[
    virtual_host short_string,
    reserved-1(capabilities) octet
    reserved-2 octet
    --]]
    [c.method.connection.OPEN] = {
      name = "open",
      w = function(method)
        local b = buffer.new()
        b:put_short_string(method.virtual_host)
        b:put_i8(0) -- capabilities
        b:put_i8(1) -- insist?
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.virtual_host = b:get_short_string()
        f.capabilities = b:get_i8()
        f.insist = b:get_i8()
        return f
      end
    },
    --[[
    reserved-1 short_string
    --]]
    [c.method.connection.OPEN_OK] = {
      name = "open_ok",
      r = function(b)
        return { reserved1 = b:get_short_string() }
      end
    },
    --[[
    reply_code i16
    reply_text short_string
    class_id i16
    method_id i16
    --]]
    [c.method.connection.CLOSE] = {
      name = "close",
      r = decode_close_reply,
      w = encode_close_reply
    },
    --[[
    --]]
    [c.method.connection.CLOSE_OK] = {
      name = "close_ok",
      r = nop,
      w = nop
    },
    --[[
    reason short_string
    --]]
    [c.method.connection.BLOCKED] = {
      name = "blocked",
      r = function(b)
        return {reason = b:get_short_string()}
      end,
      w = function(method)
        local b = buffer.new()
        b:put_short_string(method.reason)
        return b:payload()
      end
    },
    --[[
    --]]
    [c.method.connection.UNBLOCKED] = {
      name = "unblocked",
      r = function(--[[b--]])
        return nil
      end,
      w = function(--[[method--]])
        return nil
      end
    }
  },
  [c.class.CHANNEL] = {
    name = "channel",
    [c.method.channel.OPEN] = {
      name = "open",
      w = function(--[[method--]])
        -- reserved?
        return '\0'
      end,
      r = function(--[[method--]])
        return '\0'
      end
    },
    [c.method.channel.OPEN_OK] = {
      name = "open_ok",
      r = function(b)
        return {
          reserved1 = b:get_long_string()
        }
      end
    },
    --[[
    active bit
    --]]
    [c.method.channel.FLOW] = {
      name = "flow",
      r = function(b)
        return { active = b:get_bool() }
      end,
      w = function(method)
        local b = buffer.new()
        b:put_bool(method.active)
        return b:payload()
      end

    },
    [c.method.channel.FLOW_OK] = {
      name = "flow_ok",
      r = function(b)
        local bits = b:get_i8()
        return { active = band(bits,1) }
      end,
      w = function(method)
        local b = buffer.new()
        b:put_bool(method.active)
        return b:payload()
      end
    },
    --[[
    reply_code i16
    reply_text short_string
    class_id i16
    method_id i16
    --]]
    [c.method.channel.CLOSE] = {
      name = "close",
      r = decode_close_reply,
      w = encode_close_reply
    },
    [c.method.channel.CLOSE_OK] = {
      name = "close_ok",
      r = nop,
      w = nop
    },
  },
  [c.class.EXCHANGE] = {
    name = "exchange",
    --[[
    reserved1 i16
    exchange short_string
    type short_string
    passive bit
    durable bit
    auto_delete bit
    internal bit
    no_wait bit
    arguments table
    --]]
    [c.method.exchange.DECLARE] = {
      name = "declare",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.exchange)
        b:put_short_string(method.typ)
        b:put_i8(declare_exchange_flags(method))
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.exchange = b:get_short_string()
        f.typ = b:get_short_string()
        f.exchange_flags = parse_exchange_flags(b:get_i8())
        f.arguments = b:get_field_table()
        return f
      end
    },
    [c.method.exchange.DECLARE_OK] = {
      name = "declare_ok",
      r = nop
    },
    --[[
    reserved1 i16
    destination short_string
    source short_string
    routing_key short_string
    no_wait bit
    arguments table
    --]]
    [c.method.exchange.BIND] = {
      name = "bind",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.destination)
        b:put_short_string(method.source)
        b:put_short_string(method.routing_key)
        b:put_bool(method.no_wait)
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.destination = b:get_short_string()
        f.source = b:get_short_string()
        f.routing_key = b:get_short_string()
        f.no_wait = b:get_bool()
        f.arguments = b:get_field_table()
        return f
      end
    },
    [c.method.exchange.BIND_OK] = {
      name = "bind_ok",
      r = nop
    },
    --[[
    --]]
    [c.method.exchange.DELETE] = {
      name = "delete",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.exchange)
        local bits = 0
        if method.if_unused then
          bits = bor(bits,1)
        end
        if method.no_wait then
          bits = bor(bits,2)
        end
        b:put_i8(bits)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.exchange = b:get_short_string()
        f.exchange_flags = parse_exchange_delete_flags(b:get_i8())
        return f
      end
    },
    [c.method.exchange.DELETE_OK] = {
      name = "delete_ok",
      r = nop
    },
    --[[
    --]]
    [c.method.exchange.UNBIND] = {
      name = "unbind",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.destination)
        b:put_short_string(method.source)
        b:put_short_string(method.routing_key)
        b:put_bool(method.no_wait)
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.destination = b:get_short_string()
        f.source = b:get_short_string()
        f.routing_key = b:get_short_string()
        f.no_wait = b:get_bool()
        f.arguments = b:gut_field_table()
        return f
      end
    },
    [c.method.exchange.UNBIND_OK] = {
      name = "unbind_ok",
      r = nop
    },
  },
  [c.class.QUEUE] = {
    name = "queue",
    [c.method.queue.DECLARE] = {
      name = "declare",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.ticket or 0)
        b:put_short_string(method.queue)
        local bits = declare_queue_flags(method)
        b:put_i8(bits)
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.ticket = b:get_i16()
        f.queue =  b:get_short_string()
        f.queue_flags = parse_queue_flags(b:get_i8())
        f.arguments = b:get_field_table()
        return f
      end
    },
    [c.method.queue.DECLARE_OK] = {
      name = "declare_ok",
      r = function(b)
        local f = {}
        f.queue = b:get_short_string()
        f.message_count = b:get_i32()
        f.consumer_count = b:get_i32()
        return f
      end
    },
    [c.method.queue.BIND] = {
      name = "bind",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.queue)
        b:put_short_string(method.exchange)
        b:put_short_string(method.routing_key or "")
        b:put_bool(method.no_wait)
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.queue = b:get_short_string()
        f.exchange = b:get_short_string()
        f.routing_key = b:get_short_string()
        f.no_wait = b:get_bool()
        f.arguments = b:get_field_table()
        return f
      end
    },
    [c.method.queue.BIND_OK] = {
      name = "bind_ok",
      r = nop
    },
    --[[
    reserved1 i16
    queue short_string
    if_unused bit
    if_empty bit
    no_wait bit
    --]]
    [c.method.queue.DELETE] = {
      name = "delete",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.queue)
        local bits = 0
        if method.if_unused then
          bits = bor(bits,1)
        end
        if method.if_empty then
          bits = bor(bits,2)
        end
        if method.no_wait then
          bits = bor(bits, 4)
        end
        b:put_i8(bits)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.queue = b:get_short_string()
        f.queue_flags = parse_queue_delete_flags(b:get_i8())
        return f
      end
    },
    --[[
    message_count i32
    --]]
    [c.method.queue.DELETE_OK] = {
      name = "delete_ok",
      r = function(b)
        return {
          message_count = b:get_i32()
        }
      end
    },
    [c.method.queue.UNBIND] = {
      name = "unbind",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.queue)
        b:put_short_string(method.exchange)
        b:put_short_string(method.routing_key or "")
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.queue = b:get_short_string()
        f.exchange = b:get_short_string()
        f.routing_key = b:get_short_string()
        f.arguments = b:get_field_table()
        return f
      end
    },
    [c.method.queue.UNBIND_OK] = {
      name = "unbind_ok",
      r = nop
    },
    --[[
    reserved1 i16
    queue short_string
    no_wait bit
    --]]
    [c.method.queue.PURGE] = {
      name = "queue_purge",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.queue)
        local bits = 0
        if method.no_wait then
          bits = bor(bits, 1)
        end
        b:put_i8(bits)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.queue = b:get_short_string()
        f.queue_flags = { no_wait = toboolean(b:get_i8(), 1) }
        return f
      end
    },
    --[[
    message_count i32
    --]]
    [c.method.queue.PURGE_OK] = {
      name = "purge_ok",
      r = function(b)
        return {
          message_count = b:get_i32()
        }
      end
    },


  },
  [c.class.BASIC] = {
    name = "basic",
    [c.method.basic.CONSUME] = {
      name = "consume",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.ticket or 0)
        b:put_short_string(method.queue)
        b:put_short_string(method.consumer_tag or "")
        b:put_i8(basic_consume_flags(method))
        b:put_field_table(method.arguments or {})
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.ticket = b:get_i16()
        f.queue = b:get_short_string()
        f.consumer_tag = b:get_short_string()
        f.consume_flags = parse_consume_flags(b:get_i8())
        f.arguments = b:get_field_table()
        return f
      end
    },
    [c.method.basic.CONSUME_OK] = {
      name = "consume_ok",
      r = function(b)
        return {
          consumer_tag = b:get_short_string()
        }
      end
    },
    --[[
    consumer_tag short_string
    delivery_tag i64
    redelivered bool
    exchange short_string
    routing_key short_string
    --]]
    [c.method.basic.DELIVER] = {
      name = "deliver",
      r = function(b)
        local f = {}
        f.consumer_tag = b:get_short_string()
        f.delivery_tag = b:get_i64()
        f.redelivered = b:get_i8()
        f.exchange = b:get_short_string()
        f.routing_key = b:get_short_string()
        return f
      end,
      w = function(method)
        local b = buffer.new()
        b:put_short_string(method.consumer_tag)
        b:put_i64(method.delivery_tag)
        b:put_bool(method.redelivered)
        b:put_short_string(method.exchange)
        b:put_short_string(method.routing_key)
        return b:payload()
      end
    },
    --[[
    prefectch_size i32
    prefetch_count i16
    global bit
    --]]
    [c.method.basic.QOS] = {
      name = "qos",
      w = function(method)
        local b = buffer.new()
        b:put_i32(method.prefetch_size)
        b:put_i16(method.prefetch_count)
        b:put_bool(method.global)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.prefetch_size = b:get_i32()
        f.prefetch_count = b:get_i16()
        f.global = b:get_bool()
        return f
      end
    },
    [c.method.basic.QOS_OK] = {
      name = "qos_ok",
      r = nop
    },
    --[[
    consumer_tag short_string
    no_wait bit
    --]]
    [c.method.basic.CANCEL] = {
      name = "cancel",
      w = function(method)
        local b = buffer.new()
        b:put_short_string(method.consumer_tag)
        b:put_bool(method.no_wait)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.consumer_tag = b:get_short_string()
        f.no_wait = b:get_bool()
        return f
      end
    },
    [c.method.basic.CANCEL_OK] = {
      name = "cancel_ok",
      r = function(b)
        return {
          consumer_tag = b:get_short_string()
        }
      end
    },
    --[[
    reserved1 i16
    queue short_string
    no_ack bit
    --]]
    [c.method.basic.GET] = {
      name = "get",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.queue)
        b:put_bool(method.no_ack)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.queue = b:get_short_string()
        f.no_ack = b:get_bool()
        return f
      end
    },
    --[[
    delivery_tag i64
    redelivered bool
    exchange short_string
    routing_key short_string
    message_count i32
    --]]
    [c.method.basic.GET_OK] = {
      name = "get_ok",
      r = function(b)
        local f = {}
        f.delivery_tag = b:get_i64()
        f.redelivered = b:get_bool()
        f.exchange = b:get_short_string()
        f.routing_key = b:get_short_string()
        f.message_count = b:get_i32()
        return f
      end
    },
    --[[
    requeue bit
    --]]
    [c.method.basic.RECOVER] = {
      name = "recover",
      w = function(method)
        local b = buffer.new()
        b:put_bool(method.requeue)
        return b:payload()
      end,
      r = function(b)
        return {
          requeue = b:get_bool()
        }
      end
    },
    [c.method.basic.RECOVER_OK] = {
      name = "recover_ok",
      r = nop
    },
    [c.method.basic.RECOVER_ASYNC] = {
      name = "recover_async",
      w = function(method)
        local b = buffer.new()
        b:put_bool(method.requeue)
        return b:payload()
      end,
      r = function(b)
        return {
          requeue = b:get_bool()
        }
      end
    },
    --[[
    reserved1 i16
    exchange short_string
    routing_key short_string
    mandatory bit
    immediate bit
    --]]
    [c.method.basic.PUBLISH] = {
      name = "publish",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reserved1 or 0)
        b:put_short_string(method.exchange)
        b:put_short_string(method.routing_key)
        local bits = 0
        if method.mandatory then
          bits = bor(bits,1)
        end
        if method.immediate then
          bits = bor(bits,2)
        end
        b:put_i8(bits)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        local _,reserved1 = pcall(buffer.get_i16, b)
        f.reserved1 = reserved1 or 0
        f.exchange = b:get_short_string()
        f.routing_key = b:get_short_string()
        local bits = b:get_i8()
        f.mandatory = band(bits,1)
        f.immediate = band(bits,2)
        return f
      end
    },
    --[[
    reply_code i16
    reply_text short_string
    exchange short_string
    routing_key short_string
    --]]
    [c.method.basic.RETURN] = {
      name = "return",
      w = function(method)
        local b = buffer.new()
        b:put_i16(method.reply_code)
        b:put_short_string(method.reply_text)
        b:put_short_string(method.exchange)
        b:put_short_string(method.routing_key)
        return b:payload()
      end,
      r = function(b)
        local f = {}
        f.reply_code = b:get_i16()
        f.reply_text = b:get_short_string()
        f.exchange = b:get_short_string()
        f.routing_key = b:get_short_string()
        return f
      end
    },
    --[[
    reserved1 i16
    --]]
    [c.method.basic.GET_EMPTY] = {
      name = "get_emtpy",
      r = function(b)
        local _,reserved1 = pcall(buffer.get_i16, b)
        return {
          reserved1 = reserved1 or 0
        }
      end
    },
    --[[
    delivery_tag i64
    multiple bit
    --]]
    [c.method.basic.ACK] = {
      name = "ack",
      r = function(b)
        local method = {}
        method.delivery_tag = b:get_i64()
        method.multiple = b:get_bool()
        return method
      end,
      w = function(method)
        local b = buffer.new()
        b:put_i64(method.delivery_tag)
        b:put_bool(method.multiple)
        return b:payload()
      end
    },
    --[[
    delivery_tag i64
    multiple bit
    requeue bit
    --]]
    [c.method.basic.NACK] = {
      name = "nack",
      r = function(b)
        local f = {}
        f.delivery_tag = b:get_i64()
        local v = b:get_i8()
        f.multiple = (band(v,0x1) ~= 0)
        f.requeue = (band(v,0x2) ~= 0)
        return f
      end,
      w = function(method)
        local b = buffer.new()
        b:put_i64(method.delivery_tag)
        local bits = 0
        if method.multiple and method.multiple ~= 0 then
          bits = bor(bits, 1)
        end
        if method.requeue and method.requeue ~= 0 then
          bits = bor(bit, 2)
        end
        b:put_i16(bits)
        return b:payload()
      end
    },
    --[[
    delivery_tag i64
    requeue bit
    --]]
    [c.method.basic.REJECT] = {
      name = "reject",
      r = function(b)
        local f = {}
        f.delivery_tag = b:get_i64()
        f.requeue = b:get_bool()
        return f
      end,
      w = function(method)
        local b = buffer.new()
        b:put_i64(method.delivery_tag)
        b:put_bool(method.requeue)
        return b:payload()
      end
    },

  },
  [c.class.TX] = {
    name = "tx",
    [c.method.tx.SELECT] = {
      name = "select",
      r = nop,
      w = nop
    },
    [c.method.tx.SELECT_OK] = {
      name = "select_ok",
      r = nop,
      w = nop
    },
    [c.method.tx.COMMIT] = {
      name = "commit",
      r = nop,
      w = nop
    },
    [c.method.tx.COMMIT_OK] = {
      name = "commit_ok",
      r = nop,
      w = nop
    },
    [c.method.tx.ROLLBACK] = {
      name = "rollback",
      r = nop,
      w = nop
    },
    [c.method.tx.ROLLBACK_OK] = {
      name = "rollback_ok",
      r = nop,
      w = nop
    },

  },
  [c.class.CONFIRM] = {
    name = "confirm",
    --[[
    no_wait bit
    --]]
    [c.method.confirm.SELECT] = {
      name = "select",
      w = function(method)
        local b = buffer.new()
        b:put_bool(method.no_wait)
        return b:payload()
      end,
      r = function(b)
        return {
          no_wait = b:get_bool()
        }
      end
    },
    [c.method.confirm.SELECT_OK] = {
      name = "select_ok",
      r = nop
    },
  }
}

--
-- decoder
--

local function method_frame(data,channel)
  local frame = { channel = channel }
  local b = buffer.new(data)
  if is_debug_enabled() then
    debug("[method_frame]",b:hex_dump())
  end
  local class_id = b:get_i16()
  local method_id = b:get_i16()
  frame.class_id = class_id
  frame.method_id = method_id
  debug("[method_frame] class_id:",class_id, "method_id:", method_id)
  local codec = methods_[class_id][method_id]
  if not codec then
    local err = "[method_frame]: no codec for class: " .. class_id .. " method: " .. method_id
    return nil,err
  end

  if not codec.r then
    local err = "[method_frame]: no decoder for class: " .. class_id .. " method: " .. method_id
    return nil, err
  end
  debug("[method_frame] class:",methods_[class_id].name, "method:", codec.name)
  frame.method = codec.r(b)
  return frame
end

local function header_frame(data,channel)

  local frame = { channel = channel, properties = {} }
  local b = buffer.new(data)

  if is_debug_enabled() then
    debug("[header_frame]",b:hex_dump())
  end

  frame.class_id = b:get_i16()
  frame.weight = b:get_i16()
  frame.body_size = b:get_i64()

  local flag = b:get_i16()

  frame.flag = flag

  if band(flag,c.flag.CONTENT_TYPE) ~= 0 then
    frame.properties.content_type= b:get_short_string()
  end

  if band(flag,c.flag.CONTENT_ENCODING) ~= 0 then
    frame.properties.content_encoding = b:get_short_string()
  end

  if band(flag,c.flag.HEADERS) ~= 0 then
    frame.properties.headers = b:get_field_table()
  end

  if band(flag,c.flag.DELIVERY_MODE) ~= 0 then
    frame.properties.delivery_mode = b:get_i8()
  end

  if band(flag,c.flag.PRIORITY) ~= 0 then
    frame.properties.priority = b:get_i8()
  end

  if band(flag,c.flag.CORRELATION_ID) ~= 0 then
    frame.properties.correlation_id = b:get_short_string()
  end

  if band(flag,c.flag.REPLY_TO) ~= 0 then
    frame.properties.reply_to = b:get_short_string()
  end

  if band(flag,c.flag.EXPIRATION) ~= 0 then
    frame.properties.expiration = b:get_short_string()
  end

  if band(flag,c.flag.MESSAGE_ID) ~= 0 then
    frame.properties.message_id = b:get_short_string()
  end

  if band(flag,c.flag.TIMESTAMP) ~= 0 then
    frame.properties.timestamp = b:get_timestamp()
  end

  if band(flag,c.flag.TYPE) ~= 0 then
    frame.properties.type = b:get_short_string()
  end

  if band(flag,c.flag.USER_ID) ~= 0 then
    frame.properties.user_id = b:get_short_string()
  end

  if band(flag,c.flag.APP_ID) ~= 0 then
    frame.properties.app_id = b:get_short_string()
  end

  if band(flag,c.flag.RESERVED1) ~= 0 then
    frame.properties.reserved1 = b:get_short_string()
  end

  return frame
end

local function body_frame(data,channel)
  local frame = { channel = channel }
  local b = buffer.new(data)
  if is_debug_enabled() then
    debug("[body_frame]",b:hex_dump())
  end
  frame.body = b:payload()
  return frame
end


local function heartbeat_frame(channel,size)
  local frame = { channel = channel }
  if size > 0 then
    return nil
  end
  return frame
end

local match = string.match

if _G.ngx and _G.ngx.match then
  match = _G.ngx.match
end

function amqp_frame.consume_frame(ctx)

  local ok
  local err
  local fe
  local data


  data,err = ctx:receive(7)

  if not data then
    return nil, err
  end

  local b = buffer.new(data)
  if is_debug_enabled() then
    debug("[frame] take the first 7 octets: ",b:hex_dump())
  end

  local typ = b:get_i8()
  local channel = b:get_i16()
  local size = b:get_i32()

  data, err = ctx:receive(size)

  if not data then
    return nil, err
  end
  if typ == c.frame.METHOD_FRAME then
    ok,fe,err = pcall(method_frame,data,channel)
  elseif typ == c.frame.HEADER_FRAME then
    ok,fe,err = pcall(header_frame,data,channel)
  elseif typ == c.frame.BODY_FRAME then
    ok,fe,err = pcall(body_frame,data,channel)
  elseif typ == c.frame.HEARTBEAT_FRAME then
    ok,fe,err = pcall(heartbeat_frame,channel,size)
  else
    ok = nil
    err = "invalid frame type"
  end

  -- THE END --
  local ok0,err0 = ctx:receive(1)
  if not ok0 then
    return nil,err0
  end

  local tk = byte(ok0,1)
  if tk ~= c.frame.FRAME_END then
    if match(data, "^AMQP") then
      return nil, "connect event"
    end
    return nil,"malformed frame: no frame_end"
  end

  -- err captured by pcall, most likely, due to malformed frames
  if not ok then
    return nil, fe
  end

  -- other errors
  if not fe then
    return nil, err
  end

  fe.type = typ
  return fe, nil
end

--
-- encoder
--

local function encode_frame(typ,channel,payload)
  payload = payload or ""
  local size = #payload
  local b = buffer.new()
  b:put_i8(typ)
  b:put_i16(channel)
  b:put_i32(size)
  b:put_payload(payload)
  b:put_i8(c.frame.FRAME_END)
  return b:payload()
end

local function encode_method_frame(frame)
  local b = buffer.new()
  b:put_i16(frame.class_id)
  b:put_i16(frame.method_id)
  local payload = methods_[frame.class_id][frame.method_id].w(frame.method)
  if payload then
    b:put_payload(payload)
  end
  return encode_frame(c.frame.METHOD_FRAME,frame.channel,b:payload())
end

local function flags_mask(frame)
  local mask = 0
  if not frame.properties then
    return mask
  end
  if frame.properties.content_type ~= nil then
    mask = bor(mask,c.flag.CONTENT_TYPE)
  end
  if frame.properties.content_encoding ~= nil then
    mask = bor(mask,c.flag.CONTENT_ENCODING)
  end
  if frame.properties.headers ~= nil then
    mask = bor(mask,c.flag.HEADERS)
  end
  if frame.properties.delivery_mode ~= nil then
    mask = bor(mask,c.flag.DELIVERY_MODE)
  end
  if frame.properties.priority ~= nil then
    mask = bor(mask,c.flag.PRIORITY)
  end
  if frame.properties.correlation_id ~= nil then
    mask = bor(mask,c.flag.CORRELATION_ID)
  end
  if frame.properties.reply_to ~= nil then
    mask = bor(mask,c.flag.REPLY_TO)
  end
  if frame.properties.expiration ~= nil then
    mask = bor(mask,c.flag.EXPIRATION)
  end
  if frame.properties.timestamp ~= nil then
    mask = bor(mask,c.flag.TIMESTAMP)
  end
  if frame.properties.type ~= nil then
    mask = bor(mask,c.flag.TYPE)
  end
  if frame.properties.user_id ~= nil then
    mask = bor(mask,c.flag.USER_ID)
  end
  if frame.properties.app_id ~= nil then
    mask = bor(mask,c.flag.APP_ID)
  end
  return mask
end

local function encode_header_frame(frame)
  local b = buffer.new()
  b:put_i16(frame.class_id)
  b:put_i16(frame.weight)
  b:put_i64(frame.size)

  local flags = flags_mask(frame)
  b:put_i16(flags)
  if band(flags,c.flag.CONTENT_TYPE) ~= 0 then
    b:put_short_string(frame.properties.content_type)
  end

  if band(flags,c.flag.CONTENT_ENCODING) ~= 0 then
    b:put_short_string(frame.properties.content_encoding)
  end

  if band(flags,c.flag.HEADERS) ~= 0 then
    b:put_field_table(frame.properties.headers)
  end

  if band(flags,c.flag.DELIVERY_MODE) ~= 0 then
    b:put_i8(frame.properties.delivery_mode)
  end

  if band(flags,c.flag.PRIORITY) ~= 0 then
    b:put_i8(frame.properties.priority)
  end

  if band(flags,c.flag.CORRELATION_ID) ~= 0 then
    b:put_short_string(frame.properties.correlation_id)
  end

  if band(flags,c.flag.REPLY_TO) ~= 0 then
    b:put_short_string(frame.properties.reply_to)
  end

  if band(flags,c.flag.EXPIRATION) ~= 0 then
    b:put_short_string(frame.properties.expiration)
  end

  if band(flags,c.flag.MESSAGE_ID) ~= 0 then
    b:put_short_string(frame.properties.message_id)
  end

  if band(flags,c.flag.TIMESTAMP) ~= 0 then
    b:put_time_stamp(frame.properties.timestamp)
  end

  if band(flags,c.flag.TYPE) ~= 0 then
    b:put_short_string(frame.properties.type)
  end

  if band(flags,c.flag.USER_ID) ~= 0 then
    b:put_short_string(frame.properties.user_id)
  end

  if band(flags,c.flag.APP_ID) ~= 0 then
    b:put_short_string(frame.properties.app_id)
  end

  return encode_frame(c.frame.HEADER_FRAME,frame.channel,b:payload())
end

local function encode_body_frame(frame)
  return encode_frame(c.frame.BODY_FRAME,frame.channel,frame.body)
end

local function encode_heartbeat_frame(frame)
  return encode_frame(c.frame.HEARTBEAT_FRAME,frame.channel,nil)
end

local mt = { __index = amqp_frame }

--
-- new a frame
--

function amqp_frame.new(typ,channel)
  return setmetatable({ typ = typ, channel = channel }, mt)
end

function amqp_frame.new_method_frame(channel,class_id,method_id)
  local frame = amqp_frame.new(c.frame.METHOD_FRAME, channel)
  frame.class_id = class_id
  frame.method_id = method_id
  return frame
end

function amqp_frame:encode()
  local typ = self.typ
  if not typ then
    local err = "no frame type specified."
    logger.error("[frame.encode] " .. err)
    return nil,err
  end

  if typ == c.frame.METHOD_FRAME then
    return encode_method_frame(self)
  elseif typ == c.frame.HEADER_FRAME then
    return encode_header_frame(self)
  elseif typ == c.frame.BODY_FRAME then
    return encode_body_frame(self)
  elseif typ == c.frame.HEARTBEAT_FRAME then
    return encode_heartbeat_frame(self)
  else
    local err = "invalid frame type" .. tostring(typ)
    logger.error("[frame.encode]" .. err)
    return nil, err
  end
end

--
-- protocol
--

function amqp_frame.wire_protocol_header(ctx)
  local bytes, err = ctx:send("AMQP\0\0\9\1")
  if not bytes then
    return nil, err
  end
  return amqp_frame.consume_frame(ctx)
end

function amqp_frame.wire_heartbeat(ctx)
  local frame = amqp_frame.new(c.frame.HEARTBEAT_FRAME,c.DEFAULT_CHANNEL)
  local msg = frame:encode()
  local bytes, err = ctx:send(msg)
  if not bytes then
    return nil,"[heartbeat]" .. err
  end

  return bytes
end

function amqp_frame.wire_header_frame(ctx,body_size,properties)
  local frame = amqp_frame.new(c.frame.HEADER_FRAME,ctx.opts.channel or 1)
  frame.class_id = c.class.BASIC
  frame.weight = 0
  frame.size = body_size
  frame.properties = properties
  local msg = frame:encode()
  local bytes, err = ctx:send(msg)
  if not bytes then
    return nil,"[wire_header_frame]" .. err
  end

  return bytes
end

function amqp_frame.wire_body_frame(ctx,payload)
  local frame = amqp_frame.new(c.frame.BODY_FRAME,ctx.opts.channel or 1)
  frame.class_id = c.class.BASIC
  frame.body = payload
  local msg = frame:encode()
  local bytes, err = ctx:send(msg)
  if not bytes then
    return nil,"[wire_body_frame]" .. err
  end
  return bytes
end


local function is_channel_close_received(frame)
  return frame ~= nil and frame.class_id == c.class.CHANNEL and frame.method_id == c.method.channel.CLOSE
end

local function is_connection_close_received(frame)
  return frame ~= nil and frame.class_id == c.class.CONNECTION and frame.method_id == c.method.connection.CLOSE
end


local function ongoing(ctx,frame)
  ctx.ongoing = ctx.ongoing or {}
  ctx.ongoing.class_id = frame.class_id
  ctx.ongoing.method_id = frame.method_id
end

function amqp_frame.wire_method_frame(ctx,frame)
  local f

  local msg = frame:encode()
  local bytes,err = ctx:send(msg)

  if not bytes then
    return nil,"[wire_method_frame]" .. err
  end

  debug("[wire_method_frame] wired a frame.", "[class_id]: ", frame.class_id, "[method_id]: ", frame.method_id)

  if frame.method ~= nil and not frame.method.no_wait then

    f, err = amqp_frame.consume_frame(ctx)

    if f then
      debug("[wire_method_frame] channel: ",f.channel)
      if f.method then
        debug("[wire_method_frame] method: ",f.method)
      end

      if is_channel_close_received(f) then
        ctx.channel_state = c.state.CLOSE_WAIT
        ongoing(ctx,frame)
        return nil, f.method.reply_code, f.method.reply_text
      end

      if is_connection_close_received(f) then
        ctx.channel_state = c.state.CLOSED
        ctx.connection_state = c.state.CLOSE_WAIT
        ongoing(ctx,frame)
        return nil, f.method.reply_code, f.method.reply_text
      end
    end
    return f, err
  end
  return true
end

return amqp_frame