インストール編(Beats)では、BeatsのインストールとFilebeatの設定を行ないました。Elasticスタックのインストールと、その後の可視化システムの構築の流れを再掲します。(下図)
- Elasticsearch, Elasticsearch X-Packのインストール(済)
- Kibana, Kibana X-Packのインストール、Basicライセンスの取得と投入(済)
- Logstash, Logstash X-Packのインストール(済)
- Beats (Filebeat)のインストール(済)
- Logstashのログ処理ルールの作成とテストデータでの動作確認(本記事)
- Filebeat→Logstash→Elasticsearchでの運用開始
- Kibanaでの検索、ダッシュボードの作成
本記事では、テスト用のログデータを用いて、FilebeatからLogstashにデータが送信されることを確認します。その後、Logstashによるログデータ加工のための処理ルールについて説明していきます。そして最後に、再びテストデータを用いて、Logstashの処理ルールが正しく動作することを確認します。
テスト用ログファイルの作成
まず、テスト用のログデータを用意しましょう。用意といっても、新たにテストデータを作るわけではなく、すでにあるホームゲートウェイのログから適当な一行を切り出せばOKです。インストール編(Beats)で、テスト用のログファイルパスを/tmp/hgw.log
と指定しましたので、例えば以下の内容のファイルを/tmp/hgw.log
として保存します。
/tmp/hgw.log
Apr 28 00:15:00 filebeat hgw: 2018/4/28 00:14:51 SRC=62.210.180.80/5283 DST=203.0.113.1/5090 UDP table=spi
FilebeatからLogstashへのテストデータ送信
テストファイルが作成できましたので、さっそくFilebeatからLogstashへ送信してみましょう。まず、Logstashを起動します。
/usr/local/logstash/bin/logstash --path.settings /usr/local/etc/logstash --path.logs /var/log/logstash
次に、Filebeatを起動します。
service filebeat start
一瞬の後に、以下のような出力がLogstashを起動したターミナルに表示されます。出力内容の中にmessage
という要素が含まれており、用意したテスト用ログの一行がそのまま含まれていることを確認してください。このように、Logstashが受信したログデータは、特に指定を行なわなければmessage
という要素に格納されます。これ以外のデータ要素はFilebeat、あるいはLogstashが付加したメタデータです。
{
"source" => "/tmp/hgw.log",
"@timestamp" => 2018-04-30T05:27:34.305Z,
"message" => "Apr 28 00:15:00 filebeat hgw: 2018/4/28 00:14:51 SRC=62.210.180.80/5283 DST=203.0.113.1/5090 UDP table=spi",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"host" => "filebeat.example.com",
"fields" => {
"logtype" => "firewall_hgw"
},
"@version" => "1",
"beat" => {
"name" => "Filebeat",
"hostname" => "filebeat.example.com",
"version" => "6.2.3"
},
"offset" => 109,
"prospector" => {
"type" => "log"
}
}
Logstashを用いたログデータの加工
テストデータの送受信はうまく行きましたか? いいですね。では、いったんLogstashとFilebeatを停止させておいてください。
次は、本記事で構築しようとしているログ可視化システムの肝になる部分、Logstashによるログデータの加工について説明します。
以下の記事では、Logstashを用いてApacheログを加工する手順について詳しく紹介しています。本記事ではホームゲートウェイのログを扱いますので、ソースデータの種類は異なります。しかし、基本的な処理の流れは同じですので、こちらの記事も参考にしていただければと思います。
さて、本記事におけるログ処理の流れは以下のようになります。
- 入力元からログデータを受信
- 受信ログをパース
message
フィールドを削除(パース成功時のみ)- タイムスタンプを修正
source_host
フィールドを追加- アクセス元の地理的位置を検索
- アクセス元のドメイン名を逆引き
- 加工済みのログデータを送信
本処理を記述する設定ファイルは/usr/local/etc/logstash/logstash.conf
です。まず、ファイルの全体を示しておきます。
/usr/local/etc/logstash/logstash.conf
input { # 1. 入力元からログデータを受信 beats { port => 5044 host => "0.0.0.0" } } filter { if [fields][logtype] == "firewall_hgw" { grok { # 2. 受信ログをパース patterns_dir => [ "/usr/local/etc/logstash/patterns" ] match => { "message" => [ "%{SYSLOGBASE} %{HGWLOG}", "%{SYSLOGBASE} %{GREEDYDATA:raw_message}" ] } } if "_grokparsefailure" not in [tags] { mutate { # 3. messageフィールドを削除(パース成功時のみ) remove_field => [ "message" ] } } date { # 4. タイムスタンプを修正 match => [ "hgw_timestamp", "yyyy/M/d HH:mm:ss" ] } mutate { # 5. source_hostフィールドを追加 add_field => { "source_host" => "%{source_ip}" } } geoip { # 6. アクセス元の地理的位置を検索 source => "source_ip" target => "geoip" } dns { # 7. アクセス元のドメイン名を逆引き reverse => [ "source_host" ] action => "replace" nameserver => [ "192.168.1.251", "192.168.1.247" ] failed_cache_size => 1000 failed_cache_ttl => 300 hit_cache_size => 1000 hit_cache_ttl => 300 } } } output { # 8. 加工済みのログデータを送信 stdout { codec => rubydebug } }
入力元からログデータを受信
input { beats { port => 5044 host => "0.0.0.0" } }
Beatsプラグインを用いて、Filebeatからログデータを受信します。本パートはテスト用の設定から変更していません。
受信ログをパース
if [fields][logtype] == "firewall_hgw" { grok { patterns_dir => [ "/usr/local/etc/logstash/patterns" ] match => { "message" => [ "%{SYSLOGBASE} %{HGWLOG}", "%{SYSLOGBASE} %{GREEDYDATA:raw_message}" ] } }
インストール編(Beats)でFilebeatの設定を行ないましたが、この際、送信データに付け加えるメタデータとして、
fields
にlogtype: firewall_hgw
を設定したことを覚えていますか? フィルタパートの一番はじめにこの値を確認します。logtype
がfirewall_hgw
のときにのみ、後続の処理を行なうことにご注意ください。(今後、複数種類のログを扱うことを考慮しているためです。)logtype
がfirewall_hgw
に一致する場合は、Grokプラグインを用いて、ログデータ(文字列)と正規表現(以下、パターンと呼びます)とのマッチングを行ない、ログを複数のフィールド(文字列)に分解します。上記の例では、
message
要素に格納されている一行分のログと二つのパターン(%{SYSLOGBASE} %{HGWLOG}
および%{SYSLOGBASE} %{GREEDYDATA:raw_message}
)のいずれかとのマッチングを試みます。マッチ処理が成功した場合、マッチした部分文字列がパターン内で指定したフィールドに格納されます。Grokプラグインを用いてログをパースする本部分は、一連の手順の中でももっとも複雑で難しいところですので、もう少し説明を加えます。
マッチングに用いるパターンの基本的な形は以下のとおりです。
%{SYNTAX:semantics}
SYNTAX
が「ログとのマッチング対象となる正規表現」、semantics
が「ログのマッチした部分文字列に対応させるフィールド名」です。部分文字列に対するフィールド名の対応付けが必要ない場合は:semantics
の部分を省略します。正規表現ときいて、おぞけをふるうかたもいらっしゃる(かくいうわたしもその一人です)でしょうが、それほど心配する必要はなさそうです。というのは、Grokプラグインには、はじめから多くの組み込みパターンが用意されていますので、ゼロから正規表現を構築する必要があるケースはかなり少ないと考えられるためです。
組み込みパターンをできるだけ活用しつつ、既存のパターンから変更したい部分、あるいは不足しているパターンのみを作成する、というやり方が大半になるのではないかと思います。本記事の例でも、まったく新しい正規表現を書き起こすことはやっていません。
では、パターンを作成するため、もう一度テスト用のログデータを見直してみましょう。
Apr 28 00:15:00 filebeat hgw: 2018/4/28 00:14:51 SRC=62.210.180.80/5283 DST=203.0.113.1/5090 UDP table=spi
以前の記事で述べましたように、本ログは
logger(1)
経由でsyslogに出力しています。したがって、ログはsyslogのフォーマットにそった形式になっています。組み込みパターン(grok-patterns)にSYSLOGBASE
という正規表現がすでに定義されていますので、上記メッセージのApr 28 00:15:00 filebeat hgw:
までの部分については、本パターンを使ってマッチングを行ないます。以降の部分はホームゲートウェイRS-500KIの独自形式ですので、新しくパターンを作成する必要があります。この部分のフォーマットは以下のようになっています。
<タイムスタンプ> SRC=<ソースIPアドレス>/<ソースポート> DST=<デスティネーションIPアドレス>/<デスティネーションポート> <プロトコル> <残りのメッセージ>
これにマッチするように定義したパターン
HGWLOG
を以下に示します。ご覧いただければわかると思いますが、ゼロから正規表現を書き起こすことはしていません。IPアドレスやポート番号にマッチする正規表現は、すでに組み込みパターン(grok-patterns)の中で定義済みですので、適宜これを組み合わせることで目的を達しています。新たに定義したカスタムパターンは
hgw
というファイル内に記述し、patterns_dir
で指定したディレクトリ/usr/local/etc/logstash/patterns
以下に格納しました。/usr/local/etc/logstash/patterns/hgw
DATE_YMD %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY} TIMESTAMP_HGW %{DATE_YMD}[- ]%{TIME} HGWLOG %{TIMESTAMP_HGW:hgw_timestamp} SRC=%{IPORHOST:source_ip}/%{POSINT:source_port} DST=%{IPORHOST:dest_ip}/%{POSINT:dest_port} %{DATA:protocol} %{GREEDYDATA:message}
SYSLOGBASE
とHGWLOG
を組み合わせて、最終的にはログデータをtimestamp
,logsource
,program
,hgw_timestamp
,source_ip
,source_port
,dest_ip
,destport
,protocol
, およびmessage
の各フィールドに分解しました。(下図)ゼロから正規表現を書き起こす必要はほとんどないとはいえ、やはり、Grokプラグインで用いるパターンを作成するのがLogstashの設定における最難関であることに変わりはありません。そこで、パターンを作成するときに役立つ支援ツールを二つ紹介しておきたいと思います。
一つめは、X-Packに含まれているGrok Debuggerです(下図)。テスト用のログデータ(一行分)とパターンを記述すると、パターンに対するマッチング処理の結果をJSON形式でアウトプットしてくれます。カスタムパターンもサポートされています。
二つめは、X-PackのGrok Debuggerとほぼ同様の機能を提供する以下のWebサイトです。こちらは、テストデータやパターンを変更すると自動的にマッチング処理を走らせてくれますので、試行錯誤しながらパターンを開発する場合にはこちらのほうが便利ですね。
message
フィールドを削除(パース成功時のみ)
if "_grokparsefailure" not in [tags] { mutate { remove_field => [ "message" ] } }
Mutateプラグインを用いて、
message
フィールドをログデータから削除します。ただし、
tags
フィールドに_grokparsefailure
が含まれていない場合に限ります。これは、Grokプラグインでのパースが成功し、message
が所定のフィールドに分解できた場合に対応します。もちろん、
message
に格納されている、もともとの(分解前の)ログデータを保持しておいてもかまわないのですが、分解済みの各フィールドと両方保持するのは冗長であるため、本記事ではmessage
フィールドを削除することにしました。- タイムスタンプを修正
date { match => [ "hgw_timestamp", "yyyy/M/d HH:mm:ss" ] }
Dateプラグインを用いて、ログデータのタイムスタンプ(
@timestamp
フィールド)の値を修正します。デフォルトでは、ログデータのタイムスタンプとして、Logstashがデータを受信した時刻が使われます。しかし、ログデータを可視化する観点では、Logstashのデータ受信時刻には関心がありません。それよりも、ホームゲートウェイがログを記録した時刻、つまりパース処理時に抽出した
hgw_timestamp
の値をタイムスタンプとして使えるほうが便利です。フィールド名と、そのフィールドに含まれるタイムスタンプのフォーマットを指定してタイムスタンプの値を取り出し、
@timestamp
フィールドに格納します。source_host
フィールドを追加
mutate { add_field => { "source_host" => "%{source_ip}" } }
再びMutateプラグインを用います。今度は、新たに
source_host
というフィールドを追加します。フィールドの内容についてはsource_ip
フィールドからコピーします。こんなことをする理由は、この先で行なうドメイン名の逆引き結果を格納するフィールドをあらかじめ作っておくためです。(ドメイン名の逆引きでは、既存のフィールドの値を逆引き結果で上書きする形をとります。)
- アクセス元の地理的位置を検索
geoip { source => "source_ip" target => "geoip" }
Geoipプラグインを用いて、
source_ip
フィールドに格納されているアクセス元のIPアドレスから、その地理的位置を検索します。検索結果はgeoip
フィールドに格納します。- アクセス元のドメイン名を逆引き
dns { reverse => [ "source_host" ] action => "replace" nameserver => [ "192.168.1.251", "192.168.1.247" ] failed_cache_size => 1000 failed_cache_ttl => 300 hit_cache_size => 1000 hit_cache_ttl => 300 }
DNSプラグインを用いて、先ほど
source_ip
フィールドをコピーして作成した、source_host
に格納されているアクセス元のIPアドレスの逆引き検索を行ないます。逆引きを行なうと、IPアドレスに対応するドメイン名(FQDN)が得られます。逆引きが成功した場合は、その結果を用いてsource_host
フィールドの内容を置き換えます。注: 手もとの環境では
nameserver
を明示的に指定しないと逆引きがうまく行きませんでしたので、適切なネームサーバのIPアドレスを指定することをおすすめします。- 加工済みのログデータを送信
output { stdout { codec => rubydebug } }
まだ動作テストの段階ですので、出力パートについては変更せず、標準出力に出力されるようにしておきます。
FilebeatからLogstashへのテストデータ送信(再)
あー、長かったですね。やっとLogstashの設定がほぼ終わりました。では、再度テスト用のログデータを用いて、Logstashの動作確認を行ないましょう。まず、Logstashを起動します。
/usr/local/logstash/bin/logstash --path.settings /usr/local/etc/logstash --path.logs /var/log/logstash
次はFilebeatの起動ですが、その前にひと手間必要です。Filebeatは、(再起動された場合などに)すでに送信済みのログを再び送信することがないよう、registry
というファイルを用いて送信済みデータなどの情報を管理しています。今回は、テストのため送信済みデータを再度送信したいので、このファイルを起動前に削除しておきます。
rm /var/db/beats/filebeat/data/registry
その後、Filebeatを起動します。
service filebeat start
一瞬の後に、以下のような出力がLogstashを起動したターミナルに表示されると思います。設定したLogstashの処理がすべて問題なく行なわれているか、出力メッセージを目視して確認します。主要な確認ポイントは以下のとおりです。
timestamp
,logsource
,program
,hgw_timestamp
,source_ip
,source_port
,dest_ip
,dest_port
, およびprotocol
の各フィールドが存在することmessage
フィールドが削除されていて存在しないこと@timestmp
とhgw_timestamp
が同一の時刻を指していること(おそらく@timestamp
はUTCで、hgw_timestamp
はJSTで表示されていると思います)geoip
フィールドが存在し、地理的位置情報が格納されていることsource_host
フィールドが存在し、FQDNあるいは(逆引きが失敗した場合)IPアドレスが格納されていること
{
"dest_ip" => "203.0.113.1",
"timestamp" => "Apr 28 00:15:00",
"source" => "/tmp/hgw.log",
"fields" => {
"logtype" => "firewall_hgw"
},
"host" => "filebeat.example.com",
"source_ip" => "62.210.180.80",
"source_port" => "5283",
"dest_port" => "5090",
"offset" => 109,
"beat" => {
"hostname" => "filebeat.example.com",
"name" => "Filebeat",
"version" => "6.2.3"
},
"geoip" => {
"country_code2" => "FR",
"location" => {
"lat" => 48.8582,
"lon" => 2.3387000000000002
},
"longitude" => 2.3387000000000002,
"ip" => "62.210.180.80",
"timezone" => "Europe/Paris",
"latitude" => 48.8582,
"country_code3" => "FR",
"continent_code" => "EU",
"country_name" => "France"
},
"hgw_timestamp" => "2018/4/28 00:14:51",
"prospector" => {
"type" => "log"
},
"@timestamp" => 2018-04-27T15:14:51.000Z,
"program" => "hgw",
"source_host" => "62-210-180-80.rev.poneytelecom.eu",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"@version" => "1",
"protocol" => "UDP",
"logsource" => "filebeat"
}
以上で、Logstashの処理ルールについての説明は終了です。FilebeatおよびLogstashを停止させ、再度registryファイルを削除しておいてください。
次回の記事では、これまでに説明してきた各コンポーネントの設定ファイルの最終的な内容を示します。その後、システム全体を(再)起動して、本格運用を開始します。
参考文献
- Logstashを利用したApacheアクセスログのインポート, http://blog.johtani.info/blog/2014/11/21/import-apache-accesslog-using-logstash/
- Beats input plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-inputs-beats.html
- Grok filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-grok.html
- RFC 3164, The BSD syslog Protocol, https://tools.ietf.org/html/rfc3164
- Grok Debugger, https://grokdebug.herokuapp.com/
- Mutate filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-mutate.html
- Date filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-date.html
- Geoip filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-geoip.html
- Dns filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-dns.html
- How Filebeat works, https://www.elastic.co/guide/en/beats/filebeat/6.2/how-filebeat-works.html