403Webshell
Server IP : 118.27.122.248  /  Your IP : 216.73.216.15
Web Server : Apache
System : Linux web0264.sh.tyo1 4.18.0-553.79.1.lve.el7h.x86_64 #1 SMP Wed Oct 15 16:34:46 UTC 2025 x86_64
User : c9415830 ( 11735)
PHP Version : 8.4.17
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/alt/python36/lib64/python3.6/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/alt/python36/lib64/python3.6/test/__pycache__/test_signal.cpython-36.opt-2.pyc
3

�
�^
��@s2ddlZddlmZddlmZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
Z
ddlZddlZddlZddlZddlZddlZddlmZmZyddlZWnek
r�dZYnXyddlZWnek
r�dZYnXGdd�dej�Zejejdkd�Gd	d
�d
ej��Zejejdkd�Gdd
�d
ej��ZGdd�dej�Z ejejdkd�Gdd�dej��Z!eje"ed�d�Gdd�dej��Z#ejejdkd�Gdd�dej��Z$ejejdkd�Gdd�dej��Z%Gdd�dej�Z&Gdd�dej�Z'dd�Z(e)d k�r.ej*�dS)!�N)�support)�closing)�assert_python_ok�spawn_pythonc@s$eZdZejedkd�dd��ZdS)�GenericTestsNztest needs threading modulecCs�x�tt�D]�}tt|�}|d
kr0|j|tj�q
|dkrH|j|tj�q
|jd�rn|jd�rn|j|tj�q
|jd�r
|j|tj�|jt	j
d	�q
WdS)N�SIG_DFL�SIG_IGN�	SIG_BLOCK�SIG_UNBLOCK�SIG_SETMASKZSIGZSIG_ZCTRL_�win32>rr>r
r	r)�dir�signal�getattr�assertIsInstance�HandlersZSigmasks�
startswithZSignals�assertEqual�sys�platform)�self�name�sig�r�0/opt/alt/python36/lib64/python3.6/test_signal.py�
test_enumss

zGenericTests.test_enums)�__name__�
__module__�__qualname__�unittest�skipIf�	threadingrrrrrrsrrzNot valid on Windowsc@sFeZdZdd�Zdd�Zdd�Zdd�Zeje	j
d	kd
�dd��Zd
S)�
PosixTestscGsdS)Nr)r�argsrrr�trivial_signal_handler.sz!PosixTests.trivial_signal_handlercCs(|jttjd�|jttjd|j�dS)Ni�)�assertRaises�
ValueErrorr�	getsignalr$)rrrr�,test_out_of_range_signal_number_raises_error1sz7PosixTests.test_out_of_range_signal_number_raises_errorcCs|jttjtjd�dS)N)r%�	TypeErrorr�SIGUSR1)rrrr�0test_setting_signal_handler_to_none_raises_error7s
z;PosixTests.test_setting_signal_handler_to_none_raises_errorcCsZtjtj|j�}|j|tj�|jtjtj�|j�tjtj|�|jtjtj�|�dS)N)r�SIGHUPr$rrrr')rZhuprrr�test_getsignal;szPosixTests.test_getsignal�freebsd6zOinter process signals not reliable (do not mix well with threading) on freebsd6cCs&tjjt�}tjj|d�}t|�dS)Nzsignalinterproctester.py)�os�path�dirname�__file__�joinr)rr1Zscriptrrr�test_interprocess_signalDsz#PosixTests.test_interprocess_signalN)rrrr$r(r+r-rr rrr4rrrrr",s	r"zWindows specificc@seZdZdd�ZdS)�WindowsSignalTestscCs�dd�}t�}xTtjtjtjtjtjtjtjfD]0}tj	|�dk	r0tj|tj||��|j
|�q0W|j|�|jt
��tjd|�WdQRX|jt
��tjd|�WdQRXdS)NcSsdS)Nr)�x�yrrr�<lambda>Qsz3WindowsSignalTests.test_issue9324.<locals>.<lambda>�����)�setr�SIGABRTZSIGBREAK�SIGFPE�SIGILL�SIGINT�SIGSEGV�SIGTERMr'�add�
assertTruer%r&)r�handlerZcheckedrrrr�test_issue9324Os
z!WindowsSignalTests.test_issue9324N)rrrrFrrrrr5Msr5c@sFeZdZdd�Zdd�Zdd�Zdd�Zeje	j
d	kd
�dd��Zd
S)�
WakeupFDTestscCs tj�}|jttftj|�dS)N)rZmake_bad_fdr%r&�OSErrorr�
set_wakeup_fd)r�fdrrr�test_invalid_fdgs
zWakeupFDTests.test_invalid_fdcCs0tj�}|j�}|j�|jttftj|�dS)N)�socket�fileno�closer%r&rHrrI)rZsockrJrrr�test_invalid_socketls

