ó @r`Pc@`sêdZddlmZmZddlZddlZyddlZWnek r]dZnXdZddl m Z ddl m Z ddlmZddd „ƒYZejeƒd e jfd „ƒYZd e jfd „ƒYZdS(s) Tests for L{twisted.python.threadable}. i(tdivisiontabsolute_importNsPlatform lacks thread support(t_PY3(tunittest(t threadablet TestObjectcB`s&eZdgZdZdZd„ZRS(taMethodiÿÿÿÿicC`slxetdƒD]W}|j|j|_|_|j|j|_|jdks td|jfƒ‚q WdS(Ni isz == %d, not 0 as expected(trangetytxtztAssertionError(tselfti((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyRs(t__name__t __module__t synchronizedR RR(((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyRs tSynchronizationTestCasecB`sVeZd„Zd„Zd„Zd„Zedk rKee_ee_nd„Z RS(cC`strJttddƒdk r‹|jtjtjƒƒtjdƒq‹nAttddƒdk r‹|jtjtjƒƒtjdƒndS(sì Reduce the CPython check interval so that thread switches happen much more often, hopefully exercising more possible race conditions. Also, delay actual test startup until the reactor has been started. tgetswitchintervalgH¯¼šò×z>tgetcheckintervaliN( RtgetattrtsystNonet addCleanuptsetswitchintervalRtsetcheckintervalR(R ((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pytsetUp'scC`s|jdtjjƒdS(sk The name of a synchronized method is inaffected by the synchronization decorator. RN(t assertEqualRRR(R ((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyttest_synchronizedName7sc`smtjƒg‰tjd‡fd†ƒ}|jƒ|jƒ|jˆddƒ|jtjƒdƒdS(s˜ L{threadable.isInIOThread} returns C{True} if and only if it is called in the same thread as L{threadable.registerAsIOThread}. ttargetc`sˆjtjƒƒS(N(tappendRt isInIOThread((t foreignResult(s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pytGsis#Non-IO thread reported as IO threads#IO thread reported as not IO threadN( RtregisterAsIOThreadt threadingtThreadtstarttjoint assertFalset assertTrueR(R tt((R s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyttest_isInIOThread?s    c`sžtƒ‰g‰‡‡fd†}g}x=tdƒD]/}tjd|ƒ}|j|ƒ|jƒq4Wx|D]}|jƒqnWˆrštjˆƒ‚ndS(Nc`sRy%xtdƒD]}ˆjƒqWWn&tk rM}ˆjt|ƒƒnXdS(Niè(RRR Rtstr(R te(terrorsto(s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pytcallMethodLotsUs iR( RRR#R$RR%R&RtFailTest(R R/tthreadsR R)((R-R.s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyttestThreadedSynchronizationPs   cC`s.tƒ}xtdƒD]}|jƒqWdS(Niè(RRR(R R.R ((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyttestUnthreadedSynchronizationms N( RRRRR*R2t threadingSkipRtskipR3(((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyR&s       tSerializationTestCasecB`s2eZd„Zedk r'ee_nd„ZRS(cC`sPtjƒ}t|ƒ}tj|ƒ}tj|ƒ}|jt||ƒƒdS(N(RtXLockttypetpickletdumpstloadsR(t isinstance(R tlocktlockTypet lockPickletnewLock((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyt testPicklingus   cC`s:d}tj|ƒ}tj|dƒ}tj|ƒ}dS(Ns6ctwisted.python.threadable unpickle_lock p0 (tp1 Rp2 .i(R9R;R:(R R?R=t newPickleR@((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyttestUnpickling€sN(RRRAR4RR5RC(((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyR6ts   ((t__doc__t __future__RRRR9R#t ImportErrorR4Rttwisted.python.compatRt twisted.trialRttwisted.pythonRRt synchronizetSynchronousTestCaseRR6(((s@/usr/lib/python2.7/dist-packages/twisted/test/test_threadable.pyts   N