(Python)並行処理と並列処理について
並行・並列
- 並行は英語でConcurrent、並列はParallelと表現される
- 並行と並列は別ものである by RobPike
並行(Concurrent)
- 複数のスレッドを使って処理を走らせる
- スレッドは共有のメモリ領域を利用する
- 同時にいつくかの質の異なることを扱う
並列(Parallel or Parallelism)
- 複数のプロセスを使った処理
- プロセスは専用のメモリ領域を利用する
- 同時にいくつかの質の同じことを扱う
プロセス作成とスレッド作成
複数のスレッドを使った処理をする場合、同一のプロセス内でスレッドが切り替わりながら処理が進む。
Apacheの設定にあるpreforkとworkerは、プロセス、スレッドのどちらかで動かすかの違い
複数のプロセスを使って処理をする場合
- コアが1つだけのCPUであれば、やはりプロセスを切り替えながら処理が進むが
- コアが複数であったり、CPU自体が複数ある場合は、1つのコアに1つのプロセスが処理できるため、同時的に処理が進む
ライス、みそ汁、焼き魚の定食を作るとすると
1人の人間が進める場合
- 湯を沸かしている間に炊飯器をセットし、魚をやくためのグリルを温める...というように
- 作業を切り替えながら同時進行させていくのが並行処理に相当する
3人の人間が進める場合
- それぞれの品を担当して同時進行させていくのが並列処理に相当する
1人の人間が作業を切り替えながら料理を進行していくのは3人で進める場合に比べて忙しそうだがすべて自分好みの味にできる。 これは処理対象の情報を自分の中で共有できているから。
pythonではmultithread,multiprocess,asyncioを使ったプログラミングでは細やかな多重処理の制御が行えるが、 そもそも並列処理プログラミング自体がなかなか複雑になりやすい面もある
Pythonで比較的扱いやすいライブラリであるconcurrent.Futureを使ってみる
concurrent.Futureライブラリについて
Futureとは他の言語やライブラリではpromise,delayなどとも呼ばれ、 ある処理の結果が後で取得されることを前提に処理の実装が行えるようにするもの
並列処理をマルチスレッドで行いたい場合
- ThreadPoolExecutorメソッドを使用する
- スレッドを使って並列タスクを実行する
- ネットワークアクセスなどCPUに負荷がかからない処理の並列実行に適している
マルチプロセスで行いたい場合
- ProcessPoolExcutorメソッドを使用する
- プロセスを使って並列タスクを実行する
- CPUに負荷がかかる計算処理などの並列実行に適している
必要なライブラリ
pip install futures pip install requests
""" 音楽ファイルの並列ダウンロード """ import concurrent.futures from os import path from urllib import parse import requests import mylogger import shutil # ログ設定 logger = mylogger.get_my_logger(__name__) MUSIC_URLS = ['https://archive.org/download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3', 'https://archive.org/download/ThePianoMusicOfMauriceRavel/02JeuxDeauPourPianoMr30.mp3', 'https://archive.org/download/ThePianoMusicOfMauriceRavel/03SonatinePourPianoMr40-Modr.mp3', 'https://archive.org/download/ThePianoMusicOfMauriceRavel/04MouvementDeMenuet.mp3', 'https://archive.org/download/ThePianoMusicOfMauriceRavel/05Anim.mp3'] def download(url, timeout=180): # mp3のファイル名をURLから取り出す parse_url = parse.urlparse(url) file_name = path.basename(parse_url.path) # ダウンロード開始をログ出力 logger.info("[download start] {file_name}".format(file_name=file_name)) # 音楽ファイルのダウンロード r = requests.get(url,timeout=timeout) # ダウンロードの終了ログをログ出力する logger.info("[download finished] {file_name}".format(file_name=file_name)) with open(file_name, "wb") as file: shutil.copyfileobj(r.raw, file) return if __name__ == '__main__': # 同時に2つの処理を並行実行するためのexecutorを作成 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: ##with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: logger.info("[main start]") # executor.submit()によりdownload関数を並行実行する。download関数の引数にmusic_urlを与えている # 並行実行処理のまとまりを futures 変数に入れておく futures = [executor.submit(download, music_url) for music_url in MUSIC_URLS] # download()関数の処理が完了したものから future 変数に格納する for future in concurrent.futures.as_completed(futures): # download関数の実行結果を resultメソッドで取り出す music = future.result() logger.info("[main finished]")
""" loggerは別パッケージ """ import logging from logging import getLogger, StreamHandler, Formatter def get_my_logger(name): # loggerオブジェクトの宣言 logger = getLogger(name) # loggerのログレベル設定(ハンドラに渡すエラーメッセージのレベル) logger.setLevel(logging.DEBUG) # handlerの生成 stream_handler = StreamHandler() # handlerのログレベル設定(ハンドラが出力するエラーメッセージのレベル) stream_handler.setLevel(logging.DEBUG) # ログ出力フォーマット設定 handler_format = Formatter('[%(levelname)s]\t%(asctime)s\tprocess:%(process)d\tthread:%(thread)d\tmodule:%(module)s\t%(pathname)s:%(lineno)d\t%(message)s') stream_handler.setFormatter(handler_format) # loggerにhandlerをセット logger.addHandler(stream_handler) return logger
- 出力結果(ThreadPoolExecutor)
[INFO] 2018-11-12 11:07:23,600 process:17648 thread:14680 module:multi2 multi2.py:50 [main start] [DEBUG] 2018-11-12 11:07:23,601 process:17648 thread:16028 module:multi2 multi2.py:33 [download start] sleep: 2.0 01PavanePourUneInfanteDfuntePourPianoMr19.mp3 [DEBUG] 2018-11-12 11:07:23,601 process:17648 thread:15308 module:multi2 multi2.py:33 [download start] sleep: 3.5 02JeuxDeauPourPianoMr30.mp3 [INFO] 2018-11-12 11:07:53,104 process:17648 thread:16028 module:multi2 multi2.py:39 [download finished] 01PavanePourUneInfanteDfuntePourPianoMr19.mp3 [DEBUG] 2018-11-12 11:07:53,105 process:17648 thread:16028 module:multi2 multi2.py:33 [download start] sleep: 1.5 03SonatinePourPianoMr40-Modr.mp3 [INFO] 2018-11-12 11:07:54,921 process:17648 thread:15308 module:multi2 multi2.py:39 [download finished] 02JeuxDeauPourPianoMr30.mp3 [DEBUG] 2018-11-12 11:07:54,923 process:17648 thread:15308 module:multi2 multi2.py:33 [download start] sleep: 2.0 04MouvementDeMenuet.mp3 [INFO] 2018-11-12 11:08:05,049 process:17648 thread:16028 module:multi2 multi2.py:39 [download finished] 03SonatinePourPianoMr40-Modr.mp3 [DEBUG] 2018-11-12 11:08:05,050 process:17648 thread:16028 module:multi2 multi2.py:33 [download start] sleep: 2.0 05Anim.mp3 [INFO] 2018-11-12 11:08:13,564 process:17648 thread:15308 module:multi2 multi2.py:39 [download finished] 04MouvementDeMenuet.mp3 [INFO] 2018-11-12 11:08:17,042 process:17648 thread:16028 module:multi2 multi2.py:39 [download finished] 05Anim.mp3 [INFO] 2018-11-12 11:08:17,043 process:17648 thread:14680 module:multi2 multi2.py:61 [main finished]
ログの時刻に注目すると、最初に2つのファイルのダウンロードが開始され、 そのうち1つ終了すると、次のファイルのダウンロードが開始される
- 出力結果(ProcessPoolExecutor)
[INFO] 2018-11-12 11:09:59,411 process:14408 thread:13500 module:multi2 multi2.py:50 [main start] [DEBUG] 2018-11-12 11:09:59,553 process:16508 thread:948 module:multi2 multi2.py:33 [download start] sleep: 3.5 01PavanePourUneInfanteDfuntePourPianoMr19.mp3 [DEBUG] 2018-11-12 11:09:59,553 process:16928 thread:1588 module:multi2 multi2.py:33 [download start] sleep: 2.5 02JeuxDeauPourPianoMr30.mp3 [INFO] 2018-11-12 11:10:23,491 process:16508 thread:948 module:multi2 multi2.py:39 [download finished] 01PavanePourUneInfanteDfuntePourPianoMr19.mp3 [DEBUG] 2018-11-12 11:10:23,493 process:16508 thread:948 module:multi2 multi2.py:33 [download start] sleep: 2.5 03SonatinePourPianoMr40-Modr.mp3 [INFO] 2018-11-12 11:10:34,172 process:16928 thread:1588 module:multi2 multi2.py:39 [download finished] 02JeuxDeauPourPianoMr30.mp3 [DEBUG] 2018-11-12 11:10:34,173 process:16928 thread:1588 module:multi2 multi2.py:33 [download start] sleep: 2.0 04MouvementDeMenuet.mp3 [INFO] 2018-11-12 11:10:37,539 process:16508 thread:948 module:multi2 multi2.py:39 [download finished] 03SonatinePourPianoMr40-Modr.mp3 [DEBUG] 2018-11-12 11:10:37,541 process:16508 thread:948 module:multi2 multi2.py:33 [download start] sleep: 2.5 05Anim.mp3 [INFO] 2018-11-12 11:10:46,934 process:16508 thread:948 module:multi2 multi2.py:39 [download finished] 05Anim.mp3 [INFO] 2018-11-12 11:10:47,823 process:16928 thread:1588 module:multi2 multi2.py:39 [download finished] 04MouvementDeMenuet.mp3 [INFO] 2018-11-12 11:10:47,824 process:14408 thread:13500 module:multi2 multi2.py:61 [main finished]
- ProcessPoolExecutorはプロセス間によって実現しているため、いつくか制限がある
補足
- pickleモジュールはPythonのオブジェクトを直列化・非直列化するための機能を提供している 直列化(Serialize)というのはプログラミング言語においてオブジェクトをバイト配列などの表現に変換すること
参考URL
- https://www.slideshare.net/takuyaueda967/go-77689475
- https://qiita.com/mimitaro/items/9fa7e054d60290d13bfc
- https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012
- https://blog.amedama.jp/entry/2015/12/05/132520
- https://blog.amedama.jp/entry/2016/02/15/223227
- https://moro-archive.hatenablog.com/entry/2014/09/11/013520