
はじめに
enechain データプラットフォームデスク エンジニアの鳥山です。
enechainでは電力取引に必要な情報(例えば電力需給や燃料価格)を外部から収集し、社内外に公開しています。
このデータ収集はAPI経由で有償購入しているものもあれば、オープンデータとしてwebサイトに公開されているものもあります。電力自由化以降、電力需給に関する情報の公開が進んでおり色々なデータが参照可能となっていますが、それらは必ずしもマシンフレンドリーなデータばかりであるとは限りません。これは電力に限らず、特定の業界特化のドメインデータにおいてはよくある状態です。
本稿では、このような不確実性の高いデータを収集・活用するにあたってのノウハウの一部をご紹介させて頂きます。
データ取得における問題
基本的な考え方
APIや外部サイトからデータを機械的に取得しデータベースへ保管すること自体は特に難しくなく、初学者でも対応可能なタスクといえます。例えばPythonを用いれば簡単に実装できます。(サンプルデータとして東京都のオープンデータAPIを利用させて頂きました1)
import requests # 東京都 新型コロナウイルス感染症新規陽性者数 東京都 新型コロナウイルス感染症新規陽性者数 res = requests.post( "https://service.api.metro.tokyo.lg.jp/api/t000001d0000000011-819fb24a2e74a5f2ea848d548c5cff7d-0/json?limit=5", headers={"accept": "application/json", "Content-Type": "application/json"}, json={"column": ["公表_年月日", "日別陽性者数"]}, ) data = res.json()["hits"] # DBへの投入は使用している言語やフレームワークに依存するので詳細割愛 # df = pd.DataFrame(data) # insert_to_db(df)
ですが、実際にこの処理を定期実行し、データの提供を開始すると様々な問題が発生することに気付きます。
問題
- リクエスト時のエラーによりデータが取得できない
- データ形式が予告なく変更される
- データの公開が遅延する
- データの値が誤っていることがある
上記のような要因で処理が失敗すると、後続のサービス提供にそのまま影響が出てしまいます。
対処
方針として、全ての外部データ取得は失敗する可能性が必ずあると考えるべきです。
リトライとタイムアウトを設定する
- 取得処理後には必ずステータスコードを確認し、必要に応じ再取得するようにしましょう。
- 再取得に関してはAPIやwebサイトの規約を参照し、過度なアクセスをしないことに注意が必要です。リトライの間隔を空けたり、一定回数失敗したら諦めるなどの策を取りましょう。後述しますが、リトライ戦略にはデータを利用する側のニーズも加味して設定することが望ましいといえます。
- 処理に対する応答がないことを考慮しタイムアウトも必ず設定します。多くのライブラリにはタイムアウト機能が含まれており、簡単に設定可能です。
データを検査する
- ステータスコードが正常だとしても、データが正常とは限りません。json/csv/xmlなどのファイル形式、そのkeyやcolumnが想定通りかを検査すべきです。
💡 Tips: このとき、想定と異なる状況ならば処理を無理に継続するよりも異常終了させてしまった方が良いでしょう。間違ったデータを提供することはデータを提供できないよりなお悪いことです。
- データ形式が正しくても遅延、すなわち期待する時刻にデータが揃っていないケースが考えられます。APIを用いてデータ取得範囲をパラメータで指定できればステータスコード404で検知できることもありますが、1つのファイルにデータが追記されていく形式の場合は遅延を見逃す可能性があります。データに含まれる時系列のキーが最新値であることを確かめましょう。
- 形式が正しく遅延していないように見えても値そのものが誤っている可能性は残ります。軽微な誤りは利用側からは判断できませんが、桁数が異なるような重大な差については値の範囲や前日差/前日比といったヒューリスティックな判断により異常値を検知してエラー扱いとすることで取り込みを抑止する策もあります。高い信頼性が求められるデータについてはこのような検査することが望ましいでしょう。
- 有償APIを利用する場合はエッジケースにおける振る舞いがドキュメントに明文化されていることが多いですが、オープンデータ(特に電力業界に関わるもの)はwebサイトに掲載されているものをAs-Isで利用するしかないケースが多いです。この場合にどのような検査が必要かは実際の挙動を見ながら徐々に改善していくしかありません。
rawデータを永続化し、履歴を残す
- 前項のような対策をするためには、データ提供元から実際にどのような異常データが提供されたのかを調べる必要があります。しかし、冒頭に記述した簡単なサンプルにおいては失敗の原因となったデータは実行時のメモリ上にしか存在せず事後検証が難しいです。この問題に対処するため、GCSやS3といったオブジェクトストレージを利用し取得されたrawデータをそのまま格納するのが良いでしょう。
💡 Tips: このとき、格納前にデータの整形を試みないことが重要です。整形はファイル構造に対し何らかの前提を置いているものであり、整形をすることは必ず失敗の可能性を持つためです。取得が成功したら「最初に」オブジェクトストレージに格納することで失敗のリスクを減らしましょう。
- オブジェクトストレージに格納するデータは破棄や上書きをせず、処理の実行ごとに履歴保管しましょう。外部データに対して冪等性を期待することは難しく、利用側としては「この瞬間にこうだった」の積み重ねから取得・利用方法を考えていくしかありません。そのための証跡として、各タイミングの処理結果は個別に蓄積すべきです。
💡 Tips: データが大容量になるとコストの問題が気になるかもしれません。この問題に対しては過去データの圧縮やオブジェクトストレージのストレージクラスを安価なものに変更するなどで対処することも検討しましょう。
問題に対処した実装例
問題に対処した実装例です。
import json import time import requests # リトライ制御は素朴な実装にしているが、実務ではライブラリを使うとスマート # (Pythonではurllib3.util.retryやTenacityなど) for i in range(4): try: res = requests.post( "https://service.api.metro.tokyo.lg.jp/api/t000001d0000000011-819fb24a2e74a5f2ea848d548c5cff7d-0/json?limit=5", headers={"accept": "application/json", "Content-Type": "application/json"}, json={"column": ["公表_年月日", "日別陽性者数"]}, timeout=60, # 応答がないと失敗にも気付けないため、タイムアウトは必ず設定する ) # Pythonのrequestsのステータスコードチェック機能を利用 # ステータスコードによって処理を分岐したい場合はres.status_codeを見て分岐実装 res.raise_for_status() except Exception as e: if i < 3: print(e) time.sleep(600) # 過度な連続リトライは相手方に迷惑を掛けるため、調整する else: raise # rawデータをそのままで一旦物理ファイルに保存。これが失敗時の証跡となる # これは簡易なサンプルで、実務ではオブジェクトストレージの類を利用する(GCS, S3, etc) with open("data.json", "wb") as f: f.write(res.content) # -----(ここで処理分割すると問題の発生時に失敗箇所の特定が容易になる。今回は割愛)----- # 物理ファイルからの読み込み with open("data.json", "r") as f: data = json.load(f) # データ形式が想定通りか確認 # APIやwebサイトの挙動を観察して状況に応じた実装をする # (挙動が保証されるAPIならやらなくていいこともある) if "hits" not in data: raise ValueError("Expected key is not found in json") records = data["hits"] # 件数やデータ遅延のチェック # APIやwebサイトの挙動を観察して状況に応じた実装をする(0件や遅延の際の挙動に着目) if len(records) == 0: raise ValueError("No records found") # サンプルデータが過去のものであるため過去日程を指定しているが、 # 実際には実行時点で期待するデータの最新日が含まれるかを確認する if max([r["公表_年月日"] for r in records]) < "2020-01-24": raise ValueError("The latest date is not found") # DBへの投入は使用している言語やフレームワークに依存するので詳細割愛 # df = pd.DataFrame(records) # insert_to_db(df)
データ利用における問題
基本的な考え方
データの取得ができたら、次はそのデータをどのように利用するかを検討します。毎日何時にAPIで、 BigQueryで、SpreadSheetで、社内サーバ上のExcelファイルで…といった様々な方法が存在します。方法は多様で、公開すること自体に大きな問題は生じないでしょう。 しかし、運用を続けていくとこの断面においても課題が発生します。
問題
- データの精度、特に遅延に関する期待値の認識が利用側と合わないことがある。特定のデータがないと業務が進まないが、データ提供元の公開が遅延していて対応不能になってしまう
- データが当初想定していた利用目的外に流用される。特にSpreadSheetや社内サーバなど多数の人間が見られる場所に公開されたデータは本来想定している業務とは異なる業務に流用されることがある。これに付随し、トラブルや改修対応が発生した際の告知範囲が特定しづらくなったり、データライセンスに関する統制が取れなくなったりする
対処
データの不確実性について事前に認識を合わせる
外部データ利用を開始する際は、データの取得時刻やリトライ戦略および遅延が発生した際の取り扱いの認識を合わせておきましょう。
データXは平日9時に参照可能になりますとだけ伝えることも可能ですが、例えば以下のように情報を提供できます。データXは平日9時に取得を行い、遅延があった場合は最大1時間までリトライを試みます。 リトライ期間を超過した場合はトラブルとして社内告知を行います。 過去◯ヶ月で実際に遅延が発生した回数は◯回で、最遅でN時まで取得できないケースがありました。 利用においてはデータが取得できなかった場合のサイドプランを業務フローに組み込むようにしてください。外部データの取得においては失敗や遅延の可能性をなくすことはできず、プロダクト内部のデータのみを参照する処理と比べ高い確率で失敗することになります。取得処理が保証する範囲を定め、失敗前提の利用方法を構築してもらうことが重要です。
公開範囲を制限する
- 全てのデータを全員に公開するのではなく、データごとに参照可能なユーザを制限しましょう。これにより誰がデータを見ているかを提供側で把握し、トラブルや改修対応時の告知対象が明確になったり、ライセンス違反を抑制できるようになります。
- データ公開方法の選定の段階で権限制御が行えるものを選ぶことも重要です。スモールスタートとしてまずは利用者全員が参照できるサーバやクラウドストレージに置くといったことはよくあるケースですが、利用が拡大するならば早期に移行を検討すべきです。
- enechainでは主にBigQueryを用いてデータ公開設計をしています。これについては以下の記事で詳しく触れていますのであわせてご覧ください。 techblog.enechain.com
最後に
enechainのプロダクトとビジネスの土台を支えるデータ収集・活用についてご紹介させて頂きました。外部データを取ってきて公開するというシンプルなタスクではありますが、プロダクト機能や社内業務フローの一部として組み込むには考慮すべき事項が多くあります。本稿で挙げたようなノウハウが、不確実な外部データを安全に取り扱う際の参考になれば幸いです。
enechainでは、一緒に働く仲間を募集しています。 ご興味があれば、以下のリンクからぜひ詳細をご覧ください。 tech.enechain.com herp.careers
- 東京都 新型コロナウイルス感染症新規陽性者数 https://spec.api.metro.tokyo.lg.jp/spec/t000001d0000000011-819fb24a2e74a5f2ea848d548c5cff7d-0 東京都、クリエイティブ・コモンズ・ライセンス 表示4.0国際 (https://creativecommons.org/licenses/by/4.0/deed.ja)↩