速くて軽量なAPI、Starletteを使ってGCPにデプロイする Part3
今回すすめるトピック
今回はバックグラウンドでAPIを参照した回数を記録する非同期処理を作ってみましょう。
実装内容は参照回数をカウントするためのプロパティを追加、データベースのスキーマ変更、参照した回数を更新するメソッド追加、バックグラウンドタスクの実装が必要です。
順を追って実装していきましょう。
カウントするプロパティと対応するDBテーブルのカラム追加
GUIのデータベースアプリケーションがない方は、psqlコマンドでクエリを実行してください。
psql -U docker -h 127.0.0.1 docker
enter password:
ALTER TABLE robots ADD COLUMN count bigint default 0;
\q
追加できたらモデルのプロパティを追加します。
@@ -41,6 +41,7 @@ class ModelInterface(metaclass=ABCMeta):
class Robot(ModelInterface):
id: int
name: str
+ count: int
created_at: datetime = datetime.utcnow().replace(tzinfo=pytz.utc)
これでデータの整合はとれるので、次に参照するたびに参照された回数をインクリメントするメソッドを実装します。
@@ -54,3 +55,11 @@ class Robot(ModelInterface):
'values': values
})
return cls(**res)
+
+ async def count_ref(self) -> None:
+ await super().execute({
+ 'query': """
+ UPDATE robots SET count=(count+1) where id = :id
+ """,
+ 'values': {'id': self.id}
+ })
このようにプライマリキーでフィルタしたレコードに対してcountの値を+1してあげる更新クエリを実行すれば実現できますね。
カウントするタスクを作る
参照されたモデルを受け取って、カウントするメソッドを実行してあげるタスクを作ります。更新だけだと0.7秒位で完了してしまい、実感できないのでわかりやすくするために、12秒後に更新処理を行うようにしました。
--- a/src/task.py
+++ b/src/task.py
@@ -1,5 +1,6 @@
import logging
import time
+import asyncio
from . import store
@@ -8,5 +9,10 @@ logger = logging.getLogger(__name__)
async def repair_robot(robot: store.Robot) -> None:
# FIX ROBOT.
- time.sleep(4)
- logger.debug("TASK HAS DONE")
+ pass
+
+
+async def count_ref(robot: store.Robot) -> None:
+ # FIX ROBOT.
+ await asyncio.sleep(12)
+ await robot.count_ref()
HTTPリクエストにレスポンスを返してバックグラウンドタスクを実行する
前回作ったretrieve_robot関数内に今回追加したtask_count_refの関数をBackgroundTaskで作成し、レスポンスにバックグラウンドで実行するtaskを引数に追加しました。
task_count_ref内では、さきほど実装したタスクを実行して何も返さないVoid関数にします。
--- a/main.py
+++ b/main.py
@@ -65,19 +65,21 @@ async def repair(request: Request) -> str:
@app.route('/v1/robots/{robot_id:int}')
async def retrieve_robot(request: Request) -> str:
robot_id = request.path_params["robot_id"]
- logger.debug(robot_id)
values = {'robot_id': robot_id}
robot = await store.Robot.retrieve(values)
- logger.debug(robot)
- # task = BackgroundTask(task_repair, robot)
+ task = BackgroundTask(task_count_ref, robot)
message = {'message': f'Robot is {robot.name}.'}
- return JSONResponse(message)
+ return JSONResponse(message, background=task)
async def task_repair(robot: store.Robot) -> None:
- task.repair_robot(robot)
+ await task.repair_robot(robot)
+
+
+async def task_count_ref(robot: store.Robot) -> None:
+ await task.count_ref(robot)
Starletteでは簡単にバックグラウンドタスクが実装できるクラスが用意してあります。今回は1つのタスクを実行するタスクを実装しましたが、マルチタスクもリストで渡してあげれば実行できる複数形のクラスが用意されていますので、以下を参照してもらえば分かるかと思います。
# starlette/background.py
class BackgroundTask:
def __init__(
self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
await self.func(*self.args, **self.kwargs)
else:
await run_in_threadpool(self.func, *self.args, **self.kwargs)
class BackgroundTasks(BackgroundTask):
def __init__(self, tasks: typing.Sequence[BackgroundTask] = []):
self.tasks = list(tasks)
def add_task(
self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
) -> None:
task = BackgroundTask(func, *args, **kwargs)
self.tasks.append(task)
async def __call__(self) -> None:
for task in self.task
実行して12秒後に値が+1されているのが確認できたかと思います。
このように非同期で実装するメリットはノンブロッキングで、並列実行することで処理の依頼者にストレス無く使えるシステムを構築できます。
私がStarletteを使う場合のほとんどのケースは、フルスタックなアプリケーション(たとえばDjango)があって、形態素解析、Keras、スクレイピングなどを非同期処理で作っています。
余談ですが、私の構築しているインフラはKubernetes、IstioもしくはCloudRun,Appengineでマイクロサービス化されているため、パフォーマンスを変えたいときや、限定的な機能開発を外注でエンジニアさんに依頼するときも全体は見なくていいようにしているため、システムの部品化が実現できていて、処理が向いてる言語、開発者が得意な言語で作れるようになっており、小さく置き換えが可能なようにしてあります。そういった設計指針が好みの方であればStarletteは気に入っていただけかと思いますので、ライブラリのコードを十分探索してみてください、きっと楽しいと思います🤓
今回の内容はこちらにコミットしています。