1.用多进程和多线程两种方式来运算 斐波那契数列,这里都依赖 concurrent.futures 模块提供的线/进程池。
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
def fib(n):
return 1 if n <= 2 else fib(n-1) + fib(n-2)
if __name__ == '__main__':
# with ProcessPoolExecutor(3) as executor:
with ThreadPoolExecutor(3) as executor:
all_task = [executor.submit(fib, n) for n in range(25, 35)]
start_time = time.time
for future in as_completed(all_task):
data = future.result
# todo
end_time = time.time
print('time consuming by threads: {0}s'.format(end_time-start_time))
# print('time consuming by processes: {0}s'.format(end_time-start_time))
两种方式的运行结果对比:
# result:
# time consuming by threads: 4.823292016983032s
# time consuming by processes: 3.3890748023986816s
可以看到,对于高计算量的任务,多进程要比多线程更加高效。同时,从这个例子中还能看出,通过concurrent.futures模块使用线程池和进程池的方式的接口和使用逻辑是一样的,不过在使用多进程时,对于Windows的操作平台,相关逻辑一定要放在main中,Linux不受约束。
2.用多进程和多线程两种方式来模拟 I/O密集操作,I/O操作 的特点就是 cpu 要耗费大量的时间进行等待数据,这里用sleep进行模拟即可。
整体的操作方式不变,修改过的逻辑如下:
def random_sleep(n):
time.sleep(n)
return n
...
# 8 个线程,每个休眠两秒,模拟 I/O
with ProcessPoolExecutor(8) as executor:
# with ThreadPoolExecutor(8) as executor:
all_task = [executor.submit(random_sleep, 2) for i in range(30)]
# result:
# time consuming by threads: 8.002903699874878s
# time consuming by processes: 8.34946894645691s
直接使用
import time
import multiprocessing
def read(times):
time.sleep(times)
print('process reading...')
return 'read for {0}s'.format(times)
def write(times):
time.sleep(times)
print('process writing...')
return 'write for {0}s'.format(times)
if __name__ == '__main__':
read_process = multiprocessing.Process(target=read, args=(1,))
write_process = multiprocessing.Process(target=write, args=(2,))
read_process.start
write_process.start
print('read_process id {rid}'.format(rid=read_process.pid))
print('write_process id {wid}'.format(wid=write_process.pid))
read_process.join
write_process.join
print('done')
# result:
# read_process id 7064
# write_process id 836
# process reading...
# process writing...
# done
可以看出,关于多线程的逻辑和多线程的使用方式以类似的,要注意在Windows操作系统上,和进程有关的逻辑要写在if __name__ == '__main__'中。其他的一些方法请参阅 官方文档。
使用原生进程池
import time
import multiprocessing
def read(times):
time.sleep(times)
print('process reading...')
return 'read for {0}s'.format(times)
def write(times):
time.sleep(times)
print('process writing...')
return 'write for {0}s'.format(times)
if __name__ == '__main__':
# multiprocessing.cpu_count 获取cpu的核心数
pool = multiprocessing.Pool(multiprocessing.cpu_count)
read_result = pool.apply_async(read, args=(2,))
write_result = pool.apply_async(write, args=(3,))
# 关闭进程池,不再接受新的任务提交,否则 join 出错
pool.close
# 等待进程池中提交的所有任务完成
pool.join
print(read_result.get)
print(write_result.get)
# result:
# process reading...
# process writing...
# read for 2s
# write for 3s
使用imap,所有任务顺序执行:
pool = multiprocessing.Pool(multiprocessing.cpu_count)
for result in pool.imap(read, [2, 1, 3]):
print(result)
# result:
# process reading...
# process reading...
# read for 2s
# read for 1s
# process reading...
# read for 3s
使用imap_unordered,哪个任务先完成就先返回结果:
for result in pool.imap_unordered(read, [1, 5, 3]):
print(result)
# process reading...
# read for 1s
# process reading...
# read for 3s
# process reading...
# read for 5s
使用concurrent.futures中的ProcessPoolExecutor
这个在多线程和多进程对比的时提到过,因为和多线程的使用方式一样,这里就不多赘述,可以参阅 官方文档 给出的例子
进程通信和线程通信有些区别,在线程通信中各种提供的锁的机制和全局变量在这里不再适用,我们要选取新的工具来完成进程通信任务。
使用multiprocessing.Queue
使用逻辑是和多线程中的Queue是一样的,详细方法。这种通信方式不能用在通过Pool进程池创建的进程中
import multiprocessing
import time
def plus(queue):
for i in range(6):
num = queue.get + 1
queue.put(num)
print(num)
time.sleep(1)
def subtract(queue):
for i in range(6):
num = queue.get - 1
queue.put(num)
print(num)
time.sleep(2)
if __name__ == '__main__':
queue = multiprocessing.Queue(1)
queue.put(0)
plus_process = multiprocessing.Process(target=plus, args=(queue,))
subtract_process = multiprocessing.Process(target=subtract, args=(queue,))
plus_process.start
subtract_process.start
# result:
# 1
# 1
# 2
# 2
# 3
# 3
# 0
# 1
# 2
# 2
# 1
# 0
使用Manager中的Queue
Manager会返回一个在进程间进行同步管理的一个对象,它提供了多种在进程间共享数据的形式。
import multiprocessing
import time
def plus(queue):
for i in range(6):
num = queue.get + 1
queue.put(num)
print(num)
time.sleep(1)
def subtract(queue):
for i in range(6):
num = queue.get - 1
queue.put(num)
print(num)
time.sleep(2)
if __name__ == '__main__':
queue = multiprocessing.Manager.Queue(1) # 创建方式有些奇特
# queue = multiprocessing.Queue # 这时用这个就行不通了
pool = multiprocessing.Pool(2)
queue.put(0)
pool.apply_async(plus, args=(queue,))
pool.apply_async(subtract, args=(queue,))
pool.close
pool.join
# result:
# 0
# 1
# 1
# 2
# 2
# 3
# -1
# 0
# 1
# 2
# 1
# 0
使用Manager中的list
多个进程可以共享全局的list,因为是进程间共享,所以用锁的机制保证它的安全性。这里的Manager.Lock不是前面线程级别的Lock,它可以保证进程间的同步。
import multiprocessing as mp
import time
def add_person(waiting_list, name_list, lock):
lock.acquire
for name in name_list:
waiting_list.append(name)
time.sleep(1)
print(waiting_list)
lock.release
def get_person(waiting_list, lock):
lock.acquire
if waiting_list:
name = waiting_list.pop(0)
print('get {0}'.format(name))
lock.release
if __name__ == '__main__':
waiting_list = mp.Manager.list
lock = mp.Manager.Lock # 使用 lock 限制进程对全局量的访问
name_list = ['MetaTian', 'Rity', 'Anonymous']
add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock))
get_process = mp.Process(target=get_person, args=(waiting_list, lock))
add_process.start
get_process.start
add_process.join
get_process.join
print(waiting_list)
# result:
# ['MetaTian']
# ['MetaTian', 'Rity']
# ['MetaTian', 'Rity', 'Anonymous']
# get MetaTian
# ['Rity', 'Anonymous']
Manager中还有更多的进程间通信的工具,可以参阅官方文档。
使用Pipe
Pipe只能适用于两个进程间的通信,它的性能高于Queue,Pipe会返回两个Connection对象,使用这个对象可以在进程间进行数据的发送和接收,非常像前面讲过的socket对象。关于Connection
import multiprocessing
def plus(conn):
default_num = 0
for i in range(3):
num = 0 if i == 0 else conn.recv
conn.send(num + 1)
print('plus send: {0}'.format(num+1))
def subtract(conn):
for i in range(3):
num = conn.recv
conn.send(num-1)
print('subtract send: {0}'.format(num-1))
if __name__ == '__main__':
conn_plus, conn_sbtract = multiprocessing.Pipe
plus_process = multiprocessing.Process(target=plus, args=(conn_plus,))
subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,))
plus_process.start
subtract_process.start
# result:
# plus send: 1
# subtract send: 0
# plus send: 1
# subtract send: 0
# plus send: 1
# subtract send: 0
send可以连续发送数据,recv将另一端发送的数据陆续取出,如果没有取到数据,则进入等待状态。
注:喜欢python + qun:839383765 可以获取Python各类免费最新入门学习资料!
(完)
联系客服