PythonのasyncioライブラリのHigh Level APIで非同期処理する
Pythonで非同期処理がしたい
昨今のLLMブーム、MCPブームでまたREST APIを叩く機会が増えてきました。
そしてREST APIのようなネットワーク処理は非同期処理したくなります。
AIではPythonを使うことが多いので、「Pythonでも非同期処理したい」と感じることが多くなりました。
これまでのPython非同期処理事情
これまで、Pythonの非同期は未成熟で洗練されていないイメージがありました。
例えば、一昔前、Pythonで非同期処理をしたい場合はマルチスレッドかマルチプロセスでした。
しかしマルチプロセスはREST APIをちょっと叩くという目的に対しては過剰に感じます。
マルチスレッドに関しては、実は内部ではシングルスレッドで動きますがIO処理中はGILが解放されるのでIO処理が多い場合はマルチスレッドっぽく動きます。 でも、そこまでPythonで頑張るくらいなら他の言語使った方がいいや、となります。
その後、asyncioが現れました。 しかし下記のように、event_loopというLow Levelな仕組みが丸見えのAPIは好きになれませんでした。
loop = asyncio.get_event_loop() loop.run_until_complete(hello())
ところが久し振りに調べてみたところ、最近はHigh LevelなAPIが用意され、上記のようなLow Level APIは使わなくてもよくなりました。 いい感じに仕上がった感じがします。
そこで今回はREST APIを非同期に叩く上でお世話になりそうなHigh Level APIに焦点を絞って使い方をまとめたいと思います。
備忘録としてサンプルコードを多めに掲載したので超長文になってしまいました。。。
用語
asyncio
Pythonで非同期処理するためのライブラリです。
標準ライブラリなのでpip install不要です。
Coroutine
async def foo():で定義した関数です。
簡単に言えば非同期処理可能な部品です。
まだ「部品」の段階なので、そのまま呼び出すと同期的・直列に動きます。
Coroutneを非同期・並列に実行するには後述するTaskが必要です。
Task
TaskにCoroutineをラップすることで非同期・並列に動かすことができるようになります。
await
Task化したCoroutine全体が一斉に並列実行されるわけではありません。
まずはTask化したCoroutineが直列に実行されます。
そしてそのTaskがawait付きの非同期IO処理に入るとCPUが空くので、その空いたCPUを使って他のTaskが実行されます。
そのTaskが完了するか、await付きの非同期IO処理に入るとまたCPUが空いて別のTaskが動きます。
そのTaskは新しいTaskかもしれないですし、await付きの非同期IO処理が終わって復帰のタイミングを待っている最初のTaskかもしれません。
このように複数のTaskがawait付きIO処理をトリガーにして空いたCPUを譲り合う形で並列に実行されます。
awaitとは簡単に言えば、「これからIO処理するから他の処理にCPUを譲ってもいいよ」という目印です。
Semaphore
同時に並列処理可能なCoroutineの数を制限できる仕組みです。
REST APIを大量に叩く可能性があるときは利用した方が親切でしょう。
他にもLock、Event、Conditionといった仕組みもありますが、REST APIをちょっと叩くくらいなら使わないと思うので説明は省略します。
サンプルコード
1. 同期処理
まずは普通のコードです。
データ取得処理はダミーです。 このコードは同期実行されます。
import time def main(): print("Fetching data") time.sleep(2) print("Data fetched") main()
2. Coroutine作成+呼び出し
Coroutine mainを作成しました。
Coroutineの前にはasyncが必要です。
asyncio.sleepは非同期に動作するCoroutineです。
Coroutine(main)の中でCoroutine(sleep)を呼ぶときはawait します。
asyncio.run(<Coroutine>)が非同期処理のエントリポイントです。
この時点ではまだ並列実行可能な部品を作っただけです。 Taskを使っていないので、このコードは直列に実行されます。
import asyncio async def main(): print("Fetching data") await asyncio.sleep(2) print("Data fetched") aws = main() asyncio.run(aws)
3. Coroutine 2つを同時実行
下の例ではCoroutine(main)の中でCoroutineを2つ(fetch(1)、fetch(2))呼び出しています。
先ほど同様にfetchをawaitしています。
このコードは一見、並列実行されそうに見えます。 しかし、まだTaskを使っていないので、直列に実行されます。
import asyncio async def fetch(delay: int): print("Fetching data") await asyncio.sleep(delay) print("Data fetched") return {"data": "Some data"} async def main(): aws1 = fetch(1) aws2 = fetch(2) result1 = await aws1 result2 = await aws2 print(f"Received data: {result1}") print(f"Received data: {result2}") print("End of main coroutine") aws_main = main() asyncio.run(aws_main)
4. Coroutineを2つを非同期・並列に実行
asyncio.create_taskでTaskを作成し、作成したTaskをawaitして実行します。
このコードは非同期・並列に実行されます。
具体的にはawait asyncio.sleep(delay)で待っている間、他のCoroutineに処理を譲ります。
import asyncio async def fetch(delay: int): print("Fetching data") await asyncio.sleep(delay) print("Data fetched") return {"data": "Some data"} async def main(): task1 = asyncio.create_task(fetch(2)) task2 = asyncio.create_task(fetch(3)) result1 = await task1 result2 = await task2 print(result1, result2) aws_main = main() asyncio.run(aws_main)
5. High Level APIの利用例1
実際にasyncio.create_taskを使うことはほとんどないでしょう。
High LevelなAPIを利用することが推奨されています。
まずはよく見るasyncio.gatherを使った例です。
asyncio.gatherはTaskを作成し、すぐに並列に実行し、結果を配列にまとめて返してくれます。 短いコードで色々まとめてやってくれて便利ですが、細かいエラー処理は苦手です。 あるCoroutineで例外が発生しても、他のCoroutineを止めてくれません。 細かいエラー処理が必要な場合は次のTaskGroupを使いましょう。
import asyncio async def fetch(delay: int): print("Fetching data") await asyncio.sleep(delay) print("Data fetched") return {"data": "Some data"} async def main(): results = await asyncio.gather( fetch(2), fetch(3) ) print(results[0]) print(results[1]) aws_main = main() asyncio.run(aws_main)
6. High Level APIの利用例2
asyncio.TaskGroupを使うとより細かい制御が可能です。
実践においてはこちらに慣れておいた方がよさそうです。
tg.create_taskでTaskを作成し、task.resultで結果を取得します。
task.resultはasync context managerの外で実施する必要があります。
import asyncio async def fetch(delay: int): print("Fetching data") await asyncio.sleep(delay) print("Data fetched") return {"data": "Some data"} async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(fetch(2)) task2 = tg.create_task(fetch(3)) print(task1.result()) print(task2.result()) aws_main = main() asyncio.run(aws_main)
REST APIを叩いてみる
ここまでデータ取得部分がダミーだったので最後に実際にREST APIを叩いてみます。
requestsライブラリは非同期に対応しているとは言い難いのでhttpxライブラリを使います。
対向サーバに負荷をかけ過ぎないよう、Semaphoreを利用して2つずつ処理します。
Pythonで開発する場合、REST API処理のみ非同期処理にして他は同期処理にすることが多そうなので、同期関数mainから非同期処理を呼んで結果を受け取っています。
fetchの中にasyncio.sleepを入れているのは処理の流れを見やすくするためです。
import asyncio import httpx url = "https://jsonplaceholder.typicode.com" semaphore = asyncio.Semaphore(2) async def fetch(endpoint: str): async with semaphore: print(f"Start fetching data from {endpoint}") async with httpx.AsyncClient() as client: response = await client.get(url + endpoint) await asyncio.sleep(2) print(f"Done fetching data from {endpoint}") # エラー処理はサンプルなので省略 return response.raise_for_status().json() async def gather_data(endpoints: list[str]): tasks = [] async with asyncio.TaskGroup() as tg: for endpoint in endpoints: task = tg.create_task(fetch(endpoint)) tasks.append(task) results = [] for task in tasks: results.append(task.result()) return results def main(): endpoints = [ "/users/1", "/users/2", "/users/3", "/users/4", "/users/5", "/users/6", ] results = asyncio.run(gather_data(endpoints)) for result in results: print(result["name"]) main()
感想
これまでと比較してとても扱いやすいAPIです。
便利になったと思いました。