Amazon SQS のスキーマ管理を Protobuf でやってみました。
サンプルコードはこちら。
.proto ファイル
以下のような .proto ファイルを用意します。
// proto/example.proto syntax = "proto3"; message Item { string product_id = 1; float price = 2; } message Purchase { string user_id = 1; repeated Item items = 2; // Unix time uint32 timestamp = 3; }
今回は Purchase message を binary serialize した後、 base64 encode したものを SQS に積む方針で進めます。
AWS で SQS のリソースを作る
まずはじめに Amazon SQS のリソースを準備します。
今回は Amazon SQS のキューをひとつ作って、手元の環境からアクセスする方針で進めます(EC2 からアクセスしたりとかバックエンドを Lambda にしたりとかは 面倒 本稿のテーマとは無関係なのでやらない)。
キューをひとつ作るだけなのでマネジメントコンソールをポチポチするだけでもいいのですが、なんとなく Terraform を使ってみます。
予め SQSFullAccess ポリシーを持ったユーザーを作っておき、その Access Key と Secret を使って aws-cli でプロファイルを作成します(AWSのIAMユーザーを作る部分は説明略)。
# Access Key を準備する部分は省略 # aws configure で sqs という profile を作る $ aws configure --profile sqs AWS Access Key ID [None]: XXXXXXXXXXXXXXXXX AWS Secret Access Key [None]: XXXXXXXXXXXXXXX Default region name [None]: ap-northeast-1 Default output format [None]: None # AWS_PROFILE を指定 $ export AWS_PROFILE=sqs
.tf ファイルを準備します。
# terraform/terraform.tf provider "aws" { region = "ap-northeast-1" } resource "aws_sqs_queue" "queue" { name = "example-queue" } # Queue の URL を出力する output "queue_id" { value = aws_sqs_queue.queue.id }
あとは Terraform CLI を叩くだけで、 example-queue というキューが作られて、キューの id が出力されます。
id は URL 形式になっており、キューを積んだり取り出したりする際に必要になります。
後々用いるので環境変数に追加しておきます。
# 初回だけ必要 $ terraform init # dry run。 実行後に AWS 上にどのような変更が行われるか確認できる $ terraform plan # 実際に AWS 上のリソースが作る $ terraform apply # => queue_id = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/example-queue" # id を環境変数に登録しておく。ついでに AWS のリージョンも export QUEUE_URL=https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/example-queue export AWS_REGION=ap-northeast-1
Terraformで作成したリソースは terraform destroy で削除できます。使い終わったら忘れずに消しておきましょう*1。
Protobuf.js
.proto ファイルと SQS の準備ができたので、やっと実装が始まります。
今回は TypeScript と Protobuf.js を用いて adapter を実装します。
Protobuf.js はその名の通り Protobuf を取り扱うライブラリです。あくまでも Protobuf を扱うライブラリなので、 gRPC 関連の機能ほぼサポートされていませんが、これはむしろ今回の用途に適っているので好都合。
gRPC の文脈では @grpc/proto-loader と組み合わせて dynamic codegen ((実行時に .proto ファイルを読み込む方式))を行う際によく使われているライブラリです。
しかし、今回は Protobuf.js が提供する CLI ツール pbjs と pbts を用いて static codegen ((予め .proto ファイルを読み込んでコードを生成し、それを用いる方式)) を行います。
コード生成のコマンドはそこそこ長いので shell script にまとめておきます。
# bin/protogen.sh #!/usr/bin/env bash set -eu NODE_BIN=$(dirname $0)/../node_modules/.bin PROTO_DIR=$(dirname $0)/../proto/*.proto OUTPUT_DIR=$(dirname $0)/../generated rm -f ${OUTPUT_DIR}/*.js ${OUTPUT_DIR}/*.ts ${NODE_BIN}/pbjs --target static-module --out ${OUTPUT_DIR}/index.js ${PROTO_DIR} ${NODE_BIN}/pbts --out ${OUTPUT_DIR}/index.d.ts ${OUTPUT_DIR}/index.js
ちなみに pbts は jsdoc を元に型定義を吐くライブラリ tsd-jsdoc を使っているようです。なので pbjs で --no-comment オプションを付けてコメントを省略してしまうと .d.ts ファイルが正しく生成されません(ちょっとしたハマりポイント)。
成功すると以下のような .d.ts ファイルが生成されます。
// generated/index.d.ts から抜粋
import * as $protobuf from "protobufjs";
export interface IItem {
productId?: string | null;
price?: number | null;
}
export interface IPurchase {
userId?: string | null;
items?: IItem[] | null;
timestamp?: number | null;
}
export class Purchase implements IPurchase {
// Message class を作るやつ
public static create(properties?: IPurchase): Purchase;
// Message を binary serialize するやつ
public static encode(
message: IPurchase,
writer?: $protobuf.Writer,
): $protobuf.Writer;
// binary を deserialize して Message Class を得るやつ
public static decode(
reader: $protobuf.Reader | Uint8Array,
length?: number,
): Purchase;
}
pbts は { hoge?: (T|null); } のような undefined を許容した型を吐きます。これでは少し不便なので undefined を許容しない型へ変換するヘルパーを用意します。
// clinet/types.ts
export type DropUndefined<T> = T extends undefined
? never
: {
[P in keyof T]-?: T[P] extends undefined ? never : DropUndefined<T[P]>;
};
// DropUndefined<{ hoge?: string|null }> = { hoge: string|null }
DropUndefined<T> を用いることで、値無しの項目について明示的に null を付ける必要が出るため、意図しない代入漏れを防ぐことができます*2。
メッセージを積むやつ(client)を書く
こんな感じ
// client/index.ts
import { IPurchase, Purchase } from "../generated";
import { SQS } from "aws-sdk";
import { DropUndefined } from "./types";
async function main(): Promise<void> {
const QueueUrl = process.env.QUEUE_URL;
if (QueueUrl === undefined) {
console.error("Please set env `QUEUE_URL`");
process.exit(1);
}
const region = process.env.AWS_REGION ?? "ap-northeast-1";
// TS の object
const rawMessage: DropUndefined<IPurchase> = {
userId: "hoge",
items: [{ productId: "fuga", price: 123.4 }],
timestamp: new Date(0).valueOf(),
};
// Protobuf.js が生成した Class
const message: Purchase = Purchase.create(rawMessage);
// serialize されたバイト列
const serialized: Uint8Array = Purchase.encode(message).finish();
// バイト列を base64 でエンコードして文字列にしたもの
const base64String: string = Buffer.from(serialized).toString("base64");
// SQS に積む処理
await new SQS({ region })
.sendMessage({
MessageBody: base64String,
QueueUrl,
})
.promise();
}
main()
.catch(e => {
console.error(e);
process.exit(1);
})
.then(() => {
console.log("finished");
process.exit(0);
});
実行するとキューにメッセージが積まれます。マネジメントコンソールや aws-cli から積まれているメッセージ数を確認すると値が増えていることがわかります。
$ yarn ts-node client # finished $ aws sqs get-queue-attributes --queue-url $QUEUE_URL --attribute-names ApproximateNumberOfMessages # { # "Attributes": { # "ApproximateNumberOfMessages": "1" # } #}
キューを処理するやつ(consumer)を書く
キューを処理するコードはこんな感じになります
// consumer/index.ts
import { SQS } from "aws-sdk";
import { Purchase } from "../generated";
async function main() {
const QueueUrl = process.env.QUEUE_URL;
if (QueueUrl === undefined) {
console.error("Please set env `QUEUE_URL`");
process.exit(1);
}
const region = process.env.AWS_REGION ?? "ap-northeast-1";
const sqsInstance = new SQS({
region,
});
// SQS からメッセージを取り出す
const sqsResult = await sqsInstance
.receiveMessage({
QueueUrl,
})
.promise();
if (sqsResult.Messages === undefined || sqsResult.Messages.length === 0) {
console.log("no messages found");
process.exit(0);
}
// base64 string から Protobuf.js の Message Class を生成する
const getMessage = (base64String: string): Purchase => {
// Protobuf の仕様に従って serialize されたバイト列
const serialized = Uint8Array.from(Buffer.from(base64String, "base64"));
// Protobuf.js の Message Class
return Purchase.decode(serialized);
};
const messages = sqsResult.Messages.map(m => m.Body)
.filter(<T>(x: T | undefined): x is T => x !== undefined)
.map(str => getMessage(str));
// 各 message について、何らかの処理を行う
// 今回は console.log に適当に文を吐くだけ
messages.forEach(message => {
// `message.userId` 等でプロパティにアクセスできる
const userId = message.userId;
// 型も付く Protobuf で `uint32 timestamp = 3;` と定義したので、`message.timestamp` は number 型
const date = new Date(message.timestamp);
// ただし、 Message をネストすると型が正しく付かない
// message.items[n].price は `number | null | undefined`
const sumPrice = message.items.reduce((sum, i) => sum + (i.price ?? 0), 0);
console.log(
`ユーザー(${userId})が ${date.toLocaleDateString()} に合計 ${Math.floor(
sumPrice,
)}円の買い物をしました`,
);
});
// 取得したメッセージを消す
await sqsInstance
.deleteMessageBatch({
QueueUrl,
Entries: sqsResult.Messages.map(m => ({
Id: m.MessageId!,
ReceiptHandle: m.ReceiptHandle!,
})),
})
.promise();
}
main()
.catch(e => {
console.error(e);
process.exit(1);
})
.then(() => {
console.log("finished");
process.exit(0);
});
$ yarn ts-node consumer # ユーザー(hoge)が 1970/1/1 に合計 123円の買い物をしました # finished
こんな感じで base64 string から値を復元できます。
少し気になったのは Purchase.Items の型が IItems[] になっている点でしょうか。
IItems は { productId?: (string|null); price?: (number|null); } です。
本来なら値が指定されていない項目については空文字列や 0 で初期化されているため、 { productId: string; price: number; } とみなしてよいのですが、 null や undefined を含む型として扱われてしまい不便です。 as で対処する等の工夫が要るかもしれません*3。
余談:serialize について
今回は Protobuf のメッセージをバイナリに serialize しました。 ですが、 Protobuf はバイナリ以外のフォーマットにも serialize 可能です。 例えば、 JSON に serialize すれば、base64 エンコードせずとも SQS に乗せることができますし、デバッグ目的で手動ポーリングする際に可読性のあるメッセージが拾えて便利です。 送受信双方のライブラリが対応しているのなら JSON を用いる選択肢も検討の余地がありそうです*4。
まとめ
いかがでしたか?(枕詞) 本記事では Protobuf を SQS のスキーマ管理に用いる例を紹介しました。 今回は client/consumer ともに TypeScript で実装しましたが、 Protobuf は language-neutral をウリにしているので、別の言語で書いたアプリケーションを使うこともできます。 node 製の BFF サーバーがメッセージを積んで、 Java や Go や Rust や Haskell で書いた job worker が処理するといった運用は実際に使う場面がありそうです。
*1:Amazon SQS は 100 万リクエスト/月 まで無料でキューの維持費もとくにないので本記事の内容を一通り試した後にキューを放置してもとくに課金されることは無い。何らかの手違いで悪意ある第三者に id が見つかってイタズラされた場合はその限りではないが
*2:oneof が使われている proto との相性はあまりよくないので万能では無いが
*3:このあたりの型定義の緩さは JavaScript を使う上で避けられない面もある。現状 TS の型定義がちゃんとしている Protobuf ライブラリが見当たらず、 client 側はまだしも consumer 側を JS/TS で書くのは面倒そうだという印象を覚えた。 Protobuf は多言語対応なので、 consumer 側については別の言語で書いてしまうほうがいいのかもしれない
*4:Protobuf.js は JSON への serialize/deseriualize 双方に対応しているようなので、少し書き換えれば JSON を積む実装に変更できる