KinesisはLambdaのイベントリソースとして設定することができます。
Lambda関数にKinesisイベントリソースを設定する
Kinesisのストリームにデータが積まれた時にLambda関数を実行する
$ aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:ap-northeast-1:****:stream/streamName \ --function-name lambdaFunctionName \ --enabled --starting-position LATEST
Lambda関数の定義
引数のeventからストリームのレコードが取得できる。
import boto3 import json import base64 def lambda_handler(event, context): try: # eventにKinesisのstream recordが渡される(record数はイベントソースの設定上限まで) for record in event['Records']: #recordはbase64されて運ばれる payload = base64.b64decode(record['kinesis']['data']) data = json.loads(payload)
Lambda実行ロールのポリシーについて
なお、Lambdaの実行ロールにKinesisへのアクセスポリシーが必要。多分、Lambdaコンテナ起動時にストリームアクセスしているからだと思う。
ロールに付与するポリシー例。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*",
"kinesis:ListStreams",
"kinesis:GetRecords"
"kinesis:GetShardIterator",
"kinesis:DescribeStream"
],
"Resource": "*"
}
]
}
リンク
