インストール編(Beats)では、BeatsのインストールとFilebeatの設定を行ないました。Elasticスタックのインストールと、その後の可視化システムの構築の流れを再掲します。(下図)

  1. Elasticsearch, Elasticsearch X-Packのインストール(済)
  2. Kibana, Kibana X-Packのインストール、Basicライセンスの取得と投入(済)
  3. Logstash, Logstash X-Packのインストール(済)
  4. Beats (Filebeat)のインストール(済)
  5. Logstashのログ処理ルールの作成とテストデータでの動作確認(本記事)
  6. Filebeat→Logstash→Elasticsearchでの運用開始
  7. Kibanaでの検索、ダッシュボードの作成

Elasticスタックを用いたHGWログ可視化 - 構築の流れ - Logstashルール

本記事では、テスト用のログデータを用いて、FilebeatからLogstashにデータが送信されることを確認します。その後、Logstashによるログデータ加工のための処理ルールについて説明していきます。そして最後に、再びテストデータを用いて、Logstashの処理ルールが正しく動作することを確認します。

テスト用ログファイルの作成

まず、テスト用のログデータを用意しましょう。用意といっても、新たにテストデータを作るわけではなく、すでにあるホームゲートウェイのログから適当な一行を切り出せばOKです。インストール編(Beats)で、テスト用のログファイルパスを/tmp/hgw.logと指定しましたので、例えば以下の内容のファイルを/tmp/hgw.logとして保存します。

FilebeatからLogstashへのテストデータ送信

テストファイルが作成できましたので、さっそくFilebeatからLogstashへ送信してみましょう。まず、Logstashを起動します。

/usr/local/logstash/bin/logstash --path.settings /usr/local/etc/logstash --path.logs /var/log/logstash

次に、Filebeatを起動します。

service filebeat start

一瞬の後に、以下のような出力がLogstashを起動したターミナルに表示されます。出力内容の中にmessageという要素が含まれており、用意したテスト用ログの一行がそのまま含まれていることを確認してください。このように、Logstashが受信したログデータは、特に指定を行なわなければmessageという要素に格納されます。これ以外のデータ要素はFilebeat、あるいはLogstashが付加したメタデータです。

{
        "source" => "/tmp/hgw.log",
    "@timestamp" => 2018-04-30T05:27:34.305Z,
       "message" => "Apr 28 00:15:00 filebeat hgw: 2018/4/28 00:14:51 SRC=62.210.180.80/5283 DST=203.0.113.1/5090 UDP table=spi",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
          "host" => "filebeat.example.com",
        "fields" => {
        "logtype" => "firewall_hgw"
    },
      "@version" => "1",
          "beat" => {
            "name" => "Filebeat",
        "hostname" => "filebeat.example.com",
         "version" => "6.2.3"
    },
        "offset" => 109,
    "prospector" => {
        "type" => "log"
    }
}

Logstashを用いたログデータの加工

テストデータの送受信はうまく行きましたか? いいですね。では、いったんLogstashとFilebeatを停止させておいてください。

次は、本記事で構築しようとしているログ可視化システムの肝になる部分、Logstashによるログデータの加工について説明します。

以下の記事では、Logstashを用いてApacheログを加工する手順について詳しく紹介しています。本記事ではホームゲートウェイのログを扱いますので、ソースデータの種類は異なります。しかし、基本的な処理の流れは同じですので、こちらの記事も参考にしていただければと思います。

さて、本記事におけるログ処理の流れは以下のようになります。

  1. 入力元からログデータを受信
  2. 受信ログをパース
  3. messageフィールドを削除(パース成功時のみ)
  4. タイムスタンプを修正
  5. source_hostフィールドを追加
  6. アクセス元の地理的位置を検索
  7. アクセス元のドメイン名を逆引き
  8. 加工済みのログデータを送信

