ごく簡単な Python multiprocessing の使い方の話です。

multiprocessing before/after

例えば次のようなコードがあります。

import time


def calc(x):
    a = x * 2
    print(a)
    time.sleep(2)

if __name__ == '__main__':
    data = [1, 2, 3, 4, 5]

    [calc(x) for x in data]

この処理を、後から並列で実行したくなったとします。

import time
import multiprocessing


def worker(data):
    [calc(x) for x in data]


def calc(x):
    a = x * 2
    print(a)
    time.sleep(2)

if __name__ == '__main__':
    split_data = [[1, 2, 3], [4, 5]]

    jobs = []
    for data in split_data:
        job = multiprocessing.Process(target=worker, args=(data, ))
        jobs.append(job)
        job.start()

    [job.join() for job in jobs]

    print('Finish')

このコードは、子プロセスを2つ生成して処理を実行します。

処理対象のリスト = data を2分割したものが split_data です。分割は CPU のコア数などに応じて動的に行われるべきですが、本題ではないので手動で分割しています。for文の中身はごく簡単な multiprocessing の使い方です。target で指定している worker() は Wrapper的な役割を持たせたメソッドで、新規に追加したものです。

worker() 経由で実行すると calc() に変更を加えなくて良いため、お手軽感があります。

multiprocessingの間違いあるある

join() を省略するとどうなるでしょうか。

for data in split_data:
    job = multiprocessing.Process(target=worker, args=(data, ))
    jobs.append(job)
    job.start()

print('Finish')

並列処理が完了する前に print() が実行されてしまい、後続の処理がある場合問題になります。

join() の位置を以下のようにするとどうなるでしょうか。

for data in split_data:
    job = multiprocessing.Process(target=worker, args=(data, ))
    job.start()
    job.join()

print('Finish')

start() したプロセスの完了をfor文の中で待つことになるので、シングルプロセスでの実行と同じになってしまいます。

まとめ

  • multiprocessing で直列に行っていた繰り返し処理を簡単に並列化できます
  • join() は適切に行いましょう