python中进程、线程、协程

Python 进程、线程、协程

进程和线程

计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的所有任务。 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等。 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专门的管理和控制执行程序的数据结构——进程控制块。 进程就是一个程序在一个数据集上的一次动态执行过程。 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

在早期的操作系统里,计算机只有一个核心,进程执行程序的最小单位,任务调度采用时间片轮转的抢占式方式进行进程调度。每个进程都有各自的一块独立的内存,保证进程彼此间的内存地址空间的隔离。 随着计算机技术的发展,进程出现了很多弊端,一是进程的创建、撤销和切换的开销比较大,二是由于对称多处理机(对称多处理机(SymmetricalMulti-Processing)又叫SMP,是指在一个计算机上汇集了一组处理器(多CPU),各CPU之间共享内存子系统以及总线结构)的出现,可以满足多个运行单位,而多进程并行开销过大。 这个时候就引入了线程的概念。 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合 和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。 线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。

进程与线程之间的关系

  • 线程:是操作系统能够进行运算的调度的最小单位,它被包含在进程中,是进程中实际的运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程可以并发多个线程,每条线程并行执行不同的任务。
    线程是一串指令的集合
  • 进程:程序要以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用等….即对各种资源的集合。
  • 进程要操作cpu,必须要先创建一个线程
  • 所有在同一个进程里的线程是共享同一块内存空间的,线程共享内存空间,进程的内存是独立的
  • 同一个进程的线程之间可以直接交流,两个进程想要通信,必须通过一个中间代理来实现
  • 创建新线程很简单,创建新进程需要对其父进程进行一次克隆

一个线程可以控制和操作同一个进程里的其他线程,但是进程只能操作子进程
线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。

threading模块

threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
Thread方法说明:

  • t.start() : 激活线程,
  • t.getName() : 获取线程的名称
  • t.setName() : 设置线程的名称
  • t.name : 获取或设置线程的名称
  • t.is_alive() : 判断线程是否为激活状态
  • t.isAlive() :判断线程是否为激活状态
  • t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  • t.isDaemon() : 判断是否为守护线程
  • t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
  • t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
  • t.run() :线程被cpu调度后自动执行线程对象的run方法
import threading
import time

def worker(num):
    time.sleep(1)
    print("the num is %d" %num)
    return
for i in range(20):
    t = threading.Thread(target=worker,args=(i,),name="t.%d" %i)
    t.start()
------------------------
the num is 1
the num is 0
the num is 4the num is 3the num is 2


the num is 7the num is 5
the num is 6

the num is 8
the num is 10the num is 9

the num is 12
the num is 11
the num is 14the num is 13

the num is 15
the num is 17the num is 19
the num is 16the num is 18

这段代码创建20个前台线程,然后由CPU根据一些算法,分片执行指令

线程锁threading.RLock和threading.Lock

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。所以,可能出现如下问题:

例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。
eg

#!/usr/bin/python
# -*-coding:utf-8-*-
# Author:Gsuhy

import threading
import time

globals_num = 0
lock = threading.RLock()

def func():
    lock.acquire() #获得锁
    global globals_num
    globals_num += 1
    time.sleep(1)
    print(globals_num)
    lock.release() #释放锁

for i in range(10):
    t = threading.Thread(target=func)
    t.start()
----------------
1
2
3
4
5
6
7
8
9
10

threading.RLock和threading.Lock 的区别

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

import threading
lock = threading.Lock()    #Lock对象
lock.acquire()
lock.acquire()  #产生了死琐。
lock.release()
lock.release() 

import threading
rLock = threading.RLock()  #RLock对象
rLock.acquire()
rLock.acquire()    #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

threading.Event

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
  • Event.isSet() :判断标识位是否为Ture。

当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

import threading

def do(event):
    print("start")
    event.wait()
    print("execute")

event_obj = threading.Event()

for i in range(10):
    t = threading.Thread(target=do,args=(event_obj,))
    t.start()

event_obj.clear()
inp = input(">>>")
if inp == 'true':
    event_obj.set()
-----------------
如果输入的是true 那么就输出10个execute
如果输入的是False,或者其他杂七杂八的,就阻塞

当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

threading.Condition

一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。acquire()release()会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

wait(timeout=None)

等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。
如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all()则会唤醒所有在等待conditon变量的线程。

注意: notify()notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait()调用。除非线程调用notify()notify_all()之后放弃了锁的所有权。

进程

multiprocessing模块

multiprocessing是python的多进程管理包,和threading.Thread类似。
直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,

multiprocessing,Array,Value

数据可以用Value或Array存储在一个共享内存地图里

多进程

使用场景

IO操作不占用CPU
计算占用cpu
python多线程不适合cpu密集型操作的任务,适合IO操作密集型的任务
看个例子:

#!/usr/bin/python
# -*-coding:utf-8-*-
# Author:Gsuhy


from multiprocessing import Process
import os

def info(title):
    print(title)
    print("module name:",__name__)
    print("parent process:",os.getppid())
    print("process id:",os.getpid())
    print("\n")

def f(name):
    info('\033[31;1mcalled from child process function f\033[0m')
    print("hello",name)

if __name__ == "__main__":
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f,args=('bobo',))
    p.start()
---------------------
output:
main process line
module name: __main__
parent process: 9608
process id: 7188


called from child process function f
module name: __mp_main__
parent process: 7188
process id: 10232


hello bobo

这个程序run以后,会发现主函数的进程就是后台的pycharm.exe的进程id9608,而且不论运行多少次都是这个

进程间数据的交互

通过Queue和Pipe可以实现进程间数据的传递,但是不同进程内存不能实现共享

Queue

使用方法和threading里面的queue差不多

线程之间的数据共享是这样的

import threading
import queue

