0%

python013

进程和线程

当今的计算机早就已经是多CPU或多核了,四核八核等等,目前的操作系统也都是支持“多任务”的操作系统。也就是说可以同时运行多个程序,也可以将一个程序分解为若干个相对独立的子任务,让多个子任务并发的执行,从而缩短程序的运行时间,让用户获得更好的体验。

概念

进程:进程是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为他们合理的分配资源。进程可以通过fork或spawn的方式创建新的进程来执行其他的任务,不过新的进程也有自己独立的控件,因此必须通过进程间通信机制(IPC,Inter-Process Communication)来实现数据共享,包括管道、信号、套接字、共享内存区等。
一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。当然在单核CPU中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间。多线程可以提升程序的性能和改善用户体验。但是多线程因为占用了更多的CPU执行时间,导致其他程序无法获得足够的CPU执行时间,而且在开发者角度讲,编写和调试多线程程序有些困难。
Python既支持多进程又支持多线程,因此Python实现并发编程主要有三种方式:
多进程、多线程、多进程+多线程。还有协程

Python中的多进程

Unix和Linux系统有fork()函数,系统调用它来创建进程,调用fork()函数时父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID,fork()函数非常特殊的会返回两次,父进程中可以通过fork()函数的返回值得到子进程的PID,而子进程的返回值永远是0,Python的os模块提供了fork()函数。windows系统中要实现跨平台的多进程编程,可以使用multiprocess模块的Process类来创建子进程,而且该模块提供了更高级的封装,例如批量启动进程的进程池Pool、用于进程间通信的队列Queue和管道Pipe
例如下载文件:

    from random import random
    from time import time, sleep


    def download_task(filename):
        print("开始下载%s..." % filename)
        time_to_download = randint(5,10)
        sleep(time_to_download)
        print("%s下载完成,耗费了%d秒" % (filename, time_to_download))

    def main():
        start = time()
        download_task('python从入门到住院.pdf')
        download_task("Peking Hot.avi")
        end = time()
        print("公共耗费了%.2f秒" % (end - start))


    if __name__ == "__main__":
        main()
程序一步步的执行,只有个第一个文件下载完后才会下载下一个,显然如果有
很多文件的话,速度会很慢,我们需要他们同时下载,这就使用了多进程

多进程下载文件

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep

def download_task(filename):
    print("启动下载进程,进程号[%d]" % getpid())
    print("开始下载%s.." % filename)
    time_to_download = randint(1,5)
    sleep(time_to_download)
    print("%s下载完成!耗费了%d秒" % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=("python.md",))
    p1.start()
    p2 = Process(target=download_task, args=("tokhot.av",))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print("总共耗费了%0.2秒" % (end - start))


if __name__ == "__main__":
    main()

Process类创建了进程对象,通过target参数传入一个函数表示进程启动后要执行的代码,后面的args是一个元组(特别注意),代表了传递给函数的参数。Process的start方法用来启动进程,而join方法表示等待进程执行结束。上面就使用了多进程,就可以同时下载两个文件了。(自行了解subprocess模块中的类和函数创建和启动子进程,通过管道和子进程进行通信)

多进程可以使多个任务同时进程,那么假如进程之间要协调通信应该如何做到呢?

from multiprocessing import Process
from time import sleep

counter = 0

def sub_task(string):
    global counter
    while counter < 10:
        print(string, end = '', flush=True)
        counter += 1
        sleep(0.01)

def main():
    p1 = Process(target=sub_task, args=("ping",))
    p1.start()
    p2 = Process(target=sub_task, args=("pong",))
    p2.start()


if __name__ == "__main__":
    main()

乍一看,程序没有任何问题,ping pong两个进程总共循环10次,是吗?其实会是
ping pong 各输出10次,这是因为,在程序中创建进程的时候,子进程复制了父进
程及其所有的额数据结构,每个子进程都有自己独立的内存空间,所以意味着两个子进程各有一个counter变量,所以各输出10次,要解决这个问题,简单的办法就是使用multiprocessing模块中的Queue类,它是可以被多个进程共享的队列,底层是通过管道和信号量(semaphore)机制实现的

