注意点
:进程的Queue,父进程与子进程进行交互,或者同属于一个父进程下的多个子进程进行交互。而2个独立的python程序的进程是无法交互的。比如,你开2个dom窗口,里面的py程序是无法交互的。而同一进程下的不同线程是可以交互的。
from multiprocessing import Process,Pipe,Lock,Queue,Manager,Pool
#每一个进程都有一个父进程,进程间是不共享内存空间的
#os.getppid() 获取父进程id os.getpid() 获取当前进程id
def f():
print("ppid: ",os.getppid())
print("pid: ",os.getpid())
time.sleep(1)
print(lst) #NameError,说明进程间是不共享空间的
if __name__ == "__main__":
lst = [1,2,3]
print(os.getpid())
p1 = Process(target=f,) #这个新起的进程就是由父进程9444起的
p1.start()
out-------------------------------:
ppid: 9444
pid: 8468
NameError: name 'lst' is not defined
进程锁
#进程锁:进程虽然是独立的,不像线程一样要加上线程锁来保护共享资源。但是,进程如果
#修改同一个函数的话,也是需要加上锁的,否则可能出现打某个进程在界面打印的时候,打
#印到一半,另一个进程就来打印了。
def func(lock,num):
lock.acquire()
print(str(num) * 10)
lock.release()
if __name__ == "__main__":
lock = Lock()
for num in range(10):
p = Process(target = func,args = (lock,num))
p.start()
p.join() #跟线程的join一样
进程间通信
#进程是不能相互访问的,要想实现进程间通信,可以通过以下几种方式
#1.Queue
def fun(q):
time.sleep(2)
q.put("main")
# print(q.get())
if __name__ == "__main__":
q = Queue()
p1 = Process(target=fun,args=(q,))
p1.start()
p1.join(0.2)
print(1)
try :
print(q.get(block=False))
except Exception as e:
print(e)
print(q.get()) #创建的p1进程在执行fun函数时,sleep 2s,再put元素到Queue中,
#而主进程在往下执行时遇到p1.join(0.2)只停留0.2s,就会接着往下执行,先pirnt(1),
#然后再执行q.get(block=False),发现此时没有Queue中没有元素【虽然这个不是异常,
#所以print(e)没有任何内容,不过也可以try】,最后执行q.get(),此时当然Queue中
#当然也没有元素,不过会阻塞在这里,fun函数sleep时间到,存入q.put("main")。所
#以最后打印结果如下。
out--------------------------------
[Finished in 3.0s]
#2.Pipe(duplex) :(conn1,conn2) = Pipe() conn1和conn2代表管道的2端,默认可以
#双向通信,如果duplex设置为Flase,那么只能coon1用来接收数据,coon2用来发送数据
def func(conn):
conn.send("message")
if __name__ == "__main__":
(conn1,conn2) = Pipe()
p = Process(target=func,args=(conn1,))
p.start()
print(conn2.recv())
p1 = Process(target=func,args=(conn2,))
p1.start()
print(conn1.recv())
message
message
#3. Manager
def fun(dic,lst):
dic['1'] = 1
dic['a'] = 'a'
dic['c'] = None
lst.append("me")
print("dic: ",dic)
print("lst: ",lst)
if __name__ == "__main__":
with Manager() as manager:
dic = manager.dict()
lst = manager.list()
p1 = Process(target=fun,args=(dic,lst))
p2 = Process(target=fun,args=(dic,lst))
p1.start()
p1.join()
p2.start()
p2.join()
out : 发现2个进程可以共享字典了
dic: {'1': 1, 'a': 'a', 'c': None}
lst: ['me']
dic: {'1': 1, 'a': 'a', 'c': None}
lst: ['me', 'me']
进程同步与异步
#要在linux在执行才可以输出以下的结果,进程这玩意在windows执行好多都不知道怎么解释
from multiprocessing import Pool
import os, time,random
''' 不知为啥不能给子进程添加装饰器,应该是进程的创建都是从父进程中复制,也就是pickle
不能将装饰器进行序列化或反序列化
def timer(func):
def deco(*args,**kwargs):
t1 = time.time()
func(*args,**kwargs)
t2 = time.time()
print("comsume %s s" % (t2 - t1))
return deco
@timer
def Foo(i):
s = time.time()
print("in task [%s]"%i,"process id is ",os.getpid())
time.sleep(random.random()*3)
e = time.time()
print("task [%s] runs %.2f s" % (i,e-s))
return i
def Bar(arg):
print('I am in Bar:',arg,"ppid",os.getpid(),os.getppid())
if __name__ == "__main__":
pool = Pool()
print("main process:",os.getpid(),os.getppid())
t1 = time.time()
for i in range(10):
#pool.apply_async(func=Foo, args=(i,),callback=Bar) #callback是
#回调的作用,等待Foo运行完了将Foo的返回值传入Bar进行运行
pool.apply_async(func=Foo, args=(i,)) # 异步
#pool.apply(func=Foo, args=(i,)) # 同步,只能一个一个进程执行。Pool(5)写几都没用
print('waiting for all subprocessing start')
pool.close()
pool.join() #
t2 = time.time()
print("all subprocessing end,runs [%s] " % (t2-t1))
out----------------------------------
#结果解读:测试的机器有8个cpu,所以在创建Pool()实例的时候,不加入参数(如Pool(5))的
#话,默认就可以同时并行跑8个进程,所以当这8个进程有一个执行完了,那么下面的进程就可以进入
main process: 5470 5292
waiting for all subprocessing start
in task [0] process id is 5471
in task [1] process id is 5472
in task [2] process id is 5473
in task [3] process id is 5474
in task [4] process id is 5475
in task [5] process id is 5476
in task [6] process id is 5477
in task [7] process id is 5478
task [6] runs 0.15 s
in task [8] process id is 5477
task [7] runs 0.27 s
in task [9] process id is 5478
task [2] runs 0.38 s
task [5] runs 0.76 s
task [8] runs 1.18 s
task [1] runs 1.36 s
task [4] runs 1.67 s
task [0] runs 2.26 s
task [9] runs 2.04 s
task [3] runs 2.90 s
all subprocessing end,runs [2.9068007469177246]