Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
zookeeper / java / java_base.rb
Size: Mime:
require 'java'
require 'thread'

# require 'rubygems'
# gem 'slyphon-log4j', '= 1.2.15'
# gem 'zk-ruby-zookeeper_jar', "= #{Zookeeper::DRIVER_VERSION}"

require 'log4j'
require 'zookeeper_jar'


# XXX: reindent this WHOLE FILE later
module Zookeeper

# The low-level wrapper-specific methods for the Java lib,
# subclassed by the top-level Zookeeper class
class JavaBase
  include Java
  include Common
  include Constants
  include Callbacks
  include Exceptions
  include ACLs
  include Logger

  JZK   = org.apache.zookeeper
  JZKD  = org.apache.zookeeper.data
  Code  = JZK::KeeperException::Code

  ANY_VERSION = -1
  DEFAULT_SESSION_TIMEOUT = 10_000

  JZKD::Stat.class_eval do
    MEMBERS = [:version, :czxid, :mzxid, :ctime, :mtime, :cversion, :aversion, :ephemeralOwner, :dataLength, :numChildren, :pzxid]
    def to_hash
      MEMBERS.inject({}) { |h,k| h[k] = __send__(k); h }
    end
  end

  JZKD::Id.class_eval do
    def to_hash
      { :scheme => getScheme, :id => getId }
    end
  end

  JZKD::ACL.class_eval do
    def self.from_ruby_acl(acl)
      raise TypeError, "acl must be a Zookeeper::ACLs::ACL not #{acl.inspect}" unless acl.kind_of?(Zookeeper::ACLs::ACL)
      id = org.apache.zookeeper.data.Id.new(acl.id.scheme.to_s, acl.id.id.to_s)
      new(acl.perms.to_i, id)
    end

    def to_hash
      { :perms => getPerms, :id => getId.to_hash }
    end
  end

  JZK::WatchedEvent.class_eval do
    def to_hash
      { :type => getType.getIntValue, :state => getState.getIntValue, :path => getPath }
    end
  end

  # used for internal dispatching
  # @private
  module JavaCB
    class Callback
      include Logger

      attr_reader :req_id

      def initialize(req_id)
        @req_id = req_id
      end
    end

    class DataCallback < Callback
      include JZK::AsyncCallback::DataCallback

      def processResult(rc, path, queue, data, stat)
        logger.debug { "#{self.class.name}#processResult rc: #{rc}, req_id: #{req_id}, path: #{path}, queue: #{queue.inspect}, data: #{data.inspect}, stat: #{stat.inspect}" }

        hash = {
          :rc     => rc,
          :req_id => req_id,
          :path   => path,
          :data   => (data && String.from_java_bytes(data)),
          :stat   => (stat && stat.to_hash),
        }

#         if rc == Zookeeper::ZOK
#           hash.merge!({
#             :data   => String.from_java_bytes(data),
#             :stat   => stat.to_hash,
#           })
#         end

        queue.push(hash)
      end
    end

    class StringCallback < Callback
      include JZK::AsyncCallback::StringCallback

      def processResult(rc, path, queue, str)
        logger.debug { "#{self.class.name}#processResult rc: #{rc}, req_id: #{req_id}, path: #{path}, queue: #{queue.inspect}, str: #{str.inspect}" }
        queue.push(:rc => rc, :req_id => req_id, :path => path, :string => str)
      end
    end

    class StatCallback < Callback
      include JZK::AsyncCallback::StatCallback

      def processResult(rc, path, queue, stat)
        logger.debug { "#{self.class.name}#processResult rc: #{rc.inspect}, req_id: #{req_id}, path: #{path.inspect}, queue: #{queue.inspect}, stat: #{stat.inspect}" }
        queue.push(:rc => rc, :req_id => req_id, :stat => (stat and stat.to_hash), :path => path)
      end
    end

    class Children2Callback < Callback
      include JZK::AsyncCallback::Children2Callback

      def processResult(rc, path, queue, children, stat)
        logger.debug { "#{self.class.name}#processResult rc: #{rc}, req_id: #{req_id}, path: #{path}, queue: #{queue.inspect}, children: #{children.inspect}, stat: #{stat.inspect}" }
        hash = {
          :rc       => rc, 
          :req_id   => req_id, 
          :path     => path, 
          :strings  => (children && children.to_a), 
          :stat     => (stat and stat.to_hash),
        }

        queue.push(hash)
      end
    end

    class ACLCallback < Callback
      include JZK::AsyncCallback::ACLCallback
      
      def processResult(rc, path, queue, acl, stat)
        logger.debug { "ACLCallback#processResult rc: #{rc.inspect}, req_id: #{req_id}, path: #{path.inspect}, queue: #{queue.inspect}, acl: #{acl.inspect}, stat: #{stat.inspect}" }
        a = Array(acl).map { |a| a.to_hash }
        queue.push(:rc => rc, :req_id => req_id, :path => path, :acl => a, :stat => (stat && stat.to_hash))
      end
    end

    class VoidCallback < Callback
      include JZK::AsyncCallback::VoidCallback

      def processResult(rc, path, queue)
        logger.debug { "#{self.class.name}#processResult rc: #{rc}, req_id: #{req_id}, queue: #{queue.inspect}" }
        queue.push(:rc => rc, :req_id => req_id, :path => path)
      end
    end

    class WatcherCallback < Callback
      include JZK::Watcher
      include Zookeeper::Constants

      attr_reader :client

      def initialize(event_queue, opts={})
        @event_queue = event_queue
        @client = opts[:client]
        super(ZKRB_GLOBAL_CB_REQ)
      end

      def process(event)
        hash = event.to_hash
        logger.debug { "WatcherCallback got event: #{hash.inspect}" }

        if client && (hash[:type] == ZOO_SESSION_EVENT) && (hash[:state] == ZOO_CONNECTED_STATE)
          logger.debug { "#{self.class}##{__method__} notifying client of connected state!" }
          client.notify_connected! 
        end
        
        hash = event.to_hash.merge(:req_id => req_id)
        @event_queue.push(hash)
      end
    end
  end

  attr_reader :event_queue

  def reopen(timeout=10, watcher=nil, opts = {})
