in_tail+(in|out)_forwardができるログエージェントfluent-agent-hydraをGoで書いている

タイトルが長いですが、つまりそういうものをGoで書いています。

fluent-agent-hydra - Github

(hydraっていうのは首のいっぱいあるアレです。キングヒドラとか)

特徴

  • fluent-agent-lite 的なファイルを tail -F のように追尾する機能
    • 1プロセスで複数ファイルを追跡できます
    • in_tail のような pos_file, parse 機能は今のところありません
  • in_forward 的な TCP で msgpack 形式のログを受け取る機能
    • 各種言語の logger (Ruby, Perl, Go など) から投げたログを受け取って fluentd に送り直せます
    • JSON 形式には対応していません
    • 簡易的なオンメモリバッファを持っています
  • 上記から入力されたログを fluentd に送信する out_forward 的な機能
    • 複数の送信先を登録し、primary の fluentd がダウンしたら次の fluentd に送信を試みます
    • 送信先がダウンしてる場合はファイルの追尾を進めないので、内部バッファが溜まることはありません
    • (in_forward のバッファは溢れることがあります)
  • 動作状況を JSON で返す stats monitor httpd
    • 監視に便利
  • Goで書かれているのでバイナリ1ファイル(+設定ファイル)で動作します
    • daemonize 機構はないので、何らかの supervisor (daemontools, supervisord, runitなど) 経由で動作させる必要はあります
  • Windowsでも動きました


開発背景

これまで自分の環境では、Web(App)サーバから集約 fluentd へのログ転送について

  • ファイル追尾には fluent-agent-lite
  • アプリケーションからログを受け付けるのは fluentd (in_forward)

を利用していました。

fluent-agent-lite では 1プロセスで 1ファイルを扱うため、多数のファイルを追尾する場合にはファイル数分 fluent-agent-lite + tail のプロセスが必要になります。
それ自体はメモリ消費が多少大きくなるぐらいでさほど問題ではないのですが

  • agent-lite は起動時にファイルが存在しないとプロセスが終了してしまう
    • プロセスの起動順により、追尾したいログがまだない、ということがあり得る
  • agent-lite プロセス数で監視を行うと、追跡するファイル数が増えるごとに監視設定を追従する必要がある

という点が多少不便だったのと、自分のユースケース的には in_forward も1プロセスで賄えたら便利かな、というのと、あと単純に作ってみたかったからです。

【追記】

今はファイルがなくても死なないオプションがあるそうです。


Go での fluentd 代替実装については Ik が既にありますが、本家 fluentd のように plugin アーキテクチャになっていて重厚なのと、あとドキュメントがなかったので手を出せずに、という感じです。
ただし、in_forward 機能の実装にあたって、Ikからコードを頂いた部分が多くあります。ありがとうございます。

パフォーマンス

fluentd (in_tail+out_forward), fluent-agent-lite, fluent-agent-hydra の3者で、fluentd-benchmark / one_forwardベンチマークを取りました。

ベンチマーク結果はこちらです https://github.com/fujiwara/fluent-agent-hydra#benchmark

ざっとまとめると、通常使用する領域 (秒間数百〜数万lines/secまで) において

  • CPU使用率は lite < hydra < fluentd
  • メモリ使用量は hydra < lite << fluentd

という結果です。
ピーク性能では lite が 580,000/sec、hydraでバッファサイズとGOMAXPROCSを調整すれば 700,000/sec までいけましたが、秒間50万行ファイルにログを書く人はいないと思いますので、あまり意味のある結果ではないですね。

現状

数日間、某所環境で特にメモリリークもローテート時の取りこぼしもなく快調に動いているので、一応使えるレベルではないかと思いますが、α版状態です。

今後、本番に導入を進めて安定させたいと思います。
バイナリリリースもありますので、お試しいただければ幸いです。

MHAをAWSで使うための支援ツールMHA::AWSをアップデートしてCPANに上げた

以前に作って、プロダクションでもいくつかのサービスに導入している MHA::AWS ですが、failover 方法を ENI 付け替えの他に VPC Route Table の書き換えもサポートしました。

ENI付け替えでは同一 Availability Zone 内での failover しかできませんが、VPC Route Table の書き換えによる方法では Multi-AZ 環境での failover も可能になります。

CPANにも上げましたので、 cpanm MHA::AWS でインストール可能です。