本処理を記述する設定ファイルは/usr/local/etc/logstash/logstash.confです。まず、ファイルの全体を示しておきます。

  1. 入力元からログデータを受信

    input {
      beats {
        port => 5044
        host => "0.0.0.0"
      }
    }

    Beatsプラグインを用いて、Filebeatからログデータを受信します。本パートはテスト用の設定から変更していません。

  2. 受信ログをパース

    if [fields][logtype] == "firewall_hgw" {
      grok {
        patterns_dir => [ "/usr/local/etc/logstash/patterns" ]
        match => {
          "message" => [
            "%{SYSLOGBASE} %{HGWLOG}",
            "%{SYSLOGBASE} %{GREEDYDATA:raw_message}"
          ]
        }
      }

    インストール編(Beats)でFilebeatの設定を行ないましたが、この際、送信データに付け加えるメタデータとして、fieldslogtype: firewall_hgwを設定したことを覚えていますか? フィルタパートの一番はじめにこの値を確認します。logtypefirewall_hgwのときにのみ、後続の処理を行なうことにご注意ください。(今後、複数種類のログを扱うことを考慮しているためです。)

    logtypefirewall_hgwに一致する場合は、Grokプラグインを用いて、ログデータ(文字列)と正規表現(以下、パターンと呼びます)とのマッチングを行ない、ログを複数のフィールド(文字列)に分解します。

    上記の例では、message要素に格納されている一行分のログと二つのパターン(%{SYSLOGBASE} %{HGWLOG}および%{SYSLOGBASE} %{GREEDYDATA:raw_message})のいずれかとのマッチングを試みます。マッチ処理が成功した場合、マッチした部分文字列がパターン内で指定したフィールドに格納されます。

    Grokプラグインを用いてログをパースする本部分は、一連の手順の中でももっとも複雑で難しいところですので、もう少し説明を加えます。

    マッチングに用いるパターンの基本的な形は以下のとおりです。

    %{SYNTAX:semantics}

    SYNTAXが「ログとのマッチング対象となる正規表現」、semanticsが「ログのマッチした部分文字列に対応させるフィールド名」です。部分文字列に対するフィールド名の対応付けが必要ない場合は:semanticsの部分を省略します。

    正規表現ときいて、おぞけをふるうかたもいらっしゃる(かくいうわたしもその一人です)でしょうが、それほど心配する必要はなさそうです。というのは、Grokプラグインには、はじめから多くの組み込みパターンが用意されていますので、ゼロから正規表現を構築する必要があるケースはかなり少ないと考えられるためです。

    組み込みパターンをできるだけ活用しつつ、既存のパターンから変更したい部分、あるいは不足しているパターンのみを作成する、というやり方が大半になるのではないかと思います。本記事の例でも、まったく新しい正規表現を書き起こすことはやっていません。

    では、パターンを作成するため、もう一度テスト用のログデータを見直してみましょう。

    Apr 28 00:15:00 filebeat hgw: 2018/4/28 00:14:51 SRC=62.210.180.80/5283 DST=203.0.113.1/5090 UDP table=spi

    以前の記事で述べましたように、本ログはlogger(1)経由でsyslogに出力しています。したがって、ログはsyslogのフォーマットにそった形式になっています。組み込みパターン(grok-patterns)にSYSLOGBASEという正規表現がすでに定義されていますので、上記メッセージのApr 28 00:15:00 filebeat hgw:までの部分については、本パターンを使ってマッチングを行ないます。

    以降の部分はホームゲートウェイRS-500KIの独自形式ですので、新しくパターンを作成する必要があります。この部分のフォーマットは以下のようになっています。

    <タイムスタンプ> SRC=<ソースIPアドレス>/<ソースポート> DST=<デスティネーションIPアドレス>/<デスティネーションポート> <プロトコル> <残りのメッセージ>
    

    これにマッチするように定義したパターンHGWLOGを以下に示します。ご覧いただければわかると思いますが、ゼロから正規表現を書き起こすことはしていません。IPアドレスやポート番号にマッチする正規表現は、すでに組み込みパターン(grok-patterns)の中で定義済みですので、適宜これを組み合わせることで目的を達しています。

    新たに定義したカスタムパターンはhgwというファイル内に記述し、patterns_dirで指定したディレクトリ/usr/local/etc/logstash/patterns以下に格納しました。

    • /usr/local/etc/logstash/patterns/hgw

      DATE_YMD %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}
      TIMESTAMP_HGW %{DATE_YMD}[- ]%{TIME}
      HGWLOG %{TIMESTAMP_HGW:hgw_timestamp} SRC=%{IPORHOST:source_ip}/%{POSINT:source_port} DST=%{IPORHOST:dest_ip}/%{POSINT:dest_port} %{DATA:protocol} %{GREEDYDATA:message}

    SYSLOGBASEHGWLOGを組み合わせて、最終的にはログデータをtimestamp, logsource, program, hgw_timestamp, source_ip, source_port, dest_ip, destport, protocol, およびmessageの各フィールドに分解しました。(下図)

    カスタムパターンを用いたログデータのパース

    ゼロから正規表現を書き起こす必要はほとんどないとはいえ、やはり、Grokプラグインで用いるパターンを作成するのがLogstashの設定における最難関であることに変わりはありません。そこで、パターンを作成するときに役立つ支援ツールを二つ紹介しておきたいと思います。

    一つめは、X-Packに含まれているGrok Debuggerです(下図)。テスト用のログデータ(一行分)とパターンを記述すると、パターンに対するマッチング処理の結果をJSON形式でアウトプットしてくれます。カスタムパターンもサポートされています。

    Kibana X-Pack - Grok Debugger

    二つめは、X-PackのGrok Debuggerとほぼ同様の機能を提供する以下のWebサイトです。こちらは、テストデータやパターンを変更すると自動的にマッチング処理を走らせてくれますので、試行錯誤しながらパターンを開発する場合にはこちらのほうが便利ですね。

  3. messageフィールドを削除(パース成功時のみ)

    if "_grokparsefailure" not in [tags] {
      mutate {
        remove_field => [ "message" ]
      }
    }

    Mutateプラグインを用いて、messageフィールドをログデータから削除します。

    ただし、tagsフィールドに_grokparsefailureが含まれていない場合に限ります。これは、Grokプラグインでのパースが成功し、messageが所定のフィールドに分解できた場合に対応します。

    もちろん、messageに格納されている、もともとの(分解前の)ログデータを保持しておいてもかまわないのですが、分解済みの各フィールドと両方保持するのは冗長であるため、本記事ではmessageフィールドを削除することにしました。

  4. タイムスタンプを修正

    date {
      match => [ "hgw_timestamp", "yyyy/M/d HH:mm:ss" ]
    }

    Dateプラグインを用いて、ログデータのタイムスタンプ(@timestampフィールド)の値を修正します。

    デフォルトでは、ログデータのタイムスタンプとして、Logstashがデータを受信した時刻が使われます。しかし、ログデータを可視化する観点では、Logstashのデータ受信時刻には関心がありません。それよりも、ホームゲートウェイがログを記録した時刻、つまりパース処理時に抽出したhgw_timestampの値をタイムスタンプとして使えるほうが便利です。

    フィールド名と、そのフィールドに含まれるタイムスタンプのフォーマットを指定してタイムスタンプの値を取り出し、@timestampフィールドに格納します。

  5. source_hostフィールドを追加

    mutate {
      add_field => { "source_host" => "%{source_ip}" }
    }

    再びMutateプラグインを用います。今度は、新たにsource_hostというフィールドを追加します。フィールドの内容についてはsource_ipフィールドからコピーします。

    こんなことをする理由は、この先で行なうドメイン名の逆引き結果を格納するフィールドをあらかじめ作っておくためです。(ドメイン名の逆引きでは、既存のフィールドの値を逆引き結果で上書きする形をとります。)

  6. アクセス元の地理的位置を検索

    geoip {
      source => "source_ip"
      target => "geoip"
    }

    Geoipプラグインを用いて、source_ipフィールドに格納されているアクセス元のIPアドレスから、その地理的位置を検索します。検索結果はgeoipフィールドに格納します。

  7. アクセス元のドメイン名を逆引き

    dns {
      reverse => [ "source_host" ]
      action => "replace"
      nameserver => [ "192.168.1.251", "192.168.1.247" ]
      failed_cache_size => 1000
      failed_cache_ttl => 300
      hit_cache_size => 1000
      hit_cache_ttl => 300
    }

    DNSプラグインを用いて、先ほどsource_ipフィールドをコピーして作成した、source_hostに格納されているアクセス元のIPアドレスの逆引き検索を行ないます。逆引きを行なうと、IPアドレスに対応するドメイン名(FQDN)が得られます。逆引きが成功した場合は、その結果を用いてsource_hostフィールドの内容を置き換えます。

    注: 手もとの環境ではnameserverを明示的に指定しないと逆引きがうまく行きませんでしたので、適切なネームサーバのIPアドレスを指定することをおすすめします。

  8. 加工済みのログデータを送信

    output {
      stdout { codec => rubydebug }
    }

    まだ動作テストの段階ですので、出力パートについては変更せず、標準出力に出力されるようにしておきます。

