Python3 - 开始python编程(十五)

Python3 - 开始python编程(十五)

上一篇文章中,我们介绍了多线程。在这里,我们将介绍 Python 提供的另一个名为”multiprocessing”的库。像“线程”一样,“多进程”允许我们同时运行代码。但是,此代码可在多个处理器上运行。让我们从定义开始。

  • 全局解释器锁定(GIL)-用于确保 locked 时,一次只能在一个线程上运行 Python 字节码。如果为 released,则允许使用多个内核进行处理。

到目前为止,我们已经能够避免这种复杂性,因为我们只处理单核计算。 GIL 对于确保并发访问的安全至关重要;用外行的话来说,多个任务同时运行不会引起问题。

在前面的示例中,我们最终同时在控制台上打印了两行。通过确保一次只能将一个进程写入控制台,可以使用 GIL 清除此问题。

尽管 GIL 致力于保护我们,但它限制了我们使用多核进行并行处理的能力,从而导致性能问题。这并不意味着“从不使用多重处理”,而只是意味着我们应该在完全知道我们希望代码如何执行时才使用多重处理。

多进程

当我们执行基于 CPU 的计算时,最好使用”multiprocessing”。 CPU 计算可能是比特币挖掘(缓慢的方式),或者是为一些极端示例创建了彩虹表。一些更实际的示例是照片和视频编辑,音乐创作或创建哈希表(数据挖掘的东西)。

这是您从上一篇文章中发现的示例,该示例稍作更改即可用于多进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing
import time


numbers = range(100000, 1000000, 10000)


def task(number):
    print(f"Executing new Task with process {multiprocessing.current_process().pid}")

    result = 0
    for i in range(number):
        result = result + i

    print(f"Result: {result}")
    print(f"Task Executed with process {multiprocessing.current_process().pid}")


def main():
    executor = multiprocessing.Pool(processes=1)
    executor.map(task, numbers)
    executor.close()


if __name__ == "__main__":
    start_time = time.time()
    main()

    print("\n\nCOMPLETE\n--- Took %s seconds ---" % (time.time() - start_time))

除了导入threadingconcurrent.futures,我们只需要导入multiprocessing。接下来发生的变化是我使用了 multiprocessing.current_process()。pid 而不是 threading.active_count()。name [-3:]。这将获得从我们的主流程中纺出的流程的流程 ID,并将其报告回控制台,因此,如果以后需要终止它,我们知道在哪里可以找到我们的流程。我们的“任务”功能中的所有其他内容都是相同的。

main中,我们更改了执行器,该执行器曾作为ThreadPoolExecutor的实例,但将其更改为multiprocessing.Pool()。在这里,我们指定仅应使用一个过程。如果我将其留空,它将使用 os.cpu_count()返回机器中可用的 CPU 总数。

像线程一样,我使用map将每个number映射到它自己的task。如果我的任务多于进程,则它们将排队等待上一个任务完成。最后,我添加了 executor.close(),因为我有一个失控的过程。 close()清理池并确保其正确停止。

在“如果name ==”main"”中,我们删除了”current_threads = threading.active_count()”行,而没有用等效的行代替它来进行多进程,即”current_children = multiprocessing.active_children()”,因为我们的执行器将阻塞主线程直到完成,我们可以使用更真实的apply_async调用(类似于start)来运行它,但是出于我们的目的,我们在这里不需要它。

我也像以前一样继续使用“时间”来检查执行运行时间。

这是测试多个过程的结果:

使用的进程总数的运行时间(以秒为单位)。

与线程处理相比,这里的运行时间更好。并非总是如此,因此请务必先运行自己的测试,然后再决定。我们的时间是 3. 05 秒。以前,我们使用 50 个线程达到 3. 09。这必须意味着多进程知道如何利用多个线程。

像线程处理一样,当我们使用两个进程时,我们看到了最大的飞跃。在达到 4 个核心之前,我们还会看到其他明显的下降。在 4 到 7 个流程之间,我们看不出太大的区别;但是,在 8 个流程中,我们看到另一个明显的下降。这种差异的缺乏可能是由于我的计算机做了其他事情(哎呀,写这篇文章),它们已经通过 Chrome,Safari,PyCharm(用于代码质量)和 Mail 占用了 CPU 时间,更不用说所有这些东西了。在后台运行。

我添加了 10 和 16 个进程,以显示当您使用的进程多于 CPU 时会发生什么。我们看到这个数字回升,而不是改善。

您可能想要添加这样的代码,以防止将代码放置在可能不知道有多少 CPU 的系统上。

我知道还有其他方法,可以使用elif或三元运算符,但这很简单,而且可以正常工作。 os.cpu_count()可能很昂贵,这就是为什么我将其存储在变量中。这样,我不需要每次进行比较或分配时都要求 Python 查找 CPU 计数。


由于我们有机会一次将两个不同的进程打印到控制台,因此我们可能希望在程序中包含锁。

我将从上面使用相同的示例来说明锁的工作方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import multiprocessing
import time

numbers = range(100000, 1000000, 10000)
lock = multiprocessing.Lock()


def task(number):
    global lock
    print(f"Executing new Task with process {multiprocessing.current_process().pid}")

    result = 0
    for i in range(number):
        result = result + i

    lock.acquire()
    try:
        print(f"Result: {result}")
        print(f"Task Executed with process {multiprocessing.current_process().pid}")

    finally:
        lock.release()


