module ZMQ
module CommonSocketBehavior
attr_reader :socket, :name
# Allocates a socket of type +type+ for sending and receiving data.
#
# +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
# ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP,
# ZMQ::DEALER or ZMQ::ROUTER.
#
# By default, this class uses ZMQ::Message for manual
# memory management. For automatic garbage collection of received messages,
# it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
#
# sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
#
# Advanced users may want to replace the receiver class with their
# own custom class. The custom class must conform to the same public API
# as ZMQ::Message.
#
# Creation of a new Socket object can return nil when socket creation
# fails.
#
# if (socket = Socket.new(context.pointer, ZMQ::REQ))
# ...
# else
# STDERR.puts "Socket creation failed"
# end
#
def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
new(context_ptr, type, opts) rescue nil
end
# To avoid rescuing exceptions, use the factory method #create for
# all socket creation.
#
# Allocates a socket of type +type+ for sending and receiving data.
#
# +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
# ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP,
# ZMQ::DEALER or ZMQ::ROUTER.
#
# By default, this class uses ZMQ::Message for manual
# memory management. For automatic garbage collection of received messages,
# it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
#
# sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
#
# Advanced users may want to replace the receiver class with their
# own custom class. The custom class must conform to the same public API
# as ZMQ::Message.
#
# Creation of a new Socket object can raise an exception. This occurs when the
# +context_ptr+ is null or when the allocation of the 0mq socket within the
# context fails.
#
# begin
# socket = Socket.new(context.pointer, ZMQ::REQ)
# rescue ContextError => e
# # error handling
# end
#
def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
# users may override the classes used for receiving; class must conform to the
# same public API as ZMQ::Message
@receiver_klass = opts[:receiver_class]
context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)
unless context_ptr.null?
@socket = LibZMQ.zmq_socket context_ptr, type
if @socket && !@socket.null?
@name = SocketTypeNameMap[type]
else
raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
end
else
raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
end
@longlong_cache = @int_cache = nil
@more_parts_array = []
@option_lookup = []
populate_option_lookup
define_finalizer
end
# Set the queue options on this socket.
#
# Valid +name+ values that take a numeric +value+ are:
# ZMQ::HWM
# ZMQ::SWAP (version 2 only)
# ZMQ::AFFINITY
# ZMQ::RATE
# ZMQ::RECOVERY_IVL
# ZMQ::MCAST_LOOP (version 2 only)
# ZMQ::LINGER
# ZMQ::RECONNECT_IVL
# ZMQ::BACKLOG
# ZMQ::RECOVER_IVL_MSEC (version 2 only)
# ZMQ::RECONNECT_IVL_MAX (version 3 only)
# ZMQ::MAXMSGSIZE (version 3 only)
# ZMQ::SNDHWM (version 3 only)
# ZMQ::RCVHWM (version 3 only)
# ZMQ::MULTICAST_HOPS (version 3 only)
# ZMQ::RCVTIMEO (version 3 only)
# ZMQ::SNDTIMEO (version 3 only)
#
# Valid +name+ values that take a string +value+ are:
# ZMQ::IDENTITY (version 2/3 only)
# ZMQ::SUBSCRIBE
# ZMQ::UNSUBSCRIBE
#
# Returns 0 when the operation completed successfully.
# Returns -1 when this operation failed.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
# rc = socket.setsockopt(ZMQ::LINGER, 1_000)
# ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
#
def setsockopt name, value, length = nil
if 1 == @option_lookup[name]
length = 8
pointer = LibC.malloc length
pointer.write_long_long value
elsif 0 == @option_lookup[name]
length = 4
pointer = LibC.malloc length
pointer.write_int value
elsif 2 == @option_lookup[name]
length ||= value.size
# note: not checking errno for failed memory allocations :(
pointer = LibC.malloc length
pointer.write_string value
end
rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
LibC.free(pointer) unless pointer.nil? || pointer.null?
rc
end
# Convenience method for checking on additional message parts.
#
# Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.
#
# Warning: if the call to #getsockopt fails, this method will return
# false and swallow the error.
#
# message_parts = []
# message = Message.new
# rc = socket.recvmsg(message)
# if ZMQ::Util.resultcode_ok?(rc)
# message_parts << message
# while more_parts?
# message = Message.new
# rc = socket.recvmsg(message)
# message_parts.push(message) if resulcode_ok?(rc)
# end
# end
#
def more_parts?
rc = getsockopt ZMQ::RCVMORE, @more_parts_array
Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end
# Binds the socket to an +address+.
#
# socket.bind("tcp://127.0.0.1:5555")
#
def bind address
LibZMQ.zmq_bind @socket, address
end
# Connects the socket to an +address+.
#
# rc = socket.connect("tcp://127.0.0.1:5555")
#
def connect address
rc = LibZMQ.zmq_connect @socket, address
end
# Closes the socket. Any unprocessed messages in queue are sent or dropped
# depending upon the value of the socket option ZMQ::LINGER.
#
# Returns 0 upon success *or* when the socket has already been closed.
# Returns -1 when the operation fails. Check ZMQ.errno for the error code.
#
# rc = socket.close
# puts("Given socket was invalid!") unless 0 == rc
#
def close
if @socket
remove_finalizer
rc = LibZMQ.zmq_close @socket
@socket = nil
release_cache
rc
else
0
end
end
# Queues the message for transmission. Message is assumed to conform to the
# same public API as #Message.
#
# +flags+ may take two values:
# * 0 (default) - blocking operation
# * ZMQ::NonBlocking - non-blocking operation
# * ZMQ::SNDMORE - this message is part of a multi-part message
#
# Returns 0 when the message was successfully enqueued.
# Returns -1 under two conditions.
# 1. The message could not be enqueued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
def sendmsg message, flags = 0
__sendmsg__(@socket, message.address, flags)
end
# Helper method to make a new #Message instance out of the +string+ passed
# in for transmission.
#
# +flags+ may be ZMQ::NonBlocking and ZMQ::SNDMORE.
#
# Returns 0 when the message was successfully enqueued.
# Returns -1 under two conditions.
# 1. The message could not be enqueued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
def send_string string, flags = 0
message = Message.new string
send_and_close message, flags
end
# Send a sequence of strings as a multipart message out of the +parts+
# passed in for transmission. Every element of +parts+ should be
# a String.
#
# +flags+ may be ZMQ::NonBlocking.
#
# Returns 0 when the messages were successfully enqueued.
# Returns -1 under two conditions.
# 1. A message could not be enqueued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
def send_strings parts, flags = 0
send_multiple(parts, flags, :send_string)
end
# Send a sequence of messages as a multipart message out of the +parts+
# passed in for transmission. Every element of +parts+ should be
# a Message (or subclass).
#
# +flags+ may be ZMQ::NonBlocking.
#
# Returns 0 when the messages were successfully enqueued.
# Returns -1 under two conditions.
# 1. A message could not be enqueued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
def sendmsgs parts, flags = 0
send_multiple(parts, flags, :sendmsg)
end
# Sends a message. This will automatically close the +message+ for both successful
# and failed sends.
#
# Returns 0 when the message was successfully enqueued.
# Returns -1 under two conditions.
# 1. The message could not be enqueued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
def send_and_close message, flags = 0
rc = sendmsg message, flags
message.close
rc
end
# Dequeues a message from the underlying queue. By default, this is a blocking operation.
#
# +flags+ may take two values:
# 0 (default) - blocking operation
# ZMQ::NonBlocking - non-blocking operation
#
# Returns 0 when the message was successfully dequeued.
# Returns -1 under two conditions.
# 1. The message could not be dequeued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
# The application code is responsible for handling the +message+ object lifecycle
# when #recv returns an error code.
#
def recvmsg message, flags = 0
#LibZMQ.zmq_recvmsg @socket, message.address, flags
__recvmsg__(@socket, message.address, flags)
end
# Helper method to make a new #Message instance and convert its payload
# to a string.
#
# +flags+ may be ZMQ::NonBlocking.
#
# Returns 0 when the message was successfully dequeued.
# Returns -1 under two conditions.
# 1. The message could not be dequeued
# 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN.
#
# With a -1 return code, the user must check ZMQ.errno to determine the
# cause.
#
# The application code is responsible for handling the +message+ object lifecycle
# when #recv returns an error code.
#
def recv_string string, flags = 0
message = @receiver_klass.new
rc = recvmsg message, flags
string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
Loading ...