FilebeatからLogstashへのテストデータ送信(再)

あー、長かったですね。やっとLogstashの設定がほぼ終わりました。では、再度テスト用のログデータを用いて、Logstashの動作確認を行ないましょう。まず、Logstashを起動します。

/usr/local/logstash/bin/logstash --path.settings /usr/local/etc/logstash --path.logs /var/log/logstash

次はFilebeatの起動ですが、その前にひと手間必要です。Filebeatは、(再起動された場合などに)すでに送信済みのログを再び送信することがないよう、registryというファイルを用いて送信済みデータなどの情報を管理しています。今回は、テストのため送信済みデータを再度送信したいので、このファイルを起動前に削除しておきます。

rm /var/db/beats/filebeat/data/registry

その後、Filebeatを起動します。

service filebeat start

一瞬の後に、以下のような出力がLogstashを起動したターミナルに表示されると思います。設定したLogstashの処理がすべて問題なく行なわれているか、出力メッセージを目視して確認します。主要な確認ポイントは以下のとおりです。

{
          "dest_ip" => "203.0.113.1",
        "timestamp" => "Apr 28 00:15:00",
           "source" => "/tmp/hgw.log",
           "fields" => {
        "logtype" => "firewall_hgw"
    },
             "host" => "filebeat.example.com",
        "source_ip" => "62.210.180.80",
      "source_port" => "5283",
        "dest_port" => "5090",
           "offset" => 109,
             "beat" => {
        "hostname" => "filebeat.example.com",
            "name" => "Filebeat",
         "version" => "6.2.3"
    },
            "geoip" => {
         "country_code2" => "FR",
              "location" => {
            "lat" => 48.8582,
            "lon" => 2.3387000000000002
        },
             "longitude" => 2.3387000000000002,
                    "ip" => "62.210.180.80",
              "timezone" => "Europe/Paris",
              "latitude" => 48.8582,
         "country_code3" => "FR",
        "continent_code" => "EU",
          "country_name" => "France"
    },
    "hgw_timestamp" => "2018/4/28 00:14:51",
       "prospector" => {
        "type" => "log"
    },
       "@timestamp" => 2018-04-27T15:14:51.000Z,
          "program" => "hgw",
      "source_host" => "62-210-180-80.rev.poneytelecom.eu",
             "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
         "@version" => "1",
         "protocol" => "UDP",
        "logsource" => "filebeat"
}

以上で、Logstashの処理ルールについての説明は終了です。FilebeatおよびLogstashを停止させ、再度registryファイルを削除しておいてください。

次回の記事では、これまでに説明してきた各コンポーネントの設定ファイルの最終的な内容を示します。その後、システム全体を(再)起動して、本格運用を開始します。

参考文献

  1. Logstashを利用したApacheアクセスログのインポート, http://blog.johtani.info/blog/2014/11/21/import-apache-accesslog-using-logstash/
  2. Beats input plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-inputs-beats.html
  3. Grok filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-grok.html
  4. RFC 3164, The BSD syslog Protocol, https://tools.ietf.org/html/rfc3164
  5. Grok Debugger, https://grokdebug.herokuapp.com/
  6. Mutate filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-mutate.html
  7. Date filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-date.html
  8. Geoip filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-geoip.html
  9. Dns filter plugin, https://www.elastic.co/guide/en/logstash/6.2/plugins-filters-dns.html
  10. How Filebeat works, https://www.elastic.co/guide/en/beats/filebeat/6.2/how-filebeat-works.html