z!WakeupFDTests.test_invalid_socketcCs�tj�\}}|jtj|�|jtj|�tj�\}}|jtj|�|jtj|�ttd�rrtj|d�tj|d�tj|�|jtj|�|�|jtjd�|�|jtjd�d�dS)N�set_blockingFr9r;r;r;)	r/�pipe�
addCleanuprN�hasattrrPrrIr)rZr1Zw1Zr2Zw2rrr�test_set_wakeup_fd_resultss

z'WakeupFDTests.test_set_wakeup_fd_resultcCs�tj�}|j|j�|jd�|j�}tj�}|j|j�|jd�|j�}tj|�|jtj|�|�|jtjd�|�|jtjd�d�dS)NFr9r;r;r;)rLrRrNZsetblockingrMrrIr)rZsock1Zfd1Zsock2Zfd2rrr� test_set_wakeup_fd_socket_result�s


z.WakeupFDTests.test_set_wakeup_fd_socket_resultrztests specific to POSIXcCs�tj�\}}|jtj|�|jtj|�tj|d�|jt��}tj|�WdQRX|j	t
|j�d|�tj|d�tj|�tjd�dS)NTz&the fd %s must be in non-blocking modeFr9r;)r/rQrRrNrPr%r&rrIr�strZ	exception)rZrfdZwfd�cmrrr�test_set_wakeup_fd_blocking�s

z)WakeupFDTests.test_set_wakeup_fd_blockingN)rrrrKrOrTrUrr rrrXrrrrrGes
rGc@steZdZejedkd�dd�dd��Zejedkd�dd��Zd	d
�Zdd�Z	d
d�Z
ejee
d�d�dd��ZdS)�WakeupSignalTestsNzneed _testcapiT)�orderedcGs&djttt|��||�}td|�dS)Naif 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        z-c)�format�tuple�map�intr)rZ	test_bodyrZZsignals�coderrr�check_wakeup�s#zWakeupSignalTests.check_wakeupcCsjd}tj�\}}z4ytj|d�Wntk
r6YnX|jd�Wdtj|�tj|�Xtd|�dS)Na)if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                _testcapi.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        �xz9OS doesn't report write() error on the read end of a pipez-c)r/rQ�writerH�skipTestrNr)rr_�r�wrrr�test_wakeup_write_error�s&
z)WakeupSignalTests.test_wakeup_write_errorcCs|jdtj�dS)Na�def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        )r`r�SIGALRM)rrrr�test_wakeup_fd_earlysz&WakeupSignalTests.test_wakeup_fd_earlycCs|jdtj�dS)Na`def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        )r`rrg)rrrr�test_wakeup_fd_during(sz'WakeupSignalTests.test_wakeup_fd_duringcCs|jdtjtj�dS)Nz�def test():
            import _testcapi
            signal.signal(signal.SIGUSR1, handler)
            _testcapi.raise_signal(signal.SIGUSR1)
            _testcapi.raise_signal(signal.SIGALRM)
        )r`rr*rg)rrrr�test_signumFszWakeupSignalTests.test_signum�pthread_sigmaskzneed signal.pthread_sigmask()cCs|jdtjtjdd�dS)Na�def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            _testcapi.raise_signal(signum1)
            _testcapi.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        F)rZ)r`rr*�SIGUSR2)rrrr�test_pendingNszWakeupSignalTests.test_pending)rrrrr �	_testcapir`rfrhrirj�
skipUnlessrSrrmrrrrrY�s&4"rYZ
socketpairzneed socket.socketpairc@s<eZdZejedkd�dd��Zejedkd�dd��ZdS)�WakeupSocketSignalTestsNzneed _testcapicCsd}td|�dS)Na�if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        _testcapi.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        z-c)r)rr_rrr�test_socketcs z#WakeupSocketSignalTests.test_socketcCs.tjdkrd}nd}dj|d�}td|�dS)N�nt�sendrba.if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            _testcapi.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        )�actionz-c)r/rr[r)rrtr_rrr�test_send_error�s

"z'WakeupSocketSignalTests.test_send_error)rrrrr rnrqrurrrrrp`s$rpc@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�SiginterruptTestcCs�d|f}td|��t}y|jj�}|jdd�\}}Wntjk
rR|j�dSX||}|j�}|d	kr|td||f��|dkSWdQRXdS)
Na�if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        z-cg@)ZtimeoutF��zChild error (exit code %s): %r)rwrx)	r�stdout�readline�communicate�
subprocessZTimeoutExpired�kill�wait�	Exception)rZ	interruptr_�processZ
first_linery�stderr�exitcoderrr�readpipe_interrupted�s*

z%SiginterruptTest.readpipe_interruptedcCs|jd�}|j|�dS)N)r�rD)r�interruptedrrr�test_without_siginterrupt�s
z*SiginterruptTest.test_without_siginterruptcCs|jd�}|j|�dS)NT)r�rD)rr�rrr�test_siginterrupt_on�s
z%SiginterruptTest.test_siginterrupt_oncCs|jd�}|j|�dS)NF)r�ZassertFalse)rr�rrr�test_siginterrupt_offs
z&SiginterruptTest.test_siginterrupt_offN)rrrr�r�r�r�rrrrrv�s<rvc@s�eZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	e
jej
dkd�dd��Ze
jej
dkd�dd��Zdd�ZdS)�
ItimerTestcCs(d|_d|_d|_tjtj|j�|_dS)NFr)�hndl_called�
hndl_count�itimerrrg�sig_alrm�	old_alarm)rrrr�setUp
szItimerTest.setUpcCs,tjtj|j�|jdk	r(tj|jd�dS)Nr)rrgr�r��	setitimer)rrrr�tearDowns
zItimerTest.tearDowncGs
d|_dS)NT)r�)rr#rrrr�szItimerTest.sig_alrmcGsFd|_|jdkrtjd��n|jdkr4tjtjd�|jd7_dS)NTrxz.setitimer didn't disable ITIMER_VIRTUAL timer.rr9)r�r�r�ItimerErrorr��ITIMER_VIRTUAL)rr#rrr�
sig_vtalrms

zItimerTest.sig_vtalrmcGsd|_tjtjd�dS)NTr)r�rr��ITIMER_PROF)rr#rrr�sig_prof&szItimerTest.sig_profcCs|jtjtjdd�dS)Nr9rr;)r%rr�r�)rrrr�test_itimer_exc*szItimerTest.test_itimer_exccCs0tj|_tj|jd�tj�|j|jd�dS)Ng�?T)r�ITIMER_REALr�r��pauserr�)rrrr�test_itimer_real3szItimerTest.test_itimer_realr.�netbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.cCs�tj|_tjtj|j�tj|jdd�tj�}x<tj�|dkrbtddd�}tj	|j�d
kr2Pq2W|j
d�|jtj	|j�d�|j|jd	�dS)Ng333333�?g�������?gN@i90i2	i����z8timeout: likely cause: machine too slow or load too highT)r�r�)r�r�)
rr�r��	SIGVTALRMr�r��time�	monotonic�pow�	getitimerrcrr�)r�
start_time�_rrr�test_itimer_virtual:s
zItimerTest.test_itimer_virtualzBitimer not reliable (does not mix well with threading) on freebsd6cCs�tj|_tjtj|j�tj|jdd�tj�}x<tj�|dkrbtddd�}tj	|j�d	kr2Pq2W|j
d�|jtj	|j�d
�|j|jd�dS)Ng�������?gN@i90i2	i����z8timeout: likely cause: machine too slow or load too highT)r�r�)r�r�)
rr�r��SIGPROFr�r�r�r�r�r�rcrr�)rr�r�rrr�test_itimer_profQs
zItimerTest.test_itimer_profcCs2tj|_tj|jd�tjd�|j|jd�dS)Ng���ư>r9T)rr�r�r�r��sleeprr�)rrrr�test_setitimer_tinygs
zItimerTest.test_setitimer_tinyN)r.r�)rrrr�r�r�r�r�r�r�rr rrr�r�r�rrrrr�s
	r�c@s�eZdZejeed�d�dd��Zejeed�d�ejeed�d�dd���Zejeed	�d
