ó ®$¤Oc@sdZddlZddlZddlZddlmZddlmZmZddl m Z ddl m Z m Z mZmZmZddlmZdZdZe jƒrÝyddlZWqÝek rÙd ZqÝXnd ejfd „ƒYZd ejfd „ƒYZdS(s& Tests for L{twisted.internet.stdio}. iÿÿÿÿN(tunittest(tfilepathtlog(tplatform(terrortdefertprotocoltstdiotreactor(tConnectionLostNotifyingProtocolsxyz123abc Twisted is great!sIOn windows, spawnProcess is not available in the absence of win32process.tStandardIOTestProcessProtocolcBs8eZdZdZd„Zd„Zd„Zd„ZRS(sÕ Test helper for collecting output from a child process and notifying something when it exits. @ivar onConnection: A L{defer.Deferred} which will be called back with C{None} when the connection to the child process is established. @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the failure associated with the child process exiting when it exits. @ivar onDataReceived: A L{defer.Deferred} which will be called back with this instance whenever C{childDataReceived} is called, or C{None} to suppress these callbacks. @ivar data: A C{dict} mapping file descriptors to strings containing all bytes received from the child process on each file descriptor. cCs+tjƒ|_tjƒ|_i|_dS(N(RtDeferredt onConnectiont onCompletiontdata(tself((s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyt__init__5scCs|jjdƒdS(N(R tcallbacktNone(R((s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pytconnectionMade;scCsV|jj|dƒ||j|<|jdk rR|jd}|_|j|ƒndS(s“ Record all bytes received from the child process in the C{data} dictionary. Fire C{onDataReceived} if it is not C{None}. tN(RtgettonDataReceivedRR(Rtnametbytestd((s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pytchildDataReceived?s cCs|jj|ƒdS(N(R R(Rtreason((s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyt processEndedJsN( t__name__t __module__t__doc__RRRRRR(((s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyR !s    tStandardInputOutputTestCasecBs’eZeZd„Zd„Zd„Zd„Zd„Zd„Z d„Z d„Z d„Z d „Z d „Zd „Zejƒrd e_nRS( cOsÀddl}ttjƒ}tjjtjjtjjtjj|j ƒƒƒ|j ddƒgƒ|dˆjˆjdjtƒdˆjfƒ|jtjƒdS(s‰ Asserts that the parent received the bytes written by the child immediately after the child starts. is4Received %r from child, did not find expected bytes.N(t assertTrueRtendswithtUNIQUE_LAST_WRITE_STRINGRDRRE(R(RHR(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyRÈs ( R R8RTtTruet ValueErrorRtSkipTesttstrR?R (RteR((RHRs;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyttest_lastWriteReceived®s    csDtƒ‰ˆj}ˆjˆdƒ‡‡fd†}ˆj||ƒS(sƒ Verify that the transport of a protocol connected to L{StandardIO} has C{getHost} and C{getPeer} methods. sstdio_test_hostpeer.pycsGˆjdjƒ\}}ˆj|ƒˆj|ƒ|jtjƒdS(Ni(Rt splitlinest failUnlessRDRRE(Rthosttpeer(RHR(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyRÞs  (R R R8R?(RRR((RHRs;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyttest_hostAndPeerÕs   csDtƒ‰ˆj}ˆjˆdƒ‡‡fd†}ˆj||ƒS(s Verify that the C{write} method of the transport of a protocol connected to L{StandardIO} sends bytes to standard out. sstdio_test_write.pycs+ˆjˆjddƒ|jtjƒdS(Nisok!(t assertEqualRRDRRE(R(RHR(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyRðs(R R R8R?(RRR((RHRs;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyt test_writeæs   csDtƒ‰ˆj}ˆjˆdƒ‡‡fd†}ˆj||ƒS(s˜ Verify that the C{writeSequence} method of the transport of a protocol connected to L{StandardIO} sends bytes to standard out. sstdio_test_writeseq.pycs+ˆjˆjddƒ|jtjƒdS(Nisok!(R`RRDRRE(R(RHR(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyRs(R R R8R?(RRR((RHRs;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyttest_writeSequenceös   cCsW|jƒ}t|dƒ}x+tdƒD]}|jt|ƒdƒq(W|jƒ|S(Ntwis (RIR@txrangetwriteRXtclose(RtjunkPathtjunkFileti((s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyt _junkPaths   cs†tƒ‰ˆj}g‰tdƒ‰‡‡‡‡fd†‰ˆjˆdƒ‰ˆjjˆƒ‡‡‡‡fd†}ˆj||ƒS(s€ Verify that the transport of a protocol connected to L{StandardIO} is a working L{IProducer} provider. idcsNˆrJˆjtˆjƒƒdƒˆjˆdƒtjdˆdƒndS(Ns iÿÿÿÿg{®Gáz„?(tappendRXtpopReRt callLaterR(tign(RtprocttoWritetwritten(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyRssstdio_test_producer.pycsQˆjˆjddjˆƒƒˆjˆdtˆƒfƒ|jtjƒdS(NiRs*Connection lost with %d writes left to go.(R`RR(tfailIftlenRDRRE(R(RHRRpRq(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyR$s (R R trangeR8R ROR?(RRR((RRHRoRRpRqs;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyt test_producers   csVtƒ‰ˆj}ˆjƒ‰ˆjˆdˆƒ‡‡‡fd†}ˆj||ƒS(s€ Verify that the transport of a protocol connected to L{StandardIO} is a working L{IConsumer} provider. sstdio_test_consumer.pycs7ˆjˆjdtˆƒjƒƒ|jtjƒdS(Ni(R`RR@treadRDRRE(R(RgRHR(s;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyR7s#(R R RjR8R?(RRR((RgRHRs;/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.pyt test_consumer+s    cs.tjƒ}t|ƒ}tjˆjƒƒ‰ˆjdƒˆ_}ˆj|j ƒt d|j ƒƒ}t j ƒs¹tjƒ\}}ˆjtj |ƒˆjtj |ƒ||ds$(   .