Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

vistahigherlearning / logstash   deb

Repository URL to install this package:

/ opt / logstash / vendor / bundle / jruby / 1.9 / gems / ffi-rzmq-1.0.0 / lib / ffi-rzmq / socket.rb


module ZMQ

  module CommonSocketBehavior

    attr_reader :socket, :name

    # Allocates a socket of type +type+ for sending and receiving data.
    #
    # +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
    # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP,
    # ZMQ::DEALER or ZMQ::ROUTER.
    #
    # By default, this class uses ZMQ::Message for manual
    # memory management. For automatic garbage collection of received messages,
    # it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
    #
    #  sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
    #
    # Advanced users may want to replace the receiver class with their
    # own custom class. The custom class must conform to the same public API
    # as ZMQ::Message.
    #
    # Creation of a new Socket object can return nil when socket creation
    # fails.
    #
    #  if (socket = Socket.new(context.pointer, ZMQ::REQ))
    #    ...
    #  else
    #    STDERR.puts "Socket creation failed"
    #  end
    #
    def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
      new(context_ptr, type, opts) rescue nil
    end

    # To avoid rescuing exceptions, use the factory method #create for
    # all socket creation.
    #
    # Allocates a socket of type +type+ for sending and receiving data.
    #
    # +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
    # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP,
    # ZMQ::DEALER or ZMQ::ROUTER.
    #
    # By default, this class uses ZMQ::Message for manual
    # memory management. For automatic garbage collection of received messages,
    # it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
    #
    #  sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
    #
    # Advanced users may want to replace the receiver class with their
    # own custom class. The custom class must conform to the same public API
    # as ZMQ::Message.
    #
    # Creation of a new Socket object can raise an exception. This occurs when the
    # +context_ptr+ is null or when the allocation of the 0mq socket within the
    # context fails.
    #
    #  begin
    #    socket = Socket.new(context.pointer, ZMQ::REQ)
    #  rescue ContextError => e
    #    # error handling
    #  end
    #
    def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
      # users may override the classes used for receiving; class must conform to the
      # same public API as ZMQ::Message
      @receiver_klass = opts[:receiver_class]

      context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)

      unless context_ptr.null?
        @socket = LibZMQ.zmq_socket context_ptr, type
        if @socket && !@socket.null?
          @name = SocketTypeNameMap[type]
        else
          raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
        end
      else
        raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
      end

      @longlong_cache = @int_cache = nil
      @more_parts_array = []
      @option_lookup = []
      populate_option_lookup

      define_finalizer
    end

    # Set the queue options on this socket.
    #
    # Valid +name+ values that take a numeric +value+ are:
    #  ZMQ::HWM
    #  ZMQ::SWAP (version 2 only)
    #  ZMQ::AFFINITY
    #  ZMQ::RATE
    #  ZMQ::RECOVERY_IVL
    #  ZMQ::MCAST_LOOP (version 2 only)
    #  ZMQ::LINGER
    #  ZMQ::RECONNECT_IVL
    #  ZMQ::BACKLOG
    #  ZMQ::RECOVER_IVL_MSEC (version 2 only)
    #  ZMQ::RECONNECT_IVL_MAX (version 3 only)
    #  ZMQ::MAXMSGSIZE (version 3 only)
    #  ZMQ::SNDHWM (version 3 only)
    #  ZMQ::RCVHWM (version 3 only)
    #  ZMQ::MULTICAST_HOPS (version 3 only)
    #  ZMQ::RCVTIMEO (version 3 only)
    #  ZMQ::SNDTIMEO (version 3 only)
    #
    # Valid +name+ values that take a string +value+ are:
    #  ZMQ::IDENTITY (version 2/3 only)
    #  ZMQ::SUBSCRIBE
    #  ZMQ::UNSUBSCRIBE
    #
    # Returns 0 when the operation completed successfully.
    # Returns -1 when this operation failed.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    #  rc = socket.setsockopt(ZMQ::LINGER, 1_000)
    #  ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
    #
    def setsockopt name, value, length = nil
      if 1 == @option_lookup[name]
        length = 8
        pointer = LibC.malloc length
        pointer.write_long_long value

      elsif 0 == @option_lookup[name]
        length = 4
        pointer = LibC.malloc length
        pointer.write_int value

      elsif 2 == @option_lookup[name]
        length ||= value.size

        # note: not checking errno for failed memory allocations :(
        pointer = LibC.malloc length
        pointer.write_string value
      end

      rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
      LibC.free(pointer) unless pointer.nil? || pointer.null?
      rc
    end

    # Convenience method for checking on additional message parts.
    #
    # Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.
    #
    # Warning: if the call to #getsockopt fails, this method will return
    # false and swallow the error.
    #
    #  message_parts = []
    #  message = Message.new
    #  rc = socket.recvmsg(message)
    #  if ZMQ::Util.resultcode_ok?(rc)
    #    message_parts << message
    #    while more_parts?
    #      message = Message.new
    #      rc = socket.recvmsg(message)
    #      message_parts.push(message) if resulcode_ok?(rc)
    #    end
    #  end
    #
    def more_parts?
      rc = getsockopt ZMQ::RCVMORE, @more_parts_array

      Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
    end

    # Binds the socket to an +address+.
    #
    #  socket.bind("tcp://127.0.0.1:5555")
    #
    def bind address
      LibZMQ.zmq_bind @socket, address
    end

    # Connects the socket to an +address+.
    #
    #  rc = socket.connect("tcp://127.0.0.1:5555")
    #
    def connect address
      rc = LibZMQ.zmq_connect @socket, address
    end

    # Closes the socket. Any unprocessed messages in queue are sent or dropped
    # depending upon the value of the socket option ZMQ::LINGER.
    #
    # Returns 0 upon success *or* when the socket has already been closed.
    # Returns -1 when the operation fails. Check ZMQ.errno for the error code.
    #
    #  rc = socket.close
    #  puts("Given socket was invalid!") unless 0 == rc
    #
    def close
      if @socket
        remove_finalizer
        rc = LibZMQ.zmq_close @socket
        @socket = nil
        release_cache
        rc
      else
        0
      end
    end

    # Queues the message for transmission. Message is assumed to conform to the
    # same public API as #Message.
    #
    # +flags+ may take two values:
    # * 0 (default) - blocking operation
    # * ZMQ::NonBlocking - non-blocking operation
    # * ZMQ::SNDMORE - this message is part of a multi-part message
    #
    # Returns 0 when the message was successfully enqueued.
    # Returns -1 under two conditions.
    # 1. The message could not be enqueued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    def sendmsg message, flags = 0
      __sendmsg__(@socket, message.address, flags)
    end

    # Helper method to make a new #Message instance out of the +string+ passed
    # in for transmission.
    #
    # +flags+ may be ZMQ::NonBlocking and ZMQ::SNDMORE.
    #
    # Returns 0 when the message was successfully enqueued.
    # Returns -1 under two conditions.
    # 1. The message could not be enqueued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    def send_string string, flags = 0
      message = Message.new string
      send_and_close message, flags
    end

    # Send a sequence of strings as a multipart message out of the +parts+
    # passed in for transmission. Every element of +parts+ should be
    # a String.
    #
    # +flags+ may be ZMQ::NonBlocking.
    #
    # Returns 0 when the messages were successfully enqueued.
    # Returns -1 under two conditions.
    # 1. A message could not be enqueued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    def send_strings parts, flags = 0
      send_multiple(parts, flags, :send_string)
    end

    # Send a sequence of messages as a multipart message out of the +parts+
    # passed in for transmission. Every element of +parts+ should be
    # a Message (or subclass).
    #
    # +flags+ may be ZMQ::NonBlocking.
    #
    # Returns 0 when the messages were successfully enqueued.
    # Returns -1 under two conditions.
    # 1. A message could not be enqueued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    def sendmsgs parts, flags = 0
      send_multiple(parts, flags, :sendmsg)
    end

    # Sends a message. This will automatically close the +message+ for both successful
    # and failed sends.
    #
    # Returns 0 when the message was successfully enqueued.
    # Returns -1 under two conditions.
    # 1. The message could not be enqueued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    def send_and_close message, flags = 0
      rc = sendmsg message, flags
      message.close
      rc
    end

    # Dequeues a message from the underlying queue. By default, this is a blocking operation.
    #
    # +flags+ may take two values:
    #  0 (default) - blocking operation
    #  ZMQ::NonBlocking - non-blocking operation
    #
    # Returns 0 when the message was successfully dequeued.
    # Returns -1 under two conditions.
    # 1. The message could not be dequeued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    # The application code is responsible for handling the +message+ object lifecycle
    # when #recv returns an error code.
    #
    def recvmsg message, flags = 0
      #LibZMQ.zmq_recvmsg @socket, message.address, flags
      __recvmsg__(@socket, message.address, flags)
    end

    # Helper method to make a new #Message instance and convert its payload
    # to a string.
    #
    # +flags+ may be ZMQ::NonBlocking.
    #
    # Returns 0 when the message was successfully dequeued.
    # Returns -1 under two conditions.
    # 1. The message could not be dequeued
    # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
    #
    # With a -1 return code, the user must check ZMQ.errno to determine the
    # cause.
    #
    # The application code is responsible for handling the +message+ object lifecycle
    # when #recv returns an error code.
    #
    def recv_string string, flags = 0
      message = @receiver_klass.new
      rc = recvmsg message, flags
      string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
Loading ...