�dd��Z	ejeed�d�d
d��Z
ejeed�d�dd��Zejeed�d�dd��Zejeed�d�dd��Z
ejeed�d�dd��Zejeed�d�dd��Zejeed�d�dd ��Zejeed�d�ejeed�d�ejed!kd"�d#d$����Zejeed�d�d%d&��Zejeed�d�d'd(��Zejejd)kd*�ejeed	�d
�d+d,���Zd!S)-�PendingSignalsTests�
sigpendingzneed signal.sigpending()cCs|jtj�t��dS)N)rrr�r<)rrrr�test_sigpending_emptyvsz)PendingSignalsTests.test_sigpending_emptyrkzneed signal.pthread_sigmask()cCsd}td|�dS)Na
if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        z-c)r)rr_rrr�test_sigpending{sz#PendingSignalsTests.test_sigpending�pthread_killzneed signal.pthread_kill()cCsd}td|�dS)Naiif 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            if sys.platform == 'freebsd6':
                # Issue #12392 and #12469: send a signal to the main thread
                # doesn't work before the creation of the first thread on
                # FreeBSD 6
                def noop():
                    pass
                thread = threading.Thread(target=noop)
                thread.start()
                thread.join()

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        z-c)r)rr_rrr�test_pthread_kill�s z%PendingSignalsTests.test_pthread_killcCsd|j�|f}td|�dS)Nawif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        z-c)�stripr)rZblocked�testr_rrr�wait_helper�s'zPendingSignalsTests.wait_helper�sigwaitzneed signal.sigwait()cCs|jtjd�dS)Na 
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        )r�rrg)rrrr�test_sigwait�sz PendingSignalsTests.test_sigwait�sigwaitinfozneed signal.sigwaitinfo()cCs|jtjd�dS)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        )r�rrg)rrrr�test_sigwaitinfo�sz$PendingSignalsTests.test_sigwaitinfo�sigtimedwaitzneed signal.sigtimedwait()cCs|jtjd�dS)Nz�
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        )r�rrg)rrrr�test_sigtimedwaitsz%PendingSignalsTests.test_sigtimedwaitcCs|jtjd�dS)Nz�
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        )r�rrg)rrrr�test_sigtimedwait_poll
sz*PendingSignalsTests.test_sigtimedwait_pollcCs|jtjd�dS)Nz�
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        )r�rrg)rrrr�test_sigtimedwait_timeoutsz-PendingSignalsTests.test_sigtimedwait_timeoutcCstj}|jttj|gd�dS)Ng�?g�)rrgr%r&r�)r�signumrrr�"test_sigtimedwait_negative_timeout$sz6PendingSignalsTests.test_sigtimedwait_negative_timeoutNztest needs threading modulecCstdd�dS)Nz-ca�if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        )r)rrrr�test_sigwait_thread*s
z'PendingSignalsTests.test_sigwait_threadcCsH|jttj�|jttjd�|jttjddd�|jttjdg�dS)Nr9rwrxi�)r%r)rrkrH)rrrr�test_pthread_sigmask_argumentsMsz2PendingSignalsTests.test_pthread_sigmask_argumentscCsd}td|�dS)Na-	if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        z-c)r)rr_rrr�test_pthread_sigmaskUsJz(PendingSignalsTests.test_pthread_sigmaskr.zpissue #12392: send a signal to the main thread doesn't work before the creation of the first thread on FreeBSD 6cCsJd}td|��2}|j�\}}|j�}|dkr<td||f��WdQRXdS)Na7if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        z-crxzChild error (exit code %s): %s)rr{r~r)rr_r�ryr�r�rrr�test_pthread_kill_main_thread�sz1PendingSignalsTests.test_pthread_kill_main_thread)rrrrrorSrr�r�r�r�r�r�r�r�r�r�r r!r�r�r�rrr�rrrrr�qsF"-

	Lr�c@sXeZdZdd�Zdd�Zdd�Zejee	d�d�d	d
