# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ This module containts the trial distributed runner, the management class responsible for coordinating all of trial's behavior at the highest level. @since: 12.3 """ import os import sys from twisted.python.filepath import FilePath from twisted.python.modules import theSystemPath from twisted.internet.defer import DeferredList from twisted.internet.task import cooperate from twisted.trial.util import _unusedTestDirectory from twisted.trial.unittest import _iterateTests from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP from twisted.trial._dist.distreporter import DistReporter from twisted.trial.reporter import UncleanWarningsReporterWrapper from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT class DistTrialRunner(object): """ A specialized runner for distributed trial. The runner launches a number of local worker processes which will run tests. @ivar _workerNumber: the number of workers to be spawned. @type _workerNumber: C{int} @ivar _stream: stream which the reporter will use. @ivar _reporterFactory: the reporter class to be used. """ _distReporterFactory = DistReporter def _makeResult(self): """ Make reporter factory, and wrap it with a L{DistReporter}. """ reporter = self._reporterFactory(self._stream, self._tbformat, realtime=self._rterrors) if self._uncleanWarnings: reporter = UncleanWarningsReporterWrapper(reporter) return self._distReporterFactory(reporter) def __init__(self, reporterFactory, workerNumber, workerArguments, stream=None, tracebackFormat='default', realTimeErrors=False, uncleanWarnings=False, logfile='test.log', workingDirectory='_trial_temp'): self._workerNumber = workerNumber self._workerArguments = workerArguments self._reporterFactory = reporterFactory if stream is None: stream = sys.stdout self._stream = stream self._tbformat = tracebackFormat self._rterrors = realTimeErrors self._uncleanWarnings = uncleanWarnings self._result = None self._workingDirectory = workingDirectory self._logFile = logfile self._logFileObserver = None self._logFileObject = None self._logWarnings = False def writeResults(self, result): """ Write test run final outcome to result. @param result: A C{TestResult} which will print errors and the summary. """ result.done() def createLocalWorkers(self, protocols, workingDirectory): """ Create local worker protocol instances and return them. @param protocols: An iterable of L{LocalWorkerAMP} instances. @param workingDirectory: The base path in which we should run the workers. @type workingDirectory: C{str} @return: A list of C{quantity} C{LocalWorker} instances. """ return [LocalWorker(protocol, os.path.join(workingDirectory, str(x)), self._logFile) for x, protocol in enumerate(protocols)] def launchWorkerProcesses(self, spawner, protocols, arguments): """ Spawn processes from a list of process protocols. @param spawner: A C{IReactorProcess.spawnProcess} implementation. @param protocols: An iterable of C{ProcessProtocol} instances. @param arguments: Extra arguments passed to the processes. """ workertrialPath = theSystemPath[ 'twisted.trial._dist.workertrial'].filePath.path childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w', _WORKER_AMP_STDOUT: 'r'} environ = os.environ.copy() # Add a environment variable containing the raw sys.path, to be used by # subprocesses to make sure it's identical to the parent. See # workertrial._setupPath. environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path) for worker in protocols: args = [sys.executable, workertrialPath] args.extend(arguments) spawner(worker, sys.executable, args=args, childFDs=childFDs, env=environ) def _driveWorker(self, worker, result, testCases, cooperate): """ Drive a L{LocalWorkerAMP} instance, iterating the tests and calling C{run} for every one of them. @param worker: The L{LocalWorkerAMP} to drive. @param result: The global L{DistReporter} instance. @param testCases: The global list of tests to iterate. @param cooperate: The cooperate function to use, to be customized in tests. @type cooperate: C{function} @return: A C{Deferred} firing when all the tests are finished. """ def resultErrback(error, case): result.original.addFailure(case, error) return error def task(case): d = worker.run(case, result) d.addErrback(resultErrback, case) return d return cooperate(task(case) for case in testCases).whenDone() def run(self, suite, reactor=None, cooperate=cooperate, untilFailure=False): """ Spawn local worker processes and load tests. After that, run them. @param suite: A tests suite to be run. @param reactor: The reactor to use, to be customized in tests. @type reactor: A provider of L{twisted.internet.interfaces.IReactorProcess} @param cooperate: The cooperate function to use, to be customized in tests. @type cooperate: C{function} @param untilFailure: If C{True}, continue to run the tests until they fail. @type untilFailure: C{bool}. @return: The test result. @rtype: L{DistReporter} """ if reactor is None: from twisted.internet import reactor result = self._makeResult() count = suite.countTestCases() self._stream.write("Running %d tests.\n" % (count,)) if not count: # Take a shortcut if there is no test suite.run(result.original) self.writeResults(result) return result testDir, testDirLock = _unusedTestDirectory( FilePath(self._workingDirectory)) workerNumber = min(count, self._workerNumber) ampWorkers = [LocalWorkerAMP() for x in xrange(workerNumber)] workers = self.createLocalWorkers(ampWorkers, testDir.path) processEndDeferreds = [worker.endDeferred for worker in workers] self.launchWorkerProcesses(reactor.spawnProcess, workers, self._workerArguments) def runTests(): testCases = iter(list(_iterateTests(suite))) workerDeferreds = [] for worker in ampWorkers: workerDeferreds.append( self._driveWorker(worker, result, testCases, cooperate=cooperate)) return DeferredList(workerDeferreds, consumeErrors=True, fireOnOneErrback=True) stopping = [] def nextRun(ign): self.writeResults(result) if not untilFailure: return if not result.wasSuccessful(): return d = runTests() return d.addCallback(nextRun) def stop(ign): testDirLock.unlock() if not stopping: stopping.append(None) reactor.stop() def beforeShutDown(): if not stopping: stopping.append(None) d = DeferredList(processEndDeferreds, consumeErrors=True) return d.addCallback(continueShutdown) def continueShutdown(ign): self.writeResults(result) return ign d = runTests() d.addCallback(nextRun) d.addBoth(stop) reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown) reactor.run() return result def runUntilFailure(self, suite): """ Run the tests with local worker processes until they fail. @param suite: A tests suite to be run. """ return self.run(suite, untilFailure=True)