前回・前々回と、単一のデータソースから値を取得するAWS AppSync APIを作成しました。
- Pythonで、 AWS AppSyncのquery・mutation・subscriptionを試してみた - メモ的な思考的な
- Pythonで、AWS Lambda をData sourceに持つ AWS AppSync API を呼んでみた - メモ的な思考的な
ただ、単一ではなく、「複数のDynamoDBから値を取得し、その結果をマージして返す」ようなAPIが作れないかが気になりました。
調べてみたところ、通常のResolverの代わりにPipeline Resolverを使えば良さそうでした。
チュートリアル: パイプラインリゾルバー - AWS AppSync
公式ドキュメントによると、今回のAPIに必要なものは
- Data source
- Schema
- Function
- Pipeline Resolver
- Before mapping template
- After mapping template
でした。
そこで、公式チュートリアルよりももう少し単純な例を使って、理解を深めてみることにしました。
目次
- 環境
- AWS AppSync APIのひな形作成
- Data sourceの作成
- Data source Schemaの作成
- Functionの作成
- Pipeline Resolverの作成
- AppSync APIクライアントの作成
- 実行結果
- ソースコード
環境
- Python 3.7.3
- graphqlclient 0.2.4
- AWS AppSync
- データソース:Amazon DynamoDB x 2
- 両DynamoDBとも作成済
- API Key 認証
- データ取得(Query)のみ、APIとして実装
- データソース:Amazon DynamoDB x 2
DynamoDB
事前に用意した2つのDynamoDBは、以下の通りです*1。
Blog table
| id (key) | title | author_id |
|---|---|---|
| 1 | ham | 100 |
| 2 | spam | 200 |
Author table
Blog tableの author_id に紐づく id を持つテーブルです。
| id (key) | name |
|---|---|
| 100 | foo |
| 200 | bar |
APIの仕様
Blog.id == "1" となるようなQueryを投げると
| Blog.id | Blog.title | Author.id | Author.name |
|---|---|---|---|
| 1 | ham | 100 | foo |
が返るものとします。
AWS AppSync APIのひな形作成
Build from scratch より作成します。
API nameは Pipeline Resolver API とします。
Data sourceの作成
Data Sources のCreate Data sourceより作成します。今回は2つ作成します。
なお、作成する際 、両方ともAutomatically generate GraphQL を on にすると、2つのData source分がマージされた Schema が作成されます。便利。
ただ、今回はSchemaのResolverではなく、FunctionでDatasourceから値を取得することから、自分でSchemaを作成することになります。そのため、 off でも良いです。
また、ロールは新規作成としますが、必要な権限を持ったロールがある場合はそちらを利用しても良さそうです。
BlogテーブルのData source
| 項目 | 値 |
|---|---|
| Data source name | Blog_DataSource |
| Data source type | Amazon DynamoDB table |
| Region | DynamoDB Blog のあるRegion |
| Table name | Blog |
| Create or use an existing role | New role |
AuthorテーブルのData source
Data source name・Table nameを変えるくらいで、あとは同じです。
| 項目 | 値 |
|---|---|
| Data source name | Author_DataSource |
| Data source type | Amazon DynamoDB table |
| Region | DynamoDB Author のあるRegion |
| Table name | Author |
| Create or use an existing role | New role |
Data source Schemaの作成
今回は、
- リクエストを受け付けるQueryの型:
getBlogWithAuthor - 戻り値の型:
BlogWithAuthor
のみ定義します。
前述のとおり、データの取得はFunctionで行うため、schema { query: Query } の定義は不要です。
type BlogWithAuthor {
id: String!
title: String
author_id: String
author_name: String
}
type Query {
getBlogWithAuthor(id: String!): BlogWithAuthor
}
Functionの作成
Functionを作成してからでないとPipeline Resolverで使えなかったため、まずはFunctionから作成します。
今回、Functionは2つ用意します。
- Blogテーブルからデータを取得
- Function名:
GetBlog
- Function名:
- Blogテーブルの
author_idと等しいidを持つAuthor tableデータを取得し、Blog table とAuthor tableをマージ- Function名:
GetBlogWithAuthor
- Function名:
Functionを作るには
- Data source name
- Function name
- request mapping template
- response mapping template
が必要ですので、それぞれ作成していきます。
なお、mapping template系の書き方については、以下が参考になりました。ありがとうございました。
AWS AppSyncのPipeline Resolverで複数データリソースを扱う場合のVTLの書き方 | Takumon Blog
Function: GetBlog
Data sourceとFunction nameです。
| 項目 | 値 |
|---|---|
| Data source name | Blog_DataSource |
| Function name | GetBlog |
request mapping template
右にあるテンプレート GetItem を使うことで、Blog tableから指定した id のデータを取得しています。
{ "operation": "GetItem", "key": { "id": $util.dynamodb.toDynamoDBJson($ctx.args.id), } }
response mapping template
デフォルトのままです。
## Raise a GraphQL field error in case of a datasource invocation error
#if($ctx.error)
$util.error($ctx.error.message, $ctx.error.type)
#end
## Pass back the result from DynamoDB. **
$util.toJson($ctx.result)
Function: GetBlogWithAuthor
Data sourceとFunction nameです。
| 項目 | 値 |
|---|---|
| Data source name | Author_DataSource |
| Function name | GetBlogWithAuthor |
request mapping template
GetBlogと似ていますが、1点異なるのは、toDynamoDBJsonの引数 $ctx.prev.result.author_id です。
Function: GetBlogWithAuthorは、GetBlogの後に動作させますが、その際、GetBlogで取得した author_id を使ってAuthorテーブルからデータを取得します。
Pipeline Resolverでは、 $ctx.prev.result で、そのFunctionの直前に動作したFunctionの結果を参照できます(今回だと、GetBlog Functionの結果)。
{ "operation": "GetItem", "key": { "id": $util.dynamodb.toDynamoDBJson($ctx.prev.result.author_id), } }
response mapping template
GetBlogとは異なり、結果をマージする処理を追加しています。
今回は、GetBlogで取得したBlogデータに、Authorテーブルから取得したnameをマージして返します。
なお、マージする際のキー author_name は、Schemaの type BlogWithAuthor で定義した author_name と一致させます。
type BlogWithAuthor {
...
author_name: String
}
以上より、response mapping templateはこんな感じになります。
## Raise a GraphQL field error in case of a datasource invocation error
#if($ctx.error)
$util.error($ctx.error.message, $ctx.error.type)
#end
## Pass back the result from DynamoDB. **
## GetBlogの結果に、author_nameとして今回取得したAuthorテーブルのnameの値をマージ
$util.qr($ctx.prev.result.put("author_name", $ctx.result.name))
## マージしたデータを返す
$util.toJson($ctx.prev.result)
Pipeline Resolverの作成
Schemaページの右側にあるResolverのうち、Query: getBlogWithAuthorの Attach をクリックします。
Data source nameの下に Convert to pipeline resolver があるため、何も入力しない状態でそのリンクをクリックします。
すると
- Before mapping template
- Function
- After mapping template
を定義できるようになります。
Functionの配置
以下の順番でFunctionを配置します。
- Before mapping template
- GetBlog
- GetBlogWithAuthor
- After mapping template
Before mapping templateの定義
Queryから呼び出される、Before mapping template を定義します。
## Query: getBlogWithAuthorから渡された `id` を `$result` 変数に入れる
#set($result = { "id": $ctx.args.id })
## JSON化して、次のFunction: GetBlog へと渡す
$util.toJson($result)
参考:Queryの内容
type Query {
getBlogWithAuthor(id: String!): BlogWithAuthor
}
After mapping templateの定義
Function: GetBlogWithAuthorから呼び出される、After mapping template を定義します。
$util.toJson($ctx.result)
こちらはFunctionの値をそのままJSON化するだけです。
AppSync APIクライアントの作成
今回もPythonで作ります。
- データがある場合 (Blog.id == "1")
- データが無い場合 (Blog.id == "x")
をそれぞれ取得するようなQueryを実行します。
from graphqlclient import GraphQLClient from secret import API_KEY, API_URL def execute_query_api(gql_client): # pipeline resolverを使ったqueryを呼ぶ:データがある場合 query = """ query { getBlogWithAuthor(id: "1") { id title author_id author_name } } """ result = gql_client.execute(query) print(result) # pipeline resolverを使ったqueryを呼ぶ:データがない場合 query = """ query { getBlogWithAuthor(id: "x") { id title author_id author_name } } """ result = gql_client.execute(query) print(result) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') execute_query_api(c)
実行結果
データがある場合の結果です。期待した結果となりました。
{"data":{"getBlogWithAuthor":{"id":"1","title":"ham","author_id":"100","author_name":"foo"}}}
一方、データがない場合はこのようになります。
{ "data":{"getBlogWithAuthor":null}, "errors":[{"path":["getBlogWithAuthor"], "data":null, "errorType":"DynamoDB:AmazonDynamoDBException", "errorInfo":null, "locations":[{"line":3,"column":11,"sourceName":null}], "message":"The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: xxx)"}]}
ソースコード
Githubに上げました。 pipeline_resolver ディレクトリの中が、今回のソースコードとなります。
https://github.com/thinkAmi-sandbox/AWS_AppSync_python_client-sample
*1:あまり良いDynamoDBの構造ではないかもしれませんが、とりあえず...