asgirefの​クラスasgiref.local.Localは​何の​ために​あるのか?

Creative Commons License This work is licensed under a Creative Commons Attribution 4.0 International License.

asgirefの​クラスasgiref.local.Localは​何の​ために​あるのか?

Ryuji Tsutsui

2024/11/16 PyCon mini 東海 2024資料

はじめに

自己紹介

  • Ryuji Tsutsui@ryu22e

  • さくらインターネット株式会社所属

  • Python歴は​13年くらい​(主に​Django)

  • Python Boot Camp、​Shonan.py、​GCPUG Shonanなど​コミュニティ活動も​しています

  • 著書​(共著)​:『Python実践レシピ

名古屋の​イベントでの​登壇は​約3年ぶり2回目

  • 前回は​「Open Source Conference 2021 Online/Nagoya

  • 前回は​コロナ禍で​オフライン開催が​できなかったのですが、​今回は​念願の​現地参加です

  • がんばるぞ!​(特に​懇親会を​)

この​トークで​話すこと

  • asgirefと​いう​Pythonパッケージの​話

  • とりわけ、​asgiref.local.Localと​いう​クラスに​ついての​解説

この​トークの​対象者

  • マルチスレッド、​コルーチンなど​非同期処理の​知識が​ある​人

  • なんとなく​知っていれば​OK

この​トークで​得られる​こと

  • asgirefの​概要

  • asgiref.local.Localの​用途、​仕組み

この​トークの​構成

  • asgirefとは​何か

  • asgiref.local.Localとは​何か

  • asgiref.local.Localと​threading.localの​違い

  • asgiref.local.Localと​contextvars.ContextVarの​違い

asgirefとは​何か

asgirefの​概要

  • ASGIアプリケーション​(非同期処理を​行う​アプリケーション)を​開発しやすく​する​Pythonライブラリ

  • Djangoコミュニティが​開発している

asgirefに​依存している​ツール、​フレームワーク

  • Daphne

  • Django

  • Connexion

参考資料

以下の​ドキュメントに​asgirefを​使った​ツール、​フレームワークの​リストが​ある。

asgiref/docs/implementations.rst at main · django/asgiref

ただし、​バージ​ョンが​上がって​asgirefに​依存しなくなった​ものも​載っている。