Python中的多线程

Python早期的版本中引入了thread模块(现在名为_thread)实现多线程,
然而该模块过于底层,而且很多功能都没有提供,因此目前使用的多线程是
threading模块,该模块对多线程编程提供了更好的面向对象的封装。

多线程下载文件

from random import randint
from threading import Thread
from time import sleep

def download(filename):
    print("开始下载%s" % filename)
    time_to_download = randint(1,5)
    sleep(time_to_download)
    print("%s下载完成!共耗费了%d秒" % (filename, time_to_download))

def main():
    start = time()
    t1 = Thread(target=download, args=("mik.pdf",))
    t1.start()
    t2 = Thread(target=download, args=("tohot.av",))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print("总共耗费了%.3f秒" % (end - start))


if __name__ == "__main__":
    main()

可以直接使用threading模块Thread类来创建线程,但是也可以从已有的类创建新类,通过集成Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程

from random import randint
from threading import Thread
from time import time, sleep


class DownloadTask(Thread):

    def __init__(self, filename):
        super().__init()
        self._filename = filename

    def run(self):
        print("开始下载%s.." % self._filename)
        time_to_download = randint(5, 10)
        sleep(time_to_download)
        print("%s下载完成,耗费%d秒" % (self._filename, time_to_download))


def main():
    start = time()
    t1 = DownloadTask("python.pdf")
    t1.start()
    t2 = DownloadTask("tokohot.av")
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print("总共耗费了%.2f" % (end - start))

if __name__ == "__main__":
    main()