以前の紹介記事 → #11 MySQL Master HA を AWS で動作させる場合のフェイルオーバー支援ツール MHA::AWS のご紹介 | tech.kayac.com - KAYAC engineers' blog

# /etc/masterha_default.cnf
master_ip_failover_script=mhaws master_ip_failover --interface_id=eni-xxxxxxxx
master_ip_online_change_script=mhaws master_ip_online_change --interface_id=eni-xxxxxxx
shutdown_script=mhaws shutdown --interface_id=eni-xxxxxxxx

使用方法は以前と変わらず、master_ip_(failover|online_change)_script と shutdown_script に対して引数を適切に設定した mhaws コマンドを指定すれば動きます。

Usage:
        $ mhaws [subcommand] --interface_id=ENI-id [... args passed by MHA]

        required arguments:
          1. failover method is ENI attach/detach
            --interface_id=[ENI-id for master VIP]

          2. failover method is RouteTable change destination
            --route_table_id=[RouteTable-id]
            --vip=[master VIP]

        subcommand:
          master_ip_failover
          master_ip_online_change
          shutdown

実はまだ、幸か不幸か(テスト以外で) 本番環境で事故による failover が発動したことはないのですが、一応ちゃんと動くと思います。
深遠な理由で RDS でなく EC2環境で MySQL を動作させる必要がある、MHAでフェイルオーバーしたい、というかたはどうぞご利用ください。

Goで並列実行のベンチマークを取るためのライブラリ parallel-benchmark を書いた

以前 Perl で、forkして並列実行するベンチマークを取るためのライブラリ、Parallel::Benchmark というのを書きました。

これを使うと、単に Perl コードのベンチマークだけではなく、並列に外部にアクセスして計測を行うような (たとえばApacheBenchのような) ベンチマークツールが簡単に作れるので重宝しています。(仕事では、ソーシャルゲームのサーバアプリケーションに対する負荷テストを行うために使ったりもしています)

で、思い立って Go 版を書きました。

使用例

フィボナッチ数を求めるコードを並列実行するベンチマーク
  • fib(30) を1回計算するごとにスコア1とする
  • 10個の goroutine で並列実行
  • 3秒間計測
package main

import (
	"github.com/kayac/parallel-benchmark/benchmark"
	"log"
	"time"
)

func main() {
	result := benchmark.RunFunc(
		func() (subscore int) {
			fib(30)
			return 1
		},
		time.Duration(3)*time.Second,
		10,
	)
	log.Printf("%#v", result)
}

func fib(n int) int {
	if n == 0 {
		return 0
	}
	if n == 1 {
		return 1
	}
	return (fib(n-1) + fib(n-2))
}

実行結果はこんな感じです。

$ go run fib.go
2014/07/18 14:24:52 starting benchmark: concurrency: 10, time: 3s, GOMAXPROCS: 1
2014/07/18 14:24:55 done benchmark: score 330, elapsed 3.303671587s = 99.888863 / sec
2014/07/18 14:24:55 &benchmark.Result{Score:330, Elapsed:3303671587}

使い方は簡単で、benchmark.RunFunc() に計測したい処理の func() int を渡すだけです。
渡した func が返した int 値が1回実行ごとのスコアになるので、処理内容によって違うスコアを返すこともできます。(失敗したら 0 とか)

GOMAXPROCS は適宜環境変数で渡すなどしてください。

$ GOMAXPROCS=4 go run fib.go
2014/07/18 14:28:56 starting benchmark: concurrency: 10, time: 3s, GOMAXPROCS: 4
2014/07/18 14:28:59 done benchmark: score 974, elapsed 3.097321734s = 314.465233 / sec
2014/07/18 14:28:59 &benchmark.Result{Score:974, Elapsed:3097321734}
ApacheBench のような HTTP GET を行うベンチマーク

benchmark.Worker interface を実装した、自前の Worker オブジェクトをスライスで渡すことで、状態を持ったオブジェクトを使ったベンチマークを作ることもできます。

package main

import (
	"flag"
	"github.com/kayac/parallel-benchmark/benchmark"
	"io/ioutil"
	"log"
	"net/http"
	"time"
)

type myWorker struct {
	URL    string
	client *http.Client
}

func (w *myWorker) Setup() {
	w.client = &http.Client{}
}