asgirefの​主な​機能

  • 同期処理から​非同期処理への​変換​(sync_to_async()

  • 非同期処理から​同期処理への​変換​(async_to_sync()

  • ローカルストレージ​(asgiref.local.Local)

  • サーバーの​基本機能

  • WSGIから​ASGIへの​アダプター

asgirefの​主な​機能

特に​一番上の​sync_to_async()は​Djangoの​非同期ビューを​使う際は​お世話に​なる。

Djangoで​sync_to_async()が​必要に​なる​ケース

  • Djangoでは​非同期ビューが​サポートされている​(3.1から)

  • 非同期ビューの​中では​同期処理を​呼べない​(呼ぶと​エラーに​なる​仕組み)

  • とは​いえ、​Djangoの​機能には​非同期サポートしていない​ものも​ある

  • そこで、​sync_to_async()で​同期処理を​非同期処理に​変換する

sync_to_async()の​使い方

>>> from asgiref.sync import sync_to_async
>>> # 同期処理の関数を引数として渡すと非同期関数に変換される
>>> results = await sync_to_async(sync_function)
>>> # 関数デコレータとしても使える
>>> @sync_to_async
>>> def sync_function(): ...

今回の​トークの​主役は​sync_to_async()ではなく​asgiref.local.Local

実際に​asgiref.local.Localを​使って​役立った​体験が​本トークの​モチベーションなので、​今日は​asgiref.local.Localの​話を​します。

asgiref.local.Localとは​何か

docstringに​よると

https://github.com/django/asgiref/blob/e38d3c327c01aa82c0bf2726220700c1097ea6cc/asgiref/local.py#L41

Local storage for async tasks.

非同期タスク用の​ローカルストレージ

私が​この​クラスを​使った​経緯

  • Djangoアプリケーションの​ログに​リクエストごとに​ユニークな​IDを​付与したかった

  • ミドルウェアで​ uuid.uuid4() で​生成した​IDを​設定し、​ロギングフィルターで​取得する​つもりだった

  • ところが、​ミドルウェアで​設定した​値を​ロギングフィルターで​取得する​方​法が​見当たらなかった

参考に​した​OSS

django-log-request-idを​参考にした。

django-log-request-idの​ミドルウェアの​実装​(一部​抜粋)

# (省略)
class RequestIDMiddleware(MiddlewareMixin):
    def process_request(self, request):
        request_id = self._get_request_id(request)
        local.request_id = request_id  # ここに注目
# (省略)

https://github.com/dabapps/django-log-request-id/blob/2.1.0/log_request_id/middleware.py

django-log-request-idの​ロギングフィルターの​実装​(一部​抜粋)

# (省略)
class RequestIDFilter(logging.Filter):

    def filter(self, record):
        default_request_id = getattr(settings, LOG_REQUESTS_NO_SETTING, DEFAULT_NO_REQUEST_ID)
        # ↓ここに注目
        record.request_id = getattr(local, 'request_id', default_request_id)
        return True

https://github.com/dabapps/django-log-request-id/blob/2.1.0/log_request_id/filters.py

django-log-request-idの​local変数の​定義​(一部​抜粋)

import threading

__version__ = "2.1.0"


try:
    from asgiref.local import Local  # ここに注目
except ImportError:
    from threading import local as Local  # ここにも注目


local = Local()
# (省略)

https://github.com/dabapps/django-log-request-id/blob/2.1.0/log_request_id/__init__.py

asgiref.local.Localと​threading.local

  • どうやら、​両者は​似たような​ものっぽい

  • どこが​違うのだろうか?

asgiref.local.Localと​threading.localの​違い

threading.localとは

  • threadingは​標準モジュール

  • threading.localは、​スレッドごとに​固有の​ローカルストレージ

threading.localの​サンプルコード​(マルチスレッド)

import uuid
import time
import threading
from threading import local

local_storage = local()

def test_task(wait):
    # スレッドID取得
    thread_id = threading.get_ident()

    # 1. ユニークIDをローカルストレージに設定
    step1_unique_id = uuid.uuid4().hex
    local_storage.unique_id = step1_unique_id

    # 2. wait秒待つ
    time.sleep(wait)

    # 3. wait秒待機後のユニークIDを取得
    # (他のスレッドが値を上書きしていないはず)
    step3_unique_id = getattr(local_storage, "unique_id", None)
    equal_or_not = "==" if step1_unique_id == step3_unique_id else "!="
    print(f"{thread_id=} ({step1_unique_id=}) {equal_or_not} ({step3_unique_id=})")

def main():
    # 待機時間が異なるスレッドを3つ立ち上げる
    threads = [
        threading.Thread(target=test_task, args=(3,)),
        threading.Thread(target=test_task, args=(2,)),
        threading.Thread(target=test_task, args=(1,)),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-threading-local-multithreads-py

threading.localの​サンプルコード​(マルチスレッド)​実行結果

threading.localに​入れた​ユニークIDが​スレッドごとに​異なる​ことがわかる。

thread_id=6173028352 (step1_unique_id='0863e8995b064f3e9c24ed1dbe926577') == (step3_unique_id='0863e8995b064f3e9c24ed1dbe926577')
thread_id=6156201984 (step1_unique_id='0fe21b299ab34f7e83fb979277ccce3a') == (step3_unique_id='0fe21b299ab34f7e83fb979277ccce3a')
thread_id=6139375616 (step1_unique_id='2e7e9d7b8b59439dbd73fc826e45cc32') == (step3_unique_id='2e7e9d7b8b59439dbd73fc826e45cc32')

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-threading-local-multithreads-md

もし、​threading.local以外の​オブジェクトを​使ったら

import uuid
import time
import threading

# もし、threading.local以外のオブジェクトを使ったら
class LocalStorage: ...

local_storage = LocalStorage()

def test_task(wait):
    # スレッドID取得
    thread_id = threading.get_ident()

    # 1. ユニークIDをローカルストレージに設定
    step1_unique_id = uuid.uuid4().hex
    local_storage.unique_id = step1_unique_id

    # 2. wait秒待つ
    time.sleep(wait)

    # 3. wait秒待機後のユニークIDを取得
    # (他のスレッドが値を上書きしていないはず)
    step3_unique_id = getattr(local_storage, "unique_id", None)
    equal_or_not = "==" if step1_unique_id == step3_unique_id else "!="
    print(f"{thread_id=} ({step1_unique_id=}) {equal_or_not} ({step3_unique_id=})")

def main():
    # 待機時間が異なるスレッドを3つ立ち上げる
    threads = [
        threading.Thread(target=test_task, args=(3,)),
        threading.Thread(target=test_task, args=(2,)),
        threading.Thread(target=test_task, args=(1,)),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-threading-local-multithreads2-py

実行結果

local_storageは​すべての​スレッドで​共有の​オブジェクトに​なっている。

thread_id=6187102208 (step1_unique_id='512dffda46f44e6bbd12c01bba4d4f3c') == (step3_unique_id='512dffda46f44e6bbd12c01bba4d4f3c')
thread_id=6170275840 (step1_unique_id='0f5912e47aee412f9342c2e49bf96d2c') != (step3_unique_id='512dffda46f44e6bbd12c01bba4d4f3c')
thread_id=6153449472 (step1_unique_id='b1587085778e49f789fc02fb73f1ce9b') != (step3_unique_id='512dffda46f44e6bbd12c01bba4d4f3c')

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-threading-local-multithreads2-md

threading.localの​弱点

  • コルーチンを​使った​コードでは​threading.localを​使えない

  • なぜなら、​コルーチンは​シングルスレッドで​複数の​タスクを​処理する​ため、​スレッドごとの​ローカルストレージが​使えない

threading.localの​サンプルコード​(コルーチン)

import threading
import asyncio
import uuid

local_storage = threading.local()

async def test_task(wait):
    step1_unique_id = uuid.uuid4().hex

    thread_id = threading.get_ident()
    local_storage.unique_id = step1_unique_id

    # ここで待機中に別のコルーチンでlocal_storage.unique_idを上書きしてしまう場合がある。
    await asyncio.sleep(wait)

    step3_unique_id = getattr(local_storage, "unique_id", None)
    equal_or_not = "==" if step1_unique_id == step3_unique_id else "!="
    print(f"{thread_id=} ({step1_unique_id=}) {equal_or_not} ({step3_unique_id=})")

async def main():
    tasks = (
        test_task(3),
        test_task(2),
        test_task(1),
    )
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-threading-local-co-routine-py

threading.localの​サンプルコード​(コルーチン)​実行結果

wait秒待機中に​他の​コルーチンが​local_storage.unique_idを​上書きしてしまうことがある。

thread_id=8370802496 (step1_unique_id='b8e9a1f3e8714831b2aa8275fa47b8f1') == (step3_unique_id='b8e9a1f3e8714831b2aa8275fa47b8f1')
thread_id=8370802496 (step1_unique_id='cdd46248fbe44f57a2a488919add7d1e') != (step3_unique_id='b8e9a1f3e8714831b2aa8275fa47b8f1')
thread_id=8370802496 (step1_unique_id='39eb437c91e8437dae500b91e36bb3ff') != (step3_unique_id='b8e9a1f3e8714831b2aa8275fa47b8f1')

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-threading-local-co-routine-md

Pythonの​Webアプリケーションは、​マルチスレッド、​コルーチンの​両方を​使うことがある

  • マルチスレッドの​例: gunicorn + Greenlet

  • コルーチンの​例: Django、​FastAPI

  • マルチスレッド、​コルーチンの​両方で​使える​ローカルストレージが​あると​便利

そこで​asgiref.local.Localクラスの​登場

asgiref.local.Localは、​マルチスレッド、​コルーチンの​両方で​使える​ローカルストレージ

asgiref.local.Localの​サンプルコード​(マルチスレッド)

import uuid
import time
import threading

from asgiref.local import Local

local_storage = Local()  # ここを変えただけ

def test_task(wait):
    # スレッドID取得
    thread_id = threading.get_ident()

    # 1. ユニークIDをローカルストレージに設定
    step1_unique_id = uuid.uuid4().hex
    local_storage.unique_id = step1_unique_id

    # 2. wait秒待つ
    time.sleep(wait)

    # 3. wait秒待機後のユニークIDを取得
    # (他のスレッドが値を上書きしていないはず)
    step3_unique_id = getattr(local_storage, "unique_id", None)
    equal_or_not = "==" if step1_unique_id == step3_unique_id else "!="
    print(f"{thread_id=} ({step1_unique_id=}) {equal_or_not} ({step3_unique_id=})")

def main():
    # 待機時間が異なるスレッドを3つ立ち上げる
    threads = [
        threading.Thread(target=test_task, args=(3,)),
        threading.Thread(target=test_task, args=(2,)),
        threading.Thread(target=test_task, args=(1,)),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-asgiref-local-local-multithreads-py

asgiref.local.Localの​サンプルコード​(マルチスレッド)​実行結果

threading.localと​同じく、​asgiref.local.Localに​入れた​ユニークIDが​スレッドごとに​異なる​ことがわかる。

thread_id=6140276736 (step1_unique_id='43faa0bb3add4921b1e2649af269646e') == (step3_unique_id='43faa0bb3add4921b1e2649af269646e')
thread_id=6123450368 (step1_unique_id='d244e874e5f74940a944895c641302c3') == (step3_unique_id='d244e874e5f74940a944895c641302c3')
thread_id=6106624000 (step1_unique_id='4ed999ac3ad04dbaafa26eda3ad71a0b') == (step3_unique_id='4ed999ac3ad04dbaafa26eda3ad71a0b')

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-asgiref-local-local-multithreads-md

asgiref.local.Localの​サンプルコード​(コルーチン)

import threading
import asyncio
import uuid

from asgiref.local import Local

local_storage = Local()  # ここを変えただけ

async def test_task(wait):
    # スレッドID取得
    thread_id = threading.get_ident()

    # 1. ユニークIDをローカルストレージに設定
    step1_unique_id = uuid.uuid4().hex
    local_storage.unique_id = step1_unique_id

    # 2. wait秒待つ
    await asyncio.sleep(wait)

    # 3. wait秒待機後のユニークIDを取得
    step3_unique_id = getattr(local_storage, "unique_id", None)
    equal_or_not = "==" if step1_unique_id == step3_unique_id else "!="
    print(f"{thread_id=} ({step1_unique_id=}) {equal_or_not} ({step3_unique_id=})")

async def main():
    tasks = (
        test_task(3),
        test_task(2),
        test_task(1),
    )
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-asgiref-local-local-co-routine-py

asgiref.local.Localの​サンプルコード​(コルーチン)​実行結果

コルーチンごとに​固有の​ローカルストレージが​使える​ことがわかる。

thread_id=8323698496 (step1_unique_id='9484892561164a18af996c2cf7ab6c2f') == (step3_unique_id='9484892561164a18af996c2cf7ab6c2f')
thread_id=8323698496 (step1_unique_id='2f7b73f1301648f3a6cf4a8b2d29f559') == (step3_unique_id='2f7b73f1301648f3a6cf4a8b2d29f559')
thread_id=8323698496 (step1_unique_id='9fc06c8056184fc88c1f3af56e77330d') == (step3_unique_id='9fc06c8056184fc88c1f3af56e77330d')

ここまでの​まとめ

  • threading.localは​スレッドごとに​固有の​ローカルストレージ

  • ただし、​コルーチンは​シングルスレッドなので​threading.localは​使えない

  • asgiref.local.Localは​マルチスレッド、​コルーチン両方で​使える​万能ローカルストレージ

Q. asgiref.local.Localは​なぜコルーチンでも​使えるのか?

A.内部で​contextvars.ContextVarを​使っているから​(この​あと​詳しく​説明します)

asgiref.local.Localと​contextvars.ContextVarの​違い

contextvars.ContextVarとは

  • contextvarsは​Pythonの​標準モジュール

  • contextvars.ContextVarは、​コンテキスト変数を​宣言する​ための​クラス

  • コルーチンごとに​固有の​コンテキスト変数を​使える

contextvars.ContextVarの​サンプルコード

import threading
from contextvars import ContextVar
import asyncio
import uuid

# threading.localの説明の際に見せた、コルーチンの例とほぼ同じコード。

# コンテキスト変数を宣言
local_storage = ContextVar("local_storage", default=None)

async def test_task(wait):
    step1_unique_id = uuid.uuid4().hex
    thread_id = threading.get_ident()
    # 値の設定はset()メソッドで行う(設定できる値は1個のみ)
    local_storage.set(step1_unique_id)

    await asyncio.sleep(wait)

    # 値の取得はget()メソッドで行う
    step3_unique_id = local_storage.get()
    equal_or_not = "==" if step1_unique_id == step3_unique_id else "!="
    print(f"{thread_id=} ({step1_unique_id=}) {equal_or_not} ({step3_unique_id=})")

async def main():
    tasks = (
        test_task(3),
        test_task(2),
        test_task(1),
    )
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-contextvars-contextvar-py

contextvars.ContextVarの​サンプルコード実行結果

コルーチンごとに​固有の​ローカルストレージが​使える​ことがわかる。

thread_id=8308739904 (step1_unique_id='011b6db1ddca48b2a353667e9c79f34a') == (step3_unique_id='011b6db1ddca48b2a353667e9c79f34a')
thread_id=8308739904 (step1_unique_id='dcfc53f6ec9149f99838a6815608c12b') == (step3_unique_id='dcfc53f6ec9149f99838a6815608c12b')
thread_id=8308739904 (step1_unique_id='42ee7264770745a6b90b9e5e98082a57') == (step3_unique_id='42ee7264770745a6b90b9e5e98082a57')

https://gist.github.com/ryu22e/31595bbaf94aa9ec3204651c28e86841#file-contextvars-contextvar-md

contextvars.ContextVarの​弱点

  • contextvars.ContextVarは​スレッドセーフではない

  • 一応マルチスレッドでも​固有の​ローカルストレージに​なるが、​上記の​理由に​より、​実行タイミングに​よっては​予期せぬ挙動に​なる

  • 設定できる値は​1個だけ

つまり

標準モジュールでは、​マルチスレッドでは​threading.local、​コルーチンでは​contextvars.ContextVarを​使う。

asgiref.local.Localでは​どうしているのか

  • asgiref.local.Localでは、​デフォルトでは​contextvars.ContextVarを​使って​値を​設定、​取得する

  • オプションで​threading.localを​使うようにも​できる

  • 値の​取得、​設定の​コードで​排他制御の​コードを​入れてスレッドセーフに​なるように​工夫している

local_storage.unique_id = ...のような​実装を​可能に​する​仕組み

最後に

まとめ

  • threading.local、​contextvars.ContextVarは​どちらも​ローカルストレージと​して​使えるが​それぞれ弱点が​ある

  • 標準モジュールには​万能の​ローカルストレージは​ない

  • asgiref.local.Localは​内部で​contextvars.ContextVarを​使い、​弱点を​補う​工夫で​万能の​ローカルストレージを​実現している

ご清聴​ありがとう​ございました

AIが考えた「地味だけど便利なasgiref.local.Local」

AIが​考えた​「地味だけど​便利な​asgiref.local.Local」