こんにちわ
がじぇったー (@hackmylife7) | Twitter
です。
仕事でPythonを書く機会があったのでそのCodeを共有します
RDS⇄RDSのレプリケーション監視は前日書いた以下の記事で紹介しましたが、
今日はEC2→RDSなどレプリケーション元がRDS以外の場合にレプリケーションの失敗を監視する方法を記載します。
gadgeterkun.hatenablog.com
プログラム中のコメントにも書きましたが、show slave statusコマンドの結果、以下の2項目がどちらもyesになっていることでスレーブが問題なく動作していると判断します
- Slave_IO_Running: Yes (I/O スレッドが起動され、マスターに正常に接続したかどうか.ConnectingなどになっているとSlave→Master側へのアクセスがうまくいっていない)
- Slave_SQL_Running: Yes (SQL スレッドが起動されたかどうか. NoになっているとMaster⇄Slave のデータ整合が合わなくなっている)
監視構成
- Lambdaからレプリケーション先となるRDSに"SHOW SLAVE STATUS;"コマンドを実行し、その結果を判断することにより監視を行う
- LambdaはRDSと同一のVPC内で実行する必要がある
- RDSへ接続するために使用するライブラリはpymysqlを用いる
- 監視が失敗した際の通知先はslackにする

Code
動作するCodeは以下です。
プログラム上部の環境変数は自身の環境に合わせてください。
import sys
import logging
import pymysql
import traceback
import urllib.request
import json
#rds settings
rds_host = "RDSのエンドポイント"
name = "admin"
password = "test"
db_name = "mysql"
WEBHOOK_URL = "webhookのURL"
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def _notify():
"""
slack通知を行う
Args:
None
Returns:
None
"""
logger.debug("Notify Script Start.")
color = 'danger'
payload = {
'attachments': [
{
"title": "Notify Replication Error",
"text": "レプリケーションエラーを検知しました。データベースを確認してください",
"color": "danger"
}
]
}
try:
request = urllib.request.Request(
WEBHOOK_URL,
data=json.dumps(payload).encode(),
headers={'Content-Type': 'application/json'},
method='POST'
)
with urllib.request.urlopen(request) as response:
return response.read().decode('utf-8')
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
stack_trace = traceback.format_exception(
exc_type, exc_value, exc_traceback)
logger.error(''.join(stack_trace))
def main(event, context):
try:
logger.debug("Script Start.")
logger.debug(f"User: {name}")
logger.debug(f"Host: {rds_host}")
conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5, cursorclass=pymysql.cursors.DictCursor)
logger.info("SUCCESS: Connection to RDS MySQL instance succeeded")
with conn.cursor() as cur:
"""
show slave statusコマンドの結果、以下の2項目がどちらもyesになっていることでスレーブが問題なく動作していると判断する
・Slave_IO_Running: Yes (I/O スレッドが起動され、マスターに正常に接続したかどうか.ConnectingなどになっているとSlave→Master側へのアクセスがうまくいっていない)
・Slave_SQL_Running: Yes (SQL スレッドが起動されたかどうか. NoになっているとMaster⇄Slave のデータ整合が合わなくなっている)
"""
cur.execute("show slave status")
result = cur.fetchall()
logger.debug(f"result: {result}")
result_list = [i for i in (result[0]["Slave_IO_Running"], result[0]["Slave_SQL_Running"])]
logger.debug(f"result_list: {result_list}")
result_list = ["NO"]
# どちらかがNoの場合、Slackに通知する
if all([x == "Yes" for x in result_list]):
logger.info(f"Message: Database is running normally")
sys.exit(0)
logger.info(f"Message: Notify Replication ERROR")
_notify()
return "succeeded"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
stack_trace = traceback.format_exception(
exc_type, exc_value, exc_traceback)
logger.error(''.join(stack_trace))
if __name__ == "__main__":
main()
LambdaへCodeをアップする際はpymysqlのライブラリもアップロードする必要があるので下記の手順でzip化してアップしましょう
❯ pip install pymysql -t ./ ❯ chmod -R 755 ./* ❯ zip -r zip_file ./*
↓手を動かして学べる最高の本