本教程中使用的ELK和filebeat
版本是 6.2.4
1. elk安装
关于elk的配置参考我之前的一篇文章,不在累述:
elk安装地址:
https://jjlu521016.github.io/2018/05/01/springboot-logback-log4j-elk.html#2-elk%E9%85%8D%E7%BD%AE
2. kafka及zookeeper安装
参考我之前的文章、把对应的配置改成单机即可:
2.1. 安装kafka:
2.2. 安装zookeeper:
2.3 启动zookeeper
我本地zookeeper配置了环境变量
1 | zkServer.sh start |
2.4 启动kafka
启动kafka
1 | # bin/kafka-server-start.sh config/server.properties |
创建topic
1 | # bin/kafka-topics.sh --create --zookeeper 192.168.188.110:2181 --replication-factor 1 --partitions 1 --topic test |
创建生产者
1 | # bin/kafka-console-producer.sh --broker-list 192.168.188.110:9092 --topic test |
创建消费者
1 | bin/kafka-console-consumer.sh --zookeeper 192.168.188.110:2181 --topic test --from-beginning |
此时生产者输入的内容可以在消费者输出,证明kafka调用通了。
3. 集成logstash与kafka
参考:
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
vi logstash_agent.conf
1 | input { |
启动elk之后,在kafka生产者输入一条信息之后logstash打印出对应的内容,说明集成成功了
。在kibana设置对应的日志。
4. 安装filebeat
解压
1 | tar -zxvf filebeat-6.2.3-linux-x86_64.tar.gz |
配置filebeat.yml
1 | filebeat.prospectors: |
运行
1 | ./filebeat -e -c filebeat.yml |
把logstash停止,修改配置文件,添加filebeat配置的topic。重启logstash。打开kibana刷新页面,看到filebeat收集到的日志。
注意:
在logstash中,当input里面有多个kafka输入源时,client_id => “xxx”必须添加且需要不同,否则会报错javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。
1 | input { |
v1.5.2