[Python] 処理のタイムアウトを実装する方法

[Python] 処理のタイムアウトを実装する方法

Python でタイムアウト

Python で実装した処理が一定時間以上経過しても終了しない場合、強制的に終了するような制御(タイムアウト)を実装する方法をまとめます。実装方法は幾つかありますが、非同期処理なのか同期処理なのかで実装方法や挙動が変わってくるので注意が必要です。

非同期処理をタイムアウト制御する方法

高水準の API インデックス — Python 3.9.1 ドキュメント

タイムアウトになるとタスクがキャンセルされる実装

まず非同期処理(awaitできる処理)については簡単です。標準ライブラリの asynciowait_for というタイムアウトが制御できる機能が提供されています。これを使えば指定時間内で非同期処理が終わらない場合、タイムアウト例外が発生し、実行中の処理が中断されます。

例えば次のような時間のかかる見込みのある非同期関数(heavy_work)が存在するとします。これを1秒以内に完了しない場合、強制的にタイムアウトとして終了させるには次のように実装できます。

import asyncio

async def main():
    try:
        print('main() start ...')
        
        # タイムアウト時間を決めて実行する
        await asyncio.wait_for(heavy_work(), timeout=1)
        
        print('main() finished ...')
    except asyncio.TimeoutError:
        # Timeoutが発生したとき
        print('heavy_work() time out ...')

async def heavy_work():
    print('heavy_work() start ...')
    await asyncio.sleep(5)
    print('heavy_work() finished ...')


asyncio.run(main())
print('end ...')

# 実行結果
# main() start ...
# heavy_work() start ...
# heavy_work() time out ...
# end ...

実行結果を確認すると、main() から heavy_work() か呼ばれ、タイムアウト時間経過後に TimeoutError が発生し、heavy_work() が終了していることがわかります。

asyncio.wait_for の説明を見ると次のようにあります。

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

aw awaitable が、完了するかタイムアウトになるのを待ちます。

aw がコルーチンだった場合、自動的に Task としてスケジュールされます。

timeout には None もしくは待つ秒数の浮動小数点数か整数を指定できます。 timeout が None の場合、 Future が完了するまで待ちます。

タイムアウトが起きた場合は、 Task をキャンセルし asyncio.TimeoutError を送出します。

Task の キャンセル を避けるためには、 shield() の中にラップしてください。

つまり awaitable な引数が完了するか指定された秒数が経過するのを待機するのが wait_for であり、タイムアウト発生時には引数の Task はキャンセルされます。

タイムアウトになってもタスクがキャンセルされない実装

タイムアウトして欲しいが実行中のタスクがキャンセルされるのは困る場合、以下のように shield() でラップすることで処理が中断されずに最後まで実行されます。この場合でももちろん例外は発生します。

import asyncio

async def main():
    # 実行をタスクにしておく
    task = asyncio.create_task(heavy_work())
    try:
        print('main() start ...')
        
        await asyncio.wait_for(asyncio.shield(task), timeout=1)
        
        print('main() finished ...')
    except asyncio.TimeoutError:
        print('heavy_work() time out ...')
    
    # タイムアウト後も実行されているので、終了を待機する
    await task

async def heavy_work():
    print('heavy_work() start ...')
    await asyncio.sleep(5)
    print('heavy_work() finished ...')


asyncio.run(main())
print('end ...')

# 実行結果
# main() start ...
# heavy_work() start ...
# heavy_work() time out ...
# heavy_work() finished ...
# end ...

heavy_work の実行を Task にしておき、そのタスクを asyncio.shield でラップすることでキャンセルされずに処理が続いているため、そのタスクの完了を待機すれば後続の処理の実行が確認できます。

試しに、asyncio.shield を外してみると、await task の箇所ではすでにタスクがキャンセルされているので asyncio.exceptions.CancelledError が発生してエラーになります。

同期処理を無理やり非同期にして制御してみるとうまくいかない

Python の async はキーワードを付けてマークするだけでいい感じに並行処理してくれるわけではありません。例えば time.sleepasyncio.sleep と違い完全に処理をブロッキングする同期的な処理なのでタイムアウトがうまくいきません。

import asyncio
import time

async def main():
    try:
        print('main() start ...')
        
        await asyncio.wait_for(heavy_work(), timeout=1)
        
        print('main() finished ...')
    except asyncio.TimeoutError:
        print('heavy_work() time out ...')

async def heavy_work():
    print('heavy_work() start ...')
    time.sleep(5)
    print('heavy_work() finished ...')


asyncio.run(main())
print('end ...')

# 実行結果
# main() start ...
# heavy_work() start ...
# heavy_work() finished ...
# main() finished ...
# end ...

上記の結果の通り、タイムアウト例外が発生することなく処理がすべて完了してしまいます。

もう少し動作を見てみるために、3秒 time.sleep して、同じく3秒 asyncio.sleep してみると、タイムアウト1秒だとどうなるか見てみます。

import asyncio
import time

