[C`@sddgZddlZddlZddlZddlZddlZddlZddlZddlm Z ddlm Z m Z dZ dZ dZejZdd Zd d ZGd d d eZGdddZddZGdddeZdfddddZGdddeZGdddeZeZGdddeZGdddeZGdddeZGd ddeZ dS)!Pool ThreadPoolN)util) get_context TimeoutErrorcCstt|S)N)listmap)argsr */usr/lib/python3.4/multiprocessing/pool.pymapstar+srcCsttj|d|dS)Nrr)r itertoolsstarmap)r r r r starmapstar.src@s(eZdZddZddZdS)RemoteTracebackcCs ||_dS)N)tb)selfrr r r __init__6szRemoteTraceback.__init__cCs|jS)N)r)rr r r __str__8szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrr r r r r5s  rc@s(eZdZddZddZdS)ExceptionWithTracebackcCsDtjt|||}dj|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr rr r r r<s zExceptionWithTraceback.__init__cCst|j|jffS)N) rebuild_excr r)rr r r __reduce__Asz!ExceptionWithTraceback.__reduce__N)rrrrr"r r r r r;s  rcCst||_|S)N)r __cause__)r rr r r r!Dsr!cs@eZdZdZfddZddZddZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.csAt||_t||_tt|j|j|jdS)N)reprr valuesuperr$r)rr r&) __class__r r rPszMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r&r )rr r r rUs zMaybeEncodingError.__str__cCsdt|S)Nz)str)rr r r __repr__YszMaybeEncodingError.__repr__)rrr__doc__rrr*r r )r(r r$Ls  r$Fc'Cs|dks0t|tkr*|dks0t|j}|j}t|drn|jj|jjn|dk r||nd}xx|dks|r||kry |} Wn&t t fk rt j dPYnX| dkrt j dPn| \} } } } }yd| | |f}WnLt k r}}z,|r_t||j}nd|f}WYdd}~XnXy|| | |fWnbt k r}zBt||d}t j d||| | d|ffWYdd}~XnX|d7}qWt j d |dS) Nr_writerz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFrz0Possible encoding error while sending result: %szworker exiting after %d tasks)rintAssertionErrorputgethasattrr,close_readerEOFErrorOSErrorrdebug Exceptionr __traceback__r$)inqueueoutqueue initializerinitargsZmaxtasksZwrap_exceptionr/r0Z completedtaskjobifuncr kwdsresultewrappedr r r worker]sD0     !     ,rEc@seZdZdZdZddZddfddddZdd Zd d Zd d Z ddZ fiddZ dddZ dddZ dddddZdddZdddZfiddddZddddd Zdddd!d"Zed#d$Zed%d&Zed'd(Zed)d*Zd+d,Zd-d.Zd/d0Zd1d2Zed3d4Zed5d6Zd7d8Z d9d:Z!dS);rzS Class which supports an async version of applying functions to arguments. TcOs|jj||S)N)_ctxProcess)rr rAr r r rGsz Pool.ProcessNc Cs,|p t|_|jtj|_i|_t|_||_ ||_ ||_ |dkryt j psd}n|dkrtdn|dk rt| rtdn||_g|_|jtjdtjd|f|_d|j_t|j_|jjtjdtjd|j|j|j|j|jf|_d|j_t|j_|jjtjdtjd|j|j |jf|_!d|j!_t|j!_|j!jt"j#||j$d|j|j%|j|j|j|j|j!|jfdd|_&dS) Nrz&Number of processes must be at least 1zinitializer must be a callabletargetr TZ exitpriority)'rrF _setup_queuesqueueQueue _taskqueue_cacheRUN_state_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_pool_repopulate_pool threadingZThreadr_handle_workers_worker_handlerdaemonstart _handle_tasks _quick_put _outqueue _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool_inqueue _terminate)r processesr;r<Zmaxtasksperchildcontextr r r rsT                            z Pool.__init__cCszd}xmttt|jD]P}|j|}|jdk r"tjd||jd}|j|=q"q"W|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNzcleaning up worker %dT)reversedrangelenrZexitcoderr6r)rZcleanedr?rEr r r _join_exited_workerss"  zPool._join_exited_workersc Csxt|jt|jD]}|jdtd|j|j|j|j |j |j f}|jj ||j jdd|_ d|_|jtjdqWdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. rHr rGZ PoolWorkerTz added workerN)rnrYrorZrGrErircrRrSrQ_wrap_exceptionappendnamereplacer_r`rr6)rr?wr r r r[s#    zPool._repopulate_poolcCs|jr|jndS)zEClean up any exited workers and start replacements for them. N)rqr[)rr r r _maintain_pools zPool._maintain_poolcCsL|jj|_|jj|_|jjj|_|jjj|_ dS)N) rFZ SimpleQueuerircr,sendrbr3recvrf)rr r r rJszPool._setup_queuescCs.|jtkst|j|||jS)z6 Equivalent of `func(*args, **kwds)`. )rPrOr. apply_asyncr0)rr@r rAr r r applysz Pool.applycCs|j||t|jS)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr0)rr@iterable chunksizer r r r szPool.mapcCs|j||t|jS)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )r|rr0)rr@r}r~r r r rsz Pool.starmapcCs|j||t|||S)z= Asynchronous version of `starmap()` method. )r|r)rr@r}r~callbackerror_callbackr r r starmap_asyncszPool.starmap_asyncrcs|jtkrtdn|dkrrt|j|jjfddt|DjfS|dkst t j ||}t|j|jjfddt|DjfddDSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. zPool not runningrc3s0|]&\}}j||fifVqdS)N)_job).0r?x)r@rBr r szPool.imap..c3s0|]&\}}j|t|fifVqdS)N)rr)rr?r)rBr r r%scss"|]}|D] }|Vq qdS)Nr )rchunkitemr r r r'sN) rPrOrV IMapIteratorrNrMr/ enumerate _set_lengthr.r _get_tasks)rr@r}r~ task_batchesr )r@rBr imaps z Pool.imapcs|jtkrtdn|dkrrt|j|jjfddt|DjfS|dkst t j ||}t|j|jjfddt|DjfddDSdS)zL Like `imap()` method but ordering of results is arbitrary. zPool not runningrc3s0|]&\}}j||fifVqdS)N)r)rr?r)r@rBr r r1sz&Pool.imap_unordered..c3s0|]&\}}j|t|fifVqdS)N)rr)rr?r)rBr r r8scss"|]}|D] }|Vq qdS)Nr )rrrr r r r:sN) rPrOrVIMapUnorderedIteratorrNrMr/rrr.rr)rr@r}r~rr )r@rBr imap_unordered)s zPool.imap_unorderedcCsb|jtkrtdnt|j||}|jj|jd|||fgdf|S)z; Asynchronous version of `apply()` method. zPool not runningN)rPrOrV ApplyResultrNrMr/r)rr@r rArrrBr r r rz<s +zPool.apply_asynccCs|j||t|||S)z9 Asynchronous version of `map()` method. )r|r)rr@r}r~rrr r r map_asyncGszPool.map_asyncc s|jtkrtdnt|ds<t|}n|dkrtt|t|jd\}}|r|d7}qnt|dkrd}ntj |||}t |j |t||d||j j fdd t|DdfS) zY Helper function to implement map, starmap and their async counterparts. zPool not running__len__Nrrrc3s0|]&\}}j||fifVqdS)N)r)rr?r)mapperrBr r rcsz"Pool._map_async..)rPrOrVr1r divmodrorZrr MapResultrNrMr/r) rr@r}rr~rrZextrarr )rrBr r|Os  (  zPool._map_asynccCsrtj}xB|jtks6|jrP|jtkrP|jtjdqW|j j dt j ddS)Ng?zworker handler exiting) r\current_threadrPrOrN TERMINATErwtimesleeprMr/rr6)poolthreadr r r r]gs  * zPool._handle_workerscCstj}xt|jdD]\}}d }xt|D]\}} |jrdtjdPny|| Wq>tk r} zN| dd\} } y|| j | d| fWnt k rYnXWYdd} ~ Xq>Xq>W|rtjd||dqqPqWtjdyFtjd|j dtjdx|D]} |dqQWWnt k rtjd YnXtjd dS) Nrz'task handler found thread._state != RUNrFzdoing set_length()ztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting) r\riterr0rrPrr6r7_setKeyErrorr/r5) taskqueuer/r:rcacherZtaskseqZ set_lengthr?r=rCr>Zindpr r r rats<           zPool._handle_taskscCstj}xy |}Wn)ttfk rGtjddSYnX|jrw|jtksfttjdPn|dkrtjdPn|\}}}y||j ||Wqt k rYqXqWx|r|jtkry |}Wn)ttfk r$tjddSYnX|dkrDtjdqn|\}}}y||j ||Wqt k rYqXqWt |drtjdy5x.t dD] }|j jsPn|qWWqttfk rYqXntjdt||jdS) Nz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr3z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)r\rr5r4rr6rPrr.rrr1rnr3pollro)r:r0rrr=r>r?objr r r resX                   zPool._handle_resultsccsDt|}x1ttj||}|s1dS||fVqWdS)N)rtuplerislice)r@itsizerr r r rs  zPool._get_taskscCstddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedError)rr r r r"szPool.__reduce__cCs8tjd|jtkr4t|_t|j_ndS)Nz closing pool)rr6rPrOCLOSEr^)rr r r r2s  z Pool.closecCs0tjdt|_t|j_|jdS)Nzterminating pool)rr6rrPr^rj)rr r r terminates   zPool.terminatecCsqtjd|jttfks(t|jj|jj|j jx|j D]}|jqYWdS)Nz joining pool) rr6rPrrr.r^rrdrgrZ)rrr r r rs    z Pool.joincCsZtjd|jjx9|jrU|jjrU|jjtj dqWdS)Nz7removing tasks from inqueue until task handler finishedr) rr6Z_rlockacquireis_aliver3rryrr)r9 task_handlerrr r r _help_stuff_finishs    zPool._help_stuff_finishc Cstjdt|_t|_tjd|j||t||jsit|dksitt|_|jdtjdt j |k r|j n|rt |ddrtjdx-|D]"} | j dkr| jqqWntjdt j |k r-|j ntjdt j |k rY|j n|rt |ddrtjd x>|D]3} | jrtjd | j| j qqWndS) Nzfinalizing poolz&helping task handler/workers to finishrzjoining worker handlerrzterminating workerszjoining task handlerzjoining result handlerzjoining pool workerszcleaning up worker %d)rr6rrPrrorr.r/r\rrr1rprpid) clsrr9r:rZworker_handlerrZresult_handlerrrr r r rhs8    $             zPool._terminate_poolcCs|S)Nr )rr r r __enter__2szPool.__enter__cCs|jdS)N)r)rexc_typeZexc_valZexc_tbr r r __exit__5sz Pool.__exit__)"rrrr+rrrGrrqr[rwrJr{r rrrrrzrr| staticmethodr]rarerr"r2rrr classmethodrhrrr r r r rsD   8       *:     . c@s^eZdZddZddZddZddd Zdd d Zd d ZdS)rcCsJtj|_tt|_||_||_||_|||js    zApplyResult.__init__cCs |jjS)N)rZis_set)rr r r readyFszApplyResult.readycCs|jst|jS)N)rr._success)rr r r successfulIszApplyResult.successfulNcCs|jj|dS)N)rwait)rtimeoutr r r rMszApplyResult.waitcCs?|j||js"tn|jr2|jS|jdS)N)rrrr_value)rrr r r r0Ps     zApplyResult.getcCs{|\|_|_|jr7|jr7|j|jn|jr]|j r]|j|jn|jj|j|j=dS)N)rrrrrsetrNr)rr?rr r r rYs zApplyResult._set) rrrrrrrr0rr r r r r<s     rc@s(eZdZddZddZdS)rcCstj|||d|d|_dg||_||_|dkrjd|_|jj||j=n||t |||_dS)NrTr) rrrr _chunksize _number_leftrrrbool)rrr~lengthrrr r r rjs      zMapResult.__init__cCs|\}}|r||j||j|d|j<|jd8_|jdkr|jrq|j|jn|j|j=|jjqnHd|_||_|j r|j |jn|j|j=|jjdS)NrrF) rrrrrNrrrrr)rr?Zsuccess_resultsuccessrBr r r rws %      zMapResult._setN)rrrrrr r r r rhs  rc@sUeZdZddZddZdddZeZdd Zd d ZdS) rcCsktjtj|_tt|_||_tj |_ d|_ d|_ i|_ |||j s:             ,&%I