def func():
    q.put([22,"dean",'hello'])

if __name__=="__main__":
    q = queue.Queue()
    t = threading.Thread(target=func)
    t.start()
    print(q.get(q))
-----------------
[22, 'dean', 'hello']

进程之间的共享:

from multiprocessing import Process,Queue

def f(data):
    data.put([11,None,"Gsuhy"])

if __name__ == "__main__":
    q = Queue() #进程q
    p = Process(target=f,args=(q,))
    p.start()
    print(q.get())
-----------------------
[11, None, 'Gsuhy']

这里可以看到父进程可以调用到子程序放入的数据
这里的q其实是被克隆了一个q,然后将子线程序列化的内容传入的克隆q,然后再反序列化给q从而海鲜进程之间的数据的传递。
如果这里吧q = Queue()改成q = queue.Queue()线程q的话是不行的。

Pipe

直接上代码

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([22,None,"Gsuhy"])
    conn.send([21, None, "w1mps"])
    conn.send([20, None, "mmmGsuhy"])
    print(conn.recv())
    conn.close()


if __name__ == "__main__":
    left_conn,right_conn = Pipe()
    p = Process(target=f,args=(right_conn,))
    p.start()
    print(left_conn.recv())
    print(left_conn.recv())
    print(left_conn.recv())
    left_conn.send("I am Gsuhy")
--------------------------------
[22, None, 'Gsuhy']
[21, None, 'w1mps']
[20, None, 'mmmGsuhy']
I am Gsuhy

开始Pipe()生成两个值,一个用来发送,一个用来接收,上面f函数send多少下面就可以接受多少,也可以接受少于send的数量

Manager实现不同进程间的数据共享

上代码

from multiprocessing import Process,Manager
import os

def f(d,l):
    d[1] = "1"
    d["2"] = 2
    d[0.25] = None
    l.append(os.getpid())
    print(l)

if __name__ == "__main__":
    with Manager() as manager:#这种方式等价于 manager=Manager()
        d = manager.dict() #生成一个可以在多个进程间共享的字典
        l = manager.list(range(5)) #生成一个可以在多个进程间共享的列表
        p_list = []
        for i in range(10):
            p = Process(target=f,args=(d,1))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)
-------------------------
[0, 1, 2, 3, 4, 16404]
[0, 1, 2, 3, 4, 16404, 13160]
[0, 1, 2, 3, 4, 16404, 13160, 12440]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936, 2876]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936, 2876, 14388]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936, 2876, 14388, 1840]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936, 2876, 14388, 1840, 13604]
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936, 2876, 14388, 1840, 13604, 17732]
{1: '1', '2': 2, 0.25: None}
[0, 1, 2, 3, 4, 16404, 13160, 12440, 15180, 17936, 2876, 14388, 1840, 13604, 17732]

协程

协程,又称微线程,纤程(Coroutine)。协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
故,协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,也就是进入上一次离开时所处逻辑流的位置。

协程的好处

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发,高扩展性,低成本:一个CPU支持上完的协程,适合高并发处理

缺点

无法利用多核资源,协程的本质是单线程,不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上。
进行阻塞(Blocking)操作会阻塞掉整个程序

yield实现简单的协程

#!/usr/bin/python
# -*-coding:utf-8-*-
# Author:Gsuhy


import time
import queue

def consumer(name):
     print("--->starting eating baozi...")
     while True:
         new_baozi = yield
         print("[%s] is eating baozi %s" % (name, new_baozi))
         time.sleep(2)
def producer():

    r = con.__next__()
    r = con2.__next__()
    n = 0
    n += 1
    con.send(n)
    con2.send(n)
    print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
if __name__ == '__main__':

    con = consumer("Gsuhy")
    con2 = consumer("嘤嘤嘤")
    p = producer()
-----------------------

--->starting eating baozi...
--->starting eating baozi...
[Gsuhy] is eating baozi 1
[嘤嘤嘤] is eating baozi 1
[producer] is making baozi 1

Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度.

用协程gevent写一个简单并发爬网页

#!/usr/bin/python
# -*-coding:utf-8-*-
# Author:Gsuhy

from urllib import request
import gevent,time
from gevent import monkey

monkey.patch_all()
def f(url):
    print("get:%s" %url)
    resp = request.urlopen(url)
    data = resp.read()
    print("%d bytes received from %s" %(len(data),url))

urls = [
    "http://sina.com.cn",
    "http://www.cnblogs.com/",
    "https://news.cnblogs.com/"
]
time_start = time.time()
for url in urls:
    f(url)

print("同步串行cost:",time.time()-time_start)

async_time = time.time()

gevent.joinall([
    gevent.spawn(f,"http://sina.com.cn"),
    gevent.spawn(f, "http://www.cnblogs.com/"),
    gevent.spawn(f, "https://news.cnblogs.com/")
])
print("异步cost:",time.time()-async_time)
---------------------------
get:http://sina.com.cn
572279 bytes received from http://sina.com.cn
get:http://www.cnblogs.com/
48414 bytes received from http://www.cnblogs.com/
get:https://news.cnblogs.com/
78889 bytes received from https://news.cnblogs.com/
同步串行cost: 0.9885585308074951
get:http://sina.com.cn
get:http://www.cnblogs.com/
get:https://news.cnblogs.com/
572279 bytes received from http://sina.com.cn
78889 bytes received from https://news.cnblogs.com/
48414 bytes received from http://www.cnblogs.com/
异步cost: 0.4312920570373535

可以看出异步的确比同步要快一点
这里要注意一下,在开头我导入了from gevent import monkey,如果不加这个模块的话,运行这个py那么同步和异步时间看起来是差不多的。


   转载规则


《python中进程、线程、协程》 Gsuhy 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录