func (w *myWorker) Teardown() {
}

func (w *myWorker) Process() (subscore int) {
	resp, err := w.client.Get(w.URL)
	if err == nil {
		defer resp.Body.Close()
		_, _ = ioutil.ReadAll(resp.Body)
		if resp.StatusCode == 200 {
			return 1
		}
	} else {
		log.Printf("err: %v, resp: %#v", err, resp)
	}
	return 0
}

func main() {
	var (
		conn     int
		duration int
	)
	flag.IntVar(&conn, "c", 1, "connections to keep open")
	flag.IntVar(&duration, "d", 1, "duration of benchmark")
	flag.Parse()
	url := flag.Args()[0]
	// benchmark.Worker interface をもった worker を作成してスライスに入れる
	workers := make([]benchmark.Worker, conn)
	for i, _ := range workers {
		workers[i] = &myWorker{URL: url}
	}
	benchmark.Run(workers, time.Duration(duration)*time.Second)
}
  • Setup() : goroutine が作成された後、各workerで呼ばれます。初期化を行うのに利用してください
  • Process() int: 全ての worker の Setup() が終了後、各 worker の Process() が指定時間に達するまでループで呼び出されます。返す int 値が1回実行ごとのスコアになります
  • Teardown(): 指定時間が経過後、各 worker で呼ばれます。後処理が必要であればここで行ってください

実装上のポイントとか

Perl版でも同様なのですが、ベンチマークを取るときにちょっと嬉しい小ネタが入っています。

  • すべての worker の初期化が終わるのを待ってから計測開始するので、重い初期化処理があっても開始が揃う
  • 途中でシグナル (INT, TERM, QUIT, HUP) を受けた場合は、そこで計測を終了してその時点での結果を返す
    • つい計測時間10分でベンチ始めたけど待つの辛いので5分にしたい、けど中断したらそれまでの計測が無駄に…というようなケースでも躊躇なく停止できます

Perl版では上記の挙動を実装するために fork した子プロセスが初期化完了するのを Parallel::Scoreborad で待ったり、子プロセスを制御するのにシグナルを送ったりしていてなかなか複雑な実装になっていたのですが、Goでは channel が使えるので大変楽に書けてすばらしいですね。

zabbix-agent で取得できる値を Mackerel の custom metrics として送り込む

Mackerel Meetup #1 Tokyo に行ってきました。鯖サンド美味しかったです。

Mackerel では custom metrics を sensu plugin 形式で出力するコマンドから送り込める (ドキュメント)、ということなので、思いついて拙作の go-zabbix-get に sensu plugin format 出力機能をつけてみました。

実行例はこんなかんじで、-f sensu をつけると key, value, unixtime をタブ区切りで出力します。

$ go-zabbix-get -k system.uptime -f sensu
system.uptime	2546472	1403230104

$ go-zabbix-get -k system.users.num -f sensu
system.users.num	1	1403230155

ということで mackerel-agent.conf に以下のような定義をするだけで

[plugin.metrics.system]
command = "go-zabbix-get -k system.users.num -f sensu"
type = "metric"

zabbix-agent で取得できる値をそのまま Mackerel の custom metrics として送信することができます。

既に zabbix-agent 側で UserParam が定義されている場合でも変更なしに Mackerel 側にもデータを送ることができるので、「いま Zabbix をメインで使っているんだけど Mackerel も試してみたいなー」というかた (私です) には大変便利じゃないかと思います!

Consul service のヘルスチェックを zabbix での監視項目と共用する

Consul での service 定義にはヘルスチェックを設定できます。Service Definition - Consul

以下のようにサービス定義に死活監視用のコマンドを登録しておくことで、一定時間ごとにコマンドを起動します。コマンドの終了ステータスが 0 : 正常、1 : warning、それ以外で critical という扱いです。このあたりは nagios, sensu 等のプラグインと互換性があるようですね。
(他に、外部から一定時間ごとに状態を API で登録する TTL 型の死活監視もあります)

{
  "Name": "nginx"
  "Check": {
    "Interval": "10s",
    "Script": "/path/to/healthcheck.sh"
  },
}

ところで、既に何らかのモニタリングツールで監視をしている場合、Node 上で動く daemon 類についてはあらかじめ監視が仕込まれていることが多いはずです。

