Logstash에서 Kafka Input 사용
Kafka INPUT를 사용하기 위해서는 플러그인을 먼저 인스톨 해야 한다.
./bin/logstash-plugin install logstash-input-kafka
이제 conf 파일을 작성해보자.
input {
kafka {
bootstrap_servers => "192.168.56.4:9092,192.168.57.3:9092,192.168.58.3:9092"
group_id => "logstash"
topics => ["ykkim-topic"]
consumer_threads => 1
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "http://localhost:9200"
index => "kafka-test-%{+YYYY-MM-dd}"
document_type => "_doc"
}
}
bootstrap_servers
에 카프카 서버리스트를 작성해주고 group_id를 설정해준다. 그리고 가져올 토픽을 지정하면 된다. consumer_threads는 사양에 맞게 적절하게 세팅하면 된다. 디폴트는 1이다.
이제 로그스테스를 가동한 후에 카프카에서 producer를 이용해 메시지(Hello Logstash1~3)를 전송하고 이를 로그스테시에서 잘 받는지 확인해보자.
>./bin/kafka-console-producer.sh --broker-list 192.168.56.4:9092,192.168.57.3:9092,192.168.58.3:9092 --topic ykkim-topic
>Hello Logstash1
>Hello Logstash2
>Hello Logstash3
총 3번의 메시지를 전송하고 로그스테시 화면에서 보면 아래와 같이 출력됨을 확인할 수 있다.
Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [ykkim-topic-0]
{
"message" => "Hello Logstash1",
"@timestamp" => 2019-02-17T11:40:00.208Z,
"@version" => "1"
}
{
"message" => "Hello Logstash2",
"@timestamp" => 2019-02-17T11:40:09.514Z,
"@version" => "1"
}
{
"message" => "Hello Logstash3",
"@timestamp" => 2019-02-17T11:40:16.496Z,
"@version" => "1"
}
Elasticsearch에서도 잘 저장되고 있음을 확인할 수 있다.
GET kafka-test-2019-02-17/_search
{
"took": 33,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1.0,
"hits": [
{
"_index": "kafka-test-2019-02-17",
"_type": "_doc",
"_id": "7IFB-2gBFeMvUP3hhPeT",
"_score": 1.0,
"_source": {
"message": "Hello Logstash1",
"@timestamp": "2019-02-17T11:40:00.208Z",
"@version": "1"
}
},
{
"_index": "kafka-test-2019-02-17",
"_type": "_doc",
"_id": "7YFB-2gBFeMvUP3hpPc4",
"_score": 1.0,
"_source": {
"message": "Hello Logstash2",
"@timestamp": "2019-02-17T11:40:09.514Z",
"@version": "1"
}
},
{
"_index": "kafka-test-2019-02-17",
"_type": "_doc",
"_id": "7oFB-2gBFeMvUP3hv_dg",
"_score": 1.0,
"_source": {
"message": "Hello Logstash3",
"@timestamp": "2019-02-17T11:40:16.496Z",
"@version": "1"
}
}
]
}
}