ó i$VQc@`s"dZddlmZmZddlZddlZddlZddlZddlZddl m Z ddl m Z ddl mZddlmZmZmZmZddlmZdd lmZd ejfd „ƒYZd ejfd „ƒYZdefd„ƒYZe r[ndS(sD Test running processes with the APIs in L{twisted.internet.utils}. i(tdivisiontabsolute_importN(t_PY3(tplatform(tunittest(terrortreactortutilst interfaces(tDeferred(tSuppressedWarningsTeststProcessUtilsTestscB`sãeZdZejedƒdkr-dZndZdZ e j Z d„Z d„Zd„Zd„Zd„Zd„Zd„Zejƒr™d e_nd „Zd „Zd „Zd „Zd„Zd„Zd„Zd„ZRS(st Test running a process using L{getProcessOutput}, L{getProcessValue}, and L{getProcessOutputAndValue}. s)reactor doesn't implement IReactorProcesscC`sU|jƒ}t|dƒ}|jtjj|ƒtjƒ|jƒtjj|ƒS(sj Write the given list of lines to a text file and return the absolute path to it. twt( tmktemptfiletwritetostlineseptjointclosetpathtabspath(tselft sourceLinestscriptt scriptFile((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pytmakeSourceFile!s    cC`sI|jddddgƒ}tj|jd|gƒ}|j|jdƒS(s™ L{getProcessOutput} returns a L{Deferred} which fires with the complete output of the process it runs after that process exits. s import syssfor s in 'hello world\n':s sys.stdout.write(s)s sys.stdout.flush()s-us hello world (RRtgetProcessOutputtexet addCallbackt assertEqual(RRtd((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt test_output-s c`sbˆjddgƒ}tjˆjd|gƒ}ˆj|tƒ}‡fd†}|j|ƒ|S(s— The L{Deferred} returned by L{getProcessOutput} is fired with an L{IOError} L{Failure} if the child process writes to stderr. s import syss!sys.stderr.write("hello world\n")s-uc`sˆj|jtjƒS(N(t assertFailuret processEndedRt ProcessDone(terr(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pytcbFailedHs(RRRRR!tIOErrorR(RRRR%((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_outputWithErrorIgnored;s  cC`sR|jdddddgƒ}tj|jd|gdtƒ}|j|jdƒS( sÒ If a C{True} value is supplied for the C{errortoo} parameter to L{getProcessOutput}, the returned L{Deferred} fires with the child's stderr output as well as its stdout output. s import sysssys.stdout.write("foo")ssys.stdout.flush()ssys.stderr.write("foo")ssys.stderr.flush()s-uterrortootfoofoo(RRRRtTrueRR(RRR((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_outputWithErrorCollectedNs !cC`s@|jdgƒ}tj|jd|gƒ}|j|jdƒS(s| The L{Deferred} returned by L{getProcessValue} is fired with the exit status of the child process. sraise SystemExit(1)s-ui(RRtgetProcessValueRRR(RRR((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt test_valueasc`s[tj}ˆjddddgƒ}‡fd†}tjˆjd|gƒ}|j|ƒS(s The L{Deferred} returned by L{getProcessOutputAndValue} fires with a three-tuple, the elements of which give the data written to the child's stdout, the data written to the child's stderr, and the exit status of the child. s import syss"sys.stdout.write('hello world!\n')s$sys.stderr.write('goodbye world!\n')s sys.exit(1)c`sJ|\}}}ˆj|dƒˆj|dtjƒˆj|dƒdS(Ns hello world! sgoodbye world!i(RRR(t out_err_codetoutR$tcode(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pytgotOutputAndValue{ss-u(tsyst executableRRtgetProcessOutputAndValueRR(RRRR1R((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_outputAndValuels  c`sjˆjddddddgƒ}‡fd†}tjˆjd|gƒ}ˆj|tƒ}|j|ƒS( s If the child process exits because of a signal, the L{Deferred} returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple containing the the child's stdout, stderr, and the signal which caused it to exit. simport sys, os, signals"sys.stdout.write('stdout bytes\n')s"sys.stderr.write('stderr bytes\n')ssys.stdout.flush()ssys.stderr.flush()s$os.kill(os.getpid(), signal.SIGKILL)c`sF|\}}}ˆj|dƒˆj|dƒˆj|tjƒdS(Ns stdout bytes s stderr bytes (RtsignaltSIGKILL(t out_err_sigR/R$tsig(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR1–ss-u(RRR4RR!ttupleR(RRR1R((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_outputSignal„s  s"Windows doesn't have real signals.cC`sltjj|jƒƒ}tj|ƒ|jddgƒ}||jd|gd|ƒ}|j||ƒ|S(Nsimport os, sysssys.stdout.write(os.getcwd())s-uR(RRRR tmakedirsRRR(RtutilFunctchecktdirRR((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt _pathTest¤s  cC`s|jtj|jƒS(s L{getProcessOutput} runs the given command with the working directory given by the C{path} parameter. (R@RRR(R((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_getProcessOutputPath¯sc`s"‡fd†}ˆjtj|ƒS(s~ L{getProcessValue} runs the given command with the working directory given by the C{path} parameter. c`sˆj|dƒdS(Ni(R(tresulttignored(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR>¼s(R@RR,(RR>((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_getProcessValuePath·sc`s"‡fd†}ˆjtj|ƒS(s‡ L{getProcessOutputAndValue} runs the given command with the working directory given by the C{path} parameter. c`s3|\}}}ˆj||ƒˆj|dƒdS(Ni(R(tout_err_statusR?R/R$tstatus(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR>Æs(R@RR4(RR>((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt!test_getProcessOutputAndValuePathÁscC`sÑtjj|jƒƒ}tj|ƒ|jdd|fdgƒ}|jtjtjƒƒtj|ƒ|jtj |t j tj dƒj ƒƒtj |dƒ||j d|gƒ}|j||ƒ|S(Nsimport os, sys, statsos.chmod(%r, stat.S_IXUSR)ssys.stdout.write(os.getcwd())t.is-u(RRRR R<Rt addCleanuptchdirtgetcwdtchmodtstattS_IMODEtst_modeRR(RR=R>R?RR((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt_defaultPathTestÍs    %cC`s|jtj|jƒS(s If no value is supplied for the C{path} parameter, L{getProcessOutput} runs the given command in the same working directory as the parent process and succeeds even if the current working directory is not accessible. (RPRRR(R((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt test_getProcessOutputDefaultPathêsc`s"‡fd†}ˆjtj|ƒS(s If no value is supplied for the C{path} parameter, L{getProcessValue} runs the given command in the same working directory as the parent process and succeeds even if the current working directory is not accessible. c`sˆj|dƒdS(Ni(R(RBRC(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR>ûs(RPRR,(RR>((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_getProcessValueDefaultPathôsc`s"‡fd†}ˆjtj|ƒS(s  If no value is supplied for the C{path} parameter, L{getProcessOutputAndValue} runs the given command in the same working directory as the parent process and succeeds even if the current working directory is not accessible. c`s3|\}}}ˆj||ƒˆj|dƒdS(Ni(R(RER?R/R$RF(R(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR>s(RPRR4(RR>((Rs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt(test_getProcessOutputAndValueDefaultPathsN(t__name__t __module__t__doc__RtIReactorProcessRtNonetskiptoutputtvalueR2R3RRR R'R+R-R5R;Rt isWindowsR@RARDRGRPRQRRRS(((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR s.            tSuppressWarningsTestscB`seZdZd„ZRS(s. Tests for L{utils.suppressWarnings}. c`s¶g‰‡fd†}|jtd|ƒd„}tj|d tddƒfƒ}|dƒ|jtˆƒdƒ|dƒ|jtˆƒdƒ|d ƒ|jtˆƒd ƒd S( ss L{utils.suppressWarnings} decorates a function so that the given warnings are suppressed. c`sˆj||fƒdS(N(tappend(Rtatkw(RB(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyt showwarningsRacS`stj|ƒdS(N(twarningstwarn(tmsg((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pytfstignoretmessagesThis is messagesSanity check messageisUnignored messageiN(signore(tpatchRbRtsuppressWarningstdictRtlen(RRaRetg((RBs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_suppressWarningss !   (RTRURVRm(((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyR]stDeferredSuppressedWarningsTestscB`s/eZdZeejƒZd„Zd„ZRS(s` Tests for L{utils.runWithWarningsSuppressed}, the version that supports Deferreds. c`s•d ifd ifg}tƒ‰|j|‡fd†ƒtjdƒˆjdƒtjdƒ|jdgg|jƒD]}|d^qzƒd S( s² If the function called by L{utils.runWithWarningsSuppressed} returns a C{Deferred}, the warning filters aren't removed until the Deferred fires. Rfs.*foo.*s.*bar.*c`sˆS(N(((RB(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pytEss ignore foois ignore foo 2RgN(signores.*foo.*(signores.*bar.*(R trunWithWarningsSuppressedRbRctcallbackRt flushWarnings(Rtfilterstw((RBs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_deferredCallback<s     c`sªd ifd ifg}tƒ‰|j|‡fd†ƒ}tjdƒˆjtƒƒ|jd„ƒtjdƒ|jdgg|jƒD]}|d^qƒd S( s If the function called by L{utils.runWithWarningsSuppressed} returns a C{Deferred}, the warning filters aren't removed until the Deferred fires with an errback. Rfs.*foo.*s.*bar.*c`sˆS(N(((RB(s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyRoVss ignore foocS`s |jtƒS(N(ttraptZeroDivisionError(Re((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyRoYss ignore foo 2RgN(signores.*foo.*(signores.*bar.*( R RpRbRcterrbackRwt addErrbackRRr(RRsRRt((RBs</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyttest_deferredErrbackMs    (RTRURVt staticmethodRRpRuRz(((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyRn3s (RVt __future__RRRbRRMR2R6ttwisted.python.compatRttwisted.python.runtimeRt twisted.trialRttwisted.internetRRRRttwisted.internet.deferR ttwisted.python.test.test_utilR tTestCaseR tSynchronousTestCaseR]Rn(((s</usr/lib/python2.7/dist-packages/twisted/test/test_iutils.pyts<"ü#+