使用しているのが nagios、sensu であればチェックスクリプトを共用できるので楽ですが、Zabbix で監視をしている場合はどのようにするのがよいか。

zabbix_get というコマンド(もしくは拙作の互換品 go-zabbix-get) を使うことで、Node 上で動いている zabbix-agent から各種情報を取ることができるため、これを Consul の死活監視にも流用できるように考えてみました。

まず、以下のような bash で書かれた wrapper script を、zabbix_get_eval という名前で用意しておきます。

#!/bin/bash
VALUES=()
# 最後の引数を除いた引数をループして zabbix_get した値を配列に入れる
for KEY in "${@:1:($#-1)}"; do
    V=`zabbix_get -k "${KEY}"`
    VALUES=("${VALUES[@]}" "${V}")
done
# 最後の引数は評価式
EXPR="${!#}"
# 評価式に値を渡して評価
bash -c "${EXPR}" -- "${VALUES[@]}"

このコマンドは引数に zabbix-agent から取得する key 名、最後の引数にそれを評価する bash script を取ります。

また、zabbix-agent の設定で、localhostからの情報取得を許可します。

Server=127.0.0.1,zabbix.example.com


以下のような状態を正常と見なす死活監視を定義してみると、

  • 動作しているnginxという名前のプロセス数が 1以上
  • かつ
  • TCP 80 を Listen している

zabbix-agent の proc.num と net.tcp.listen を以下のように使用することで、正常時には exit 0、異常時には exit 2 で終了するコマンドになります。

$ zabbix_get_eval 'proc.num[nginx]' 'net.tcp.listen[80]' '[[ $1 -ge 1 && $2 -eq 1 ]] || exit 2'

Consul に登録するサービス定義にはこれをそのまま渡せば OK です。

{
  "Name": "nginx"
  "Check": {
    "Interval": "10s",
    "Script": "zabbix_get_eval 'proc.num[nginx]' 'net.tcp.listen[80]' '[[ $1 -ge 1 && $2 -eq 1 ]] || exit 2'"
  },
}

zabbix-agent から取得できる項目は結構いろいろあり、(【参考】1 Zabbix エージェント [Zabbix Documentation 2.0]) CPUやプロセス、ネットワークの情報以外にも、面白いところでは

  • web.page.get (HTTPでURLにアクセスして内容を取得)
  • net.dns.record (DNSで名前解決した結果を取得)

などもあります。

bash での値評価は数値、文字列の一致や大小比較の他にも正規表現(=~)も使えるので、複数項目の値を使って柔軟に評価できるかと思います。

Consul の情報を Chef / Ohai から使う ohai-plugin-consul を作ったのとその周辺の話

先日とあるサービスに Consul を入れました。

内部 DNS と、たとえば nginx からアプリケーションサーバに振り分ける定義をするために service を使用しています。

そこで使うために、ohai-plugin-consul を書きました。Github にあります。

fujiwara/ohai-plugin-consul · GitHub

Ohai の version 6 と 7 で plugin の interface が変わっており、ohai-plugin-consul は Ohai 7 向けなので、Chefから使う場合は Chef-11.12.0 以上、または 11.10.4.ohai7.0 が必要です。
【参考】 Ohai, new Ohai plugins! - O'Reilly Radar

使用方法

ohai コマンドから使う場合は -d で plugin (consul.rb) を配置したディレクトリを指定して実行すると、最上位の consul というキーに API を叩いた情報が入ってきます。

ohai -d /path/to/plugin_dir | jq .consul
{
  "agent": {
    "checks": { ... },      #= /v1/agent/checks
    "members": [ ... ],     #= /v1/agent/members
    "services": [ ... ]     #= /v1/agent/services
  },
  "catalog": {
    "datacenters": [ ... ], #= /v1/catalog/datacenters
    "nodes": [ ... ],       #= /v1/catalog/nodes
    "services": [ ... ],    #= /v1/catalog/services
    "node": {
      "FOO": { },           #= /v1/catalog/node/FOO
      ...
    },
    "service": {
      "BAR": { },           #= /v1/catalog/service/BAR
      ...
    }
  }
  "status": {
    "leader": "...",        #= /v1/status/leader
    "peers": [ ... ],       #= /v1/status/peers
  }
}

Chefから使用する場合は、(client|solo).rb に plugin_path を定義してください。

Ohai::Config[:plugin_path] << '/path/to/plugins'

node[:consul] で上記と同様の情報が取得できます。

開発経緯

たとえば以下のように、app というサービスを定義して、その node に nginx からリクエストを振り分けたいとします。

$ curl localhost:8500/v1/catalog/services | jq .
{
  "app": [
    "pc",
    "mobile"
  ]
}
$ curl localhost:8500/v1/catalog/service/app | jq .
[
  {
    "ServicePort": 0,
    "ServiceTags": [
      "pc",
      "mobile"
    ],
    "ServiceName": "app",
    "ServiceID": "app",
    "Address": "192.168.1.11",
    "Node": "app001"
  },
  {
    "ServicePort": 0,
    "ServiceTags": [
      "pc",
      "mobile"
    ],
    "ServiceName": "app",
    "ServiceID": "app",
    "Address": "192.168.1.12",
    "Node": "app002"
  }
]

最初は DNS interface を使って、以下のように nginx から app.service.consul の名前解決をして振り分けようとしました。

# nginx.conf
location / {
  set $app "app.service.consul";
  proxy_pass http://$app:5000;
}

が、以下の事情により DNS による振り分けは断念。

  • Consul (v0.2.1) では DNS (UDP) でのアクセスでは、サービスに node が何台いても 3アドレスをランダムに返す
  • Consul が TTL 0 で応答を返すが、nginx は1秒間は名前解決結果を cache する
  • そのため、任意の1秒間では特定の 3 node にしか振り分けられない
  • 4 node 以上ある場合は1秒ごとに全くアクセスが行かない node ができてしまう

ということで、Chef でテンプレートから生成している nginx.conf に Consul API から取得した service を渡す形にしました。

# nginx.conf.erb
upstream pc_backend {
<% node[:consul][:catalog][:service][:app].select{|n| n[:ServiceTags].include?("pc") }.each do |n| %>
   server <%= n[:Address] %>:5000;  # <%= n[:Node] %>
<% end %>
}

このテンプレートを上記の service 定義で展開すると以下のようになります。

# nginx.conf
upstream pc_backend {
   server 192.168.1.11:5000;  # app001
   server 192.168.1.12:5000;  # app002
}

まだやってないこと

nginx.conf をファイルとして静的に展開するので、service の状態に変化 (nodeの増減など) があった場合にはそれを検知して設定ファイルを再生成、再読込する必要があります。

Consul には blocking query という仕組みがあり、状態の変化を long polling する HTTP API で検知することができます。

mizzyさんの consul-catalog というライブラリを使用すると、以下の Gist のようなコードで service の変更を検知して chef-client を実行、という形が取れるかと思います。

https://gist.github.com/fujiwara/4cdff1d718ecaa2b8294

【参考】【Consul】ブロッキング・クエリ(blokcing query)とは | Pocketstudio.jp log3

また、Consul の service にはヘルスチェック機構がありますが、他に Zabbix でやっている監視とうまく共用できないか構想中のためまだ入れていません。(backendに接続できなければ nginx が切り離すので、今はとりあえずそれで…)

zabbix-getコマンドのGo版を書いた ので、うまいこと組み合わせられないかと構想中です。

Zabbix のスクリーンを percol で快適に選択して開く open_zabbix_screen を作った

数えてみたら Zabbix のスクリーンが180枚もできていて、こうなるとブラウザのプルダウンでの選択がめんどくさいわけです。

一応、先頭一致でインクリメンタルサーチはできますが日本語が混じっているとできないし……と不満に思いつつ使っていたのですが、percol というコマンドが便利だということを知ったので percol で選択したスクリーンを開くツールを書いてみました。

App::OpenZabbixScreen

Perl製です。あらかじめ PATH が通ったところに percol をインストールしてください。

ターミナルから open_zabbix_screen コマンドを起動すると、初回のみ Config::Pit が環境変数 $EDITOR のエディタを開くので、Zabbix の URL (API エンドポイントが http://example.com/zabbix/api_jsonrpc.php なら http://example.com/zabbix/ まで) とユーザ名、パスワードを入力して保存すると、スクリーンの選択画面が開きます。

Zabbix API を叩いてスクリーン一覧を取得して、percol で選択した URL を open コマンドの引数に渡してくれるので、ブラウザでスクリーンが一発で開きます。

便利!


【参考】