Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
idna / lib / python2.7 / site-packages / oslo_messaging / _drivers / impl_rabbit.pyc
Size: Mime:
ó
čEYc!@sØddlZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
Z
ddlZ
ddlZ
ddlZ
ddlmZddlmZddlmZddlmZddlZddlmZddlmZddlmZdd	lm Z dd
lm!Z"ddlm#Z#ddl$m%Z%dd
l$m&Z&ddl$m'Z'ddl$m(Z(ddl)m*Z*ddl)m+Z+dZ,ej-dde.ddddƒej/dddddddƒej/dddddddƒej/d dddd!dd"ƒej/d#dddd$dd%ƒej0d&dd'd(d)dd*ƒej/d+dd,ƒej1d-dd.dd/dd0ƒej/d1d2d3d4fdd3dd5ƒej/d6dd7d(d)d8e2d9d:dd;ƒej3d<dd=d(d)d8e2d9d:dd>ƒej4d?dd@gd(d)d8e2d9d:ddAƒej/dBddCd(d)d8e2d9d:ddDƒej/dEddCd(d)d8e2d9d:ddFdGe2ƒej/dHd2dIdJdKfddJd(d)ddLƒej/dMddNd(d)d8e2d9d:ddOƒej1dPddQddRƒej1dSddTd(d)ddUƒej1dVddWddXƒej1dYddZd8e2d(d)dd[ƒej-d\de.d(d)dd]ƒej1d^d_dQdd`ddaƒej1dbddZddcƒej1dddd/ddeƒej1dfddTddgƒej-dhde.d(d)ddiƒgZ5ej6e7ƒZ8dj„Z9dke:fdl„ƒYZ;dme<fdn„ƒYZ=doe*j>fdp„ƒYZ?dqe?fdr„ƒYZ@dse<fdt„ƒYZAduejBfdv„ƒYZCdS(wiÿÿÿÿN(tcfg(tlog(t
eventletutils(tnetutils(tparse(tamqp(t
amqpdriver(tbase(tcommon(tpool(t_(t_LE(t_LI(t_LW(t_utils(t
exceptionsitssltdefaulttdeprecated_nametrabbit_use_sslthelpsConnect over SSL.tssl_versionttkombu_ssl_versionsSSL version to use (valid only if SSL enabled). Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be available on some distributions.tssl_key_filetkombu_ssl_keyfiles)SSL key file (valid only if SSL enabled).t
ssl_cert_filetkombu_ssl_certfiles*SSL cert file (valid only if SSL enabled).tssl_ca_filetkombu_ssl_ca_certss=SSL certification authority file (valid only if SSL enabled).tkombu_reconnect_delaygð?tdeprecated_grouptDEFAULTsYHow long to wait before reconnecting in response to an AMQP consumer cancel notification.tkombu_compressions‹EXPERIMENTAL: Possible values are: gzip, bz2. If not set compression will not be used. This option may not be available in future versions.t$kombu_missing_consumer_retry_timeouttkombu_reconnect_timeouti<s†How long to wait a missing client before abandoning to send it its replies. This value should not be longer than rpc_response_timeout.tkombu_failover_strategytchoicessround-robintshufflesºDetermines how the next RabbitMQ node is chosen in case the one we are currently connected to becomes unavailable. Takes effect only if more than one RabbitMQ node is provided in config.trabbit_hostt	localhosttdeprecated_for_removaltdeprecated_reasons#Replaced by [DEFAULT]/transport_urls8The RabbitMQ broker address where a single node is used.trabbit_porti(s5The RabbitMQ broker port where a single node is used.trabbit_hostss$rabbit_host:$rabbit_ports$RabbitMQ HA cluster host:port pairs.t
rabbit_useridtguestsThe RabbitMQ userid.trabbit_passwordsThe RabbitMQ password.tsecrettrabbit_login_methodtPLAINtAMQPLAINsRABBIT-CR-DEMOsThe RabbitMQ login method.trabbit_virtual_hostt/sThe RabbitMQ virtual host.trabbit_retry_intervalis1How frequently to retry connecting with RabbitMQ.trabbit_retry_backoffisDHow long to backoff for between retries when connecting to RabbitMQ.trabbit_interval_maxisGMaximum interval of RabbitMQ connection retries. Default is 30 seconds.trabbit_max_retriesisSMaximum number of RabbitMQ connection retries. Default is 0 (infinite retry count).trabbit_ha_queuessTry to use HA queues in RabbitMQ (x-ha-policy: all). If you change this option, you must wipe the RabbitMQ database. In RabbitMQ 3.0, queue mirroring is no longer controlled by the x-ha-policy argument when declaring a queue. If you just want to make sure that all queues (except those with auto-generated names) are mirrored across all nodes, run: "rabbitmqctl set_policy HA '^(?!amq\.).*' '{"ha-mode": "all"}' "trabbit_transient_queues_ttltminisÏPositive integer representing duration in seconds for queue TTL (x-expires). Queues which are unused for the duration of the TTL are automatically deleted. The parameter affects only reply and fanout queues.trabbit_qos_prefetch_countsXSpecifies the number of messages to prefetch. Setting to zero allows unlimited messages.theartbeat_timeout_thresholdsŠNumber of seconds after which the Rabbit broker is considered down if heartbeat's keep-alive fails (0 disable the heartbeat). EXPERIMENTALtheartbeat_ratesNHow often times during the heartbeat_timeout_threshold we check the heartbeat.tfake_rabbits<Deprecated, use rpc_backend=kombu+memory or rpc_backend=fakecCs:i}|rd|d<n|dkr6|d|d<n|S(sÔConstruct the arguments for declaring a queue.

    If the rabbit_ha_queues option is set, we try to declare a mirrored queue
    as described here:

      http://www.rabbitmq.com/ha.html

    Setting x-ha-policy to all means that the queue will be mirrored
    to all nodes in the cluster. In RabbitMQ 3.0, queue mirroring is
    no longer controlled by the x-ha-policy argument when declaring a
    queue. If you just want to make sure that all queues (except those
    with auto-generated names) are mirrored across all nodes, run:
      rabbitmqctl set_policy HA '^(?!amq\.).*' '{"ha-mode": "all"}'

    If the rabbit_queue_ttl option is > 0, then the queue is
    declared with the "Queue TTL" value as described here:

      https://www.rabbitmq.com/ttl.html

    Setting a queue TTL causes the queue to be automatically deleted
    if it is unused for the TTL duration.  This is a helpful safeguard
    to prevent queues with zero consumers from growing without bound.
    tallsx-ha-policyiiès	x-expires((R:trabbit_queue_ttltargs((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_get_queue_argumentsÑs
t
RabbitMessagecBs#eZd„Zd„Zd„ZRS(cCs?tt|ƒjtj|jƒƒtjd|ƒ||_dS(NsRabbitMessage.Init: message %s(	tsuperREt__init__t
rpc_commontdeserialize_msgtpayloadtLOGttracet_raw_message(tselftraw_message((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRGõscCs!tjd|ƒ|jjƒdS(Ns%RabbitMessage.acknowledge: message %s(RKRLRMtack(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytacknowledgeûscCs!tjd|ƒ|jjƒdS(Ns!RabbitMessage.requeue: message %s(RKRLRMtrequeue(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRRÿs(t__name__t
__module__RGRQRR(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyREôs		tConsumercBsDeZdZeddd„Zd„Zd„Zd„Zd„Z	RS(sConsumer class.ic	Cs©||_||_||_||_||_||_||_||_|	|_t	|
|ƒ|_
d|_d|_
tjjd|d|d|jd|jƒ|_dS(sgInit the Consumer class with the exchange_name, routing_key,
        type, durable auto_delete
        tnamettypetdurabletauto_deleteN(t
queue_namet
exchange_nametrouting_keytexchange_auto_deletetqueue_auto_deleteRXtcallbackRWtnowaitRDtqueue_argumentstNonetqueuet_declared_ontkombutentitytExchangetexchange(RNR[RZR\RWRXR]R^R_R`R:RB((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRGs$												cCsÉtjjd|jd|jd|jd|jd|jd|jd|j	ƒ|_
y*tjd|j
|jƒ|j
jƒWn;|jjk
r¸}|jd	kr²|j
jƒq¹‚nX|j|_d
S(s0Re-declare the queue after a rabbit (re)connect.RVtchannelRhRXRYR\Ras[%s] Queue.declare: %si”N(ReRftQueueRZRiRhRXR^R\RaRcRKtdebugt
connection_idtdeclaret
connectiontchannel_errorstcodeRd(RNtconntexc((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRm!s"							cCsú|j|jkr"|j|ƒny2|jjd|jdtj|ƒd|jƒWnŸ|j	j
k
rõ}t|jj
ƒƒ|_|jdks±|jdkrï|jdkrï|j|ƒ|jjd|jdtj|ƒd|jƒqö‚nXdS(sæActually declare the consumer on the amqp channel.  This will
        start the flow of messages from the queue.  Using the
        Connection.consume() will process the messages,
        calling the appropriate callback.
        R_tconsumer_tagR`i”i–s	Basic.ackN(RiRdRmRctconsumet	_callbacktsixt	text_typeR`RnRotsett
_consumerstvaluest	_new_tagsRptmethod_name(RNRqttagRr((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRt=s


cCs-tjd|ƒ|jjtj|ƒƒdS(Ns!ConsumerBase.cancel: canceling %s(RKRLRctcancelRvRw(RNR}((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR~bscCsyt|jjddƒ}|r-||ƒ}ny|jt|ƒƒWn.tk
rttjt	dƒƒ|j
ƒnXdS(sbCall callback with deserialized message.

        Messages that are processed and ack'ed.
        tmessage_to_pythons*Failed to process message ... skipping it.N(tgetattrRcRiRbR_REt	ExceptionRKt	exceptionRtreject(RNtmessagetm2p((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRufs
N(
RSRTt__doc__tFalseRbRGRmRtR~Ru(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRUs		%	tDummyConnectionLockcBseZd„ZRS(cCsdS(N((RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytheartbeat_acquirexs(RSRTR‰(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRˆwstConnectionLockcBsDeZdZd„Zd„Zd„Zd„Zejd„ƒZ	RS(sÑLock object to protect access to the kombu connection

    This is a lock object to protect access to the kombu connection
    object between the heartbeat thread and the driver thread.

    They are two way to acquire this lock:
        * lock.acquire()
        * lock.heartbeat_acquire()

    In both case lock.release(), release the lock.

    The goal is that the heartbeat thread always have the priority
    for acquiring the lock. This ensures we have no heartbeat
    starvation when the driver sends a lot of messages.

    So when lock.heartbeat_acquire() is called next time the lock
    is released(), the caller unconditionally acquires
    the lock, even someone else have asked for the lock before it.
    cCsgd|_t|_d|_tjƒ|_tj|jƒ|_	tj|jƒ|_
tjƒ|_
dS(Ni(t_workers_waitingR‡t_heartbeat_waitingRbt_lock_acquiredt	threadingtLockt_monitort	Conditiont_workers_lockst_heartbeat_lockRtfetch_current_thread_functort_get_thread_id(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRG‘s			cCs^|jOx8|jrD|jd7_|jjƒ|jd8_q
W|jƒ|_WdQXdS(Ni(RRR‹R’twaitR•(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytacquirešs

cCsX|jIx2|jdk	r>t|_|jjƒt|_q
W|jƒ|_WdQXdS(N(	RRRbtTrueRŒR“R–R‡R•(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR‰¢s
	

cCsª|j›|jdkr(tdƒ‚n|jƒ}|j|kr_td|j|fƒ‚nd|_|jr|jjƒn|jdkr |j	jƒnWdQXdS(Ns$We can't release a not acquired locksFWe can't release lock acquired by another thread/greenthread; %s vs %si(
RRRbtRuntimeErrorR•RŒR“tnotifyR‹R’(RNt	thread_id((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytrelease«s
		ccs%|jƒz	dVWd|jƒXdS(N(R‰Rœ(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt
for_heartbeatºs
	(
RSRTR†RGR—R‰Rœt
contextlibtcontextmanagerR(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRŠ|s						t
ConnectioncBseZdZiZd„Ziejd6ejd6Zidd6dd6dd	6d
d6Z	x=e	D]5Z
yeee	e
ƒee
<Wq^ek
r’q^Xq^We
d„ƒZd
„Zd„Zed„ƒZed„ƒZd„Zd-d-d-ed„Zd„Zd„Zd„Zd„Zd„Zd-d„Zejd„ƒZ d„Z!d„Z"d„Z#d„Z$d„Z%d-d„Z&d „Z'd!„Z(d-d-d"„Z)d#„Z*d-d-d-d$„Z+d%„Z,d-d-d&„Z-d-d-d'„Z.d-d-d(„Z/d)„Z0d-d-d*„Z1d-d+„Z2d-d,„Z3RS(.sConnection object.cCsU|j}|j|_|j|_|j|_|j|_|j	|_
|j|_|j|_
|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|j|_|jrY|j|_|j|_|j |_ |j!|_!n|jdkrtd|_n|j
dk	r|j
}n	|j
}d|_#|jrÍt$j%t&dƒƒd||_#n|j'ræ|j(j)dƒrt$j%t&dƒ|j(ƒnt*|j'ƒdkr,t+j,|j'ƒnx©|j'D]©}|j(j-ddƒ}|j-dd	ƒ}|j#d
|j#r|dnd|t.j/|j0p”dƒt.j/|j1p©dƒ|j2|j3ƒpÁdt4|j5pÐdƒ|f7_#q6Wnò|j(j)dƒr#|j(j-ddƒ}d
||f|_#nµt*|jƒdkrKt+j,|jƒnxŠ|jD]}t6j7|d|jƒ\}	}
|j#d|j#r’dndt.j/|jdƒt.j/|jdƒ|j2|	ƒ|
|f7_#qUWt8j9ƒ|_:i|_;d|_<t=ƒ|_>i|_?t@jAdƒ|_Bt=ƒ|_Ct=ƒ|_DtE|_Fd|_G||_H|tIjJkrqtKƒ|_LntMƒ|_Lt4tNjOƒƒ|_Pdt8jQjRtSjTdƒt8j9ƒ|jPf|_UtVjWjX|j#d|jYƒd|j
d|jd|jditZd6iitZd6tZd6tZd6d6|jUd6d6|j[d6|j\d6ƒ|_Wt$j]d|j^ƒƒt_|jƒt_|jƒd |_`tE|_a|jL|jbƒWdQXd|_c|tIjJkrÒ|jdƒnt$j]d!|j^ƒƒ|jeƒr|j`|_fn	d|_f|j#j)d"ƒrQd#|jWj(_gd$|jW_3d%|jW_5d&|_fndS('NiRsmDeprecated: fake_rabbit option is deprecated, set rpc_backend to kombu+memory or use the fake driver instead.smemory://%s/skombu+suSelecting the kombu transport through the transport url (%s) is a experimental feature and this is not yet supported.itrabbitRs%s%s://%s:%s@%s:%s/%st;i(s%s://%stdefault_ports%samqp://%s:%s@%s:%s/%ss%s:%d:%sRtlogin_methodt	heartbeattfailover_strategyttransport_optionstconfirm_publishtauthentication_failure_closesconnection.blockedtconsumer_cancel_notifytcapabilitiestconnection_nametclient_propertiest
on_blockedton_unblockedsF[%(connection_id)s] Connecting to AMQP server on %(hostname)s:%(port)sg@s{[%(connection_id)s] Connected to AMQP server on %(hostname)s:%(port)s via [%(transport)s] client with port %(client_port)s.s	memory://gt
memory_driveriÒgš™™™™™©?(htoslo_messaging_rabbitR9tmax_retriesR6tinterval_startR7tinterval_steppingR8tinterval_maxR1R¤R@R4tvirtual_hostR,R+R-R/R:R;R=R>R?Rtamqp_durable_queuestamqp_auto_deleteRR"R$R!RRRRRbt_urlRKtwarningR
thostst	transportt
startswithtlentrandomR&treplaceRtquotetusernametpasswordt_parse_url_hostnamethostnametstrtportRtparse_host_porttostgetpidt_initial_pidRyt	_producerRxR{t_active_tagst	itertoolstcountt_tagst_declared_exchangest_declared_queuesR‡t_consume_loop_stoppedRitpurposeRHtPURPOSE_SENDRŠt_connection_lockRˆtuuidtuuid4RltpathtbasenametsystargvRVReRnR t_fetch_ssl_paramsR˜t_on_connection_blockedt_on_connection_unblockedRkt_get_connection_infotfloatt_heartbeat_wait_timeoutt_heartbeat_support_log_emittedtensure_connectiont_heartbeat_threadt_heartbeat_startt _heartbeat_supported_and_enabledt
_poll_timeouttpolling_interval(RNtconfturlRÔtdriver_confR¶thostR¼tadrRÅRÇ((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRGÈsò						


											
	
#	
	
	
	ttlsv1tsslv23tPROTOCOL_SSLv2tsslv2tPROTOCOL_SSLv3tsslv3tPROTOCOL_TLSv1_1ttlsv1_1tPROTOCOL_TLSv1_2ttlsv1_2cCsI|jƒ}y|j|SWn'tk
rDttdƒ|ƒ‚nXdS(NsInvalid SSL version : %s(tlowert_SSL_PROTOCOLStKeyErrorR™R
(tclstversiontkey((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytvalidate_ssl_version˜s

cCsd|krd|S|S(s—Handles hostname returned from urlparse and checks whether it's
        ipaddress. If it's ipaddress it ensures that it has brackets for IPv6.
        t:s[%s]((RNRÅ((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRÄ scCsš|jr–tƒ}|jr4|j|jƒ|d<n|jrM|j|d<n|jrf|j|d<n|jrŒ|j|d<tj|d<n|p•tSt	S(s]Handles fetching what ssl params should be used for the connection
        (if any).
        Rtkeyfiletcertfiletca_certst	cert_reqs(
RtdictRRÿRRRt
CERT_REQUIREDR˜R‡(RNt
ssl_params((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRݦs						

cCstjtdƒ|ƒdS(Ns)The broker has blocked the connection: %s(RKterrorR(treason((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRÞ½scCstjtdƒƒdS(Ns'The broker has unblocked the connection(RKtinfoR(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRßÁscCs1|jdƒ|jd|jjƒ|jƒdS(Ntmethod(t_set_current_channelRbtensureRntconnecttset_transport_socket_timeout(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRäÅs
csùtjƒ}ˆj|kr:tjtdƒƒ|ˆ_n|dkrRˆj}n|dksj|dkrsd}n‡‡fd†}‡fd†}‡‡fd†}	yrˆjj	|	dˆj
d|d|d	ˆjpÙd
dˆjdˆj
d
|ƒ}
|
ƒ\}}ˆj|ƒ|SWnÚtjjk
r¹}
tjddtƒ|oR||
ƒˆjdƒi|
d6|d6}|jˆjjƒƒtdƒ|}tj|ƒtj|ƒ‚n<tjk
rςn&tk
rô}
|oí||
ƒ‚nXdS(sWill retry up to retry number of times.
        retry = None means use the value of rabbit_max_retries
        retry = -1 means to retry forever
        retry = 0 means no retry
        retry = N means N retries

        NOTE(sileht): Must be called within the connection lock
        sžProcess forked after connection established! This can result in unpredictable behavior. See: http://docs.openstack.org/developer/oslo.messaging/transport.htmlicsðtjdˆjdtƒˆo)ˆ|ƒˆjdkrFˆj|n|}i|d6|d6}|jˆjƒƒdtj|ƒkr¡tj	t
dƒ|ƒntj	t
dƒ|ƒˆjdkrìtjd	ˆjƒtj
ˆjƒndS(
Ns+[%s] Received recoverable error from kombu:texc_infoiterr_strt
sleep_times
Socket closedsq[%(connection_id)s] AMQP server %(hostname)s:%(port)s closed the connection. Check login credentials: %(err_str)ssš[%(connection_id)s] AMQP server on %(hostname)s:%(port)s is unreachable: %(err_str)s. Trying again in %(sleep_time)d seconds. Client port: %(client_port)ss(Delaying reconnect for %1.1f seconds ...(RKRkRlR˜RtupdateRàRvRwRRRLttimetsleep(RrtintervalR
(trecoverable_error_callbackRN(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyton_errorås"	

	
cs7ˆj|ƒˆjƒtjtdƒˆjƒƒdS(s‡Callback invoked when the kombu reconnects and creates
            a new channel, we use it the reconfigure our consumers.
            s}[%(connection_id)s] Reconnected to AMQP server on %(hostname)s:%(port)s via [%(transport)s] client with port %(client_port)s.N(RRRKR
RRà(tnew_channel(RN(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyton_reconnections

csˆj|ƒˆƒdS(N(R(Ri(RRN(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytexecute_methods
RiR²terrbackR³it
interval_stepRµt	on_revives&Received recoverable error from kombu:RRtretrys\Unable to connect to AMQP server on %(hostname)s:%(port)s after %(retry)s tries: %(err_str)sN(RÉRÊRËRKRºR
RbR²Rnt	autoretryRiR³R´RµRReRtOperationalErrorRkR˜RR
R
RtMessageDeliveryFailuretrpc_amqptAMQPDestinationNotFoundR(RNRRRterror_callbackttimeout_is_errortcurrent_pidRRRtautoretry_methodtretRiRrR
tmsg((RRRNsX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR
ÌsN	)				

	
cCsÆ||jkrdS|jdk	rR|jjƒ|jjƒ|jj|jƒn||_|dk	rÂ|jtj	kr‰|j
|ƒntjj
|ƒ|_x!|jD]}|j|ƒq¨WndS(sdChange the channel to use.

        NOTE(sileht): Must be called within the connection lock
        N(RiRbRÒtclearRÑRntmaybe_close_channelRÔRHtPURPOSE_LISTENt_set_qosRet	messagingtProducerRÌRyRm(RNRtconsumer((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR@s

	cCs,|jdkr(|jd|jtƒndS(s%Set QoS prefetch count on the channeliN(R=t	basic_qosR‡(RNRi((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR.Vs	cCs“|jƒ|jrxS|jjƒD]B\}}|jdkr#tjd|jjƒ|jj	ƒq#q#W|j
dƒ|jjƒd|_ndS(sClose/release this connection.tfanouts-[connection close] Deleting fanout queue: %s N(
t_heartbeat_stopRnRytitemsRWRKRkRcRVtdeleteRRbRœ(RNR1R}((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytclose]s
		

c	Cs¥|j–y4x-|jjƒD]\}}|jd|ƒqWWn!tjjk
ra|jƒnX|jjƒ|j	jƒ|j
jƒtjdƒ|_
WdQXdS(s+Reset a connection so it can be used again.R}iN(RÖRyR5R~ReRR!RäR+RÍR{RÎRÏRÐ(RNR1R}((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytresetjs



cCsO|jdkrtS|jjr#tS|jsKtjtdƒƒt|_ntS(NisUHeartbeat support requested but it is not supported by the kombu driver or the broker(	R>R‡Rntsupports_heartbeatsR˜RãRKRºR
(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRçws	cCs4|j}|jƒr<|dkr*|}q<t||ƒ}ny|jjj}Wn3tk
r„}tj	d|j
t|ƒfƒn¬X|j|ƒt
jdkr0t
jdkr0yE|dk	rÉ|dnd}|jtjtttj|ƒƒƒWq0tjk
r,}|d}|tjkr-‚q-q0XndS(Ns'[%s] Failed to get socket attribute: %stwin32tdarwinièi(R>RçRbR<RiRntsocktAttributeErrorRKRkRlRÆt
settimeoutRÛtplatformt
setsockopttsockettIPPROTO_TCPtTCP_USER_TIMEOUTtinttmathtceilRterrnotENOPROTOOPT(RNttimeouttheartbeat_timeoutR<teRRp((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRƒs*
			

ccs |j|ƒdV|jƒdS(N(R(RNRI((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_transport_socket_timeout±s
cCs|jjd|jƒdS(Ntrate(Rntheartbeat_checkR?(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_heartbeat_check·scCs\|jƒrOtjƒ|_tjd|jƒ|_t|j_	|jj
ƒn	d|_dS(Nttarget(RçRtEventt_heartbeat_exit_eventRŽtThreadt_heartbeat_thread_jobRåR˜tdaemontstartRb(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRæ¼s	cCs9|jdk	r5|jjƒ|jjƒd|_ndS(N(RåRbRRRxtjoin(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR4Æs

c	Cs	xõ|jjƒs÷|jjƒ»y|y<|jƒy|jjddƒWntjk
r_nXWn9t	j
jk
rœ}tj
tdƒ|ƒ|jƒnXWn7tk
r×tjtdƒƒtjddtƒnXWdQX|jjd|jƒqW|jjƒdS(s3Thread that maintains inactive connections
        RIgü©ñÒMbP?sHA recoverable connection/channel error occurred, trying to reconnect: %ssAUnexpected error during heartbeart thread processing, retrying...RRN(RRtis_setRÖRRORntdrain_eventsRARIReRR!RKR
RRäRRºR
RkR˜R–RâR+(RNRr((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRTÌs&

csH‡fd†}‡‡fd†}ˆjˆj|d|ƒSWdQXdS(siCreate a Consumer using the class that was passed in and
        add it to our list of consumers
        cs1iˆjd6|d6}tjtdƒ|ƒdS(NttopicRs=Failed to declare consumer for topic '%(topic)s': %(err_str)s(R\RKRR(Rrtlog_info(R1(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_connect_errorñscsqˆjˆƒˆjjˆjƒ}|dkr`tˆjƒ}|ˆjˆj<ˆjj|ƒn|ˆj	ˆ<ˆS(N(
RmRÍtgetRZRbtnextRÐR{taddRy(R}(R1RN(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_declare_consumerös

R%N(RÖR
(RNR1R\R`((R1RNsX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytdeclare_consumerìs

c
sŽtjdˆƒ‰ˆjƒd„‰‡‡‡fd†‰‡fd†}‡‡‡‡fd†}ˆjˆj|dˆd|ƒWdQXdS(	s"Consume from all queues/consumers.tdurationcSs tjd|ƒtjƒ‚dS(Ns&Timed out waiting for RPC response: %s(RKRkRHtTimeout(Rr((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_raise_timeoutscsAt|tjƒs-tˆjjƒƒˆ_nˆjˆ|ƒdS(N(t
isinstanceRHRcRxRyRzR{tcheck_return(Rr(RdRNttimer(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_recoverable_error_callbackscs$ˆ|ƒtjtdƒ|ƒdS(Ns(Failed to consume message from queue: %s(RKRR(Rr(Rh(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_error_callbacks
cs}ˆjjsˆjjd‚nxbˆjrƒxRˆjjƒD]A\}}|ˆjkr;|jˆd|ƒˆjj|ƒq;q;Wq"Wˆdkr™ˆj	nt
ˆˆj	ƒ}xËtrxˆjrÁdSˆj
ƒrڈjƒnyˆjjd|ƒdSWq®tjk
r(}ˆjˆ|dˆj	ƒ}q®ˆjjk
rt}|jdkrn|jdkrnˆjjd‚n‚q®Xq®WdS(NiR}RItmaximumi–s	Basic.ack(Rnt	connectedtrecoverable_connection_errorsR{RyR5RttremoveRbRèR<R˜RÓRçRORYRARIRfRoRpR|trecoverable_channel_errors(R1R}tpoll_timeoutRr(RdRNRIRg(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_consumes0		
RR%N(RHt
DecayingTimerRVRÖR
(RNRIRiRp((RdRhRNRIRgsX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRts
	$
cCs
t|_dS(N(R˜RÓ(RN((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytstop_consumingBscCs\td|d|d|dddtdtdtd	|d
|jd|jƒ
}|j|ƒdS(
s‡Create a 'direct' queue.
        In nova's use, this is generally a msg_id queue used for
        responses for call/multicall
        R[RZR\RWtdirectRXR]R^R_R:RBN(RUR‡R˜R:R;Ra(RNRZR_R1((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytdeclare_direct_consumerEs	cCsbtd|d|p|d|ddd|jd|jd|jd	|d
|jƒ	}|j|ƒdS(sCreate a 'topic' consumer.R[RZR\RWRZRXR]R^R_R:N(RUR·R¸R:Ra(RNR[RZR_RZR1((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytdeclare_topic_consumerXs			cCs…tjƒj}d|}d||f}td|d|d|dddtd	td
td|d|jd
|jƒ
}|j|ƒdS(sCreate a 'fanout' consumer.s	%s_fanouts%s_fanout_%sR[RZR\RWR3RXR]R^R_R:RBN(	R×RØthexRUR‡R˜R:R;Ra(RNRZR_tuniqueR[RZR1((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytdeclare_fanout_consumergs
	c
sW‡fd†}tj|ˆ|||ƒ}|j|j|d|d|ƒWdQXdS(s1Send to a publisher based on the publisher class.csDiˆjd6|d6}tjtdƒ|ƒtjdd|ƒdS(NRZRs;Failed to publish message to topic '%(topic)s': %(err_str)sRR(RVRKRRRk(RrR[(Rh(sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRisRR%N(t	functoolstpartialRÖR
(RNRRhR*R\RIRRi((RhsX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_ensure_publishing{s
cCs€|jjƒ}d}|jr^t|jjdƒr^|jjjr^|jjjjƒd}n|ji|d6|jd6ƒ|S(NR<itclient_portRl(	RnR
RbRithasattrR<tgetsocknameRRl(RNR
R|((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRàŠscCsº|jp|j|jksD||jƒjƒ|jj|jƒni|d6|pWdd6|d6}tjd|ƒ|j|ƒ0|j	j
|d|d|d|d	|jƒWd
QXd
S(sPublish a message.R*RtwhoRþsPConnection._publish: sending message %(msg)s to %(who)s with routing key %(key)sRhR\t
expirationtcompressionN(tpassiveRVRÑRiRmR_RKRLRLRÌtpublishR!(RNRhR*R\RIR[((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt_publish”s


	cCsÍ|j|f}||jkr­tjjd|jd|d|jd|jd|d|dt|j	dƒƒ}i|d	6|d6}t
jd
|ƒ|jƒ|jj
|ƒn|j||d|d|ƒdS(
såPublisher that declares a default queue

        When the exchange is missing instead of silently creates an exchange
        not binded to a queue, this publisher creates a default queue
        named with the routing_key

        This is mainly used to not miss notification in case of nobody consumes
        them yet. If the future consumer bind the default queue it can retrieve
        missing messages.

        _set_current_channel is responsible to cleanup the cache.
        RiRhRXRYRVR\RaiRþs]Connection._publish_and_creates_default_queue: declare queue %(key)s on %(exchange)s exchangeRIN(RVRÒReRfRjRiRXRYRDR:RKRLRmR_R„(RNRhR*R\RItqueue_indentifierRcR[((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt"_publish_and_creates_default_queueªs"			
cCs‡|jstdƒ‚ny$|j||d|d|ƒdSWnD|jjk
r‚}|jdkr|tjd|jƒ‚n‚nXdS(s7Publisher that raises exception if exchange is missing.sQ_publish_and_retry_on_missing_exchange() must be called with an passive exchange.R\RINi”sexchange %s doesn't exists(	R‚R™R„RnRoRpR#R$RV(RNRhR*R\RIRr((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt'_publish_and_raises_on_missing_exchangeÎs		cCsMtjjd|dddtdtdtƒ}|j|j||d|ƒdS(	sSend a 'direct' message.RVRWRsRXRYR‚R\N(ReRfRgR‡R˜R{R‡(RNtmsg_idR*Rh((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytdirect_sendès	c
CsYtjjd|ddd|jd|jƒ}|j|j||d|d|d|ƒd	S(
sSend a 'topic' message.RVRWRZRXRYR\RIRN(ReRfRgR·R¸R{R„(RNR[RZR*RIRRh((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt
topic_sendós	c	CsKtjjdd|dddtdtƒ}|j|j||d|ƒdS(	sSend a 'fanout' message.RVs	%s_fanoutRWR3RXRYRN(ReRfRgR‡R˜R{R„(RNRZR*RRh((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytfanout_sendÿs
	c	KsStjjd|ddd|jd|jƒ}|j|j||d|d|ƒdS(	s!Send a notify message on a topic.RVRWRZRXRYR\RN(ReRfRgR·R¸R{R†(RNR[RZR*RtkwargsRh((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytnotify_sends	N(4RSRTR†tpoolsRGRtPROTOCOL_TLSv1tPROTOCOL_SSLv23Rút_OPTIONAL_PROTOCOLStprotocolR€R=tclassmethodRÿRÄRÝtstaticmethodRÞRßRäRbR˜R
RR.R7R8RçRRžRŸRLRORæR4RTRaRtRrRtRuRxR{RàR„R†R‡R‰RŠR‹R(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR Ãsn	¾




			r			
	
	.		
		 	=				
$		tRabbitDrivercBs)eZdZddd„Zed„ZRS(sÞRabbitMQ Driver

    The ``rabbit`` driver is the default driver used in OpenStack's
    integration tests.

    The driver is aliased as ``kombu`` to support upgrading existing
    installations with older settings.

    c
Cstjddddƒ}|j|ƒ|jtd|ƒ|jtjd|ƒ|jtjd|ƒt	j
|||jƒ}|jj
|_|jj|_|jj}|jj}|jj}tj|||||tƒ}	tt|ƒj|||	||ƒdS(NRVR±ttitlesRabbitMQ driver optionstgroup(RtOptGrouptregister_groupt
register_optstrabbit_optsR#t	amqp_optsRt	base_optsRHtConfigOptsProxyRVR±R"t!missing_destination_retry_timeoutR=t
prefetch_sizetrpc_conn_pool_sizetconn_pool_min_sizet
conn_pool_ttlR	tConnectionPoolR RFR•RG(
RNRêRëtdefault_exchangetallowed_remote_exmodst	opt_grouptmax_sizetmin_sizetttltconnection_pool((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyRGs(	
cCsdS(N((RNRR((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pytrequire_features@sN(RSRTR†RbRGR˜R¬(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyR•s	(DRžRGRyRÎRERÉR¿RARRÛRŽRR×Retkombu.connectiontkombu.entitytkombu.messagingtoslo_configRtoslo_logRtloggingt
oslo_utilsRRRvtsix.moves.urllibRtoslo_messaging._driversRR#RRRRHR	toslo_messaging._i18nR
RRR
toslo_messagingRRRCtBoolOptR‡tStrOpttFloatOpttIntOptR˜tPortOpttListOptR›t	getLoggerRSRKRDRREtobjectRUt	DummyLockRˆRŠR tAMQPDriverBaseR•(((sX/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.pyt<module>s:			#sGÿÿÿT