#     watcher ||= @default_watcher

    @mutex.synchronize do
      @req_registry.clear_watchers!

      replace_jzk!(opts)
      wait_until_connected
    end

    state
  end

  def wait_until_connected(timeout=10)
    @connected_latch.await(timeout) unless connected?
    connected?
  end

  def initialize(host, timeout=10, watcher=nil, options={})
    @host = host
    @event_queue = QueueWithPipe.new

    watcher ||= get_default_global_watcher()

    @req_registry = RequestRegistry.new(watcher)

    @mutex = Monitor.new
    @dispatch_shutdown_cond = @mutex.new_cond
    @connected_latch = Latch.new

    @_running = nil
    @_closed  = false
    @options = {}


    # allows connected-state handlers to be registered before 
    yield self if block_given?

    reopen(timeout, nil, options)
    return nil unless connected?
    @_running = true
    setup_dispatch_thread!
  end

  def close
    shutdown_thread = Thread.new do
      @mutex.synchronize do
        unless @_closed
          @_closed = true    # these are probably unnecessary
          @_running = false

          stop_dispatch_thread!
          @jzk.close if @jzk
        end
      end
    end

    shutdown_thread.join unless event_dispatch_thread?
  end

  def state
    @mutex.synchronize { @jzk.state }
  end

  def connected?
    state == JZK::ZooKeeper::States::CONNECTED
  end

  def connecting?
    state == JZK::ZooKeeper::States::CONNECTING
  end

  def associating?
    state == JZK::ZooKeeper::States::ASSOCIATING
  end

  def running?
    @_running
  end

  def closed?
    @_closed
  end

  def self.set_debug_level(*a)
    # IGNORED IN JRUBY
  end

  def set_debug_level(*a)
    # IGNORED IN JRUBY
  end

  def get(req_id, path, callback, watcher)
    handle_keeper_exception do
      watch_cb = watcher ? create_watcher(req_id, path) : false

      if callback
        jzk.getData(path, watch_cb, JavaCB::DataCallback.new(req_id), event_queue)
        [Code::Ok, nil, nil]    # the 'nil, nil' isn't strictly necessary here
      else # sync
        stat = JZKD::Stat.new

        value = jzk.getData(path, watch_cb, stat)
        data = String.from_java_bytes(value) unless value.nil?
        [Code::Ok, data, stat.to_hash]
      end
    end
  end

  def set(req_id, path, data, callback, version)
    handle_keeper_exception do
      version ||= ANY_VERSION

      if callback
        jzk.setData(path, data.to_java_bytes, version, JavaCB::StatCallback.new(req_id), event_queue)
        [Code::Ok, nil]
      else
        stat = jzk.setData(path, data.to_java_bytes, version).to_hash
        [Code::Ok, stat]
      end
    end
  end

  def get_children(req_id, path, callback, watcher)
    handle_keeper_exception do
      watch_cb = watcher ? create_watcher(req_id, path) : false

      if callback
        jzk.getChildren(path, watch_cb, JavaCB::Children2Callback.new(req_id), event_queue)
        [Code::Ok, nil, nil]
      else
        stat = JZKD::Stat.new
        children = jzk.getChildren(path, watch_cb, stat)
        [Code::Ok, children.to_a, stat.to_hash]
      end
    end
  end

  def add_auth(req_id, scheme, cert)
    handle_keeper_exception do
      jzk.addAuthInfo(scheme, cert.to_java_bytes)
      Code::Ok
    end
  end

  def create(req_id, path, data, callback, acl, flags)
    handle_keeper_exception do
      acl   = Array(acl).map{ |a| JZKD::ACL.from_ruby_acl(a) }
      mode  = JZK::CreateMode.fromFlag(flags)

      data ||= ''

      if callback
        jzk.create(path, data.to_java_bytes, acl, mode, JavaCB::StringCallback.new(req_id), event_queue)
        [Code::Ok, nil]
      else
        new_path = jzk.create(path, data.to_java_bytes, acl, mode)
        [Code::Ok, new_path]
      end
    end
  end

  def sync(req_id, path)
    handle_keeper_exception do
      jzk.sync(path, JavaCB::VoidCallback.new(req_id), event_queue)
      Code::Ok
    end
  end

  def delete(req_id, path, version, callback)
    handle_keeper_exception do
      if callback
        jzk.delete(path, version, JavaCB::VoidCallback.new(req_id), event_queue)
      else
        jzk.delete(path, version)
      end

      Code::Ok
    end
  end

  def set_acl(req_id, path, acl, callback, version)
    handle_keeper_exception do
      logger.debug { "set_acl: acl #{acl.inspect}" }
      acl = Array(acl).flatten.map { |a| JZKD::ACL.from_ruby_acl(a) }
      logger.debug { "set_acl: converted #{acl.inspect}" }

      if callback
        jzk.setACL(path, acl, version, JavaCB::ACLCallback.new(req_id), event_queue)
      else
        jzk.setACL(path, acl, version)
      end

      Code::Ok
    end
  end

  def exists(req_id, path, callback, watcher)
    handle_keeper_exception do
      watch_cb = watcher ? create_watcher(req_id, path) : false

      if callback
        jzk.exists(path, watch_cb, JavaCB::StatCallback.new(req_id), event_queue)
        [Code::Ok, nil, nil]
      else
        stat = jzk.exists(path, watch_cb)
        [Code::Ok, (stat and stat.to_hash)]
      end
    end
  end

  def get_acl(req_id, path, callback)
    handle_keeper_exception do
      stat = JZKD::Stat.new

      if callback
        logger.debug { "calling getACL, path: #{path.inspect}, stat: #{stat.inspect}" } 
        jzk.getACL(path, stat, JavaCB::ACLCallback.new(req_id), event_queue)
        [Code::Ok, nil, nil]
      else
        acls = jzk.getACL(path, stat).map { |a| a.to_hash }
        
        [Code::Ok, Array(acls).map{|m| m.to_hash}, stat.to_hash]
      end
    end
  end

  def assert_open
    # XXX don't know how to check for valid session state!
    raise NotConnected unless connected?
  end

  def session_id
    jzk.session_id
  end

  def session_passwd
    jzk.session_passwd.to_s
  end

  # called from watcher when we are connected
  # @private
  def notify_connected!
    @connected_latch.release
  end

  def pause_before_fork_in_parent
    # this is a no-op in java-land
  end

  def resume_after_fork_in_parent
    # this is a no-op in java-land
  end

  private
    def jzk
      @mutex.synchronize { @jzk }
    end

    # java exceptions are not wrapped anymore in JRuby 1.7+
    if JRUBY_VERSION >= '1.7.0'
      def handle_keeper_exception
        yield
      rescue JZK::KeeperException => e
        # code is an enum and always set -> we don't need any additional checks here
        e.code.intValue
      end
    else
      def handle_keeper_exception
        yield
      rescue JZK::KeeperException => e
        if e.respond_to?(:cause) and e.cause and e.cause.respond_to?(:code) and e.cause.code and e.cause.code.respond_to?(:intValue)
          e.cause.code.intValue
        else
          raise e # dunno what happened, just raise it
        end
      end
    end

    def call_type(callback, watcher)
      if callback
        watcher ? :async_watch : :async
      else
        watcher ? :sync_watch : :sync
      end
    end
   
    def create_watcher(req_id, path)
      logger.debug { "creating watcher for req_id: #{req_id} path: #{path}" }
      lambda do |event|
        ev_type, ev_state = event.type.int_value, event.state.int_value

        logger.debug { "watcher for req_id #{req_id}, path: #{path} called back" }

        h = { :req_id => req_id, :type => ev_type, :state => ev_state, :path => path }
        event_queue.push(h)
      end
    end

    def get_default_global_watcher
      Proc.new { |args|
        logger.debug { "Ruby ZK Global CB called type=#{event_by_value(args[:type])} state=#{state_by_value(args[:state])}" }
        true
      }
    end

    def replace_jzk!(opts = {})
      orig_jzk = @jzk
      if opts.has_key?(:session_id) && opts.has_key(:session_passwd)
        @jzk = JZK::ZooKeeper.new(@host, DEFAULT_SESSION_TIMEOUT, JavaCB::WatcherCallback.new(event_queue, :client => self), opts.fetch(:session_id), opts.fetch(:session_passwd).to_java_bytes)
      else
        @jzk = JZK::ZooKeeper.new(@host, DEFAULT_SESSION_TIMEOUT, JavaCB::WatcherCallback.new(event_queue, :client => self))
      end
    ensure
      orig_jzk.close if orig_jzk
    end
end
end