def main():
    executor = multiprocessing.Pool(8)
    executor.map(task, numbers)
    executor.close()


if __name__ == "__main__":
    start_time = time.time()
    main()

    print("\n\nCOMPLETE\n--- Took %s seconds ---" % (time.time() - start_time))

    exit(0)

“锁”的作用是获得控制台进程上的锁。锁定处于活动状态时,没有其他进程可以使用此锁定。编辑文件时会使用“锁定”,并且我确定您已经看到警告,some user已打开some file,请要求他们关闭它或稍后重试。这与我们在此处讨论的锁相同。锁定可防止其他任何人访问控制台,直到该进程释放了控制台上的锁定。

锁确实会导致我们的运行时间增加,但不会增加太多。在此示例中,我的运行时间从 0.87 秒增加到 1.01 秒。我需要约 0.2 秒的时间来确保数据输出的安全性。

共享

有时我们需要在流程之间共享数据。有几种方法可以做到这一点,但这并不是那么简单。刚开始进行多进程时,我看到了许多新的错误消息,这些消息似乎很神秘,因为我不熟悉套接字编程。尽管这超出了本教程的范围,但我将尝试使其足够简单,以便您可以在进程之间共享数据。您可以自行学习高级知识。

在这里,我们有上一篇线程文章的修改示例。我们从一些直接导入开始:”Process”,”Queue”和”current_process”。

我稍微修改了我们的count函数。在详细介绍之前,我想介绍一下多进程队列。队列也存在于Threading库中;但是,共享值并没有那么简单的方法,因为进程在单独的内存空间中运行。这是队列起作用的地方。您将队列附加到流程,以允许将值从一个流程传输到另一个流程。

要将值插入队列,我们 ​​ 使用queue.put(item)。要获取值,我们使用”queue.get()”。排队工作的方式是先进先出或 FIFO,这意味着您将“放入”队列中的第一个项目是您第一次调用”get()”时将获得的东西。

回到我们的例子。 “count”以前需要四个参数。因为 current_process()。name 返回的是 process-1,process-2 等,所以我们可以忽略传递当前的进程计数。其他所有内容都是相同的,直到我们将结果放入队列中。

我们程序的第一步是创建一个“队列”,我们可以使用它在进程之间共享数据。接下来,我们创建一个空列表来保存我们的结果和过程。最后,我们使用 process_count 来创建一个变量,以保存要用于此计算的进程数。我们这样做的目的是,如果我们需要加强工作或执行某些流程,则可以在一处完成所有工作,以轻松进行更新。

我们通过传递“队列”而不是“结果”列表来更新创建新流程的方式。在第二个循环中启动每个进程仍然与之前相同。在第三个循环中,我们进行了一些更改。在使用 process.join()之前,我们需要从队列中取出每个值并将其存储在结果列表中。

最后,我们使用”queue.close()”来确保自己清理后,然后通过打印出长度“结果”列表中的第一个元素。

异步方法的差异

现在,我们已经介绍了可用的各种异步选项,下面让我们看一下最重要的方面:何时使用它们。

线程

  • 数据可以在线程之间轻松共享。
  • 当您受 I / O 限制(例如磁盘读/写)但连接很多时,最适合使用。
  • 仅使用一个 CPU 内核。

多进程

  • 使用多个 CPU 内核。
  • 能够一次写入许多磁盘。
  • 当您需要非常快速地执行许多计算时最好。
  • 除非另有说明,否则在其他内核中仅使用一个线程。 (如果您没有计划好的设计,代码很快就会变得混乱)
  • 进程之间共享数据很困难。

异步

  • 依赖线程,但是线程将任务委托给系统以执行工作和委托任务产生的结果。
  • 与多进程程序配对时,可以有效地将所有内核与多个线程委派给系统使用。
  • 最适用于 I / O 速度慢但需要许多连接(例如 Web 服务器或 Web 搜寻器)的情况。
  • 由于它依赖于线程,因此不适合基于 CPU 的任务。

摘要

今天,我们了解了多进程以及如何将其用于快速执行计算。我们做了更多的数据科学工作,并看到在示例中从线程切换到多进程时,它大大减少了处理时间,这是基于 CPU 的任务。

我们还研究了如何在流程之间共享数据。虽然看起来很容易,因为这里有示例,但是我第一次尝试使用多进程时花了一些时间才使它正确。当您编写更多的多进程代码时,它将变得更加容易。所以继续练习。

最后,我们回顾了编写异步代码的所有不同方法,并查看了一些技巧来帮助我们记住哪种工具最适合工作。

建议阅读

来自 Python 文档的多进程。

多进程—基于进程的并行性— Python 3.7.4rc1 文档

Python Wiki 上的 Global Interpreter Lock。

GlobalInterpreterLock-Python Wiki

下一步是什么?

毕竟,我们的主题不足。我们已经介绍了 Python 教程中的大部分内容,还有一些未包含的内容。我已经决定不想覆盖标准库。在您编写代码时自然会发生这种情况,如果这样做的话会使您超负荷。

我们确实需要涵盖所有编程语言的一个关键方面。测试!

是的,您可以编写程序而无需编写测试,并且该程序可以工作(请参见此处的先前示例)。到目前为止,测试是您为应用程序要做的最重要的事情。

但是,让我们面对现实吧,即使您的应用程序旨在执行某些有趣的事情,但如果它无法完成任何事,则完全没有用。因此,暂时不要发布您的应用。测试一下。

Rating: