Kafka INPUT를 사용하기 위해서는 플러그인을 먼저 인스톨 해야 한다.

./bin/logstash-plugin install logstash-input-kafka

이제 conf 파일을 작성해보자.

input {
    kafka {
        bootstrap_servers =>  "192.168.56.4:9092,192.168.57.3:9092,192.168.58.3:9092"
        group_id => "logstash"
        topics => ["ykkim-topic"]
        consumer_threads => 1
    }
}

filter {
}

output {
    stdout {
        codec => rubydebug
    }    
    elasticsearch {
        hosts => "http://localhost:9200"
        index => "kafka-test-%{+YYYY-MM-dd}"
        document_type => "_doc"
    }
}

bootstrap_servers 에 카프카 서버리스트를 작성해주고 group_id를 설정해준다. 그리고 가져올 토픽을 지정하면 된다. consumer_threads는 사양에 맞게 적절하게 세팅하면 된다. 디폴트는 1이다.

이제 로그스테스를 가동한 후에 카프카에서 producer를 이용해 메시지(Hello Logstash1~3)를 전송하고 이를 로그스테시에서 잘 받는지 확인해보자.

>./bin/kafka-console-producer.sh --broker-list 192.168.56.4:9092,192.168.57.3:9092,192.168.58.3:9092 --topic ykkim-topic

>Hello Logstash1
>Hello Logstash2
>Hello Logstash3

총 3번의 메시지를 전송하고 로그스테시 화면에서 보면 아래와 같이 출력됨을 확인할 수 있다.

Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [ykkim-topic-0]
{
       "message" => "Hello Logstash1",
    "@timestamp" => 2019-02-17T11:40:00.208Z,
      "@version" => "1"
}
{
       "message" => "Hello Logstash2",
    "@timestamp" => 2019-02-17T11:40:09.514Z,
      "@version" => "1"
}
{
       "message" => "Hello Logstash3",
    "@timestamp" => 2019-02-17T11:40:16.496Z,
      "@version" => "1"
}

Elasticsearch에서도 잘 저장되고 있음을 확인할 수 있다.

GET kafka-test-2019-02-17/_search

{
  "took" : 33,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "kafka-test-2019-02-17",
        "_type" : "_doc",
        "_id" : "7IFB-2gBFeMvUP3hhPeT",
        "_score" : 1.0,
        "_source" : {
          "message" : "Hello Logstash1",
          "@timestamp" : "2019-02-17T11:40:00.208Z",
          "@version" : "1"
        }
      },
      {
        "_index" : "kafka-test-2019-02-17",
        "_type" : "_doc",
        "_id" : "7YFB-2gBFeMvUP3hpPc4",
        "_score" : 1.0,
        "_source" : {
          "message" : "Hello Logstash2",
          "@timestamp" : "2019-02-17T11:40:09.514Z",
          "@version" : "1"
        }
      },
      {
        "_index" : "kafka-test-2019-02-17",
        "_type" : "_doc",
        "_id" : "7oFB-2gBFeMvUP3hv_dg",
        "_score" : 1.0,
        "_source" : {
          "message" : "Hello Logstash3",
          "@timestamp" : "2019-02-17T11:40:16.496Z",
          "@version" : "1"
        }
      }
    ]
  }
}