• HOME
  • /
  • ARTICLES
  • /
  • 速くて軽量なAPI、Starletteを使ってGCPにデプロイする Part3
python / Starlette
Created at:3/2/2020 Updated at:3/3/2020

速くて軽量なAPI、Starletteを使ってGCPにデプロイする Part3

今回は実装したAPIにバックグラウンドで実行する非同期処理を実装します。 APIで受けとったリクエストにレスポンスを返した後、async/awaitを使って、非同期で処理を実行します。

今回すすめるトピック

Gitでリポジトリをクローンする
docker-composeで依存しているミドルウェアを起動する
python main.pyでStarletteを起動する
APIのエンドポイントを追加する
バックグラウンドタスクを追加する
pytestで非同期処理をテストする
CI/CDはCloudBuildで利用して、継続的にテスト、CloudRunへデプロイできるようにする
CloudRunが起動しているのを確認したら削除する
SQL PRO for Postgres

今回はバックグラウンドでAPIを参照した回数を記録する非同期処理を作ってみましょう。
実装内容は参照回数をカウントするためのプロパティを追加、データベースのスキーマ変更、参照した回数を更新するメソッド追加、バックグラウンドタスクの実装が必要です。
順を追って実装していきましょう。


カウントするプロパティと対応するDBテーブルのカラム追加

GUIのデータベースアプリケーションがない方は、psqlコマンドでクエリを実行してください。


psql -U docker -h 127.0.0.1 docker
enter password:
ALTER TABLE robots ADD COLUMN count bigint default 0;
\q

追加できたらモデルのプロパティを追加します。


@@ -41,6 +41,7 @@ class ModelInterface(metaclass=ABCMeta):
 class Robot(ModelInterface):
     id: int
     name: str
+    count: int
     created_at: datetime = datetime.utcnow().replace(tzinfo=pytz.utc)

これでデータの整合はとれるので、次に参照するたびに参照された回数をインクリメントするメソッドを実装します。


@@ -54,3 +55,11 @@ class Robot(ModelInterface):
             'values': values
         })
         return cls(**res)
+
+    async def count_ref(self) -> None:
+        await super().execute({
+            'query': """
+                UPDATE robots SET count=(count+1) where id = :id
+            """,
+            'values': {'id': self.id}
+        })

このようにプライマリキーでフィルタしたレコードに対してcountの値を+1してあげる更新クエリを実行すれば実現できますね。

カウントするタスクを作る

参照されたモデルを受け取って、カウントするメソッドを実行してあげるタスクを作ります。更新だけだと0.7秒位で完了してしまい、実感できないのでわかりやすくするために、12秒後に更新処理を行うようにしました。


--- a/src/task.py
+++ b/src/task.py
@@ -1,5 +1,6 @@
 import logging
 import time
+import asyncio
 
 from . import store
 
@@ -8,5 +9,10 @@ logger = logging.getLogger(__name__)
 
 async def repair_robot(robot: store.Robot) -> None:
     # FIX ROBOT.
-    time.sleep(4)
-    logger.debug("TASK HAS DONE")
+    pass
+
+
+async def count_ref(robot: store.Robot) -> None:
+    # FIX ROBOT.
+    await asyncio.sleep(12)
+    await robot.count_ref()

HTTPリクエストにレスポンスを返してバックグラウンドタスクを実行する

前回作ったretrieve_robot関数内に今回追加したtask_count_refの関数をBackgroundTaskで作成し、レスポンスにバックグラウンドで実行するtaskを引数に追加しました。
task_count_ref内では、さきほど実装したタスクを実行して何も返さないVoid関数にします。


--- a/main.py
+++ b/main.py
@@ -65,19 +65,21 @@ async def repair(request: Request) -> str:
 @app.route('/v1/robots/{robot_id:int}')
 async def retrieve_robot(request: Request) -> str:
     robot_id = request.path_params["robot_id"]
-    logger.debug(robot_id)
 
     values = {'robot_id': robot_id}
     robot = await store.Robot.retrieve(values)
-    logger.debug(robot)
 
-    # task = BackgroundTask(task_repair, robot)
+    task = BackgroundTask(task_count_ref, robot)
     message = {'message': f'Robot is {robot.name}.'}
-    return JSONResponse(message)
+    return JSONResponse(message,  background=task)
 
 
 async def task_repair(robot: store.Robot) -> None:
-    task.repair_robot(robot)
+    await task.repair_robot(robot)
+
+
+async def task_count_ref(robot: store.Robot) -> None:
+    await task.count_ref(robot)

Starletteでは簡単にバックグラウンドタスクが実装できるクラスが用意してあります。今回は1つのタスクを実行するタスクを実装しましたが、マルチタスクもリストで渡してあげれば実行できる複数形のクラスが用意されていますので、以下を参照してもらえば分かるかと思います。


# starlette/background.py

class BackgroundTask:
    def __init__(
        self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
    ) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            await self.func(*self.args, **self.kwargs)
        else:
            await run_in_threadpool(self.func, *self.args, **self.kwargs)


class BackgroundTasks(BackgroundTask):
    def __init__(self, tasks: typing.Sequence[BackgroundTask] = []):
        self.tasks = list(tasks)

    def add_task(
        self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
    ) -> None:
        task = BackgroundTask(func, *args, **kwargs)
        self.tasks.append(task)

    async def __call__(self) -> None:
        for task in self.task

実行して12秒後に値が+1されているのが確認できたかと思います。
このように非同期で実装するメリットはノンブロッキングで、並列実行することで処理の依頼者にストレス無く使えるシステムを構築できます。
私がStarletteを使う場合のほとんどのケースは、フルスタックなアプリケーション(たとえばDjango)があって、形態素解析、Keras、スクレイピングなどを非同期処理で作っています。

余談ですが、私の構築しているインフラはKubernetes、IstioもしくはCloudRun,Appengineでマイクロサービス化されているため、パフォーマンスを変えたいときや、限定的な機能開発を外注でエンジニアさんに依頼するときも全体は見なくていいようにしているため、システムの部品化が実現できていて、処理が向いてる言語、開発者が得意な言語で作れるようになっており、小さく置き換えが可能なようにしてあります。そういった設計指針が好みの方であればStarletteは気に入っていただけかと思いますので、ライブラリのコードを十分探索してみてください、きっと楽しいと思います🤓

ksh3/starlette-cloudrun
Starlette framework on GCP CloudRun. Contribute to ksh3/starlette-cloudrun development by creating an account on GitHub.

今回の内容はこちらにコミットしています。

Related Articles

python / Starlette
Created at:3/1/2020 Updated at:3/3/2020
速くて軽量なAPI、Starletteを使ってGCPにデプロイする Part2
前回、起動の確認まで行いましたので、今回はPythonで実装するAPIとバックグラウンドタスクの処理を実装します。また、開発をスムーズに行うための便利なツールなどを紹介しています。
python / Starlette
Created at:3/2/2020 Updated at:3/4/2020
速くて軽量なAPI、Starletteを使ってGCPにデプロイする Part4
今回はPythonの便利なテストライブラリpytestを使ったテストを作成します。Starletteにテスト用のクライアントは実装されているのでそれを使って、前回追加した機能のテストを行い、テストが無事に通ったらGCPのCloudBuildを使ってCI/CDを行い、CloudRunへデプロイするまでを解説しています。

Policy

Reproduction or reproduction of all or part of this article is prohibited without the consent of the copyright holder. However, citations are only accepted if the author and article URLs are displayed.