DEV Community

Fomalhaut Weisszwerg
Fomalhaut Weisszwerg

Posted on

1

concurrent.futures を使った並列処理

現在所属している FLINTERS が 2024 年 1 月で 10 周年らしく、ブログリレー企画が立ったので久しぶりに日本語オンリー記事を書くことにしました。

ブログリレー 98 日目らしいです。

1. Python 標準の並列処理モジュール

Python 標準モジュールには並列処理に関連した複数のモジュールが存在します。
よく知られているものとしては

  • _thread モジュール

    • マルチスレッドを実装するためのモジュール
    • Python 2 における thread モジュールとほぼ同じ
    • 直接使われることはほぼない
  • threading モジュール

    • マルチスレッドを実装するためのモジュール
    • 低レイヤー部分をラップしてある程度使いやすくしたモジュール
    • I/O 待ちが多いプログラムの効率化などで稀に使われることがある
  • _multiprocessing モジュール

    • 直接使うことはまずない
  • multiprocessing モジュール

    • マルチプロセスを実装するためのモジュール
    • 低レイヤー部分をラップしてある程度使いやすくしたモジュール
    • 標準モジュールの中では一番使用されることが多い
  • concurrent.futures モジュール

    • Future パターンに特化したモジュール
    • 今回の主題

2. subprocess モジュールはどうなの?

Python 3.10 からデフォルトでは内部処理で vfork() システムコールが使用されるようになりました。vfork() システムコールは子プロセスの処理が終了するまで親プロセスの動作を停止させます。

このため subprocess は並列処理の実装にはあまり向かないモジュールになったと言えます。

3. concurrent.futures モジュール

3.1. concurrent.futures モジュールって何?

Future パターン に特化したモジュールです。

threading, multiprocess をさらにラップして、並列処理をより簡単に実装できるようにしています。

concurrent.futures モジュールを利用した実装は OS ごとの差異をほとんど気にする必要がないので、プラットフォームポータビリティが高くメンテコストを抑えたコードを書くことが可能です。

スレッドとプロセスをほぼ同じように扱えるので、マルチスレッドとマルチプロセスどちらにしたほうがより効率的になるのかを実際に動かして比較検討するといったことも容易にできます。

また、 concurrent.futures モジュールを活用することで分散並列処理 (distributed computing) も可能になります。実際に
dask.distributed
というライブラリは concurrent.futuresdask を拡張することで実装されています。

3.2. 実装の流れ

concurrent.future を使った実装は

  1. 関数またはメソッドを実装
  2. Executor に関数またはメソッドを渡して実行させる
  3. 結果を評価

という流れになります。

3.3. 実装例

GitHub topics と dev.to rss を HTTP GET してブラウザで開くだけの単純なプログラムです。

"""
Sample with `concurrent.futures` module.
"""

import concurrent.futures
import multiprocessing
import tempfile
import webbrowser

import httpx

class SampleExecutor:
    def __init__(self) -> None:
        self.list_rss = []
        self.executor = concurrent.futures.ProcessPoolExecutor(
            max_workers=int(multiprocessing.cpu_count() / 4),
            # In Python 3.14, `ProcessPoolExecutor` should specify `multiprocessing.get_context("fork")`
            # for `mp_context` explicitly. See:
            # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
            mp_context=multiprocessing.get_context("fork"),
        )

    def precede_get_rss(self, urls: list[str]) -> None:
        """
        method:
            precede_get_rss(URLs)

        description:
            Get RSS from URLs.
        """
        self.futures = [self.executor.submit(httpx.get, url) for url in urls]

    def postprocess_get_rss(self) -> None:
        """
        method:
            postprocess_get_rss()

        description:
            Postprocess results of HTTP GET.
        """
        for future in concurrent.futures.as_completed(self.futures):
            if future.exception() is None:
                http_response = future.result()
                if http_response.status_code == httpx.codes.OK:
                    self.list_rss.append(http_response.text)

    def _open_default_browser(self) -> None:
        """
        method:
            _open_default_browser()

        description:
            Open RSS in the default browser.

        note:
            - On Windows, this method might not work correctly...
        """
        for rss_data in self.list_rss:
            with tempfile.NamedTemporaryFile(mode="wt", delete=False) as temp_file:
                temp_file.write(rss_data)
                temp_file.flush()

            webbrowser.open(temp_file.name)