async def main():
    try:
        print('main() start ...')
        
        await asyncio.wait_for(heavy_work(), timeout=1)
        
        print('main() finished ...')
    except asyncio.TimeoutError:
        print('heavy_work() time out ...')

async def heavy_work():
    print('heavy_work() start ...')
    time.sleep(3)
    print('time.sleep finished ...')
    await asyncio.sleep(3)
    print('asyncio.sleep finished ...')
    print('heavy_work() finished ...')


start = time.time()

asyncio.run(main())

end = time.time()
print(end - start)
print('end ...')

# 実行結果
# main() start ...
# heavy_work() start ...
# time.sleep finished ...
# heavy_work() time out ...
# 3.0012667179107666
# end ...

実行結果を見るとちょうど time.sleep が終了したタイミングで(3秒後に)タイムアウトしています。つまり同期処理の部分ではタイムアウトの確認はされず、await するタイミングでタイムアウトが発生しています。

ある処理を await しているときに Python が別の処理に制御を戻しているようです。タイムアウトの確認も await 時に行われるため同期処理だとタイムアウトが思うように機能しません。

以上より、同期処理については別の方法を考えなければいけません。

同期処理をタイムアウト制御する方法

単純に同期処理を await するだけではうまくいきません。以下のように一工夫すると同期処理を非同期処理にできます。

run_in_executor で非同期処理にしてタイムアウト制御

それが run_in_executor で実行することです。これは指定されたエグゼキューターで引数の関数と引数を使って処理を実行してくれます。実行結果は await きるので asyncio.wait_for でタイムアウトを制御できます。

import asyncio
import time

async def main():
    try:
        print('main() start ...')

        # time.sleep(5) をタイムアウト1秒で実行する
        loop = asyncio.get_running_loop()
        await asyncio.wait_for(
            loop.run_in_executor(None, heavy_work, 5), 
            timeout=1
        )
        
        print('main() finished ...')
    except asyncio.TimeoutError:
        print('heavy_work() time out ...')

def heavy_work(s):
    print('heavy_work() start ...')
    time.sleep(s)
    print('heavy_work() finished ...')

start = time.time()
asyncio.run(main())
end = time.time()

print(end - start)
print('end ...')

# 実行結果
# main() start ...
# heavy_work() start ...
# heavy_work() time out ...
# 1.0034291744232178
# end ...
# heavy_work() finished ...

実行結果を見るといい感じに見えます。ちゃんと1秒でタイムアウト例外が発生していることが確認できます。

ただし最終行の出力処理が呼ばれた後も処理が終了せず、heavy_work がキャンセルされません。つまり time.sleep(5) が終了するまでプログラム自体が終了していません。

非同期処理であればキャンセルできたのに、同期処理ではキャンセルできずにタイムアウト発生後も処理が終了しません。問題なければこれでもよいのですが、処理をキャンセルしてほしいのであればまた別の方法を試す必要があります。

以下のページが参考になります。

asynchronous – Python asyncio wait_for synchronous – Stack Overflow

マルチプロセスでタイムアウト実装

マルチプロセスを使えば簡単にタイムアウトできます。

from multiprocessing import Pool, TimeoutError
import time


def main():
    try:
        print('main() start ...')

        # マルチプロセスで実行してタイムアウト1秒で制御
        with Pool(processes=1) as p:
            apply_result = p.apply_async(heavy_work, (5,))
            apply_result.get(timeout=1)

        print('main() finished ...')
    except TimeoutError:
        print('heavy_work() time out ...')


def heavy_work(s):
    print('heavy_work() start ...')
    time.sleep(s)
    print('heavy_work() finished ...')


main()

# 実行結果
# main() start ...
# heavy_work() start ...
# heavy_work() time out ...

マルチプロセスで実行すればタイムアウトも、タスクのキャンセルもうまく実行できます。ちゃんと実行結果をみても開始からタイムアウトで終了できています。

マルチスレッドでタイムアウト実装

別スレッドで処理を並列実行し、タイムアウト制御することも可能です。

import time
import threading

def main():
    print(f'main() start ... {threading.get_ident()}')

    # 別スレッドで実行する
    t = threading.Thread(target=heavy_work, args=(5,))
    t.setDaemon(True)
    t.start()
    t.join(timeout=1)
    
    # タイムアウトしていればスレッドがまだ生きている
    if t.is_alive:
        print('heavy_work() time out ...')

    print('main() finished ...')

def heavy_work(s):
    print(f'heavy_work() start ... {threading.get_ident()}')
    time.sleep(s)
    print('heavy_work() finished ...')

main()

# 実行結果
# main() start ... 140523118137472
# heavy_work() start ... 140523099457280
# heavy_work() time out ...
# main() finished ...

スレッドを作ってそこで実行しています。t.join(timeout=1) でタイムアウトを1秒に設定して待機しています。処理の完了かタイムアウトまで待機し、待機後にまだ別スレッドが生きていればまだ処理中なのでタイムアウトしたと判定できます。

必要があれば何かしらの例外を自前で raise してもよいでしょう。

参考URL

以上。

Pythonカテゴリの最新記事