web-technical-blog

web開発に関する技術メモ

(Python)並行処理と並列処理について

並行・並列

  • 並行は英語でConcurrent、並列はParallelと表現される
  • 並行と並列は別ものである by RobPike

並行(Concurrent)

  • 複数のスレッドを使って処理を走らせる
  • スレッドは共有のメモリ領域を利用する
  • 同時にいつくかの質の異なることを扱う

並列(Parallel or Parallelism)

  • 複数のプロセスを使った処理
  • プロセスは専用のメモリ領域を利用する
  • 同時にいくつかの質の同じことを扱う

f:id:yoshitachi:20181112114605p:plain

プロセス作成とスレッド作成

f:id:yoshitachi:20181112114222p:plain

複数のスレッドを使った処理をする場合、同一のプロセス内でスレッドが切り替わりながら処理が進む。

Apacheの設定にあるpreforkとworkerは、プロセス、スレッドのどちらかで動かすかの違い

複数のプロセスを使って処理をする場合

  • コアが1つだけのCPUであれば、やはりプロセスを切り替えながら処理が進むが
  • コアが複数であったり、CPU自体が複数ある場合は、1つのコアに1つのプロセスが処理できるため、同時的に処理が進む

ライス、みそ汁、焼き魚の定食を作るとすると

  • 1人の人間が進める場合

    • 湯を沸かしている間に炊飯器をセットし、魚をやくためのグリルを温める...というように
    • 作業を切り替えながら同時進行させていくのが並行処理に相当する
  • 3人の人間が進める場合

    • それぞれの品を担当して同時進行させていくのが並列処理に相当する

1人の人間が作業を切り替えながら料理を進行していくのは3人で進める場合に比べて忙しそうだがすべて自分好みの味にできる。 これは処理対象の情報を自分の中で共有できているから。

pythonではmultithread,multiprocess,asyncioを使ったプログラミングでは細やかな多重処理の制御が行えるが、 そもそも並列処理プログラミング自体がなかなか複雑になりやすい面もある

Pythonで比較的扱いやすいライブラリであるconcurrent.Futureを使ってみる

concurrent.Futureライブラリについて

Futureとは他の言語やライブラリではpromise,delayなどとも呼ばれ、 ある処理の結果が後で取得されることを前提に処理の実装が行えるようにするもの

並列処理をマルチスレッドで行いたい場合

  • ThreadPoolExecutorメソッドを使用する
    • スレッドを使って並列タスクを実行する
    • ネットワークアクセスなどCPUに負荷がかからない処理の並列実行に適している

マルチプロセスで行いたい場合

  • ProcessPoolExcutorメソッドを使用する
    • プロセスを使って並列タスクを実行する
    • CPUに負荷がかかる計算処理などの並列実行に適している

必要なライブラリ

pip install futures
pip install requests
"""
音楽ファイルの並列ダウンロード
"""
import concurrent.futures
from os import path
from urllib import parse
import requests
import mylogger
import shutil

# ログ設定
logger = mylogger.get_my_logger(__name__)

MUSIC_URLS = ['https://archive.org/download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3',
              'https://archive.org/download/ThePianoMusicOfMauriceRavel/02JeuxDeauPourPianoMr30.mp3',
              'https://archive.org/download/ThePianoMusicOfMauriceRavel/03SonatinePourPianoMr40-Modr.mp3',
              'https://archive.org/download/ThePianoMusicOfMauriceRavel/04MouvementDeMenuet.mp3',
              'https://archive.org/download/ThePianoMusicOfMauriceRavel/05Anim.mp3']

def download(url, timeout=180):
    # mp3のファイル名をURLから取り出す
    parse_url = parse.urlparse(url)
    file_name = path.basename(parse_url.path)

    # ダウンロード開始をログ出力
    logger.info("[download start] {file_name}".format(file_name=file_name))

    # 音楽ファイルのダウンロード
    r = requests.get(url,timeout=timeout)

    # ダウンロードの終了ログをログ出力する
    logger.info("[download finished] {file_name}".format(file_name=file_name))

    with open(file_name, "wb") as file:
        shutil.copyfileobj(r.raw, file)

    return

if __name__ == '__main__':
    # 同時に2つの処理を並行実行するためのexecutorを作成
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    ##with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        logger.info("[main start]")

        # executor.submit()によりdownload関数を並行実行する。download関数の引数にmusic_urlを与えている
        # 並行実行処理のまとまりを futures 変数に入れておく
        futures = [executor.submit(download, music_url) for music_url in MUSIC_URLS]

        # download()関数の処理が完了したものから future 変数に格納する
        for future in concurrent.futures.as_completed(futures):
            # download関数の実行結果を resultメソッドで取り出す
            music = future.result()

        logger.info("[main finished]")
"""
loggerは別パッケージ
"""
import logging
from logging import getLogger, StreamHandler, Formatter

def get_my_logger(name):
    # loggerオブジェクトの宣言
    logger = getLogger(name)

    # loggerのログレベル設定(ハンドラに渡すエラーメッセージのレベル)
    logger.setLevel(logging.DEBUG)

    # handlerの生成
    stream_handler = StreamHandler()

    # handlerのログレベル設定(ハンドラが出力するエラーメッセージのレベル)
    stream_handler.setLevel(logging.DEBUG)

    # ログ出力フォーマット設定
    handler_format = Formatter('[%(levelname)s]\t%(asctime)s\tprocess:%(process)d\tthread:%(thread)d\tmodule:%(module)s\t%(pathname)s:%(lineno)d\t%(message)s')
    stream_handler.setFormatter(handler_format)

    # loggerにhandlerをセット
    logger.addHandler(stream_handler)

    return logger
  • 出力結果(ThreadPoolExecutor)
