1 minute read

Kafka INPUT를 사용하기 위해서는 플러그인을 먼저 인스톨 해야 한다.

./bin/logstash-plugin install logstash-input-kafka

이제 conf 파일을 작성해보자.

input {
    kafka {
        bootstrap_servers =>  "192.168.56.4:9092,192.168.57.3:9092,192.168.58.3:9092"
        group_id => "logstash"
        topics => ["ykkim-topic"]
        consumer_threads => 1
    }
}

filter {
}

output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => "http://localhost:9200"
        index => "kafka-test-%{+YYYY-MM-dd}"
        document_type => "_doc"
    }
}

bootstrap_servers 에 카프카 서버리스트를 작성해주고 group_id를 설정해준다. 그리고 가져올 토픽을 지정하면 된다. consumer_threads는 사양에 맞게 적절하게 세팅하면 된다. 디폴트는 1이다.

이제 로그스테스를 가동한 후에 카프카에서 producer를 이용해 메시지(Hello Logstash1~3)를 전송하고 이를 로그스테시에서 잘 받는지 확인해보자.

>./bin/kafka-console-producer.sh --broker-list 192.168.56.4:9092,192.168.57.3:9092,192.168.58.3:9092 --topic ykkim-topic

>Hello Logstash1
>Hello Logstash2
>Hello Logstash3

총 3번의 메시지를 전송하고 로그스테시 화면에서 보면 아래와 같이 출력됨을 확인할 수 있다.

Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [ykkim-topic-0]
{
       "message" => "Hello Logstash1",
    "@timestamp" => 2019-02-17T11:40:00.208Z,
      "@version" => "1"
}
{
       "message" => "Hello Logstash2",
    "@timestamp" => 2019-02-17T11:40:09.514Z,
      "@version" => "1"
}
{
       "message" => "Hello Logstash3",
    "@timestamp" => 2019-02-17T11:40:16.496Z,
      "@version" => "1"
}

Elasticsearch에서도 잘 저장되고 있음을 확인할 수 있다.

GET kafka-test-2019-02-17/_search

{
    "took": 33,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 3,
        "max_score": 1.0,
        "hits": [
            {
                "_index": "kafka-test-2019-02-17",
                "_type": "_doc",
                "_id": "7IFB-2gBFeMvUP3hhPeT",
                "_score": 1.0,
                "_source": {
                    "message": "Hello Logstash1",
                    "@timestamp": "2019-02-17T11:40:00.208Z",
                    "@version": "1"
                }
            },
            {
                "_index": "kafka-test-2019-02-17",
                "_type": "_doc",
                "_id": "7YFB-2gBFeMvUP3hpPc4",
                "_score": 1.0,
                "_source": {
                    "message": "Hello Logstash2",
                    "@timestamp": "2019-02-17T11:40:09.514Z",
                    "@version": "1"
                }
            },
            {
                "_index": "kafka-test-2019-02-17",
                "_type": "_doc",
                "_id": "7oFB-2gBFeMvUP3hv_dg",
                "_score": 1.0,
                "_source": {
                    "message": "Hello Logstash3",
                    "@timestamp": "2019-02-17T11:40:16.496Z",
                    "@version": "1"
                }
            }
        ]
    }
}