問題描述
我想在 Python 中使用 multiprocessing
庫.遺憾的是,multiprocessing
使用了 pickle
,它不支持帶有閉包的函數、lambdas 或 __main__
中的函數.這三個對我來說都很重要
I would like to use the multiprocessing
library in Python. Sadly multiprocessing
uses pickle
which doesn't support functions with closures, lambdas, or functions in __main__
. All three of these are important to me
In [1]: import pickle
In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
幸運的是,dill
是一種更強大的泡菜.顯然 dill
在導入時執行魔術以使泡菜工作
Fortunately there is dill
a more robust pickle. Apparently dill
performs magic on import to make pickle work
In [3]: import dill
In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill
_load_type
p0
(S'FunctionType'
p1 ...
這非常令人鼓舞,特別是因為我無法訪問多處理源代碼.可悲的是,我仍然無法讓這個非?;镜氖纠ぷ?em class="showen">
This is very encouraging, particularly because I don't have access to the multiprocessing source code. Sadly, I still can't get this very basic example to work
import multiprocessing as mp
import dill
p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))
這是為什么?我錯過了什么?multiprocessing
+dill
組合到底有什么限制?
Why is this? What am I missing? Exactly what are the limitations on the multiprocessing
+dill
combination?
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Temporary Edit for J.F Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
^C
...lots of junk...
[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
^C
...lots of junk...
[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()
推薦答案
multiprocessing
對酸洗做了一些錯誤的選擇.不要誤會我的意思,它做出了一些不錯的選擇,使其能夠腌制某些類型,以便它們可以在池的地圖功能中使用.然而,由于我們有 dill
可以進行酸洗,多處理自己的酸洗變得有點限制.實際上,如果 multiprocessing
要使用 pickle
而不是 cPickle
... 并且還放棄一些它自己的酸洗覆蓋,那么 dill
可以接管并為 multiprocessing
提供更完整的序列化.
multiprocessing
makes some bad choices about pickling. Don't get me wrong, it makes some good choices that enable it to pickle certain types so they can be used in a pool's map function. However, since we have dill
that can do the pickling, multiprocessing's own pickling becomes a bit limiting. Actually, if multiprocessing
were to use pickle
instead of cPickle
... and also drop some of it's own pickling overrides, then dill
could take over and give a much more full serialization for multiprocessing
.
在此之前,會有一個名為 pathos 的 multiprocessing
分支(the不幸的是,發布版本有點陳舊)消除了上述限制.Pathos 還添加了多處理沒有的一些不錯的功能,例如 map 函數中的多參數.Pathos 即將發布,經過一些溫和的更新——主要是轉換為 python 3.x.
Until that happens, there's a fork of multiprocessing
called pathos (the release version is a bit stale, unfortunately) that removes the above limitations. Pathos also adds some nice features that multiprocessing doesn't have, like multi-args in the map function. Pathos is due for a release, after some mild updating -- mostly conversion to python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
只是為了展示一下 pathos.multiprocessing
可以做什么...
and just to show off a little of what pathos.multiprocessing
can do...
>>> def busy_add(x,y, delay=0.01):
... for n in range(x):
... x += n
... for n in range(y):
... y -= n
... import time
... time.sleep(delay)
... return x + y
...
>>> def busy_squared(x):
... import time, random
... time.sleep(2*random.random())
... return x*x
...
>>> def squared(x):
... return x*x
...
>>> def quad_factory(a=1, b=1, c=0):
... def quad(x):
... return a*x**2 + b*x + c
... return quad
...
>>> square_plus_one = quad_factory(2,0,1)
>>>
>>> def test1(pool):
... print pool
... print "x: %s
" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(squared, x)
... print "time to results:", time.time() - start
... print "y: %s
" % str(res)
... print pool.imap.__name__
... start = time.time()
... res = pool.imap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = list(res)
... print "time to results:", time.time() - start
... print "y: %s
" % str(res)
... print pool.amap.__name__
... start = time.time()
... res = pool.amap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = res.get()
... print "time to results:", time.time() - start
... print "y: %s
" % str(res)
...
>>> def test2(pool, items=4, delay=0):
... _x = range(-items/2,items/2,2)
... _y = range(len(_x))
... _d = [delay]*len(_x)
... print map
... res1 = map(busy_squared, _x)
... res2 = map(busy_add, _x, _y, _d)
... print pool.map
... _res1 = pool.map(busy_squared, _x)
... _res2 = pool.map(busy_add, _x, _y, _d)
... assert _res1 == res1
... assert _res2 == res2
... print pool.imap
... _res1 = pool.imap(busy_squared, _x)
... _res2 = pool.imap(busy_add, _x, _y, _d)
... assert list(_res1) == res1
... assert list(_res2) == res2
... print pool.amap
... _res1 = pool.amap(busy_squared, _x)
... _res2 = pool.amap(busy_add, _x, _y, _d)
... assert _res1.get() == res1
... assert _res2.get() == res2
... print ""
...
>>> def test3(pool): # test against a function that should fail in pickle
... print pool
... print "x: %s
" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(square_plus_one, x)
... print "time to results:", time.time() - start
... print "y: %s
" % str(res)
...
>>> def test4(pool, maxtries, delay):
... print pool
... m = pool.amap(busy_add, x, x)
... tries = 0
... while not m.ready():
... time.sleep(delay)
... tries += 1
... print "TRY: %s" % tries
... if tries >= maxtries:
... print "TIMEOUT"
... break
... print m.get()
...
>>> import time
>>> x = range(18)
>>> delay = 0.01
>>> items = 20
>>> maxtries = 20
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> pool = Pool(nodes=4)
>>> test1(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
map
time to results: 0.0553691387177
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
imap
time to queue: 7.91549682617e-05
time to results: 0.102381229401
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
amap
time to queue: 7.08103179932e-05
time to results: 0.0489699840546
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
>>> test2(pool, items, delay)
<built-in function map>
<bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>
>>> test3(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
map
time to results: 0.0523059368134
y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]
>>> test4(pool, maxtries, delay)
<pool ProcessingPool(ncpus=4)>
TRY: 1
TRY: 2
TRY: 3
TRY: 4
TRY: 5
TRY: 6
TRY: 7
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]
這篇關于多處理和蒔蘿可以一起做什么?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!