Kibana + Elasticsearch + Logstash を使って Netflow を可視化する

Kibana + Elasticsearch + logstash を使って Netflow を可視化する

ELK Stack (Kibana + Elasticsearch + logstash) を使って Netflow を可視化する方法のメモです。

netflow dashboard

Netflow とは

  • 1996年にシスコシステムズによって開発された、通信の流れを収集するためのネットワーク・プロトコル
  • パケットの共通属性から通信フローの統計情報を生成(src ip /dst ip , src port /dst port, protocol number, etc..)

パケットキャプチャは、リアルタイムで詳細な情報が取れますが、情報量が多いので定量的な時系列データの収集には不向きです。 SNMPは、時系列にトラフィック量をモニタリングできますが、より詳細なフローの収集はできません。

Netflowは、データの量だけを監視するだけでなく、トラフィックの発信元、発信先、サービスまたはアプリケーションの種類を把握できるので、パケットキャプチャ と SNMP の「いいとこ取り」というのは納得できます。

www.atmarkit.co.jp

今回使ったもの

Tools Role
Vyos(1.1.7) Netflow compatible device
Logstash(5.5.0) Netflow collector
Elasticsearch(5.5.0) Search engine
Kibana(5.5.0) Virtualizer

データの流れ

  1. Vyos で収集した Flow set を Logstash に送る
  2. Logstash で 解析して Elasticsearch に送る 
  3. Elasticsearch にデータを格納する
  4. kibana で可視化する

howitworks

設定

Logstash configuration example

Netflowコレクターとなる Logstashの設定です。 基本的には公式ドキュメント通りに設定して、input, filter, output の設定を追加します。

  • netflow codec plugin

https://www.elastic.co/guide/en/logstash/current/plugins-codecs-netflow.html

  • How Logstash works

https://www.elastic.co/guide/en/logstash/current/pipeline.html

input

netflow code (plugin) を指定

/etc/logstash/conf.d/netflow.conf

input {
  udp {
    port => 2055
    codec => netflow {
      versions => [5, 9]
    }
    type => netflow
  }
}

filter

/etc/logstash/conf.d/netflow.conf

Download the GeoIP database

  curl -O http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
  tar zxvf GeoLite2-City.tar.gz
  cp -p GeoLite2-City_*/GeoLite2-City.mmdb /etc/logstash/.

GeoIP から 宛先IPの ロケーションを取得

filter {
  geoip {
    target => "geoip"
    source => "[netflow][ipv4_dst_addr]"
    database => "/etc/logstash/GeoLite2-City.mmdb"
        }
}

output

/etc/logstash/conf.d/netflow.conf

elasticsearch に送る

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
  }
  stdout { codec => rubydebug }
}

デプロイ

docker 上にデプロイします。

  • build and run
git clone https://github.com/kodamap/kibana-elasticsearch/
cd kibana-elasticsearch
docker-compose build
docker-compute up -d
  • docker ps
    Name        Command   State                Ports
------------------------------------------------------------------
elasticsearch   /run.sh   Up      0.0.0.0:9200->9200/tcp
kibana          /run.sh   Up      0.0.0.0:5601->5601/tcp
logstash        /run.sh   Up      2055/tcp, 0.0.0.0:2055->2055/udp
nginx           /run.sh   Up      0.0.0.0:5681->5681/tcp

Vyos configuration サンプル

Interfaceの着信パケットについてフローが生成されるので双方向のフローを取得するにはすべてのインターフェースで有効化します。

set system flow-accounting interface eth0
set system flow-accounting interface eth1
set system flow-accounting netflow engine-id 100
set system flow-accounting netflow version 9
set system flow-accounting netflow timeout expiry-interval 60
set system flow-accounting netflow server <logstash ip> port 2055
set system flow-accounting netflow sampling-rate 60

まとめ(まとまってない)

ELK のお蔭で手軽にNetflowの可視化ができました。まずは日々のデータを蓄積しようと思います。 次のステップはこれをどう活用するのか(「異常」や「脅威」を検出)ですが、何をもって「異常」とするのか ? 「通常時」のデータと比較して把握することが必要です。

参考サイト

http://www.infraexpert.com/study/netflow1.html

http://komeiy.hatenablog.com/entry/2014/10/24/234353

https://www.elastic.co/guide/en/logstash/current/plugins-codecs-netflow.html

http://enog.jp/wp-content/uploads/2015/09/enog43_elk_0904.pdf