文章目录
一、什么是并行和并发?
首先我们来先说一下一个简单的共同点,并行和并发都是完成多任务更加有效率的工具。我们下面用一张图来说明它们的不同点
线程和进程的概念
在很多教科书上都有一句话:进程时资源分配的最小单位,线程时CPU调度的最小单位。
线程是程序中一个单一的顺序控制流程,是进程内一个相对独立的、可调度的执行单元,是系统独立调度和分配CPU的基本单位中的调度单位。
进程和线程的区别
进程是资源分配的基本单位。所有与该进程有关的资源,都被记录在进程控制块PCB中。以表示该进程拥有这些资源。另外进程也是抢占处理机的调度单位,它拥有一个完整的虚拟地址空间。但进程发生调度的时候,不同的进程拥有不同的虚拟你地址空间,而同一进程内的不同线程共享同一地址空间。
与进程相对应,线程和资源分配无关,它属于某一进程,并与进程内的其他线程一起共享进程的资源。线程只由相关堆栈寄存器和线程控制表TCB组成。
通常在一个进程中可以包含若干个线程,他们可以利用进程所拥有的资源。在引入线程的操作系统中,通常都是把进程作为资源费配的基本单位,而把线程作为独立运行和独立调度的基本单位。由于线程比进程更小。进本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更搞笑得提高系统内多个程序间得并发执行程度,提高系统资源的利用率。
我们总结下来有以下四项:
多进程和多线程比较
对比维度 | 多进程 | 多线程 | 总结 |
---|---|---|---|
数据共享、同步 | 数据共享复杂,同步简单 | 数据共享简单,同步复杂 | 各有优劣 |
内存、CPU | 占用内存多,切换复杂,CPU利用率低 | 占用内存少,切换简单,CPU利用率高 | 线程占优 |
创建、销毁、切换 | 复杂,速度慢 | 简单,速度快 | 线程占优 |
编程、调试 | 编程简单,调试简单 | 编程复杂,调试复杂 | 进程占优 |
可靠性 | 进程间不会互相影响 | 一个线程挂掉将导致整个进程挂掉 | 进程占优 |
GIL锁(全局解释器锁)
每次谈到Python的多线程的时候,我们都会说到GIL锁的问题。
在正式的讲GIL锁之前,我们有一点是需要明确的,就是GIL并不是Python的特性,他是在实现Python解析器(CPython)时引入的概念。而在别的解析器中比如,Jython,IronPython等都是没有全局解释锁的,但CPython时大部分环境下的默认Python执行环境,所以在很多人的概念中CPython就是Python,但这个想法是错的。
GIL本质就是一把互斥锁,既然是互斥锁,所有的互斥锁的本质都是一样的,都是会将并发运行变成串行,以此来控制同一时间内数据只能被一个任务修改,进而保证数据安全。保护不同的数据的安全,就应该加不同的锁。
GIL的版本也在不断的改善之中,在python3.2前,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过 sys.setcheckinterval 来调整),进行释放。因为计算密集型线程在释放GIL之后又会立即去申请GIL,并且通常在其它线程还没有调度完之前它就已经重新获取到了GIL,就会导致一旦计算密集型线程获得了GIL,那么它在很长一段时间内都将占据GIL,甚至一直到该线程执行结束。
而在Python3.2之后开始使用新的GIL。新的GIL实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁。该改进在单核的情况下,对于单个线程长期占用GIL的情况有所好转。
总结以下:
有了GIL的存在,Python有这两个特点:
也就是说,在Python中的多线程是假的多线程,Python解释器虽然可以开启多个线程,但在同一时间只有一个线程在解释器中运行,而做到这一点的正式由于GIL锁的存在,它的存在使得CPU的资源统一时间只会给一个线程使用,而由于开启线程的开销小,所以多线程才能有一篇用武之地。
但Python的多线程还是由用处的,从上面的分析我们可以知道:
多线程编程
在Python的多线程编程中,我们主要的是借助于threading模块,而在threading模块中最核心的内容是Thread这个类。我们要创建Thread对象,然后让他们运行,每一个Thread对象代表一个线程,在每一个线程中我们可以让程序处理不同的任务。这就是多线程编程。
创建Thread对象,有两种手段。
我们主要介绍第一种方式
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
None
;为了日后扩展 ThreadGroup
类实现而保留。run()
方法调用的可调用对象。默认是 None
,表示不需要调用任何方法。()
。{}
。None
(默认值),线程将继承当前线程的守护模式属性。在Thread中有以下方法
start
()开始线程活动。run
()代表线程活动的方法。join
(timeout=None)等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join()
的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。is_alive
()返回线程是否存活。我们列一下简单的代码实例:
import threading
import time
def test():
for i in range(5):
print(threading.current_thread().name+' test ',i) # 获取子线程的名字
time.sleep(0.5)
thread = threading.Thread(target=test,name='TestThread') # 实例化线程
thread.start() # 开始线程
thread.join() # 添加阻塞,可以改变线程的运行顺序
for i in range(5):
print(threading.current_thread().name+' main ', i)
print(thread.name+' is alive ', thread.isAlive()) # 判断线程是狗准确
time.sleep(1)
结果
TestThread test 0
TestThread test 1
TestThread test 2
TestThread test 3
TestThread test 4
MainThread main 0
TestThread is alive False
MainThread main 1
TestThread is alive False
MainThread main 2
TestThread is alive False
MainThread main 3
TestThread is alive False
MainThread main 4
TestThread is alive False
如果我门想要主进程结束的时候,子进程也要跟着结果,要怎么做呢,这里就可以用到daemon了。
我们可以设置主线程的执行时间比子线程慢,来检测一下
#-*-coding:utf-8-*-
import threading
import time
def test():
for i in range(10):
print(threading.current_thread().name+' test ',i)
time.sleep(1)
thread = threading.Thread(target=test,name='TestThread', daemon=True) # 设置守护线程
thread.start()
for i in range(10):
print(threading.current_thread().name+' main ', i)
print(thread.name+' is alive ', thread.isAlive())
time.sleep(0.5)
结果:
MainThread main 0
TestThread is alive True
TestThread test 0
MainThread main 1
TestThread is alive True
TestThread test 1
MainThread main 2
TestThread is alive True
MainThread main 3
TestThread is alive True
TestThread test 2
MainThread main 4
TestThread is alive True
MainThread main 5
TestThread is alive True
TestThread test 3
MainThread main 6
TestThread is alive True
MainThread main 7
TestThread is alive True
TestThread test 4
MainThread main 8
TestThread is alive True
MainThread main 9
TestThread is alive True
我们简单的来看一下第二种实现方式(继承Thread类):
import threading
import time
class TestThread(threading.Thread):
def __init__(self,name=None):
threading.Thread.__init__(self,name=name)
def run(self):
for i in range(5):
print(threading.current_thread().name + ' test ', i)
time.sleep(1)
## 重写run方法
thread = TestThread(name='TestThread')
thread.start()
for i in range(5):
print(threading.current_thread().name+' main ', i)
print(thread.name+' is alive ', thread.isAlive())
time.sleep(1)
介绍完基础部分就要来讲一个比较困难的一部分了,就是多线程中的锁的机制,如何实现多个线程之间的通信和数据同步。
我们用一个经典的问题来解释这个问题:
卖票问题,买票有多个窗口,我们假设有三个,窗口之间共享一个票池,每个窗口都可以买票,直至票池里面没有票可以卖。
代码如下:
import threading
import random
class WindowThread(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self,name=name)
self.name = name
self.tickts = 0
def run(self):
global tickt_count
while tickt_count > 0:
print('%s notice:There has %d tickts remain ' %(self.name,tickt_count))
if tickt_count > 2:
number = random.randint(1,2)
else:
number = 1
tickt_count -= number
self.tickts += number
print('%s have buy %d tickt,the remain tickt\'t count is %d .Already buy %d \n'
% (self.name, number, tickt_count, self.tickts))
print('%s notice:There is no tickt can sold! Already sold %d'%(self.name,self.tickts))
tickt_count = 10
window1 = WindowThread('window1')
window2 = WindowThread('window2')
window3 = WindowThread('window3')
window1.start()
window2.start()
window3.start()
window1.join()
window2.join()
window3.join()
print('tickt count ',tickt_count)
结果:
window1 notice:There has 10 tickts remain
window1 have buy 1 tickt,the remain tickt't count is 9 .Already buy 1
window1 notice:There has 9 tickts remain
window1 have buy 1 tickt,the remain tickt't count is 8 .Already buy 2
window2 notice:There has 8 tickts remain
window2 have buy 2 tickt,the remain tickt't count is 6 .Already buy 2
window1 notice:There has 6 tickts remain
window2 notice:There has 6 tickts remain
window2 have buy 1 tickt,the remain tickt't count is 5 .Already buy 3
window1 have buy 2 tickt,the remain tickt't count is 3 .Already buy 4
window1 notice:There has 3 tickts remain
window1 have buy 1 tickt,the remain tickt't count is 2 .Already buy 5
window2 notice:There has 2 tickts remain
window2 have buy 1 tickt,the remain tickt't count is 1 .Already buy 4
window2 notice:There has 1 tickts remain window3 notice:There has 1 tickts remain
window3 have buy 1 tickt,the remain tickt't count is 0 .Already buy 1
window1 notice:There is no tickt can sold! Already sold 5
window2 have buy 1 tickt,the remain tickt't count is -1 .Already buy 5
window3 notice:There is no tickt can sold! Already sold 1
window2 notice:There is no tickt can sold! Already sold 5
tickt count -1
我们惊奇的发现居然会有 -1出现,这就是数据共享出现了错误,如果我们加上锁:
如下:(自行添加测试噢)
self.lock.acquire()
if tickt_count > 0:
if tickt_count > 2:
number = random.randint(1,2)
else:
number = 1
tickt_count -= number
self.tickts += number
print('%s have buy %d tickt,the remain tickt\'t count is %d .Already buy %d \n'
% (self.name, number, tickt_count, self.tickts))
self.lock.release()
执行代码片段
lock = threading.Lock()
window1 = WindowThread('window1',lock)
window2 = WindowThread('window2',lock)
window3 = WindowThread('window3',lock)
使用锁后就不会出现上面的问题了。
lock中有两个方法
多进程编程
由于GIL锁的存在,所以如果我们为了使用多核的优势,我们通常会使用多进程编程。在python多进程编程之中,我们常用的是multiprocessing包。
创建管理进程模块:
我们先来了解一下Process吧:
class multiprocessing.``Process
(group=None, target=None, name=None, args=(), kwargs={}, ***, daemon=None)
其实Process和Thread很相像,两者的使用也是非常的类似。
实例方法:
实例:(Windows下的Process()必须放在 if _name_ == '_main_' )下运行
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process {}({})'.format(name, os.getpid()))
if __name__ == '__main__':
print('Parent process {}'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start')
p.start()
p.join()
print("Child process end")
-----------------------------------------------------------------------------
Parent process 27808
Child process will start
Run child process test(28372)
Child process end
Pool(用于创建管理进程池)
Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
class multiprocessing.pool.``Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
实例方法:
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == '__main__':
lists = range(100)
pool = Pool(10)
pool.map(test, lists)
pool.close()
pool.join()
# 异步进程池
#-*-coding:utf-8-*-
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == '__main__':
pool = Pool(8)
for i in range(100):
'''
For循环中执行步骤:
(1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)
(2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
'''
pool.apply_async(test, args=(i,))
print("test")
pool.close()
pool.join()
Queue(用于进程通信,资源共享)
在使用多进程的过程中,最好不要使用共享资源。普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。
Queue是用来创建进程间资源共享的队列的类,使用Queue可以达到多进程间数据传递的功能(缺点:只适用Process类,不能在Pool进程池中使用)。
class multiprocessing.``Queue
([maxsize])
实例方法:
#-*-coding:utf-8-*-
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write {}'.format(os.getpid()))
for value in ['A', 'B', 'C']:
print('Put {} to queue'.format(value))
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: {}'.format(os.getpid()))
while True:
value = q.get(True)
print('Get {} from queue'.format(value))
if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q, ))
pr = Process(target=read, args=(q, ))
pw.start()
pr.start()
pw.join() # 等待pw借宿
pr.terminate() # pr进程是死循环,无法等待器借宿,只能强行终止
多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。
multiprocessing.``Pipe
([duplex])
实例方法:
from multiprocessing import Process, Pipe
import time
def f(subconn):
time.sleep(1)
subconn.send("吃了吗?")
print("来自父亲的问候:", subconn.recv())
subconn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 创建管道两端
p = Process(target=f, args=(child_conn, )) # 创建子进程
p.start()
print("来自儿子的问候:", parent_conn.recv())
parent_conn.send("嗯")
Lock锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
构造方法:Lock()
实例方法:
from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock() # 这个一定要定义为全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
下期的内容将会介绍:协程和异步编程!
哈喽,我是海森堡,如果觉得文章对你有帮助,欢迎分享给你的朋友,也给我点个在看,这对我非常重要,给各位哥哥姐姐们抱拳,我们下次见
联系客服