Pythonで並行/並列処理 concurrent.futuresの紹介

concurrent.futures

Pythonで非同期実行を行うためのモジュールです。

標準ライブラリに含まれているのでインストールの必要はありません。

なお、concurrentパッケージに含まれるモジュールは現時点でfuturesのみです。

実装

マルチスレッドの場合、ThreadPoolExecutorを用います。

1秒かかる処理funcを8回実行したいとしましょう。

1
2
3
4
5
6
7
8
9
from concurrent.futures import ThreadPoolExecutor
import time
def func():
time.sleep(1)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as e:
for i in range(8):
e.submit(func)
print (time.time()-start)

ThreadPoolExecutor(max_workers=4)としてワーカーを4つ作成しています。
そのため処理が8/4=2で約2秒で終わります。

では、ワーカーを6つに増やすとどうでしょうか。考えてから実行してみてください。

マルチプロセスの場合、ProcessPoolExecutorを用います。

1
2
3
4
5
6
7
8
9
from concurrent.futures import ProcessPoolExecutor
import time
def func():
time.sleep(1)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as e:
for i in range(8):
e.submit(func)
print (time.time()-start)

マルチプロセスでも同様の高速化ができましたね。

今回のケースでは、速度面では違いはありません。

解説

funcと呼ばれるI/Oバウンドな処理を模擬した関数を実装しています。

I/Oバウンドとは、I/Oに負荷がかかることを意味しており、
ディスクの読み書きが遅い場合や通信の遅延が大きい場合に当たります。

I/Oバウンドに対しては、マルチスレッドとマルチプロセスはいずれも有効です。
(サンプルコードの通り)

一方、CPUバウンドな処理については、マルチスレッドによる高速化は期待できませんので使い分けに注意してください。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from concurrent.futures import ThreadPoolExecutor
import time
def func():
s = 0
for i in range(10000000):
s += 1
# 逐次処理
start = time.time()
for i in range(8):
func()
print (time.time()-start)

# マルチスレッド
start = time.time()
with ThreadPoolExecutor(max_workers=4) as e:
for i in range(8):
e.submit(func)
print (time.time()-start)

# マルチプロセス
start = time.time()
with ProcessPoolExecutor(max_workers=4) as e:
for i in range(8):
e.submit(func)
print (time.time()-start)

逐次処理とマルチスレッドの違いはあまりなく、マルチプロセスが最も速いはずです。

記事情報

  • 投稿日:2020年4月2日
  • 最終更新日:2020年5月4日