最近 AWS AppSync にふれる機会がありました。
そこで今回は、AWS AppSyncのGraphQLインタフェースを使って、Pythonでquery・mutation・subscriptionを試してみましたので、メモを残します。
目次
環境
Python 3.7.3
- awscli 1.16.189
- graphqlclient 0.2.4
- paho-mqtt 1.4.0
AWS AppSync
- データソース:DynamoDB
- 既存のDynamoDBを使用
- ストリームは無効
- 認証は、
API Key
- データソース:DynamoDB
既存のDynamoDBは
- title (key)
- content
という2列を持つAppSyncToDoテーブルです。
$ aws dynamodb describe-table --table-name AppSyncToDo
{
"Table": {
"AttributeDefinitions": [
{
"AttributeName": "title",
"AttributeType": "S"
}
],
"TableName": "AppSyncToDo",
"KeySchema": [
{
"AttributeName": "title",
"KeyType": "HASH"
}
],
"TableStatus": "ACTIVE",
"CreationDateTime": xxx,
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 49,
"ItemCount": 2,
"TableArn": "arn:aws:dynamodb:xxx",
"TableId": "xxx",
"LatestStreamLabel": "xxx",
"LatestStreamArn": "arn:aws:dynamodb:xxx"
}
}
長いのでまとめ
Web上ではJavaScriptのサンプルが多いですが、Pythonでも問題なく動作しました。
以降は、試した時の流れです。
AWS AppSyncでAPIを作る
AppSyncのコンソールに入り、 Create API します。
- Getting Started
Import DynamoDB tableを選択
- Import DynamoDB Table
- RegionとTable nameは、既存のものを選択
- Create or use an existing roleは、
New roleを選択 - Name the model は、適当に付ける
- Configure model fieldsは、Keyのtitleの他、
contentをStringで用意 - API nameは、適当に付ける
すると、以下のようなSchemeが自動的にできました。
type AppSyncToDo {
title: String!
content: String
}
type AppSyncToDoConnection {
items: [AppSyncToDo]
nextToken: String
}
input CreateAppSyncToDoInput {
title: String!
content: String
}
input DeleteAppSyncToDoInput {
title: String!
}
type Mutation {
createAppSyncToDo(input: CreateAppSyncToDoInput!): AppSyncToDo
updateAppSyncToDo(input: UpdateAppSyncToDoInput!): AppSyncToDo
deleteAppSyncToDo(input: DeleteAppSyncToDoInput!): AppSyncToDo
}
type Query {
getAppSyncToDo(title: String!): AppSyncToDo
listAppSyncToDos(filter: TableAppSyncToDoFilterInput, limit: Int, nextToken: String): AppSyncToDoConnection
}
type Subscription {
onCreateAppSyncToDo(title: String, content: String): AppSyncToDo
@aws_subscribe(mutations: ["createAppSyncToDo"])
onUpdateAppSyncToDo(title: String, content: String): AppSyncToDo
@aws_subscribe(mutations: ["updateAppSyncToDo"])
onDeleteAppSyncToDo(title: String, content: String): AppSyncToDo
@aws_subscribe(mutations: ["deleteAppSyncToDo"])
}
input TableAppSyncToDoFilterInput {
title: TableStringFilterInput
content: TableStringFilterInput
}
input TableBooleanFilterInput {
ne: Boolean
eq: Boolean
}
input TableFloatFilterInput {
ne: Float
eq: Float
le: Float
lt: Float
ge: Float
gt: Float
contains: Float
notContains: Float
between: [Float]
}
input TableIDFilterInput {
ne: ID
eq: ID
le: ID
lt: ID
ge: ID
gt: ID
contains: ID
notContains: ID
between: [ID]
beginsWith: ID
}
input TableIntFilterInput {
ne: Int
eq: Int
le: Int
lt: Int
ge: Int
gt: Int
contains: Int
notContains: Int
between: [Int]
}
input TableStringFilterInput {
ne: String
eq: String
le: String
lt: String
ge: String
gt: String
contains: String
notContains: String
between: [String]
beginsWith: String
}
input UpdateAppSyncToDoInput {
title: String!
content: String
}
また、Queriesも自動生成されました。queryとmutationの2つができています。
# Click the orange "Play" button and select the createAppSyncToDo
# mutation to create an object in DynamoDB.
# If you see an error that starts with "Unable to assume role",
# wait a moment and try again.
mutation createAppSyncToDo($createappsynctodoinput: CreateAppSyncToDoInput!) {
createAppSyncToDo(input: $createappsynctodoinput) {
title
content
}
}
# After running createAppSyncToDo, try running the listAppSyncToDos query.
query listAppSyncToDos {
listAppSyncToDos {
items {
title
content
}
}
}
mutation用の QUERY VARIABLES にも、初期値が設定されています。
{ "createappsynctodoinput": { "title": "Hello, world!", "content": "Hello, world!" } }
試しに createAppSyncToDo を実行してみると、結果が右に表示されました。
{ "data": { "createAppSyncToDo": { "title": "Hello, world!", "content": "Hello, world!" } } }
DynamoDBにもデータが追加されています。
$ aws dynamodb get-item --table-name AppSyncToDo --key '{"title": {"S": "Hello, world!"}}'
{
"Item": {
"content": {
"S": "Hello, world!"
},
"title": {
"S": "Hello, world!"
}
}
}
これで、AppSyncとDynamoDBが連携できていることが分かりました。
mutationの実行
APIができたため、次はPythonのGraphQLクライアントライブラリを使って、AppSyncにmutationを投げてデータを登録してみます。
PythonのGraphQLクライアントライブラリはGraphQLサイトにまとめられているものの他、Githubで公開されているものがあります。
https://graphql.org/code/#python
今回は、READMEにAPI Keyの渡し方が書いてあった、 python-graphql-client を使って試してみます。
prisma/python-graphql-client: Simple GraphQL client for Python 2.7+
pip install graphqlclient でインストール後、こんな感じでmutationを実行するスクリプトを作成します。
from graphqlclient import GraphQLClient def execute_mutation_api(gql_client, title, content): # AWS AppSyncのQueriesをそのまま貼って動作する mutation = """ mutation createAppSyncToDo($createappsynctodoinput: CreateAppSyncToDoInput!) { createAppSyncToDo(input: $createappsynctodoinput) { title content } } """ variables = { "createappsynctodoinput": { "title": title, "content": content, } } result = gql_client.execute(mutation, variables=variables) print(result) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') # 登録する execute_mutation_api(c, 'ham', 'spam')
execute_mutation_api 関数の mutation および variables は、Queriesに記載されている内容をそのまま貼っています。
また、 API_URL と API_KEY については、AppSyncのSettingsに記載されている内容を使います。
準備ができたため、スクリプトを実行してみると、ログに以下が出力されました。
{"data":{"createAppSyncToDo":{"title":"ham","content":"spam"}}}
awscliで、DynamoDBの内容を確認します。
$ aws dynamodb get-item --table-name AppSyncToDo --key '{"title": {"S": "ham"}}'
{
"Item": {
"content": {
"S": "spam"
},
"title": {
"S": "ham"
}
}
}
データが登録されており、mutationは成功したようです。
queryの実行
続いて、DynamoDBのデータを query を使って取得してみます。
query内容は、AppSyncで自動的に作成された listAppSyncToDos をそのまま使います。
def execute_query_api(gql_client): query = """ query listAppSyncToDos { listAppSyncToDos { items { title content } } } """ result = gql_client.execute(query) print(result) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') # 登録した情報を取得する execute_query_api(c)
このスクリプトを実行してみます。
{"data":{"listAppSyncToDos":{"items":[ {"title":"ham","content":"spam"}, {"title":"Hello, world!","content":"Hello, world!"} ]}}}
AppSyncのコンソールから入力した内容、および、mutationで登録した内容を取得できました*1。
subscriptionの実行
onCreate系のsubscription
最後にsubscriptionを実行してみます。
AppSyncのSettingsにはhttpsのエンドポイントはあるものの、subscriptionで使われると思われるWebSocketのエンドポイントが見当たりませんでした。
いろいろ試してみたところ、AppSyncでのsubscriptionの流れは
- httpsのエンドポイントにsubscriptionをHTTPリクエストする
- MQTTのエンドポイントやその他の情報が返ってくる
- WebSocket(wss)を使って、MQTTのエンドポイントに接続
- DynamoDBでイベントが発生した時に、通知を受け取る
となるようです。
そこで、onCreate系のsubscriptionを例に、順に試してみます。
httpsのエンドポイントにsubscriptionをHTTPリクエスト & レスポンス
DynamoDBで新規作成イベントが発生した場合にtitleとcontentを受け取るsubscriptionを用意します。
def execute_subscription_api(gql_client, subscription): # Subscription APIに投げると、MQTTの接続情報が返ってくる r = gql_client.execute(subscription) # JSON文字列なので、デシリアライズしてPythonオブジェクトにする response = json.loads(r) # 中身を見てみる print(response) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') # DynamoDBが更新された時の通知を1回だけ受け取る # Subscription API用のGraphQL (onCreate系) subscription = """ subscription { onCreateAppSyncToDo { title content } } """ execute_subscription_api(c, subscription)
実行してみると次のようなレスポンスが返ってきます。
{'extensions':
{'subscription':
{
'mqttConnections': [
{'url': 'wss://<host>.iot.<region>.amazonaws.com/mqtt?<v4_credential>',
'topics': ['path/to/onCreateAppSyncToDo/'],
'client': '<client_id>'}],
'newSubscriptions': {
'onCreateAppSyncToDo':
{'topic': 'path/to/onCreateAppSyncToDo/',
'expireTime': None}}}},
'data': {'onCreateAppSyncToDo': None}}
キー mqttConnections の中に、MQTTのエンドポイントやtopics、Client ID が入っていました。
WebSocket(wss)を使って、MQTTのエンドポイントに接続
次はPythonのMQTTクライアントを使って、MQTTのエンドポイントに接続してみます。
今回は、MQTTクライアントとして paho.mqtt.python (paho-mqtt) を使います。
eclipse/paho.mqtt.python: paho.mqtt.python
続いて、MQTTエンドポイント接続についてです。
レスポンスの url を見ると、プロトコルが wss と、セキュアなTLSによるWebSocketを使っています。また、エンドポイントはAWS IoTのようです。
TLS & AWS IoTを使うということは、接続用の証明書などを用意しないといけないのかなと思いました。
例:X.509 証明書と AWS IoT - AWS IoT
しかし、レスポンスのurlを見ると、以下にある AWS 署名バージョン 4 がクエリ文字列としてすでに追加されていました。
MQTT over WebSocket プロトコル - AWS IoT
また、AWS署名バージョン4が追加済の場合に、 paho-mqtt を使って AWS IoTのMQTTエンドポイントと接続している例が、以下に記載されていました。
https://github.com/eclipse/paho.mqtt.python/issues/277#issuecomment-372019123
実際に試してみたところ、たしかにAWS IoTの証明書まわりは不要でした。
そこで、subscription関数に、
paho-mqttを使って、MQTTエンドポイントに接続- 接続できたら、レスポンスにあった
topicをsubscribeする - topicからメッセージが送られてきたら、メッセージ内容を出力して、接続を切断(disconnect)する
という実装を追加してみました。
def execute_subscription_api(gql_client): ... def on_connect(client, userdata, flags, respons_code): print('connected') # 接続できたのでsubscribeする client.subscribe(topic) def on_message(client, userdata, msg): # メッセージを表示する print(f'{msg.topic} {str(msg.payload)}') # メッセージを受信したので、今回は切断してみる # これがないと、再びメッセージを待ち続ける client.disconnect() # Subscribeするのに必要な情報を取得する client_id = response['extensions']['subscription']['mqttConnections'][0]['client'] topic = response['extensions']['subscription']['mqttConnections'][0]['topics'][0] # URLはparseして、扱いやすくする url = response['extensions']['subscription']['mqttConnections'][0]['url'] urlparts = urlparse(url) # ヘッダーとして、netloc(ネットワーク上の位置)を設定 headers = { 'Host': '{0:s}'.format(urlparts.netloc), } # 送信時、ClientIDを指定した上でWebSocketで送信しないと、通信できないので注意 mqtt_client = MQTTClient(client_id=client_id, transport='websockets') # 接続時のコールバックメソッドを登録する mqtt_client.on_connect = on_connect # データ受信時のコールバックメソッドを登録する mqtt_client.on_message = on_message # ヘッダやパスを指定する mqtt_client.ws_set_options(path=f'{urlparts.path}?{urlparts.query}', headers=headers) # TLSを有効にする mqtt_client.tls_set() # wssで接続するため、443ポートに投げる mqtt_client.connect(urlparts.netloc, port=443) # 受信するのを待つ mqtt_client.loop_forever()
ポイントは、MQTTのクライアントを生成する際
MQTTClient(client_id=client_id, transport='websockets')
と、
- client_idに、レスポンスの
clientの値を指定 - transportとして、
websocketsを指定
の2つとなります。
次に、スクリプトを実行してみると、コンソールに connected が表示されたままになりました。うまくいっているようです。
DynamoDBでイベントが発生した時に、通知を受け取る
最後に、AppSyncのコンソールから以下のデータを1件登録してみます。
{ "createappsynctodoinput": { "title": "new", "content": "new content" } }
すると、コンソールが進み、以下のログを出して終了しました*2。
path/to/onCreateAppSyncToDo/ b'{"data":{"onCreateAppSyncToDo":
{"title":"new",
"content":"new content",
"__typename":"AppSyncToDo"}}}'
onCreate系のsubscriptionができているようです。
ちなみに、一番最初で見たとおり、DynamoDBのストリームは無効化してあります。しかし、AppSyncではDynamoDBの変更を検知し、クライアント側に通知が来ました。これは更新系・削除系でも同じでした。
onUpdate系のsubscription
同様にして、onUpdate系を試してみます。subscriptionはこんな感じです。
update_subscription = """ subscription { onUpdateAppSyncToDo { title content } } """ execute_subscription_api(c, update_subscription)
MQTTで接続後、AppSyncコンソールのQueriesを更新系のmutationに差し替えて実行します。
mutation updateAppSyncToDo($updateappsynctodoinput: UpdateAppSyncToDoInput!) {
updateAppSyncToDo(input: $updateappsynctodoinput) {
title
content
}
}
QUERY VARIABLESも変更します。
{
"updateappsynctodoinput": {
"title": "new",
"content": "update"
}
}
AppSync上で上記のmutationを実行します。すると、MQTTを実行していたコンソールに以下が表示され、更新系のsubscriptionも受信できました。
path/to/onUpdateAppSyncToDo/ b'{"data":{"onUpdateAppSyncToDo":
{"title":"new",
"content":"update",
"__typename":"AppSyncToDo"}}}'
onDelete系のsubscription
onDelete系も試してみます。Python上では以下のsubscriptionを作成します。
delete_subscription = """ subscription { onDeleteAppSyncToDo { title content } } """ execute_subscription_api(c, delete_subscription)
MQTTで接続後、AppSyncコンソールのQueriesを削除系のmutationに差し替えて実行します。
mutation deleteAppSyncToDo($deleteappsynctodoinput: DeleteAppSyncToDoInput!) {
deleteAppSyncToDo(input: $deleteappsynctodoinput) {
title
content
}
}
QUERY VARIABLESも変更します。
{
"deleteappsynctodoinput": {
"title": "new"
}
}
AppSync上で上記のmutationを実行します。すると、MQTTを実行していたコンソールに以下が表示され、削除系のsubscriptionも受信できました。
{
"data": {
"deleteAppSyncToDo": {
"title": "new",
"content": "update"
}
}
}
以上より、Pythonで、 AWS AppSyncのquery・mutation・subscriptionをすべて試すことができました。
ソースコード
GitHubにあげました。 query_mutation_subscription ディレクトリの中が今回のソースコードです。
https://github.com/thinkAmi-sandbox/AWS_AppSync_python_client-sample
その他
PythonのGraphQLクライアントのみでsubscription
今回、subscriptionではMQTTクライアントも併用していました。
GraphQLクライアントだけでできればいいなーとは思ったのですが、今のところ対応しているクライアントは無さそうです。
- What is the chance to add Subscription ? · Issue #11 · prisma/python-graphql-client
- websocket / subscriptions support? · Issue #26 · profusion/sgqlc
- Are websockets supported? · Issue #42 · graphql-python/gql
- websocket / subscriptions support? · Issue #21 · graphql-python/gql-next
- python-graphql-client/client.py at master · hsdp/python-graphql-client
もしくは、WebSocketだけのクライアント実装がありました(HTTPは話せない)
イベント情報
来週(2019/7/4)、ぎーらぼで AWS Expert Online (AWS AppSync関連) のイベントがあります。
AWS Expert Online at JAWS-UG長野 - connpass