��Z
ejee	d�d�dd��Zd
S)�
StressTestcCs tj||�}|jtj||�dS)N)rrR)rr�rEZold_handlerrrr�setsig�szStressTest.setsigcs�d�g�d
��fdd�	}|jtjtjd�|jtj|�|�xt���krXtjd�q@W�fdd�t	t��d�D�}t
j|�}tj
r�td	|f�|S)N�cs,t���kr(�jtj��tjtjd�dS)Ng���ư>)�len�appendr�Zperf_counterrr�r�)r��frame)�N�timesrrrE�sz5StressTest.measure_itimer_resolution.<locals>.handlerrg����MbP?cs g|]}�|d�|�qS)r9r)�.0�i)r�rr�
<listcomp>�sz8StressTest.measure_itimer_resolution.<locals>.<listcomp>r9z,detected median itimer() resolution: %.6f s.)NN)rRrr�r�r�rgr�r�r��range�
statisticsZmedianr�verbose�print)rrEZ	durationsZmedr)r�r�r�measure_itimer_resolution�s
z$StressTest.measure_itimer_resolutioncCs4|j�}|dkrdS|dkr dS|jd|f�dS)Ng-C��6?i'g{�G�z�?�dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r�rc)rZresorrr�decide_itimer_count�szStressTest.decide_itimer_countr�ztest needs setitimer()cs|j�}g�dd�}d
�fdd�	}|jtj|�|jtj|�|jtj|�d}tj�d}x�||kr�tjtj	�tj�|d7}x&t
��|kr�tj�|kr�tjd�q�Wtjtj	�tj�|d7}x&t
��|kr�tj�|kr�tjd�q�Wq^W|jt
��|d	�dS)NcSstjtjdtj�d�dS)Ng���ư>g�h㈵��>)rr�r��random)r�r�rrr�
first_handler�sz@StressTest.test_stress_delivery_dependent.<locals>.first_handlercs�j|�dS)N)r�)r�r�)�sigsrr�second_handlerszAStressTest.test_stress_delivery_dependent.<locals>.second_handlerrg.@r9g�h㈵��>zSome signals were lost)NN)
r�r�rr�r*rgr�r/r}�getpidr�r�r)rr�r�r��
expected_sigs�deadliner)r�r�test_stress_delivery_dependent�s&
z)StressTest.test_stress_delivery_dependentcs�|j�}g��fdd�}|jtj|�|jtj|�d}tj�d}xh||kr�tjtjdtj�d�t	j
t	j�tj�|d7}x&t��|kr�tj�|kr�tj
d�q�WqFW|jt��|d�dS)	Ncs�j|�dS)N)r�)r�r�)r�rrrE'sz=StressTest.test_stress_delivery_simultaneous.<locals>.handlerrg.@g���ư>g�h㈵��>rwzSome signals were lost)r�r�rr*rgr�r�r�r�r/r}r�r�r�r)rr�rEr�r�r)r�r�!test_stress_delivery_simultaneouss
z,StressTest.test_stress_delivery_simultaneousN)rrrr�r�r�rrorSrr�r�rrrrr��s-r�cCstj�dS)N)rZ
reap_childrenrrrr�tearDownModule@sr��__main__)+rr�r�
contextlibr�enum�gcr/�pickler�ZselectrrLr�r|�	tracebackrr��errnoZtest.support.script_helperrrr!�ImportErrorrnZTestCaserr rr"ror5rGrYrSrprvr�r�r�r�r�mainrrrr�<module>s\ 


 
D
7
R
T
hP


Youez - 2016 - github.com/yon3zu
LinuXploit