快速搞定docker部署Filebeat、ELK全家桶

本文介绍使用Docker安装部署Filebeat与Elasticsearch、Logstash、Kibana(简称ELK)全家桶7.5.1 。如果熟悉框架的话,直接copy配置文件与docker命令,简单删减和修改路径,即可快速启动整个链路 。
简单的ELK数据平台是这样的流程:

快速搞定docker部署Filebeat、ELK全家桶

文章插图
 
由于Kafka并不是ELK中必须的组件,所以本博客跳过部署Kafka,直接将Filebeat数据打到logstash上 。关于Kafka的相关知识,以后再作详细讲解 。
本文中的配置环境是这样的:
Filebeat在A机器上采集日志,ELK部署在B机器上 。其中Elasticsearch与Logstash都采用的是单节点安装 。
版本选择写此文的时候,Filebeat和ELK最新的版本为7.6.1,不过笔者看了Filebeat 7.6.1的发布日志,发现该版本属于一个紧急bugfix版本——因为7.6.0的bug导致k8s pods的标签/注解存在问题 。但是7.6.1的稳定性依旧不敢保证,毕竟笔者的项目接下来service mesh化是要结合k8s的,为了减小风险,退而求其次,选择7.5.2 。后来因为踩坑又换到7.5.1,原因在下面说明 。
filebeat7.5.2在搭建之后运行几小时之后,会出现filebeat io timeout的问题 。官方论坛有人提出从7.5.1升级到7.5.2之后也出现了同样的问题,但是笔者并没有找到官方给出有效的解决方案 。另外出于时间成本考虑没有时间去查找问题,所以选择版本为7.5.1 。目前数据流已经稳定运行两周了 。
按照官方说明,只要全家桶不是升级到8.0,都是可以轻松升级的 。所以系统稳定后随时可以升级 。
安装ElasticsearchElasticsearch是一个分布式、高扩展、高实时的搜索与数据分析引擎 。它能很方便地使大量数据具有搜索、分析和探索的能力 。充分利用Elasticsearch的水平伸缩性,能使数据在生产环境变得更有价值 。
# 创建自定义的网络(用于连接到连接到同一网络的其他服务(例如Logstash和Kibana))docker network create somenetwork # 运行 elasticsearchdocker run --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 -d --name elasticsearch --net somenetwork -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.5.1# 查看容器状态docker ps# 检测 elasticsearch 是否启动成功curl 127.0.0.1:9200安装LogstashLogstash作为Elasicsearch常用的实时数据采集引擎,可以采集来自不同数据源的数据,并对数据进行处理后输出到多种输出源,是Elasticstack的重要组成部分 。
增加logstash的声明文件,logstash.yml 。编辑内容如下
# 配置文件读取路径path.config: /home/ec2-user/docker_config/logstash/conf.d/*.conf# 日志输出路径path.logs: /var/log/logstash在上面配置文件读取路径中,增加一个测试文件test.conf,内容如下:
input {beats {port => 5044}}filter {if "JAVA-logs" in [tags]{grok {# grok正则化会按照顺序许匹配送进来的日志,当碰到第一个匹配成功的日志就break掉这个循环 。match => {# 分别是服务器记录行为日志,不带参数参数的请求,带参的get请求,带参的post请求,兼容旧版的记录行为日志 。这里的格式就跟具体业务的日志有关了"message" => ["%{TIMESTAMP_ISO8601:logtime} %{DATA} %{LOGLEVEL:loglevel}%{DATA:classpath} - %{DATA:logcontent} %{DATA:logcontent} %{DATA:logcontent} %{DATA:logcontent} %{DATA:logcontent} %{word:logdata}","%{TIMESTAMP_ISO8601:logtime} %{DATA} %{LOGLEVEL:loglevel}%{DATA:classpath} - REQUEST DATA : uri=%{URIPATHPARAM:request};client=%{IP:clientIp}];&&&&%{DATA:common_param}&&&&;耗时:%{NUMBER:costtime}ms","%{TIMESTAMP_ISO8601:logtime} %{DATA} %{LOGLEVEL:loglevel}%{DATA:classpath} - REQUEST DATA : uri=%{URIPATH:request}?%{DATA:get_param};client=%{IP:clientIp}];&&&&%{DATA:common_param}&&&&;耗时:%{NUMBER:costtime}ms","%{TIMESTAMP_ISO8601:logtime} %{DATA} %{LOGLEVEL:loglevel}%{DATA:classpath} - REQUEST DATA : uri=%{URIPATHPARAM:request};client=%{IP:clientIp};payload=%{DATA:payload}];&&&&%{DATA:common_param}&&&&;耗时:%{NUMBER:costtime}ms","%{TIMESTAMP_ISO8601:logtime}%{LOGLEVEL:loglevel} %{DATA} %{DATA:classpath}: %{DATA:logcontent} %{DATA:logcontent} %{DATA:logcontent} %{DATA:logcontent} %{DATA:logcontent} %{WORD:logdata}"]}# 过滤完成之后删除多余字段,节约磁盘空间 。remove_field => ["@version","host.name"]}# 拆分common_param字段(采用ruby插件)ruby {code => "if !event.get('common_param').nil? thenarray1 = event.get('common_param').split('%')array1.each do |temp1|if temp1.nil? thennextendarray2 = temp1.split('=')key = array2[0]value = https://www.isolves.com/it/cxkf/rongqi/2021-03-22/array2[1]if key.nil? thennextendevent.set(key, value)endend"# 移除message字段remove_field => [ "message" ]}# 如果是debug级别日志,直接抛弃掉这条日志if [loglevel] !~ "(ERROR|WARN|INFO)" {drop { }}# 抛弃掉grok解析失败的日志if "_grokparsefailure" in [tags] {drop { }}# 使用日志时间替换logstash的时间戳date {match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS"]target => "@timestamp"}}}output {elasticsearch {hosts => ["elasticsearch:9200"]index => "filebeat-logstash-%{+YYYY.MM.dd}"}stdout { codec => rubydebug }}


推荐阅读