Amazon CloudSearch にドキュメントを取りこむ Lambda 関数 s32cs のご紹介

このエントリは OSS紹介 Advent Calendar 2017 - Qiita 16日目の記事です。穴が空いていたので拙作の紹介で穴埋めを。

s32cs という Amazon CloudSearch に対して S3 からのイベントドリブンでドキュメントを投入する Lambda を書いたのでご紹介します。

github.com


3行で

  • Amazon CloudSearch にドキュメントを登録する Lambda を作った
  • アプリケーションからは Fluentd にログを送るだけ
  • Kinesis Firehose + S3 を利用することで、ストリームデータを 一定時間/一定サイズ に区切っての継続的なバッチ処理が容易かつ堅牢に

基礎知識

Amazon CloudSearch は AWS が提供している、フルマネージドな検索エンジンです。ドキュメント数が多くなったりすると勝手にスケール (アップ|アウト) してくれるので、キャパシティプランニングにそれほど気を遣うことなく全文検索を実装できます。

業務では数億(5より大きい)レコードを投入しているものもありますが、おおむね元気に動いてくれています。

CloudSearch へのデータの投入方法あれこれ

Amazon CloudSearch ドメインにデータをアップロード - Amazon CloudSearch

ここがちょっと癖があるところで、逐次レコードを登録するような、いわゆるストリーミングアップロード的なことはできず、投入するレコードが複数まとまった 5MBまでの JSON / XML ファイル (SDF) を用意してバッチ的に投げ込む API になっています。

素朴な方法

最初はものすごく素朴に、「5分ごとに DB から直近 5分のレコードを取得し、JSON に加工した上でアップロードする cron 処理」などで投入していました。

しかし、このような頻度が高い定期処理は、いろいろ面倒なことがあるのでできれば避けたいものです。

  • 前回の処理が何らかの原因で終わってない場合の排他処理
    • 特に流量が多くなると、並列処理で分散させるのが面倒になります
  • 失敗した場合のリカバリ
    • 前回処理が終わった最後のレコードを覚えていないといけない

また、レコードが削除されたというドキュメントも CloudSearch 側に反映しないといけないため、DBから物理削除してしまうと何らかの削除済みフラグを他に保存しておく必要があり、処理が複雑になります。

ken39arg/fluent-plugin-cloudsearch

そこで同僚の ken39arg が開発したのが fluent-plugin-cloudsearch です。Fluentd の output plugin として動作し、指定した時間/サイズごとにバッファを CloudSearch にアップロードするものです。

github.com

これによって、アプリケーションからはドキュメントの情報をログとして Fluentd に送信するだけでインデックスの更新ができるようになりました。ストリーミング処理ですね!

ただ、これもいくつか問題がありました。

  • 複数台で動作させると、ある1レコードの作成と削除が別々の Fluentd のバッファに保存される可能性がある
    • バッファのフラッシュタイミングによっては、作成→削除 ではなく 削除(空振り)→作成 という順で処理され、削除されたはずのレコードが消えてくれない
  • Fluentd のホストがダウンすると、バッファが失われる可能性がある
    • コンテナで動作させると、落ちた瞬間に失われるのでロストするリスクが大
  • 投入した SDF はどこにも保存されずに消えてしまうので、問題があった場合のリカバリが難しい

s32cs

ということで開発したのが s32cs です。名前はまったくいいキラキラネームが思いつかなかったので、「S3 to CloudSearch」という意味です。 Go + Apex で実装されています。

S3 にオブジェクトが配置された時のトリガで Lambda を実行し、

  • S3 からオブジェクトを取得
  • 以下のような 1行 1レコードの JSON をアップロード用の SDF 形式に加工
{"id":"123","type":"add","fields":{"foo":"bar","bar":["A","B"]}}
{"id":"123","type":"delete"}

[
  {"id":"123","type":"add","fields":{"foo":"bar","bar":["A","B"]}},
  {"id":"123","type":"delete"}
]
  • CloudSearch にアップロード

という動作を行います。

S3 に JSON を配置するときに fluent-plugin-s3 で行うと、fluent-plugin-cloudsearch 同様に作成と削除が入れ替わる可能性があるため、Firehose を経由することでレコードの発生順序を(できるだけ)直列化するのがお薦めです。

  1. アプリケーションは Fluentd に送信
  2. Fluentd は Kinesis Firehose に逐次送信
  3. Firehose は 5分 / 5MB 単位などで S3 にオブジェクトを保存
  4. s32cs が S3 のオブジェクトを加工してアップロード

これによって、fluent-plugin-cloudsearch であったいくつかの問題も解消しました。

  • 複数台で動作させると、ある1レコードの作成と削除が別々の Fluentd のバッファに保存される可能性がある
    • → Firehose へなるべく間隔をおかずに送信することでイベントの順序を保つ
  • Fluentd のホストがダウンすると、バッファが失われる可能性がある
    • → 短時間で Firehose へ送り出されるので、ダウン時の影響が小さい
  • 投入した SDF はどこにも保存されずに消えてしまうので、問題があった場合のリカバリが難しい
    • → S3 に元データが残っているので、再度別の箇所に投入したり、内容を確認することが容易

まとめ

Kinesis Firehose + S3 を利用することで、ストリームデータを 一定時間/一定サイズ に区切っての継続的なバッチ処理が Lambda で容易かつ堅牢に行えるようになります。

使いでのあるアーキテクチャパターンなので、今後も多用していきたいと思います。