読者です 読者をやめる 読者になる 読者になる

Amazon SQSを利用してS3からRedshiftにデータ投入するRinというツールを書いた

fluentdで集約したログをRedshiftに投入するのに、これまでは fluent-plugin-redshift を使っていたのですが、諸々の理由でこれを置き換えるツールをGoで書きました。

Rin - Redshift data Importer by SQS messaging.

プロダクション環境に投入して、2週間ほど快調に動作しているので記事を書いておきます。

アーキテクチャと特徴

S3にデータが保存されたタイミングで、Amazon SNS または SQS にメッセージを飛ばすイベント通知機能がありますので、それを利用しています。

  • (何者か) S3 にデータを保存する (fluent-plugin-s3, その他どんな手段でも可)
  • (S3) SQS に S3 の path 等が記述されたメッセージを通知する
  • (Rin) SQS のメッセージを受信し、Redshift へ COPY を発行して取り込みを行う

S3, SQSの設定をした上で以下のような config を用意し、rin -config config.yaml として起動しておくだけで動作します。

1プロセスで、複数の S3 path(bucket) に対応した Redshift の table (schema) への投入を扱えます。

Go 製なので、バイナリをダウンロードするだけで動作可能です。

queue_name: my_queue_name    # SQS queue name

credentials:
  aws_access_key_id: AAA
  aws_secret_access_key: SSS
  aws_region: ap-northeast-1

redshift:
  host: localhost
  port: 5439
  dbname: test
  user: test_user
  password: test_pass
  schema: public
s3:
  bucket: test.bucket.test
  region: ap-northeast-1
sql_option: "JSON 'auto' GZIP"       # COPY SQL option

# define import target mappings
targets:
  - redshift:
      table: foo
    s3:
      key_prefix: test/foo
  - redshift:
      schema: $1      # expand by key_regexp captured value.
      table: $2
    s3:
      key_regexp: test/([a-z]+)/([a-z]+)/

開発動機

fluent-plugin-redshift を利用していた間、以下のような問題がありました。

アップロード時に重い

fluentdのバッファとしてmsgpack形式で保存したものを、S3へのアップロード時に取り込み用のフォーマットに変換するという処理を行うため、fluentd の CPU を相当食います。それなりの流量のデータ(数千msgs/sec程度) を Redshift に投入しようとすると、fluentd は1プロセスでは複数CPUを有効に使えないため、複数プロセスに処理を分割する必要がありました。

Redshift のメンテナンス時に面倒

Redshiftのクラスタにノードを追加、削除する場合、クラスタリサイズ中にはデータ投入ができなくなります(読み取りは可能)。

その状態で fluent-plugin-redshift のデータ投入が走ると、S3へファイルをアップロードするところまでは成功した上、その後の COPY の発行でエラーになるため、fluentdの処理は「S3へのアップロード処理から」リトライされます。

リトライされるので最終的には問題なく取り込まれるのですが、S3には投入できなかったファイルが残ったままになり、投入できたファイルとできなかったファイルには部分的に同一のログが重複して含まれる状態になります。

エラーになって取り込まれなかったファイルをきちんと消しておかないと、後日まとめて再取り込みをしようとしたときに、ログを重複して読み込んでしまうことになります。

時々死ぬ

原因は結局特定できなかったのですが、plugin-redshiftの定義を多数記述すると数日〜数週間に一度程度の頻度で fluentd ごと処理が停止していました。こうなると kill -KILL しないと再起動もできなくなります。fluentdの優秀なバッファ機構のおかげで kill してもデータロストはないようですが、停止を検知 (ログが流れてこなくなる) して、強制再起動する仕組みを作ってだましだまし動かしていました。

Rin でうれしいこと

S3へのアップロードが軽くなる

fluent-plugin-s3 はアップロードする形式で直接バッファに保存し、そのまま(圧縮して)S3に投げるだけのため、バッファからの再構築でのCPU消費がありません。

Redshiftのメンテナンス時のリトライ処理が楽

S3に上げるところまでは Redshift とは無関係のため、S3 へアップロードされたものが部分的に重複することはありません。 Rin が Redshift へ投入できなかった場合には SQS のメッセージは削除せず、不可視期間が過ぎた後に再度実行します。リトライは SQS のメッセージで担保されます。

死ににくい

fluent-plugin-s3が原因でfluentdが刺さった経験は未だありません。

fluentd以外からのデータ投入も可能になる

S3, ELB, CloudFront など、S3 にログが保存されるサービスの Redshift への取り込みも統一的に扱うことができます。 (まだやってないけどできるはず…)

FAQ

Q1 "Redshift data Importer by SQS messaging" だったら Rin じゃなくて Ris なのでは?

A1 最初、SQS ではなく SNS 通知をトリガにして取り込むようにしようと名前を決めてコードをある程度書いた後に、リトライと実行時のレスポンスを考えると SQS のほうが……となった経緯があります。

Q2 Lambda でやったらよいのでは?

A2 本記事執筆時点、Tokyoリージョンには未だに Lambda が来ていません(もうすぐ来そうな予感がひしひしとしていますが)。 また、Lamba のリトライ処理は3分間隔で3回、とのことなので、リサイズ中には比較的長時間失敗し続けることを考えると不安があります。参考: S3、Kinesis/DynamoDB StreamsでのLambdaリトライ処理