[INFO]  2018-11-12 11:07:23,600 process:17648   thread:14680    module:multi2   multi2.py:50    [main start]
[DEBUG] 2018-11-12 11:07:23,601 process:17648   thread:16028    module:multi2   multi2.py:33    [download start] sleep: 2.0 01PavanePourUneInfanteDfuntePourPianoMr19.mp3
[DEBUG] 2018-11-12 11:07:23,601 process:17648   thread:15308    module:multi2   multi2.py:33    [download start] sleep: 3.5 02JeuxDeauPourPianoMr30.mp3
[INFO]  2018-11-12 11:07:53,104 process:17648   thread:16028    module:multi2   multi2.py:39    [download finished] 01PavanePourUneInfanteDfuntePourPianoMr19.mp3
[DEBUG] 2018-11-12 11:07:53,105 process:17648   thread:16028    module:multi2   multi2.py:33    [download start] sleep: 1.5 03SonatinePourPianoMr40-Modr.mp3
[INFO]  2018-11-12 11:07:54,921 process:17648   thread:15308    module:multi2   multi2.py:39    [download finished] 02JeuxDeauPourPianoMr30.mp3
[DEBUG] 2018-11-12 11:07:54,923 process:17648   thread:15308    module:multi2   multi2.py:33    [download start] sleep: 2.0 04MouvementDeMenuet.mp3
[INFO]  2018-11-12 11:08:05,049 process:17648   thread:16028    module:multi2   multi2.py:39    [download finished] 03SonatinePourPianoMr40-Modr.mp3
[DEBUG] 2018-11-12 11:08:05,050 process:17648   thread:16028    module:multi2   multi2.py:33    [download start] sleep: 2.0 05Anim.mp3
[INFO]  2018-11-12 11:08:13,564 process:17648   thread:15308    module:multi2   multi2.py:39    [download finished] 04MouvementDeMenuet.mp3
[INFO]  2018-11-12 11:08:17,042 process:17648   thread:16028    module:multi2   multi2.py:39    [download finished] 05Anim.mp3
[INFO]  2018-11-12 11:08:17,043 process:17648   thread:14680    module:multi2   multi2.py:61    [main finished]

ログの時刻に注目すると、最初に2つのファイルのダウンロードが開始され、 そのうち1つ終了すると、次のファイルのダウンロードが開始される

  • 出力結果(ProcessPoolExecutor)
[INFO]  2018-11-12 11:09:59,411 process:14408   thread:13500    module:multi2   multi2.py:50    [main start]
[DEBUG] 2018-11-12 11:09:59,553 process:16508   thread:948      module:multi2   multi2.py:33 [download start] sleep: 3.5 01PavanePourUneInfanteDfuntePourPianoMr19.mp3
[DEBUG] 2018-11-12 11:09:59,553 process:16928   thread:1588     module:multi2   multi2.py:33 [download start] sleep: 2.5 02JeuxDeauPourPianoMr30.mp3
[INFO]  2018-11-12 11:10:23,491 process:16508   thread:948      module:multi2   multi2.py:39 [download finished] 01PavanePourUneInfanteDfuntePourPianoMr19.mp3
[DEBUG] 2018-11-12 11:10:23,493 process:16508   thread:948      module:multi2   multi2.py:33 [download start] sleep: 2.5 03SonatinePourPianoMr40-Modr.mp3
[INFO]  2018-11-12 11:10:34,172 process:16928   thread:1588     module:multi2   multi2.py:39 [download finished] 02JeuxDeauPourPianoMr30.mp3
[DEBUG] 2018-11-12 11:10:34,173 process:16928   thread:1588     module:multi2   multi2.py:33 [download start] sleep: 2.0 04MouvementDeMenuet.mp3
[INFO]  2018-11-12 11:10:37,539 process:16508   thread:948      module:multi2   multi2.py:39 [download finished] 03SonatinePourPianoMr40-Modr.mp3
[DEBUG] 2018-11-12 11:10:37,541 process:16508   thread:948      module:multi2   multi2.py:33 [download start] sleep: 2.5 05Anim.mp3
[INFO]  2018-11-12 11:10:46,934 process:16508   thread:948      module:multi2   multi2.py:39 [download finished] 05Anim.mp3
[INFO]  2018-11-12 11:10:47,823 process:16928   thread:1588     module:multi2   multi2.py:39 [download finished] 04MouvementDeMenuet.mp3
[INFO]  2018-11-12 11:10:47,824 process:14408   thread:13500    module:multi2   multi2.py:61    [main finished]
  • ProcessPoolExecutorはプロセス間によって実現しているため、いつくか制限がある
    • 関数の引数および戻り値は、pickleを使ってシリアライズ可能なオブジェクトでなければいけない
    • 関数自体もプロセス間で渡さなければならない。インスタンスメソッドもNG。ラムダ式はOK
    • 関数の中で副作用としてグローバル変数を書き換えるなどしても、呼び元のプロセスには反映されない
補足
  • pickleモジュールはPythonのオブジェクトを直列化・非直列化するための機能を提供している 直列化(Serialize)というのはプログラミング言語においてオブジェクトをバイト配列などの表現に変換すること
参考URL