1. 概述
这篇文章是使用 Helm 安装 ZooKeeper 和 Kafka 的记录,ZooKeeper 单独安装是因为别的软件也需要使用 ZooKeeper,比如 Clickhouse。
本次安装我使用的是 Bitnami 维护的仓库,详细的安装参见官方说明:
安装前先说明下当前的环境,规划一下安装方案。
节点 | 角色 | IP | 配置 | Label |
---|---|---|---|---|
master | master, etcd | 192.168.1.100 | 4核4G50G | usefulness=schedule |
node1 | worker | 192.168.1.101 | 8核8G100G | usefulness=devops |
node2 | worker | 192.168.1.102 | 8核8G100G | usefulness=demo |
node3 | worker | 192.168.1.103 | 8核8G100G | usefulness=business |
我计划将 Kafka 与 ZooKeeper 都安装在 node1 节点上,因此安装的时候需要对 Chart 默认的参数进行修改。
2. 安装
开始安装之前,需要先在 host 中写入测试域名,如下:
cat >> /etc/hosts <<EOF
192.168.1.101 kafka.local.com
192.168.1.101 zookeeper.local.com
EOF
2.1 ZooKeeper
helm 安装
这里我修改了下述配置:
- 节点数设置为3
- 加上 nodeSelector 相关的配置即可,将 pod 部署在 node1 节点上
- service 类型修改为 NodePort,方便本地调试
replicaCount: 3
nodeSelector:
usefulness: devops
service:
type: NodePort
nodePorts:
client: 2181
上述内容保存为 zookeeper.yaml
,使用如下命令进行安装:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install zookeeper bitnami/zookeeper -f zookeeper.yaml --namespace devops
安装完成后输出如下:
root@master:~/k8s/traefik/zookeeper# helm install zookeeper bitnami/zookeeper -f zookeeper.yaml --namespace devops
NAME: zookeeper
LAST DEPLOYED: Sun Dec 5 16:17:53 2021
NAMESPACE: devops
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 7.4.13
APP VERSION: 3.7.0
** Please be patient while the chart is being deployed **
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.devops.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace devops -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh
To connect to your ZooKeeper server from outside the cluster execute the following commands:
export NODE_IP=$(kubectl get nodes --namespace devops -o jsonpath="{.items[0].status.addresses[0].address}")
export NODE_PORT=$(kubectl get --namespace devops -o jsonpath="{.spec.ports[0].nodePort}" services zookeeper)
zkCli.sh $NODE_IP:$NODE_PORT
从上述信息得知,内部访问 ZooKeeper 的 DNS 地址为 zookeeper.devops.svc.cluster.local:2181,后面两个命令是一个是连接 ZooKeeper 执行命令,另一个是外部访问的方法。
接着我们查看下部署好的资源信息:
# 查看 StatefulSet
root@master:~/k8s/traefik# kubectl get statefulset -n devops -o wide
NAME READY AGE CONTAINERS IMAGES
zookeeper 3/3 8m10s zookeeper docker.io/bitnami/zookeeper:3.7.0-debian-10-r215
# 查看 Pod
root@master:~/k8s/traefik# kubectl get pod -n devops -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
zookeeper-0 1/1 Running 0 9m7s 10.233.90.19 node1 <none> <none>
zookeeper-1 1/1 Running 0 9m7s 10.233.90.20 node1 <none> <none>
zookeeper-2 1/1 Running 0 9m7s 10.233.90.18 node1 <none> <none>
# 查看 Service
root@master:~/k8s/traefik/zookeeper# kubectl get svc -n devops -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
zookeeper NodePort 10.233.3.58 <none> 2181:2181/TCP,2888:12521/TCP,3888:26982/TCP 95s app.kubernetes.io/component=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/name=zookeeper
zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 95s app.kubernetes.io/component=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/name=zookeeper
traefik
这里我们使用 traefik 将外部的访问流量打到 ZooKeeper 服务上,IngressRoute 清单如下:
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
name: zookeeper
namespace: devops
spec:
entryPoints:
- web
routes:
- match: Host(`zookeeper.local.com`)
kind: Rule
services:
- name: zookeeper
port: 2181
上述内容保存为 ingress-route-zookeeper.yaml
,执行命令创建即可:
kubectl apply -f ingress-route-zookeeper.yaml
创建好后查看路由信息:
root@master:~/k8s/traefik/zookeeper# kubectl get ingressroute -n devops
NAME AGE
zookeeper 24s
root@master:~/k8s/traefik/zookeeper# kubectl describe ingressroute zookeeper -n devops
Name: zookeeper
Namespace: devops
Labels: <none>
Annotations: <none>
API Version: traefik.containo.us/v1alpha1
Kind: IngressRoute
Metadata:
Creation Timestamp: 2021-12-05T07:54:05Z
Generation: 1
Managed Fields:
API Version: traefik.containo.us/v1alpha1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:entryPoints:
f:routes:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2021-12-05T07:54:05Z
Resource Version: 341404
UID: 95a50c4d-26e5-4835-875c-7fff6ff6d635
Spec:
Entry Points:
web
Routes:
Kind: Rule
Match: Host(`zookeeper.local.com`)
Services:
Name: zookeeper
Port: 2181
Events: <none>
2.2 Kafka
helm 安装
这里我修改了下述配置:
- 节点数设置为3
- 加上 nodeSelector 相关的配置即可,将 pod 部署在 node1 节点上
- 允许删除主题
- service 类型修改为 NodePort,方便本地调试
- 禁用了内部 ZooKeeper,改为使用外部 ZooKeeper,也就是上面我们安装的 ZooKeeper
- 启用k8s集群外部访问,并分别配置了每一个节点对外访问的端口
replicaCount: 3
nodeSelector:
usefulness: devops
deleteTopicEnable: true
service:
type: NodePort
nodePorts:
client: 9092
zookeeper:
enabled: false
externalZookeeper:
servers: ["zookeeper.devops.svc.cluster.local:2181"]
externalAccess:
enabled: true
service:
type: NodePort
nodePorts:
- 19090
- 19091
- 19092
useHostIPs: true
上述内容保存为 kafka.yaml
,使用如下命令进行安装:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install kafka bitnami/kafka -f kafka.yaml --namespace devops
安装完成后输出如下:
root@master:~/k8s/traefik/kafka# helm install kafka bitnami/kafka -f kafka.yaml --namespace devops
NAME: kafka
LAST DEPLOYED: Sun Dec 5 17:10:59 2021
NAMESPACE: devops
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 14.4.3
APP VERSION: 2.8.1
---------------------------------------------------------------------------------------------
WARNING
By specifying "serviceType=LoadBalancer" and not configuring the authentication
you have most likely exposed the Kafka service externally without any
authentication mechanism.
For security reasons, we strongly suggest that you switch to "ClusterIP" or
"NodePort". As alternative, you can also configure the Kafka authentication.
---------------------------------------------------------------------------------------------
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.devops.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.devops.svc.cluster.local:9092
kafka-1.kafka-headless.devops.svc.cluster.local:9092
kafka-2.kafka-headless.devops.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace devops --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace devops -- bash
PRODUCER:
kafka-console-producer.sh \
--broker-list kafka-0.kafka-headless.devops.svc.cluster.local:9092,kafka-1.kafka-headless.devops.svc.cluster.local:9092,kafka-2.kafka-headless.devops.svc.cluster.local:9092 \
--topic test
CONSUMER:
kafka-console-consumer.sh \
--bootstrap-server kafka.devops.svc.cluster.local:9092 \
--topic test \
--from-beginning
To connect to your Kafka server from outside the cluster, follow the instructions below:
Kafka brokers domain: You can get the external node IP from the Kafka configuration file with the following commands (Check the EXTERNAL listener)
1. Obtain the pod name:
kubectl get pods --namespace devops -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka,app.kubernetes.io/component=kafka"
2. Obtain pod configuration:
kubectl exec -it KAFKA_POD -- cat /opt/bitnami/kafka/config/server.properties | grep advertised.listeners
Kafka brokers port: You will have a different node port for each Kafka broker. You can get the list of configured node ports using the command below:
echo "$(kubectl get svc --namespace devops -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka,app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].spec.ports[0].nodePort}' | tr ' ' '\n')"
从上面输出内容可以知晓,k8s 内部访问 kafka 集群的 DNS 地址为 kafka.devops.svc.cluster.local:9092,三个 Broker 的内部访问地址分别如下:
- kafka-0.kafka-headless.devops.svc.cluster.local:9092
- kafka-1.kafka-headless.devops.svc.cluster.local:9092
- kafka-2.kafka-headless.devops.svc.cluster.local:9092
3. 测试
3.1 ZooKeeper
测试代码如下:
// 连接 ZooKeeper 集群
hosts := []string{"zookeeper.local.com"}
option := zk.WithEventCallback(
func(event zk.Event) {
b, _ := json.Marshal(event)
fmt.Printf("[all event] %s\n", b)
},
)
conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
fmt.Printf("[connect zookeeper server error] %v\n", err)
return
}
defer conn.Close()
fmt.Println("查看根节点下子节点")
nodes, stat, err := conn.Children("/")
fmt.Printf("[root children] %+v, %+v, %v\n", nodes, stat, err)
acl := zk.WorldACL(zk.PermAll)
fmt.Println("-----------------------------------")
fmt.Println("添加golang节点")
res, err := conn.Create("/golang", []byte("this is golang znode"), ZKFlagPersistent, acl)
if err != nil {
fmt.Printf("create znode [%s] error: %v\n", "/golang", err)
return
}
fmt.Printf("create znode [%s] success: %v\n", "/golang", res)
fmt.Println("-----------------------------------")
fmt.Println("再次查看根节点下子节点")
nodes, stat, err = conn.Children("/")
fmt.Printf("[root children] %+v, %+v, %v\n", nodes, stat, err)
测试结果如下,可以看到查看和创建都正常:
查看根节点下子节点
[all event] {"Type":-1,"State":1,"Path":"","Err":null,"Server":"192.168.1.101:2181"}
[all event] {"Type":-1,"State":100,"Path":"","Err":null,"Server":"192.168.1.101:2181"}
2021/12/05 16:23:44 Connected to 192.168.1.101:2181
[all event] {"Type":-1,"State":101,"Path":"","Err":null,"Server":"192.168.1.101:2181"}
2021/12/05 16:23:44 authenticated: id=216182479996452864, timeout=5000
2021/12/05 16:23:44 re-submitting `0` credentials after reconnect
[root children] [zookeeper], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:-1 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:1 Pzxid:0}, <nil>
-----------------------------------
添加golang节点
create znode [/golang] success: /golang
-----------------------------------
再次查看根节点下子节点
[root children] [zookeeper golang], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:2 Pzxid:8589934594}, <nil>
3.2 Kafka
3.2.1 主题
首先我们需要创建一个名为 test_topic 的主题,分区数设为 3,副本数设为 2,创建之后获取所有的主题信息:
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"github.com/Shopify/sarama"
)
func main() {
servers := []string{
"kafka.local.com:19090",
"kafka.local.com:19091",
"kafka.local.com:19092",
}
topic := "test_topic"
sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
cfg := sarama.NewConfig()
cfg.Version = sarama.V2_8_0_0
// 连接
admin, err := sarama.NewClusterAdmin(servers, cfg)
if err != nil {
panic(err)
}
defer admin.Close()
// 创建Topic
fmt.Println("----------------------------------")
fmt.Println("创建Topic")
err = admin.CreateTopic(
topic, &sarama.TopicDetail{
NumPartitions: 3,
ReplicationFactor: 2,
}, false,
)
if err != nil {
panic(err)
}
// 获取所有Topic
fmt.Println("----------------------------------")
fmt.Println("获取Topic列表")
res, err := admin.ListTopics()
if err != nil {
panic(err)
}
for topic, detail := range res {
b, _ := json.Marshal(detail)
fmt.Printf("topic: %s, detail: %s\n", topic, b)
}
}
执行结果如下:
[Sarama] 2021/12/05 18:41:22 Initializing new client
[Sarama] 2021/12/05 18:41:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:41:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:41:22 client/metadata fetching metadata for all topics from broker kafka.local.com:19091
[Sarama] 2021/12/05 18:41:22 Connected to broker at kafka.local.com:19091 (unregistered)
[Sarama] 2021/12/05 18:41:22 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2021/12/05 18:41:22 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2021/12/05 18:41:22 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2021/12/05 18:41:22 Successfully initialized new client
[Sarama] 2021/12/05 18:41:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
----------------------------------
[Sarama] 2021/12/05 18:41:22 Connected to broker at 192.168.1.101:19090 (registered as #0)
创建Topic
----------------------------------
获取Topic列表
[Sarama] 2021/12/05 18:41:23 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:41:23 Connected to broker at 192.168.1.101:19091 (registered as #1)
topic: test_topic, detail: {"NumPartitions":3,"ReplicationFactor":2,"ReplicaAssignment":{"0":[0,1],"1":[2,0],"2":[1,2]},"ConfigEntries":{"flush.messages":"10000","flush.ms":"1000","max.message.bytes":"1000012","retention.bytes":"1073741824","segment.bytes":"1073741824"}}
topic: __consumer_offsets, detail: {"NumPartitions":50,"ReplicationFactor":1,"ReplicaAssignment":{"0":[2],"1":[1],"10":[1],"11":[0],"12":[2],"13":[1],"14":[0],"15":[2],"16":[1],"17":[0],"18":[2],"19":[1],"2":[0],"20":[0],"21":[2],"22":[1],"23":[0],"24":[2],"25":[1],"26":[0],"27":[2],"28":[1],"29":[0],"3":[2],"30":[2],"31":[1],"32":[0],"33":[2],"34":[1],"35":[0],"36":[2],"37":[1],"38":[0],"39":[2],"4":[1],"40":[1],"41":[0],"42":[2],"43":[1],"44":[0],"45":[2],"46":[1],"47":[0],"48":[2],"49":[1],"5":[0],"6":[2],"7":[1],"8":[0],"9":[2]},"ConfigEntries":{"cleanup.policy":"compact","compression.type":"producer","flush.messages":"10000","flush.ms":"1000","max.message.bytes":"1000012","retention.bytes":"1073741824","segment.bytes":"104857600"}}
[Sarama] 2021/12/05 18:41:23 Closing Client
3.2.2 生产者
这里启动了3个 Goroutine,分别往 test_topic 的三个分区各自写入10条消息,消息格式为 [分区号-消息号]。
package main
import (
"fmt"
"log"
"os"
"sync"
"github.com/Shopify/sarama"
)
func main() {
servers := []string{
"kafka.local.com:19090",
"kafka.local.com:19091",
"kafka.local.com:19092",
}
topic := "test_topic"
sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Return.Successes = true
cfg.Producer.Partitioner = sarama.NewManualPartitioner
// 生产消息
producer, err := sarama.NewSyncProducer(servers, cfg)
if err != nil {
panic(err)
}
defer producer.Close()
wg := sync.WaitGroup{}
for p := 0; p < 3; p++ {
wg.Add(1)
go func(p int32) {
defer wg.Done()
for i := 0; i < 10; i++ {
// 消息
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Partition = p
msg.Value = sarama.StringEncoder(fmt.Sprintf("this is test message: %d-%d", p, i))
_, _, _ = producer.SendMessage(msg)
fmt.Printf("produce message [%s] to [Partition %d] success.\n", msg.Value, p)
}
}(int32(p))
}
wg.Wait()
}
执行结果如下,可以看到消息已经正常写入主题 test_topic 对应的分区。
[Sarama] 2021/12/05 18:47:05 Initializing new client
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 client/metadata fetching metadata for all topics from broker kafka.local.com:19090
[Sarama] 2021/12/05 18:47:05 Connected to broker at kafka.local.com:19090 (unregistered)
[Sarama] 2021/12/05 18:47:05 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2021/12/05 18:47:05 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2021/12/05 18:47:05 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2021/12/05 18:47:05 Successfully initialized new client
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 producer/broker/1 starting up
[Sarama] 2021/12/05 18:47:05 producer/broker/2 starting up
[Sarama] 2021/12/05 18:47:05 Connected to broker at 192.168.1.101:19092 (registered as #2)
[Sarama] 2021/12/05 18:47:05 producer/broker/1 state change to [open] on test_topic/2
[Sarama] 2021/12/05 18:47:05 producer/broker/0 starting up
[Sarama] 2021/12/05 18:47:05 producer/broker/2 state change to [open] on test_topic/1
[Sarama] 2021/12/05 18:47:05 producer/broker/0 state change to [open] on test_topic/0
[Sarama] 2021/12/05 18:47:05 Connected to broker at 192.168.1.101:19091 (registered as #1)
[Sarama] 2021/12/05 18:47:05 Connected to broker at 192.168.1.101:19090 (registered as #0)
produce message [this is test message: 1-0] to [Partition 1] success.
produce message [this is test message: 1-1] to [Partition 1] success.
produce message [this is test message: 1-2] to [Partition 1] success.
produce message [this is test message: 1-3] to [Partition 1] success.
produce message [this is test message: 0-0] to [Partition 0] success.
produce message [this is test message: 0-1] to [Partition 0] success.
produce message [this is test message: 1-4] to [Partition 1] success.
produce message [this is test message: 2-0] to [Partition 2] success.
produce message [this is test message: 0-2] to [Partition 0] success.
produce message [this is test message: 1-5] to [Partition 1] success.
produce message [this is test message: 0-3] to [Partition 0] success.
produce message [this is test message: 1-6] to [Partition 1] success.
produce message [this is test message: 2-1] to [Partition 2] success.
produce message [this is test message: 0-4] to [Partition 0] success.
produce message [this is test message: 1-7] to [Partition 1] success.
produce message [this is test message: 2-2] to [Partition 2] success.
produce message [this is test message: 1-8] to [Partition 1] success.
produce message [this is test message: 0-5] to [Partition 0] success.
produce message [this is test message: 2-3] to [Partition 2] success.
produce message [this is test message: 1-9] to [Partition 1] success.
produce message [this is test message: 0-6] to [Partition 0] success.
produce message [this is test message: 2-4] to [Partition 2] success.
produce message [this is test message: 0-7] to [Partition 0] success.
produce message [this is test message: 2-5] to [Partition 2] success.
produce message [this is test message: 0-8] to [Partition 0] success.
produce message [this is test message: 2-6] to [Partition 2] success.
produce message [this is test message: 0-9] to [Partition 0] success.
produce message [this is test message: 2-7] to [Partition 2] success.
produce message [this is test message: 2-8] to [Partition 2] success.
produce message [this is test message: 2-9] to [Partition 2] success.
[Sarama] 2021/12/05 18:47:06 Producer shutting down.
[Sarama] 2021/12/05 18:47:06 Closing Client
[Sarama] 2021/12/05 18:47:06 producer/broker/1 input chan closed
[Sarama] 2021/12/05 18:47:06 producer/broker/1 shut down
[Sarama] 2021/12/05 18:47:06 producer/broker/2 input chan closed
[Sarama] 2021/12/05 18:47:06 producer/broker/2 shut down
[Sarama] 2021/12/05 18:47:06 producer/broker/0 input chan closed
[Sarama] 2021/12/05 18:47:06 producer/broker/0 shut down
3.2.3 消费者
接下来启动三个分区消费者,分别读取各个分区的消息,代码如下:
package main
import (
"fmt"
"log"
"os"
"github.com/Shopify/sarama"
)
func main() {
servers := []string{
"kafka.local.com:19090",
"kafka.local.com:19091",
"kafka.local.com:19092",
}
topic := "test_topic"
sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
consumer, _ := sarama.NewConsumer(servers, nil)
partitions, _ := consumer.Partitions(topic)
for partition := range partitions {
go func(partition int32) {
pc, _ := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
for msg := range pc.Messages() {
fmt.Printf(
"PartitionConsumer [%d] consume one message: Partition=%d, Offset=%d, Key=%v, Value=%s\n",
partition, msg.Partition,
msg.Offset, msg.Key, msg.Value,
)
}
}(int32(partition))
}
select {}
}
执行结果如下,可以看到各个分区的消费者已经争取消费了各自的10条数据:
[Sarama] 2021/12/05 18:54:17 Initializing new client
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 client/metadata fetching metadata for all topics from broker kafka.local.com:19092
[Sarama] 2021/12/05 18:54:17 Connected to broker at kafka.local.com:19092 (unregistered)
[Sarama] 2021/12/05 18:54:17 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2021/12/05 18:54:17 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2021/12/05 18:54:17 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2021/12/05 18:54:17 Successfully initialized new client
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 Connected to broker at 192.168.1.101:19090 (registered as #0)
[Sarama] 2021/12/05 18:54:17 Connected to broker at 192.168.1.101:19092 (registered as #2)
[Sarama] 2021/12/05 18:54:17 Connected to broker at 192.168.1.101:19091 (registered as #1)
[Sarama] 2021/12/05 18:54:17 consumer/broker/0 added subscription to test_topic/0
[Sarama] 2021/12/05 18:54:17 consumer/broker/1 added subscription to test_topic/2
[Sarama] 2021/12/05 18:54:17 consumer/broker/2 added subscription to test_topic/1
PartitionConsumer [0] consume one message: Partition=0, Offset=0, Key=[], Value=this is test message: 0-0
PartitionConsumer [0] consume one message: Partition=0, Offset=1, Key=[], Value=this is test message: 0-1
PartitionConsumer [0] consume one message: Partition=0, Offset=2, Key=[], Value=this is test message: 0-2
PartitionConsumer [0] consume one message: Partition=0, Offset=3, Key=[], Value=this is test message: 0-3
PartitionConsumer [0] consume one message: Partition=0, Offset=4, Key=[], Value=this is test message: 0-4
PartitionConsumer [0] consume one message: Partition=0, Offset=5, Key=[], Value=this is test message: 0-5
PartitionConsumer [0] consume one message: Partition=0, Offset=6, Key=[], Value=this is test message: 0-6
PartitionConsumer [0] consume one message: Partition=0, Offset=7, Key=[], Value=this is test message: 0-7
PartitionConsumer [0] consume one message: Partition=0, Offset=8, Key=[], Value=this is test message: 0-8
PartitionConsumer [0] consume one message: Partition=0, Offset=9, Key=[], Value=this is test message: 0-9
PartitionConsumer [1] consume one message: Partition=1, Offset=0, Key=[], Value=this is test message: 1-0
PartitionConsumer [1] consume one message: Partition=1, Offset=1, Key=[], Value=this is test message: 1-1
PartitionConsumer [1] consume one message: Partition=1, Offset=2, Key=[], Value=this is test message: 1-2
PartitionConsumer [1] consume one message: Partition=1, Offset=3, Key=[], Value=this is test message: 1-3
PartitionConsumer [1] consume one message: Partition=1, Offset=4, Key=[], Value=this is test message: 1-4
PartitionConsumer [1] consume one message: Partition=1, Offset=5, Key=[], Value=this is test message: 1-5
PartitionConsumer [1] consume one message: Partition=1, Offset=6, Key=[], Value=this is test message: 1-6
PartitionConsumer [1] consume one message: Partition=1, Offset=7, Key=[], Value=this is test message: 1-7
PartitionConsumer [1] consume one message: Partition=1, Offset=8, Key=[], Value=this is test message: 1-8
PartitionConsumer [1] consume one message: Partition=1, Offset=9, Key=[], Value=this is test message: 1-9
PartitionConsumer [2] consume one message: Partition=2, Offset=0, Key=[], Value=this is test message: 2-0
PartitionConsumer [2] consume one message: Partition=2, Offset=1, Key=[], Value=this is test message: 2-1
PartitionConsumer [2] consume one message: Partition=2, Offset=2, Key=[], Value=this is test message: 2-2
PartitionConsumer [2] consume one message: Partition=2, Offset=3, Key=[], Value=this is test message: 2-3
PartitionConsumer [2] consume one message: Partition=2, Offset=4, Key=[], Value=this is test message: 2-4
PartitionConsumer [2] consume one message: Partition=2, Offset=5, Key=[], Value=this is test message: 2-5
PartitionConsumer [2] consume one message: Partition=2, Offset=6, Key=[], Value=this is test message: 2-6
PartitionConsumer [2] consume one message: Partition=2, Offset=7, Key=[], Value=this is test message: 2-7
PartitionConsumer [2] consume one message: Partition=2, Offset=8, Key=[], Value=this is test message: 2-8
PartitionConsumer [2] consume one message: Partition=2, Offset=9, Key=[], Value=this is test message: 2-9
4. 总结
本篇文章使用 helm 安装了 ZooKeeper 和 Kafka,并且安装完成后使用 golang 进行了测试,测试结果都合预期一样。