Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
idna / lib / python2.7 / site-packages / oslo_messaging / _drivers / amqpdriver.pyc
Size: Mime:
ó
čEYc@s£dgZddlZddlZddlZddlZddlZddlmZddlm	Z	ddl
Z
ddlmZ
ddlmZddlmZddlmZdd	lmZdd
lmZddlmZejeƒZdZd
Zdefd„ƒYZdejfd„ƒYZdefd„ƒYZdej fd„ƒYZ!defd„ƒYZ"defd„ƒYZ#dej$fd„ƒYZ%dS(tAMQPDriverBaseiÿÿÿÿN(t	timeutils(tmoves(tamqp(tbase(tcommon(t_(t_LE(t_LI(t_LWgü©ñÒMbP?gð?tMessageOperationsHandlercBsDeZdZd„Zd„Zd„Zd„Zd„Zd„ZRS(sªQueue used by message operations to ensure that all tasks are
    serialized and run in the same thread, since underlying drivers like kombu
    are not thread safe.
    cCsqd|tt|ƒƒf|_tjjƒ|_tjƒ|_	tj
d|jƒ|_t
|j_d|_dS(Ns%s (%s)ttarget(thextidtnameRtqueuetQueuet_taskst	threadingtEventt	_shutdowntThreadt_process_in_backgroundt_shutdown_threadtTruetdaemontNonet	_executor(tselfR((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt__init__2s	cCs|jjƒdS(N(Rtset(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytstopBscCs|jjƒdS(s_Run all pending tasks queued by do() in an thread during the
        shutdown process.
        N(Rtstart(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytprocess_in_backgroundEscCs1x*|jjƒs,|jƒtjtƒqWdS(N(Rtis_settprocessttimetsleeptACK_REQUEUE_EVERY_SECONDS_MIN(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRKs
cCsdx]tr_y|jjdtƒ\}}Wntjjk
rBPnXz|ƒWd|jƒXqWdS(s%Run all pending tasks queued by do().tblockN(RRtgettFalseRRtEmptyR(Rttasktevent((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR#Ps	cCsg|jdkrtdƒ‚nE|jdkr7|ƒn,tjƒ}|jj||fƒ|jƒdS(s@Put the task in the queue and waits until the task is completed.s(Unexpected error, no executor is setupedtblockingN(RRtRuntimeErrorRRRtputtwait(RR+R,((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytdo]s
(	t__name__t
__module__t__doc__RRR!RR#R1(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR
-s					
tAMQPIncomingMessagecBsAeZd„Zddd„Zddd„Zd„Zd„ZRS(c		Csott|ƒj||ƒ||_||_||_||_||_||_t	j
ƒ|_|jjƒdS(N(
tsuperR5Rtlistenert	unique_idtmsg_idtreply_qt_obsolete_reply_queuest_message_operations_handlerRt	StopWatcht	stopwatchR (	RR7tctxttmessageR8R9R:tobsolete_reply_queuestmessage_operations_handler((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRms						cCsÑ|jj|j|jƒsdS|r7tj|ƒ}ni|d6|d6td6|jd6}tj|ƒ|tj	}t
jdi|jd6|d6|jd6|jj
ƒd	6ƒ|j|jtj|ƒƒdS(
Ntresulttfailuretendingt_msg_idsTsending reply msg_id: %(msg_id)s reply queue: %(reply_q)s time elapsed: %(elapsed)ssR9R8R:telapsed(R;t
reply_q_validR:R9t
rpc_commontserialize_remote_exceptionRtrpc_amqpt_add_unique_idt	UNIQUE_IDtLOGtdebugR>RGtdirect_sendt
serialize_msg(RtconntreplyRDtmsgR8((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt_send_replyzs 


	

cCsO|js
dS|jj|j|jƒs,dS|jjj}tjd|ƒ}|j	ƒxñt
rJy<|jjjtjƒ}|j
|||ƒWdQXdSWqZtjk
rF|jƒdkrõtjdi|jd6|jd6ƒtjdƒqG|jj|j|jƒtjtdƒi|jd6|jd6|d6ƒdSqZXqZWdS(NtdurationisUThe reply %(msg_id)s cannot be sent  %(reply_q)s reply queue don't exist, retrying...R9R:gÐ?smThe reply %(msg_id)s cannot be sent  %(reply_q)s reply queue don't exist after %(duration)s sec abandoning...(R9R;RHR:R7tdrivert!missing_destination_retry_timeoutRIt
DecayingTimerR Rt_get_connectiontPURPOSE_SENDRURKtAMQPDestinationNotFoundtcheck_returnRNROR$R%taddtinfoR(RRSRDRVttimerRR((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRS‘s8	
		


cCs0|jj|jjƒ|jjj|jƒdS(N(R<R1R@tacknowledgeR7tmsg_id_cacheR^R8(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRa»scCs|jj|jjƒdS(N(R<R1R@trequeue(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRc¿sN(R2R3RRRURSRaRc(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR5ks
	
*	tObsoleteReplyQueuesCachecBs>eZdZdZdZd„Zd„Zd„Zd„ZRS(sLCache of reply queue id that doesn't exists anymore.

    NOTE(sileht): In case of a broker restart/failover
    a reply queue can be unreachable for short period
    the IncomingMessage.send_reply will block for 60 seconds
    in this case or until rabbit recovers.

    But in case of the reply queue is unreachable because the
    rpc client is really gone, we can have a ton of reply to send
    waiting 60 seconds.
    This leads to a starvation of connection of the pool
    The rpc server take to much time to send reply, other rpc client will
    raise TimeoutError because their don't receive their replies in time.

    This object cache stores already known gone client to not wait 60 seconds
    and hold a connection of the pool.
    Keeping 200 last gone rpc client for 1 minute is enough
    and doesn't hold to much memory.
    iÈi<cCs.tjƒ|_tj|j|jƒ|_dS(N(RtRLockt_lockt
cachetoolstTTLCachetSIZEtTTLt_cache(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRáscCs'||jkr#|j||ƒtStS(N(Rkt
_no_reply_logR)R(RR:R9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRHåscCs;|j|jji||6ƒWdQX|j||ƒdS(N(RfRktupdateRl(RR:R9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR^ës
cCs(tjtdƒi|d6|d6ƒdS(Ns8%(reply_queue)s doesn't exists, drop reply to %(msg_id)streply_queueR9(RNtwarningR	(RR:R9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRlðs
(	R2R3R4RiRjRRHR^Rl(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRdÉs			tAMQPListenercBsAeZd„Zd„Zejdd„ƒZd„Zd„Z	RS(cCs‰tt|ƒj|jƒ||_||_tjƒ|_g|_	t
jƒ|_t
jƒ|_
tƒ|_tdƒ|_t|_dS(NRp(R6RpRt
prefetch_sizeRWRRRKt_MsgIdCacheRbtincomingRRRt_shutoffRdR;R
R<R&t_current_timeout(RRWRR((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRøs			c
Cs¢tj|ƒ}|jj|ƒ}|jrQtjdi|jd6|jd6ƒntjd|ƒ|jj	t
||jƒ|||j|j|j|j
ƒƒdS(Ns6received message msg_id: %(msg_id)s reply to %(queue)sRR9s#received message with unique_id: %s(RKtunpack_contextRbtcheck_duplicate_messageR9RNROR:RstappendR5tto_dictR;R<(RR@R?R8((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt__call__s 		
	cCs!tjd|ƒjƒ}xÏ|jjƒsé|jjƒ|jrP|jjdƒS|j	dt
ƒ}|dkrz|j}n|dkrŠdSy#|j
jdt|j|ƒƒWn-tjk
rÜt|jdtƒ|_qXt|_qW|jjƒ|jr|jjdƒS|jjƒdS(NRVitreturn_nonettimeouti(RR=R RR"R<R#RstpoptleftoverRRRuRRtconsumetminRItTimeouttmaxtACK_REQUEUE_EVERY_SECONDS_MAXR&RtR(RR|R>tleft((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytpolls(
	#


	cCs8|jjƒ|jjƒ|jjƒ|jjƒdS(N(RRRRtstop_consumingRtR0R<R!(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR9s


cCs|jjƒ|jjƒdS(N(R<RRRtclose(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytcleanupDs
N(
R2R3RRzRtbatch_poll_helperRR…RRˆ(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRpös	
		tReplyWaiterscBs>eZeƒZd„Zd„Zd„Zd„Zd„ZRS(cCsi|_d|_dS(Ni
(t_queuest_wrn_threshold(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRQs	cCsRy!|j|jdtd|ƒSWn*tjjk
rMtjd|ƒ‚nXdS(NR'R|s.Timed out waiting for a reply to message ID %s(R‹R(RRRR*toslo_messagingtMessagingTimeout(RR9R|((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR(Us!cCsi|jj|ƒ}|sXtjtdƒ|ƒtjdit|jƒd6|d6ƒn
|j|ƒdS(Ns*No calling threads waiting for msg_id : %ss) queues: %(queues)s, message: %(message)stqueuesR@(R‹R(RNR_RROtlenR/(RR9tmessage_dataR((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR/]s	!cCstjjƒ|j|<t|jƒ}||jkr{tjtdƒi|d6|jd6|jdd6ƒ|jd9_ndS(NsœNumber of call queues is %(queues_length)s, greater than warning threshold: %(old_threshold)s. There could be a leak. Increasing threshold to: %(threshold)st
queues_lengtht
old_thresholdit	threshold(	RRRR‹RRŒRNRoR	(RR9R’((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR^fs

cCs|j|=dS(N(R‹(RR9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytremovess(	R2R3tobjecttWAKE_UPRR(R/R^R•(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRŠMs						
tReplyWaitercBs_eZd„Zd„Zd„Zd„Zd„Zd„Zed„ƒZ	d„Z
d„ZRS(	cCs„||_||_tjƒ|_tƒ|_|jj||ƒtj	ƒ|_
tjd|jƒ|_
t|j
_|j
jƒdS(NR(RRtallowed_remote_exmodsRKRrRbRŠtwaiterstdeclare_direct_consumerRRt_thread_exit_eventRR…t_threadRRR (RR:RRR™((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRxs		cCs@|jr<|jjƒ|jjƒ|jjƒd|_ndS(N(RRœRRRR†tjoinR(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR…s
	


cCsŠt}x}|jjƒs…y|jjd|ƒWnJtjk
rXt|dtƒ}q	t	k
r{t
jtdƒƒq	Xt}q	WdS(NR|is/Failed to process incoming message, retrying...(
R&RœR"RRRRIRR‚Rƒt	ExceptionRNt	exceptionR(Rtcurrent_timeout((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR…Œs

cCsU|jƒ|jddƒ}|jdƒr>tjd|ƒn|jj||ƒdS(NRFREsreceived reply msg_id: %s(RaR}RR(RNRORšR/(RR@tincoming_msg_id((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRz›s

cCs|jj|ƒdS(N(RšR^(RR9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytlisten¦scCs|jj|ƒdS(N(RšR•(RR9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytunlisten©scCstjtdƒ|ƒ‚dS(Ns/Timed out waiting for a reply to message ID %s.(RRŽR(R9((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt_raise_timeout_exception¬scCsj|jj|ƒ|dr<|d}tj||jƒ}n|jddƒ}|jdtƒ}||fS(NRDRCRE(RbRwRItdeserialize_remote_exceptionR™R(RR)(RtdataRDRCRE((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt_process_reply±s

cCs»tjd|ƒ}|jƒd}t}xŒ|s¶|j|j|ƒ}y|jj|d|ƒ}Wn$t	j
jk
rˆ|j|ƒnX|j|ƒ\}}|dk	r+|}q+q+W|S(NRVR|(
RIRYR RR)R]R¥RšR(RRR*R¨(RR9R|R`tfinal_replyRER@RS((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR0½s
	
(R2R3RRR…RzR£R¤tstaticmethodR¥R¨R0(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR˜ws	
						cBseZdZddd„Zd„Zejd„Zd„Z	dde
edd„Zdddd„Z
dd„Zd„Zd	„Zd
„ZRS(icCs_tt|ƒj||||ƒ||_||_tjƒ|_d|_	d|_
d|_dS(N(R6RRt_default_exchanget_connection_poolRtLockt
_reply_q_lockRt_reply_qt
_reply_q_connt_waiter(Rtconfturltconnection_pooltdefault_exchangeR™((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRÛs				cCs|jp|jS(N(texchangeR«(RR((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt
_get_exchangeéscCstj|jd|ƒS(Ntpurpose(RItConnectionContextR¬(RR¸((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRZìscCs||jj|jdk	r |jSdtjƒj}|jtjƒ}t	|||j
ƒ|_||_||_WdQX|jS(Ntreply_(
R®R¯Rtuuidtuuid4RRZRItPURPOSE_LISTENR˜t_allowed_remote_exmodsR±R°(RR:RR((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt_get_reply_qðs
		c	Csg|}	|rLtjƒj}
|	ji|
d6ƒ|	ji|jƒd6ƒntj|	ƒ|	tj}tj|	|ƒ|rŽt	j
|	ƒ}	n|r±|jj|
ƒd|
}n
d|}z‹|j
t	jƒ<}
|r3|j|ƒ}|di|d6|jd67}tj|ƒ|
j||j|	d|ƒnÕ|jr}|d	i|jd67}tj|ƒ|
j|j|	d|ƒn‹|j}|j|ƒ}|jr·d
|j|jf}n|di|d6|d67}tj|ƒ|
jd|d|d
|	d|d|ƒWdQX|rE|jj|
|ƒ}t|tƒrA|‚n|SWd|rb|jj|
ƒnXdS(NRFR¯sCALL msg_id: %s sCAST unique_id: %s s0NOTIFY exchange '%(exchange)s' topic '%(topic)s'R¶ttopictretrysFANOUT topic '%(topic)s's%s.%ss)exchange '%(exchange)s' topic '%(topic)s't
exchange_nameRTR|(R»R¼RRmR¿RKRLRMtpack_contextRIRQR±R£RZR[R·RÀRNROtnotify_sendtfanouttfanout_sendtservert
topic_sendR0t
isinstanceRŸR¤(RRR?R@twait_for_replyR|tenvelopetnotifyRÁRTR9R8tlog_msgRRR¶RÀRC((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt_sends^




		
		
	cCs|j|||||d|ƒS(NRÁ(RÎ(RRR?R@RÊR|RÁ((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytsend?sc
Cs+|j|||d|dkdtd|ƒS(NRËg@RÌRÁ(RÎR(RRR?R@tversionRÁ((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytsend_notificationDscCs¤|jtjƒ}t||ƒ}|jd|j|ƒd|jd|ƒ|jd|j|ƒdd|j|jfd|ƒ|j|j|ƒt	j
|||ƒS(NRÂRÀtcallbacks%s.%s(RZRIR½Rptdeclare_topic_consumerR·RÀRÇtdeclare_fanout_consumerRtPollStyleListenerAdapter(RRt
batch_sizet
batch_timeoutRRR7((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyR£Hs		
c	
Cs|jtjƒ}||j_t||ƒ}xL|D]D\}}|jd|j|ƒdd|j|fd|d|ƒq4Wt	j
|||ƒS(NRÂRÀs%s.%sRÒt
queue_name(RZRIR½t
connectiontrabbit_qos_prefetch_countRpRÓR·RÀRRÕ(	Rttargets_and_prioritiestpoolRÖR×RRR7Rtpriority((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pytlisten_for_notificationsYs	cCs}|jr|jjƒnd|_|jL|jdk	rs|jjƒ|jjƒd|_d|_d|_nWdQXdS(N(	R¬temptyRR®R¯R±RR°R‡(R((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRˆls		


		N(R2R3RXRRR·RIR[RZR¿RR)RÎRÏRÑR£RÞRˆ(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyRØs
		<		(&t__all__tloggingRR$R»Rgt
oslo_utilsRtsixRRtoslo_messaging._driversRRKRRRItoslo_messaging._i18nRRRR	t	getLoggerR2RNR&RƒR–R
tRpcIncomingMessageR5RdtPollStyleListenerRpRŠR˜t
BaseDriverR(((sW/home/tvault/.virtenv/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.pyt<module>s2	>^-W*a