K8s使用ElasticStack结合Kafka收集日志

Tuesday, December 7, 2021

1. 概述

在上一篇文章 Golang结合K8s验证Kafka主题分区 中,测试验证阶段是通过 kubectl logs 命令收集各个 Pod 的日志,这在日志量小的时候还可以,一旦日志量庞大起来,这个方法就很不合适了,所以需要专门搭建一套收集日志的工具,这里我采用的是ELK+Filebeat+Kafka。

这里我们使用的是官方的 helm chart,地址如下:

  1. Elasticsearch
  2. Kibana
  3. Logstash
  4. Filebeat

整体的采集流程:

  1. Filebeat 采集业务节点上的日志,我的K8s集群中,node3是业务节点,采集路径为 /var/log/containers/*.log
  2. Filebeat 采集日志后将日志上报到 Kafka
  3. Logstash 从 Kafka 读取数据后,对数据进行过滤加工,之后写入 Elasticsearch
  4. Kibana上针对收集的日志建立索引模式等
  5. 用户访问 Kibana 查看日志

如下图:

k8s日志采集

这里有个小问题,为什么要用 Kafka ?

这是因为日志产生和日志消费的速度不匹配,而且由于网络原因,可能会导致数据丢失,所以才要加消息中间件,而 Kafka 就是一个很棒的性能很高的消息中间件。Kafka 是分布式发布-订阅消息系统,它是一个分布式的、可划分的、多订阅者、冗余备份的持久性的日志服务,它具备以下特点:

  • 同时为发布和订阅提供高吞吐量。据了解,Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)。

  • 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。

  • 分布式系统,易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。

  • 消息被处理的状态是在 consumer 端维护,而不是由 server 端维护。当失败时能自动平衡。

  • 支持 online 和 offline 的场景。

2. 准备工作

收集日志前先列出当前的服务器环境,规划下这些软件的部署方案,K8s节点列表如下所示,计划将 Kibana、Elasticsearch 和 Logstash 安装在 node2(label: usefulness=elastic) 上,而 Filebeat 由于要收集 node3 的日志,所以需要部署在 node3 节点上,Kafka 之前已经安装好,这里就不再介绍,安装文档见 Helm安装ZooKeeper和Kafka

节点 角色 IP 配置 Label
master master, etcd 192.168.1.100 4核4G50G usefulness=schedule
node1 worker 192.168.1.101 8核8G100G usefulness=devops
node2 worker 192.168.1.102 8核8G100G usefulness=elastic
node3 worker 192.168.1.103 8核8G100G usefulness=business

部署完后需要通过域名来访问 Elasticsearch 和 Kibana 服务,所以需要提前准备两个本地域名。

服务 域名
Elasticsearch es.local.com
Kibana kibana.local.com

3. 开始部署

这次部署我们使用的是 Elastic 官方的 Charts 文件,因此需要先增加官方的仓库地址,如下:

helm repo add elastic https://helm.elastic.co

3.1 域名

由于我的集群中,traefik 部署在 node1(192.168.1.101) 上,因此需要将网关的域名解析到 node1 上。

cat >> /etc/hosts <<EOF
192.168.1.101 es.local.com
192.168.1.101 kibana.local.com
EOF

3.2 命名空间

创建 elastic 命名空间,包含 Elasticsearch、Kibana、Logstash 和 Filebeat。

apiVersion: v1
kind: Namespace
metadata:
  name: elastic

3.3 部署Elasticsearch

ELasticsearch 集群我们会部署三个节点,分别是 master、data 和 client:

  • master:负责 ELasticsearch 集群的管理工作
  • data:负责存储数据
  • client:负载均衡
创建Secret

安装之前我们需要先生成两个 Secret,一个是内部通信用的证书,一个是 ELasticsearch 的账号密码。

内部通信用的证书使用官方的 elasticsearch-certutil 来生成,我们用 docker 启动一个 ELasticsearch 的容器来生成,之后再拷贝出来,使用下面的脚本生成并导入到 k8s 中:

#!/bin/bash

# 创建容器
docker run -d --name es-cert docker.elastic.co/elasticsearch/elasticsearch:7.15.0 sleep 3600
# 生成证书
docker exec es-cert /usr/share/elasticsearch/bin/elasticsearch-certutil cert -out /elastic-certificates.p12 -pass ""
# 拷贝证书
docker cp es-cert:/elastic-certificates.p12 ./
# 关闭并删除容器
docker stop es-cert && docker rm es-cert
# 导入证书到k8s
kubectl create secret generic elastic-certificates --from-file=./elastic-certificates.p12 -n elastic
# 创建账号密码 Secret
kubectl create secret generic elastic-credentials --from-literal=username=elastic --from-literal=password=eYTmelHfmA4= -n elastic
# 查看 Secret 列表
kubectl get secret -n elastic
# 查看内部通信证书 Secret
kubectl describe secret elastic-certificates -n elastic
# 查看账号密码 Secret
kubectl describe secret elastic-credentials -n elastic

执行结果:

root@master:~/elastic/es# bash es.sh
39c9c8dac1bf5622a2373afcf614c18a2bb07c14e8fe2500f4b5d6a87157a385
This tool assists you in the generation of X.509 certificates and certificate
signing requests for use with SSL/TLS in the Elastic stack.

The 'cert' mode generates X.509 certificate and private keys.
   .....
secret/elastic-certificates created
secret/elastic-credentials created
NAME                   TYPE                                  DATA   AGE
default-token-69c5n    kubernetes.io/service-account-token   3      24s
elastic-certificates   Opaque                                1      1s
elastic-credentials    Opaque                                2      1s
Name:         elastic-certificates
Namespace:    elastic
Labels:       <none>
Annotations:  <none>

Type:  Opaque

Data
====
elastic-certificates.p12:  3596 bytes
Name:         elastic-credentials
Namespace:    elastic
Labels:       <none>
Annotations:  <none>

Type:  Opaque

Data
====
username:  7 bytes
password:  12 bytes
Helm 安装
主节点 master.yaml
# 集群名称
clusterName: "elasticsearch"
# 节点名称
nodeGroup: "master"
# 角色
roles:
  master: "true"
  ingest: "false"
  data: "false"
  remote_cluster_client: "false"
  ml: "false"

# 指定镜像及tag
image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "7.15.0"

# 硬件资源有限,所有角色的节点都只开启1个
replicas: 1

# Pod 部署在 node2 节点上
nodeSelector:
  usefulness: elastic

# 以环境变量的方式写入账号密码
extraEnvs:
  - name: ELASTIC_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: ELASTIC_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password

# 节点内部通信的证书,启用 X-Pack 后需要证书通信
secretMounts:
  - name: elastic-certificates
    secretName: elastic-certificates
    path: /usr/share/elasticsearch/config/certs
    
# ELasticsearch配置,不启用 https  
esConfig:
  elasticsearch.yml: |
    xpack.security.enabled: true
    xpack.security.transport.ssl.enabled: true
    xpack.security.transport.ssl.verification_mode: certificate
    xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    # xpack.security.http.ssl.enabled: true
    # xpack.security.http.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    # xpack.security.http.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
        
# 资源有限,调整配置
resources:
  requests:
    cpu: "500m"
    memory: "1Gi"
  limits:
    cpu: "500m"
    memory: "1Gi"
数据结点 data.yaml
# 集群名称
clusterName: "elasticsearch"
# 节点名称
nodeGroup: "data"
# 角色
roles:
  master: "false"
  ingest: "false"
  data: "true"
  remote_cluster_client: "false"
  ml: "false"

# 指定镜像及tag
image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "7.15.0"

# 硬件资源有限,所有角色的节点都只开启1个
replicas: 1

# Pod 部署在 node2 节点上
nodeSelector:
  usefulness: elastic

# 以环境变量的方式写入账号密码
extraEnvs:
  - name: ELASTIC_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: ELASTIC_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password

# 节点内部通信的证书,启用 X-Pack 后需要证书通信
secretMounts:
  - name: elastic-certificates
    secretName: elastic-certificates
    path: /usr/share/elasticsearch/config/certs
    
# ELasticsearch配置,不启用 https  
esConfig:
  elasticsearch.yml: |
    xpack.security.enabled: true
    xpack.security.transport.ssl.enabled: true
    xpack.security.transport.ssl.verification_mode: certificate
    xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    # xpack.security.http.ssl.enabled: true
    # xpack.security.http.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    # xpack.security.http.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
        
# 资源有限,调整配置
resources:
  requests:
    cpu: "500m"
    memory: "1Gi"
  limits:
    cpu: "500m"
    memory: "1Gi"
负载均衡节点 client.yaml
# 集群名称
clusterName: "elasticsearch"
# 节点名称
nodeGroup: "client"
# 角色
roles:
  master: "false"
  ingest: "false"
  data: "false"
  remote_cluster_client: "false"
  ml: "false"

# 指定镜像及tag
image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "7.15.0"

# 硬件资源有限,所有角色的节点都只开启1个
replicas: 1

# Pod 部署在 node2 节点上
nodeSelector:
  usefulness: elastic
  
# 持久化配置,该节点不需要持久化
persistence:
  enabled: false

# 以环境变量的方式写入账号密码
extraEnvs:
  - name: ELASTIC_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: ELASTIC_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password

# 节点内部通信的证书,启用 X-Pack 后需要证书通信
secretMounts:
  - name: elastic-certificates
    secretName: elastic-certificates
    path: /usr/share/elasticsearch/config/certs
    
# ELasticsearch配置,不启用 https  
esConfig:
  elasticsearch.yml: |
    xpack.security.enabled: true
    xpack.security.transport.ssl.enabled: true
    xpack.security.transport.ssl.verification_mode: certificate
    xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    # xpack.security.http.ssl.enabled: true
    # xpack.security.http.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
    # xpack.security.http.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
        
# 资源有限,调整配置
resources:
  requests:
    cpu: "500m"
    memory: "1Gi"
  limits:
    cpu: "500m"
    memory: "1Gi"

安装命令:

# 安装 master 节点
helm install elasticsearch-master elastic/elasticsearch -n elastic --version 7.15.0 -f master.yaml
# 安装 data 节点
helm install elasticsearch-data elastic/elasticsearch -n elastic --version 7.15.0 -f data.yaml
# 安装 client 节点
helm install elasticsearch-client elastic/elasticsearch -n elastic --version 7.15.0 -f client.yaml

安装结果:

root@master:~/elastic/es# helm install elasticsearch-master elastic/elasticsearch -n elastic --version 7.15.0 -f master.yaml
W1208 20:29:30.573206 1520614 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
W1208 20:29:30.702251 1520614 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
NAME: elasticsearch-master
LAST DEPLOYED: Wed Dec  8 20:29:29 2021
NAMESPACE: elastic
STATUS: deployed
REVISION: 1
NOTES:
1. Watch all cluster members come up.
  $ kubectl get pods --namespace=elastic -l app=elasticsearch-master -w
2. Test cluster health using Helm test.
  $ helm --namespace=elastic test elasticsearch-master
root@master:~/elastic/es# helm install elasticsearch-data elastic/elasticsearch -n elastic --version 7.15.0 -f data.yaml
W1208 20:29:40.397263 1520939 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
W1208 20:29:40.493907 1520939 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
NAME: elasticsearch-data
LAST DEPLOYED: Wed Dec  8 20:29:38 2021
NAMESPACE: elastic
STATUS: deployed
REVISION: 1
NOTES:
1. Watch all cluster members come up.
  $ kubectl get pods --namespace=elastic -l app=elasticsearch-data -w
2. Test cluster health using Helm test.
  $ helm --namespace=elastic test elasticsearch-data
root@master:~/elastic/es# helm install elasticsearch-client elastic/elasticsearch -n elastic --version 7.15.0 -f client.yaml
W1208 20:29:52.816127 1521372 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
W1208 20:29:53.030267 1521372 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
NAME: elasticsearch-client
LAST DEPLOYED: Wed Dec  8 20:29:51 2021
NAMESPACE: elastic
STATUS: deployed
REVISION: 1
NOTES:
1. Watch all cluster members come up.
  $ kubectl get pods --namespace=elastic -l app=elasticsearch-client -w
2. Test cluster health using Helm test.
  $ helm --namespace=elastic test elasticsearch-client
设置 IngressRoute

使用 kubectl apply -f xxx.yaml 安装即可。

apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: elasticsearch
  namespace: elastic
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`es.local.com`)
      kind: Rule
      services:
        - name: elasticsearch-client
          port: 9200
测试

到这里,ELasticsearch 集群就搭建完成了,我们使用 curl 访问测试下,如下:

➜  ~ curl http://es.local.com/ --user elastic:eYTmelHfmA4=
{
  "name" : "elasticsearch-client-0",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "d02v2L9AS4WdVsb82iqcBw",
  "version" : {
    "number" : "7.15.0",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "79d65f6e357953a5b3cbcc5e2c7c21073d89aa29",
    "build_date" : "2021-09-16T03:05:29.143308416Z",
    "build_snapshot" : false,
    "lucene_version" : "8.9.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

3.4 部署 Kibana

使用下述清单安装:

# 连接 ELasticsearch 的 client 节点
elasticsearchHosts: "http://elasticsearch-client:9200"

# 以环境变量的方式写入账号密码
extraEnvs:
  - name: ELASTICSEARCH_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: ELASTICSEARCH_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password

# 指定镜像及tag
image: "docker.elastic.co/kibana/kibana"
imageTag: "7.15.0"

# Pod 部署在 node2 节点上
nodeSelector:
  usefulness: elastic
  
# 资源
resources:
  requests:
    cpu: "500m"
    memory: "1Gi"
  limits:
    cpu: "500m"
    memory: "1Gi"
    
# 配置,指定中文
kibanaConfig:
  kibana.yml: |
        i18n.locale: "zh-CN"

安装命令:

helm install kibana elastic/kibana -n elastic --version 7.15.0 -f kibana.yaml

安装结果:

root@master:~/elastic/es# helm install kibana elastic/kibana -n elastic --version 7.15.0 -f kibana.yaml
NAME: kibana
LAST DEPLOYED: Wed Dec  8 20:53:41 2021
NAMESPACE: elastic
STATUS: deployed
REVISION: 1
TEST SUITE: None

设置 IngressRoute,使用 kubectl apply -f xxx.yaml 安装即可。

apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: kibana
  namespace: elastic
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`kibana.local.com`)
      kind: Rule
      services:
        - name: kibana-kibana
          port: 5601

浏览器访问 http://kibana.local.com 即可:

image-20211208211913377

3.5 部署 Filebeat

创建 Kafka Topic

由于我们需要将 Filebeat 采集到的日志信息写入 Kafka,因此需要提前创建一个 名为 filebeat 的 Topic,设置三个分区,两个备份。

  • 创建并连接 kafka client pod

    root@master:~/kafka-demo/k8s/deploy# kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace devops --command -- sleep infinity
    pod/kafka-client created
    
    root@master:~/kafka-demo/k8s/deploy# kubectl exec --tty -i kafka-client --namespace devops -- bash
    I have no name!@kafka-client:/$
    
  • 连接上 kafka client pod 后,管理 Topic

    # 查看 Topic 列表
    I have no name!@kafka-client:/$ kafka-topics.sh --bootstrap-server kafka.devops.svc.cluster.local:9092 --list
    __consumer_offsets
    
    # 创建 Topic
    I have no name!@kafka-client:/$ kafka-topics.sh --bootstrap-server kafka.devops.svc.cluster.local:9092 --create --topic filebeat --partitions 3 --replication-factor 2
    Created topic order.
    
    # 删除 Topic
    I have no name!@kafka-client:/$ kafka-topics.sh --bootstrap-server kafka.devops.svc.cluster.local:9092 --delete --topic filebeat
    
Helm 安装

使用下述清单安装:

# 启用 daemonset,部署在 node3 节点上
daemonset:
  enabled: true
  # Filebeat 配置
  filebeatConfig:
    filebeat.yml: |
      filebeat.inputs:
      - type: container
        paths:
          - /var/log/containers/*.log
        processors:
        - add_kubernetes_metadata:
            host: ${NODE_NAME}
            matchers:
            - logs_path:
                logs_path: "/var/log/containers/"
      output:
        kafka:
          enabled: true # 增加kafka的输出
          hosts: ["kafka-0.kafka-headless.devops.svc.cluster.local:9092","kafka-1.kafka-headless.devops.svc.cluster.local:9092","kafka-2.kafka-headless.devops.svc.cluster.local:9092"]
          topic: filebeat
          max_message_bytes: 5242880
          partition.round_robin:
            reachable_only: true
          keep-alive: 120
          required_acks: 1      
  # Pod 部署在 node2 节点上
  nodeSelector:
    usefulness: business
# 指定镜像及tag
image: "docker.elastic.co/beats/filebeat"
imageTag: "7.15.0"

安装命令:

helm install filebeat elastic/filebeat -n elastic --version 7.15.0 -f filebeat.yaml

安装结果:

root@master:~/elastic/filebeat# helm install filebeat elastic/filebeat -n elastic --version 7.15.0 -f filebeat.yaml
NAME: filebeat
LAST DEPLOYED: Wed Dec  8 21:55:19 2021
NAMESPACE: elastic
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
1. Watch all containers come up.
  $ kubectl get pods --namespace=elastic -l app=filebeat-filebeat -w

等安装完毕后,我们进入 kafka 的 client 容器查看下 filebeat 主题下有没有消息生成,如下所示,日志正确的写入了 Kafka 内。

root@master:~/elastic/filebeat# kubectl exec --tty -i kafka-client --namespace devops -- bash
I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.devops.svc.cluster.local:9092 --topic filebeat --from-beginning
{"@timestamp":"2021-12-04T09:26:18.777Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.15.0"},"container":{"runtime":"docker","image":{"name":"kubesphere/notification-tenant-sidecar:v3.2.0"},"id":"ec8ad16d0e2355f91cf75f147e80d9b798b920204c08afcc473ef15ac0a7effb"},"ecs":{"version":"1.11.0"},"host":{"name":"filebeat-filebeat-tl7m9"},"stream":"stderr","message":"I1204 17:26:18.777437       1 tenant.go:66] reload tenant","log":{"offset":2109526,"file":{"path":"/var/log/containers/notification-manager-deployment-78664576cb-grhdn_kubesphere-monitoring-system_tenant-ec8ad16d0e2355f91cf75f147e80d9b798b920204c08afcc473ef15ac0a7effb.log"}},"input":{"type":"container"},"kubernetes":{"pod":{"name":"notification-manager-deployment-78664576cb-grhdn","uid":"2b699ea2-a5ed-4adc-bb14-44f2064d0f7b","ip":"10.233.92.5"},"namespace":"kubesphere-monitoring-system","replicaset":{"name":"notification-manager-deployment-78664576cb"},"labels":{"notification-manager":"notification-manager","pod-template-hash":"78664576cb","app":"notification-manager"},"container":{"name":"tenant"},"node":{"name":"node3","uid":"b7afac83-9d97-4136-8263-4c0dfb5336c7","labels":{"beta_kubernetes_io/os":"linux","kubernetes_io/arch":"amd64","kubernetes_io/hostname":"node3","kubernetes_io/os":"linux","node-role_kubernetes_io/worker":"","usefulness":"business","beta_kubernetes_io/arch":"amd64"},"hostname":"node3"},"namespace_uid":"348859b8-96c7-4e44-abaf-762d4d35c7a5","namespace_labels":{"kubesphere_io/workspace":"system-workspace","kubernetes_io/metadata_name":"kubesphere-monitoring-system","kubesphere_io/namespace":"kubesphere-monitoring-system"}},"agent":{"type":"filebeat","version":"7.15.0","hostname":"filebeat-filebeat-tl7m9","ephemeral_id":"13b31a8c-e673-40c0-8d8d-2db9df6f9584","id":"c95eace6-19d6-421c-8fd8-a40df0e69921","name":"filebeat-filebeat-tl7m9"}}
......

3.6 部署 Logstash

使用下述清单安装:

# 连接 ELasticsearch 的 client 节点
elasticsearchHosts: "http://elasticsearch-client:9200"

# 指定镜像及tag
image: "docker.elastic.co/logstash/logstash"
imageTag: "7.15.0"

# Pod 部署在 node2 节点上
nodeSelector:
  usefulness: elastic
  
# 资源
resources:
  requests:
    cpu: "500m"
    memory: "1Gi"
  limits:
    cpu: "500m"
    memory: "1Gi"
    
persistence:
  enabled: true
    
logstashPipeline:
  logstash.conf: |
    input {
      kafka {
        bootstrap_servers => "kafka-0.kafka-headless.devops.svc.cluster.local:9092,kafka-1.kafka-headless.devops.svc.cluster.local:9092,kafka-2.kafka-headless.devops.svc.cluster.local:9092"
        client_id => "${LOGSTASH_ID}"
        topics => ["filebeat"]
        group_id => "logstash"
        decorate_events => true
        codec => "json"
      }
    }

    output {
      elasticsearch {
        hosts => ["http://elasticsearch-client:9200"]
        index => "filebeat-kafka-%{[kubernetes][namespace]}-%{[kubernetes][pod][name]}-%{+YYYY.MM.dd}"
        user => "${XPACK_MONITORING_ELASTICSEARCH_USERNAME}"
        password => "${XPACK_MONITORING_ELASTICSEARCH_PASSWORD}"
      }
    }    

# 以环境变量的方式写入账号密码
extraEnvs:
  - name: XPACK_MONITORING_ELASTICSEARCH_HOSTS
    value: '"http://elasticsearch-client:9200"'
  - name: XPACK_MONITORING_ELASTICSEARCH_USERNAME
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: username
  - name: XPACK_MONITORING_ELASTICSEARCH_PASSWORD
    valueFrom:
      secretKeyRef:
        name: elastic-credentials
        key: password
  - name: LOGSTASH_ID
    valueFrom:
      fieldRef:
        fieldPath: metadata.name

livenessProbe: null

readinessProbe:
  httpGet: null
  exec:
    command:
      - curl
      - localhost:9600

安装命令:

helm install logstash elastic/logstash -n elastic --version 7.15.0 -f logstash.yaml

安装结果:

root@master:~/elastic/logstash# helm install logstash elastic/logstash -n elastic --version 7.15.0 -f logstash.yaml
W1208 22:35:33.824061 1718593 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
W1208 22:35:33.914598 1718593 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
NAME: logstash
LAST DEPLOYED: Wed Dec  8 22:35:32 2021
NAMESPACE: elastic
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
1. Watch all cluster members come up.
  $ kubectl get pods --namespace=elastic -l app=logstash-logstash -w

4. 验证

至此,Elastic Stack 的软件都安装完毕,并且开始收集 node3 上的日志写入 Kafka 和 ELasticsearch,接下来我们使用 Golang结合K8s验证Kafka主题分区 来实际测试下,测试脚本如下:

#!/bin/bash

# 分区0下单
curl http://kafka-demo.local.com/submit-order -d "partition=0" && echo ""
curl http://kafka-demo.local.com/submit-order -d "partition=0" && echo ""
curl http://kafka-demo.local.com/submit-order -d "partition=0" && echo ""

# 分区1下单
curl http://kafka-demo.local.com/submit-order -d "partition=1" && echo ""
curl http://kafka-demo.local.com/submit-order -d "partition=1" && echo ""
curl http://kafka-demo.local.com/submit-order -d "partition=1" && echo ""

# 分区2下单
curl http://kafka-demo.local.com/submit-order -d "partition=2" && echo ""
curl http://kafka-demo.local.com/submit-order -d "partition=2" && echo ""
curl http://kafka-demo.local.com/submit-order -d "partition=2" && echo ""

测试结果如下:

➜  ~ bash test.sh
{"money":887,"order_id":"1468618182632673280","time":1638980834,"user_id":"1468618182280351744"}
{"money":887,"order_id":"1468618184075513856","time":1638980835,"user_id":"1468618183731580928"}
{"money":887,"order_id":"1468618184922763264","time":1638980835,"user_id":"1468618184603996160"}
{"money":887,"order_id":"1468618185765818368","time":1638980835,"user_id":"1468618185438662656"}
{"money":887,"order_id":"1468618187359653888","time":1638980835,"user_id":"1468618187036692480"}
{"money":887,"order_id":"1468618188185931776","time":1638980836,"user_id":"1468618187841998848"}
{"money":887,"order_id":"1468618189104484352","time":1638980836,"user_id":"1468618188718608384"}
{"money":887,"order_id":"1468618190874480640","time":1638980836,"user_id":"1468618190534742016"}
{"money":887,"order_id":"1468618191721730048","time":1638980837,"user_id":"1468618191386185728"}

接下来查看 Kibana 上面的索引,可以看到命名空间 kafka-demo 下所有 Pod 的日志都收集到了:

image-20211209002905358

接下来我们建一个索引模式 *-kafka-demo-*,如下图所示:

image-20211209003016854

之后查看日志,可以看到日志已经筛选出来了:

image-20211209003144773

我们以上面打印的订单号 1468618187359653888 为例进行检索,该订单的分区为 1,接着筛选展示的字段,仅展示如下字段:

  • Time
  • kubernetes.pod.name
  • message

从结果可以看出,该订单请求到了随机的一个 gateway,之后被 order-1 分区接收,并且由 repository-1 分区打包发货,最后由唯一的 statistics 分区统计,如下图所示:

image-20211209003857409

5. 总结

这篇文章我们通过使用 Helm 安装 ElasticStack,结合 Kafka 搭建了一个日志收集系统,并使用文章 Golang结合K8s验证Kafka主题分区 中的例子来进行验证测试,最终完成了整个日志系统的搭建。

Kubernetes Kubernetes ElasticStack ELK Filebeat Kafka

有限状态机FSM的简介与DemoGolang结合K8s验证Kafka主题分区