if __name__ == "__main__":
    target_urls = ("https://github.com/topics/rss", "https://dev.to/t/rss")

    sample_executor = SampleExecutor()
    sample_executor.precede_get_rss(target_urls)
    sample_executor.postprocess_get_rss()
    sample_executor._open_default_browser()
Enter fullscreen mode Exit fullscreen mode

precede_get_rss() メソッド内部で concurrent.futures を使ってレスポンスを待つことなく複数の URL へアクセスしています。

Future パターンでは結果処理を任意のタイミングにできるので、URL へのアクセスに使用したメソッドとは異なる postprocess_get_rss() に分けています。

上の例では precede_get_rss の直後に postprocess_get_rss を呼び出していますが、この 2 つのメソッドの間に全く別の処理を挟んだとしても何も問題はありません。

3.4. submit() or map()

Executor にタスクを渡すメソッドとして submit()map() がありますが、これらはどうやって使い分けたらいいのでしょうか?

個人的には次のように考えています

  • submit()

    • すべてのタスクを同時に開始する必要がないとき
    • on demand でプールにタスクを入れていきたいとき
  • map()

    • 複数のタスクの開始をなるべく同時にしたいとき
    • すべてのタスクの完了を待つ必要があるとき

3.5. as_completed() or wait()

submit() メソッドは即座に Future オブジェクトを返しますが、オブジェクトが返された時点ではタスクが完了していないことがほとんどです。

結果を処理するためにはタスクの完了を待つ必要がありますが、タスクが完了するまで待つための関数として as_completed()wait() の 2 つが用意されています。

as_completed()wait() はどのように使い分ければいいでしょうか?

個人的には下記のように使い分けています

  • as_completed()

    • タスクの追加順序を考慮する必要がない場合
    • 複数のタスクの結果を同時に使用する必要がない場合
    • 完了したタスクの結果から順に処理したい場合
  • wait()

    • すべてのタスクの完了を待ってから次に進みたい場合
    • タスクのいずれか一つでもエラーが発生したら即座にエラーハンドリングしたい場合

3.6. 問題が発生した場合にプロセスを停止させるには?

concurrent.futures.ProcessPoolExecutor を使用したマルチプロセスプログラムの欠点として Future.cancel() メソッドでは「実行中のプロセスはキャンセルできない」ということが挙げられます。

デッドロックが発生してしまった場合に、そのプロセスに対応する Future オブジェクトの cancel() を呼び出してもプロセスを終了させることはできません。

プール内のプロセスでデッドロックが発生してしまった場合は ProcessPoolExecutor._processes からプロセスオブジェクトにアクセスできるので、そこからプロセスを kill します。

デッドロックなどの問題が発生してしまったなどでプール内のプロセスを終了させたい場合は下記のようになります。

pool_executor = concurrent.futures.ProcessPoolExecutor()
futures = [pool_executor.submit(task, arg) for arg in args]

try:
    for future in concurrent.futures.as_completed(futures, timeout=3600):
        # 処理結果を使ってやりたいことをここに実装

except concurrent.futures.TimeoutError:
    for process in pool_executor._processes:
        process.kill()

    # プール内のプロセスを強制終了させた場合、パイプなどが破壊されるので
    # ProcessPoolExecuter もシャットダウンさせてつくりなおしたほうがいい
    pool_executor.shutdown()
Enter fullscreen mode Exit fullscreen mode

デッドロックなどの問題が発生した際のプロセス制御に難があるものの、それを除けば concurrent.futures モジュールを使うことで比較的簡単にマルチスレッド、マルチプロセスプログラミングが可能になったと思います。

一方で細かい制御を行いたい場合はこれまで通り threading モジュール、 multiprocessing モジュールを使うことになると思います。

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more