因为多个线程可以共享进程的内存空间,因此要实现多个进程间的通信相对简单,能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。但是当多个线程共享同一个变量(称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么通常称之为“临界资源”,对临界资源的访问需要加上保护,否则资源就会处于“混乱”状态。做一个没有对临界资源进行保护的例子,(100个线程同时向一个银行账号转入一元钱,不保护的最后结果并不是100)

from time import sleep
from threading import Thread


class Account(object):

    def __init__(self):
        self._balance = 0

    def deposit(self, money):
        # 计算存款后的余额
        new_balance = self._balance + money
        # 模拟受理存款业务需要0.01秒的事件
        sleep(0.01)
        # 修改账户余额
        self._balance = new_balance

    @property
    def balance(self):
        return slef._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    # 创建100个存款的线程向同一个账户中存钱
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    # 等所有钱款的线程都执行完毕
    for t in threads:
        t.join()
    print("账户余额为:¥%d元" ¥ account.balance)

if __name__ == "__main__":
    main()

运行结果小于100,之所以出现这种情况,就是上面所说的没有对银行账号这个
临界资源进行保护。多个线程同时向账户存钱时,会一起执行new_balance = self._balance + money这行代码,多个线程得到的账户余额初始状态下都是0,
所以都是0+1,结果就远远小于100了,这种情况下,“锁”就可以派上用场了。可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,其他没有得到“锁”的线程就只能被阻塞起来,直到获得“锁”的线程释放了”锁”,其他线程才有机会拿到“锁”访问“临界资源”

from time import sleep
from threading import Thread, Lock


class Account(object):
    def __init__(self):
        self._balance = 0
        self._lock = Lock()

    def deposit(self, money):
        # 先获取锁才能执行后续的代码
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中执行释放锁的操作保证正常异常锁都能释放
            self._lock.release()

    @property
    def balance(self):
        return self._balance

class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    for _ in rnage(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print("账户余额为:¥%d元" % account.balance)

if __name__ == "__main__":
    main()

但是Python的多线程并不能发挥CPU多核特性,这一点只要启动几个执行死循环的线程就可以得到证实,之所以如此,是因为Python解释器有一个“全局解释器锁”
(GIL)的东西,任何线程执行前都必须先获得GIL锁,然后没执行100条字节码,解释器就会自动释放GIL锁,让别的线程有机会执行。这是创建python的历史遗留问题,但是使用多线程在提升执行效率和改善用户体验方面还是有积极意义的。

多进程还是多线程

无论是多进程还是多线程,只要数量一多,效率肯定上不去,为什么呢?加入你正在备战中考,晚上需要做语文,数学,英语,物理,化学这5门的作业,每项作业耗费1小时,如果你先花1小时做语文,再1小时做数学,这样依次全部做完,总共5小时,这种方式成为单任务模型。如果你打算切换到多任务模型,可以先1分钟做语文,再1分钟做数学,以此类推,只要切换速度足够快,这种方式就和单核CPU执行多任务是一样的了,以旁观者的角度来看,你就正在同时写5科作业
但是切换作业是由代价的,比如从语文切换到数学,你要先收拾桌子上的语文课本钢笔等(这叫保存现场),然后打开数学课本,找到圆规直尺等(这叫准备环境)
才能开始做数学作业,操作系统在切换进程或者线程时也是一样的,需要先保存当前执行的现场环境(CPU寄存器状态,内存页等),然后把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等)才能开始执行,这个切换过程虽然快,但也是需要耗费时间的。如果有几千个任务同时进行,操作系统就可能主要忙着切换任务,根本没有时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。所以,多任务一旦多到一个限度,反而会使得系统性能急剧下降,最终导致所有任务都做不好。
是否采用多任务的第二个考虑是任务的类型,可以把任务分为计算密集型和I/O密集型。计算密集型任务的特点是进行大量的计算,消耗CPU资源,比如对视频进行编码解码或者格式转换等等,这种任务全靠CPU的运算能力,虽然可以用多任务完成,但是任务越多,花在切换任务的时间就越多,CPU执行任务的效率就越低下。计算密集型任务由于主要消耗CPU资源,这类任务用Python这样的脚本语言去执行效率通常很低,所以最能胜任这类任务的是C语言,所以Python中有嵌入C/C++代码的机制。
除了计算密集型任务,其他的涉及到网络、存储介质I/O的任务都可以视为I/O密集型任务,这类任务的特点就是CPU消耗很小,任务的大部分时间都在等待I/O操作完成(因为I/O的速度远远低于CPU和内存的速度)。对于I/O密集型任务,
如果启动多任务就可以减少I/O等待时间从而让CPU高效率的运转。有一大类的任务
都属于I/O密集型任务,这其中就包括了网络应用和Web应用

单线程+异步I/O

现代操作系统对I/O操作的改进中最为重要的就是支持异步I/O。如果充分利用操作
系统提供的异步I/O支持,就可以用单进程单线程模型来执行多任务,这种全新的
模型成为事件驱动模型。Nginx就是支持异步I/O的Web服务器,它在单核CPU上
采用单进程模型就可以高效的支持多任务。在多核CPU上,可以运行多个进程(数量
与CPU核心数相同),充分利用多核CPU,用Node.js开发的服务器端程序也使用了这种工作模式,这也是当下实现多任务编程的一种趋势。
在Python语言中单进程+异步I/O的编程模式称为协程,有了协程的支持,就可以
基于事件驱动编写高效的多任务程序。协程的最大优势就是极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此没有切换线程的开销。协程的第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不用加锁,只需要判断状态就好了,所以执行效率比多线程高很多。如果想要充分利用CPU的多核特性,最简单的办法就是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

应用案例

  1. 将耗时间的任务放到线程中以获得更好的的用户体验
    “下载”和“关于”两个按钮,用休眠的方式模拟点击“下载”按钮会联网下载文件需要耗费10秒,如果不使用“多线程”,会发现,当点击“下载”按钮后整个程序的其他部分都被整个耗时间的任务阻塞而无法执行了,这显然是非常糟糕的用户体验

    import time
    improt tkinter
    import tkinter.messagebox        
    def download():
        # 模拟下载任务需要花费10秒
        time.sleep(10)
        tkinter.messagebox.showinfo("提示", "下载完成!")
    def show_about():
        tkinter.messagebox.showinfo("关于", "author:bayhax")
    def main():
        top = tkinter.TK()
        top.title("单线程")
        top.geometry("200x150")
        top.wm_attributes("-topmost", True)
        panel = tkinter.Frame(top)
        button1 = tkinter.Button(panel, text="下载", command=download)
        button1.pack(side="left")
        button2 = tkinter.Button(panel, text="关于", command=download)
        button2.pack(side="right")
        panel.pack(side="bottom")
        tkinter.mainloop()
    if __name__ == "__main__":
        main()

    如果使用多线程将耗时间的任务放到一个独立的线程中执行,这样就不会因为耗时间的任务而阻塞了其他主线程

    import time
    import tkinter
    import tkinter.messagebox
    from threading import Thread
    def main():
        class DownloadTaskHandler(Thread):
            def run(self):
                time.sleep(10)
                tkinter.messagebox.showinfo("提示","下载完成")
                # 启用下载按钮
                button1.config(state=tkinter.NORMAL)
        def download():
            # 禁用下载按钮
            button1.config(state=tkinter.DISABLED)
            # 通过daemon参数将线程设置为守护进程(主程序退出就不再保留执行)
            # 在线程中处理耗时间的下载任务
            DownloadTaskHandler(daemon=True).start()
        def show_about():
            tkinter.messagebox.shorinfo("关于","author:bayhax")
        top = tkinter.TK()
        top.title("单线程")
        top.geometry("200x150")
        top.wm_attributes("-topmost", 1)
        panel = tkinter.Frame(top)
        button1 = tkinter.Button(panel, text="下载", command=download)
        button1.pack("left")
        button2 = tkinter.Button(panel, text="关于", command=download)
        button2.pack("right")
        panel.pack(side="bottom")
        tkinter.mainloop()
    if __name__ == "__main__":
        main()
  2. 使用多进程对复杂任务进行“分而治之”,计算1-100000000的计算密集型任务

    from time import time
    def main():
        total = 0
        number_list = [x for x in range(1, 100000000)]
        start = time()
        for number in number_list:
            total += number
        print(total)
        end = time()
        print("execution time %.3fs" % (end - start))
    if __name__ == "__main__":
        main()

上面的代码,故意创建了一个列表容器然后填入了100000000个数,这一步是耗时间的,所以为了公平起见,我们将这个任务分解到8个进程中去执行的时候,暂时不考虑列表切片操作花费的时间,只是把做运算和合并运算结果的时间统计出来

from multiprocessing import Process, Queur
from random import randint
from time import time
def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)
def main():
    process = []
    number_list = [x for x in range(1, 100000000)]
    result_queue = Queue()
    index = 0
    # 启动8个进程将数据切片后进行运算
    for _ in range(8):
        p = Process(target=task_handler, args=(number_list[index:index + 12500000], result_queue))
        index += 12500000
        process.append(p)
        p.start
    # 开始记录所有进程执行完成所花费的时间
    start = time()
    for p in process:
        p.join()
    # 合并执行结果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()
    print(total)
    end = time()
    print("Execution time:" (end - start), 's', sep='')
if __name__ == "__main__":
    main()

使用多进程后由于获得了更多的CPU执行时间以及更好的利用利用了CPU的多核特性,明显的减少了程序的运行时间,而且计算量越大效果越明显。还可以将多个进程部署在不同的计算机上,做成分布式进程,通过multiprocessing.managers模块中提供的管理器将Queue对象通过网络共享出来(注册到网络上让其他计算机可以访问),便可进行分布式计算。