Etcd服务注册发现实例

Wednesday, December 15, 2021

1. 概述

在微服务架构中,每一个微服务都是独立部署的,各个微服务之间需要有一个方式获取自己需要服务的连接方式,这里就用到了服务注册与发现。简单点说,服务注册就是将自己的服务地址注册到服务中心,并做好健康检测,服务发现呢就是监听服务中心,动态获知自己依赖服务的变动(新增修改或删除)。

1.1 流程说明

下图中,服务A依赖服务B,传统做法中,是将服务B的访问地址配置到服务A中,但假如服务B有多个服务地址的话,就需要动态去维护,这样实际操作很不方便,这个时候转换思路,搭建一个服务仓库,服务B将自己的服务地址(endpoint)注册到服务仓库,服务A呢从服务仓库获取,并且服务B做好健康检查,服务A呢做好监听,如果有变动,可以立即做出修改:

  • 服务新增:服务 B 新增两个服务 B2 和 B3,这两个服务会注册到服务中心,服务 A 会监听得到这两个服务的 endpoint,进而可以将部分流量发送到 B2 和 B3。
  • 服务删除:服务 B1 因为某些原因宕机,此时 B1 不再保持健康,继而服务仓库中将移除它的 endpoint,服务A 监听后也会将本都 B1 的连接信息移除。

服务注册与发现

1.2 核心概念

这里有几个核心概念需要说明下:

  1. 服务注册:各个服务将自己的访问信息注册到服务中心
  2. 服务发现:从服务中心获取自己需要的服务的访问信息
  3. 健康检查:注册到服务中心的服务需要定时维护上报自己的健康信息,即 keep alive

1.3 etcd

基于上述的核心概念,我们列出 etcd 中对应实现的方法:

  • 服务注册:生成租约(lease grant)、注册入口(put –lease),健康检查(keep alive)
  • 服务发现:获取入口(get)、监听变动(watch)

2. 实例

这里我们以商城获取商品列表为例进行说明,假设我们有一个商城,包含一个网关服务和隐藏在后端的商品活动用户等服务,后端的服务都是动态扩缩服务数量的,因此我们需要将后端的服务入口注册到服务仓库,网关层从服务仓库中获取到所有的服务入口,简单架构如下:

etcd服务注册与发现demo

基于上图我们做一个简单的实例,这个实例中,后端服务只写了商品服务,并且包含一个获取商品列表数据的接口,网关层对应的也是提供一个商品列表接口,其中服务仓库我们使用 etcd 来做,所有的服务前缀使用 /service_registry_discovery,key 的格式为 /service_registry_discovery/{服务名}/{主机名},val 格式为 ip:port

完整的代码托管在 Github

2.1 服务注册

这里我们将商品服务的入口的注册到 etcd 中,流程如下:

  1. 生成一个3秒的租约
  2. 将服务的入口写入 etcd,写入的时候使用上一步生成的租约
  3. 保持租约存活

代码如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net"
	"net/http"
	"os"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

const (
	// ServiceGoods 商品服务
	ServiceGoods = "goods"
)

// serviceEndpointKeyPrefix 服务入口在 etcd 存储的 key 前缀
var serviceEndpointKeyPrefix = "/service_registry_discovery"

// hostname 主机名
var hostname string

// endpoint 访问入口
var endpoint string

// servicePort 服务端口
var servicePort int = 80

// etcdCfg Etcd配置
var etcdCfg = clientv3.Config{
	Endpoints: []string{
		"http://etcd-1.etcd-headless.devops.svc.cluster.local:2379",
		"http://etcd-2.etcd-headless.devops.svc.cluster.local:2379",
		"http://etcd-3.etcd-headless.devops.svc.cluster.local:2379",
	},
	DialTimeout:          time.Second * 30,
	DialKeepAliveTimeout: time.Second * 30,
	Username:             "root",
	Password:             "90CjPHPRlxw=",
}

// init 初始化
func init() {
	hostname, _ = os.Hostname()
	addrs, err := net.InterfaceAddrs()
	if err != nil {
		fmt.Println(err)
		return
	}
	for _, address := range addrs {
		// 检查ip地址判断是否回环地址
		if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
			if ipnet.IP.To4() != nil {
				endpoint = fmt.Sprintf("%s:%d", ipnet.IP.String(), servicePort)
				break
			}
		}
	}
}

func main() {
	// 服务注册
	go func() {
		ServiceRegistry()
	}()
	http.HandleFunc(
		"/", func(w http.ResponseWriter, r *http.Request) {
			_, _ = fmt.Fprint(w, "goods")
		},
	)
	http.HandleFunc("/goods/list", GetGoodsList)
	_ = http.ListenAndServe(fmt.Sprintf(":%d", servicePort), nil)
}

// GetGoodsList 获取商品列表
func GetGoodsList(w http.ResponseWriter, r *http.Request) {
	// 生成订单信息
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		fmt.Printf("read body err, %v\n", err)
		return
	}
	params := make(map[string]interface{})
	_ = json.Unmarshal(body, &params)
	var traceID = params["trace_id"]
	var res = map[string]interface{}{
		"trace_id": traceID,
		"message":  "get goods list success",
	}
	b, _ := json.Marshal(res)
	fmt.Printf(
		"trace_id: %v, goods list result: %s\n", traceID, string(b),
	)
	_, _ = fmt.Fprint(w, string(b))
}

// ServiceRegistry 服务注册
func ServiceRegistry() {
	hostname, _ = os.Hostname()
	cli, err := clientv3.New(etcdCfg)
	if err != nil {
		panic(err)
	}
	key := fmt.Sprintf("%s/%s/%s", serviceEndpointKeyPrefix, ServiceGoods, hostname)
	ctx := context.Background()
	// 过期时间: 3秒钟
	ttl := 3
	// 创建租约
	lease, err := cli.Grant(ctx, int64(ttl))
	if err != nil {
		panic(err)
	}
	b, _ := json.Marshal(lease)
	fmt.Printf("grant lease suucess: %s\n", string(b))
	// put kv
	res, err := cli.Put(ctx, key, endpoint, clientv3.WithLease(lease.ID))
	if err != nil {
		panic(err)
	}
	b, _ = json.Marshal(res)
	fmt.Printf("put kv with lease suucess: %s\n", string(b))
	// 保持租约不过期
	klRes, err := cli.KeepAlive(ctx, lease.ID)
	if err != nil {
		panic(err)
	}
	// 监听续约情况
	for v := range klRes {
		b, _ = json.Marshal(v)
		fmt.Printf("keep lease alive suucess: %s\n", string(b))
	}
	fmt.Println("stop keeping lease alive")
}

2.2 服务发现

网关服务启动时会从 etcd 中获取所需服务的入口,并启动监听,如果有新增则写入对应服务的 endpoints 配置,如果有删除则删除 endpoints 中对应的记录,代码如下:

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"math/rand"
	"net/http"
	"sync"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

const (
	// ServiceGoods 商品服务
	ServiceGoods = "goods"
)

// serviceEndpointKeyPrefix 服务入口在 etcd 存储的 key 前缀
var serviceEndpointKeyPrefix = "/service_registry_discovery"

// serviceEndpoints 服务入口列表
var serviceEndpoints = map[string]map[string]string{
	ServiceGoods: {},
}

// 全局服务锁
var serviceLocker = sync.Mutex{}

// etcdCfg Etcd配置
var etcdCfg = clientv3.Config{
	Endpoints: []string{
		"http://etcd-1.etcd-headless.devops.svc.cluster.local:2379",
		"http://etcd-2.etcd-headless.devops.svc.cluster.local:2379",
		"http://etcd-3.etcd-headless.devops.svc.cluster.local:2379",
	},
	DialTimeout:          time.Second * 30,
	DialKeepAliveTimeout: time.Second * 30,
	Username:             "root",
	Password:             "90CjPHPRlxw=",
}

// servicePort 服务端口
var servicePort int = 80

func main() {
	// 监听服务入口
	go func() {
		ServiceDiscovery()
	}()
	http.HandleFunc(
		"/", func(w http.ResponseWriter, r *http.Request) {
			_, _ = fmt.Fprint(w, "gateway")
		},
	)
	http.HandleFunc("/goods/list", GetGoodsList)
	_ = http.ListenAndServe(fmt.Sprintf(":%d", servicePort), nil)
}

// GetGoodsList 获取商品列表
func GetGoodsList(w http.ResponseWriter, r *http.Request) {
	var traceID = time.Now().Unix()
	var res = map[string]interface{}{
		"trace_id": traceID,
		"code":     0,
	}
	client, endpoint, err := GetSvcEndpoin(ServiceGoods)
	fmt.Printf(
		"trace_id: %d, get goods endpoint result: client=%s, endpoint=%s, error=%v\n", traceID, client, endpoint, err,
	)
	if err != nil {
		res["code"] = -1
		res["message"] = err.Error()
		b, _ := json.Marshal(res)
		_, _ = fmt.Fprint(w, string(b))
		return
	}
	url := fmt.Sprintf("http://%s/goods/list", endpoint)
	m := map[string]interface{}{
		"trace_id": time.Now().Unix(),
	}
	body, _ := json.Marshal(m)
	resp, err := http.Post(url, "application/json", bytes.NewReader(body))
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	defer func(Body io.ReadCloser) {
		_ = Body.Close()
	}(resp.Body)
	b, _ := ioutil.ReadAll(resp.Body)
	var jsonResp map[string]interface{}
	_ = json.Unmarshal(b, &jsonResp)
	res["message"] = jsonResp["message"]
	b, _ = json.Marshal(res)
	fmt.Printf(
		"trace_id: %d, get goods list result: %s\n", traceID, string(b),
	)
	_, _ = fmt.Fprint(w, string(b))
}

// ServiceDiscovery 服务发现
func ServiceDiscovery() {
	cli, err := clientv3.New(etcdCfg)
	if err != nil {
		panic(err)
	}
	for k, _ := range serviceEndpoints {
		go func(svc string) {
			ctx := context.Background()
			serviceKey := fmt.Sprintf("%s/%s", serviceEndpointKeyPrefix, svc)
			// 获取当前所有服务入口
			getRes, _ := cli.Get(ctx, serviceKey, clientv3.WithPrefix())
			serviceLocker.Lock()
			for _, v := range getRes.Kvs {
				serviceEndpoints[svc][string(v.Key)] = string(v.Value)
			}
			serviceLocker.Unlock()
			fmt.Printf(
				"[service_endpoint_change] [%s] service %s get endpoints success, %v\n", svc, svc,
				serviceEndpoints[svc],
			)
			ch := cli.Watch(ctx, serviceKey, clientv3.WithPrefix(), clientv3.WithPrevKV())
			for v := range ch {
				for _, v := range v.Events {
					key := string(v.Kv.Key)
					endpoint := string(v.Kv.Value)
					preEndpoint := ""
					if v.PrevKv != nil {
						preEndpoint = string(v.PrevKv.Value)
					}
					switch v.Type {
					// PUT,新增或替换
					case 0:
						serviceLocker.Lock()
						serviceEndpoints[svc][key] = endpoint
						serviceLocker.Unlock()
						fmt.Printf(
							"[service_endpoint_change] service %s put endpoint, key: %s, endpoint: %s\n", svc,
							key, endpoint,
						)
					// DELETE
					case 1:
						serviceLocker.Lock()
						delete(serviceEndpoints[svc], key)
						serviceLocker.Unlock()
						fmt.Printf(
							"[service_endpoint_change] service %s delete endpoint, key: %s, endpoint: %s\n",
							svc, key, preEndpoint,
						)
					}
				}
			}
		}(k)
	}
}

// GetSvcEndpoin 获取服务入口
func GetSvcEndpoin(svc string) (key, endpoint string, err error) {
	endpoints := serviceEndpoints[svc]
	if len(endpoints) == 0 {
		return "", "", errors.New(fmt.Sprintf("%s服务不可用,请稍后再试", svc))
	}
	num := len(endpoints)
	keys := make([]string, num)
	for v := range endpoints {
		keys = append(keys, v)
	}
	randomKey := keys[rand.Intn(len(keys))]
	return randomKey, endpoints[randomKey], nil
}

3. 测试

这里验证使用的是现有的 k8s 集群,配置如下:

节点 角色 IP 配置 Label
master master, etcd 192.168.1.100 4核4G50G usefulness=schedule
node1 worker 192.168.1.101 8核32G100G usefulness=devops
node2 worker 192.168.1.102 8核12G100G usefulness=business
node3 worker 192.168.1.103 8核12G100G usefulness=business

其中 traefik 安装在 node1 节点上,因此我们需要先将域名解析到 node1 节点上,如下:

cat >> /etc/hosts <<EOF
192.168.1.101 etcd-demo.local.com
EOF

接着需要在 k8s 集群中部署 gateway 和 goods,计划部署在 business 节点上,并且 gateway 启动 10 个 Pod,goods 启动 3 个 Pod。

k8s 清单如下:

namespace.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: etcd-demo
  annotations:
    scheduler.alpha.kubernetes.io/node-selector: usefulness=business

gateway.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: gateway
  namespace: etcd-demo
  labels:
    app: gateway
spec:
  # 期望 pod 数量
  replicas: 10
  # 新创建的 pod 在运行指定秒数后才视为运行可用,配合就绪探针可以在滚动升级失败的时候阻止升级,避免部署出错的应用
  minReadySeconds: 4
  strategy:
    rollingUpdate:
      # 滚动升级过程中最多允许超出期望副本数的数量,比如期望3,maxSurge 配置为1,则最多存在4个pod,也可以配置百分比
      maxSurge: 1
      # 滚动升级过程中最多允许存在不可用的 pod 数量,配置为0表示升级过程中所有的 pod 都必须可用,即 pod 挨个替换,也可以配置百分比
      maxUnavailable: 0
  # 匹配器,匹配 pod 的方式
  selector:
    matchLabels:
      app: gateway
  template:
    metadata:
      name: gateway
      labels:
        app: gateway
    spec:
      imagePullSecrets:
        - name: harbor-jormin
      containers:
        - name: gateway
          image: harbor.wcxst.com/etcd-demo/gateway:latest
          # 就绪探针
          readinessProbe:
            # 执行周期,单位:秒
            periodSeconds: 1
            # 初始化延迟,单位:秒
            initialDelaySeconds: 3
            httpGet:
              path: /
              port: 80
---

kind: Service
apiVersion: v1
metadata:
  name: gateway
  namespace: etcd-demo
spec:
  ports:
    - protocol: TCP
      port: 80
      targetPort: 80
  selector:
    app: gateway

---

apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: gateway
  namespace: etcd-demo
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`etcd-demo.local.com`) && PathPrefix(`/`)
      kind: Rule
      services:
        - name: gateway
          port: 80

goods.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: goods
  namespace: etcd-demo
  labels:
    app: goods
spec:
  # 期望 pod 数量
  replicas: 3
  # 新创建的 pod 在运行指定秒数后才视为运行可用,配合就绪探针可以在滚动升级失败的时候阻止升级,避免部署出错的应用
  minReadySeconds: 2
  strategy:
    rollingUpdate:
      # 滚动升级过程中最多允许超出期望副本数的数量,比如期望3,maxSurge 配置为1,则最多存在4个pod,也可以配置百分比
      maxSurge: 1
      # 滚动升级过程中最多允许存在不可用的 pod 数量,配置为0表示升级过程中所有的 pod 都必须可用,即 pod 挨个替换,也可以配置百分比
      maxUnavailable: 0
  # 匹配器,匹配 pod 的方式
  selector:
    matchLabels:
      app: goods
  template:
    metadata:
      name: goods
      labels:
        app: goods
    spec:
      imagePullSecrets:
        - name: harbor-jormin
      containers:
        - name: goods
          image: harbor.wcxst.com/etcd-demo/goods:latest
          # 就绪探针
          readinessProbe:
            # 执行周期,单位:秒
            periodSeconds: 1
            # 初始化延迟,单位:秒
            initialDelaySeconds: 3
            httpGet:
              path: /
              port: 80

测试前需要先创建命名空间:

kubectl apply -f namespace.yaml

3.1 测试服务注册与服务发现

我们的测试流程如下:

  1. 启动 goods 服务,会部署 3 个 Pod,此时会向 etcd 中注册三个 goods 服务入口
  2. 启动 gateway 服务,从 etcd 获取 goods 服务入口,并打印日志
  3. goods 服务扩容为 4 个 Pod,gateway 服务监听得知后新增该服务,并打印日志
  4. goods 服务再次缩容为 3 个 Pod,gateway 服务监听得知后删除该服务的入口,并打印日志

开始测试:

  1. 启动 goods 服务

    kubectl apply -f goods.yaml
    

    启动结果如下:

    root@master:~# kubectl get pod -n etcd-demo -o wide | grep goods
    goods-7466dd7d4f-dwdrr     1/1     Running   0          7m21s   10.233.92.161   node3   <none>           <none>
    goods-7466dd7d4f-pzmc7     1/1     Running   0          7m21s   10.233.96.155   node2   <none>           <none>
    goods-7466dd7d4f-vh5qg     1/1     Running   0          7m21s   10.233.96.156   node2   <none>           <none>
    
  2. goods 服务全部启动成功后启动 gateway 服务

    kubectl apply -f gateway.yaml
    

    启动结果如下:

    root@master:~# kubectl get pod -n etcd-demo -o wide | grep gateway
    gateway-65b95d8857-2xvss   1/1     Running   0          5m43s   10.233.92.165   node3   <none>           <none>
    gateway-65b95d8857-64qrv   1/1     Running   0          5m43s   10.233.96.158   node2   <none>           <none>
    gateway-65b95d8857-67tws   1/1     Running   0          5m43s   10.233.92.164   node3   <none>           <none>
    gateway-65b95d8857-7g2rr   1/1     Running   0          5m43s   10.233.92.166   node3   <none>           <none>
    gateway-65b95d8857-9dj6n   1/1     Running   0          5m43s   10.233.96.161   node2   <none>           <none>
    gateway-65b95d8857-9sd9d   1/1     Running   0          5m43s   10.233.96.159   node2   <none>           <none>
    gateway-65b95d8857-gfqdj   1/1     Running   0          5m43s   10.233.96.157   node2   <none>           <none>
    gateway-65b95d8857-q7bg7   1/1     Running   0          5m43s   10.233.92.162   node3   <none>           <none>
    gateway-65b95d8857-tw6t9   1/1     Running   0          5m43s   10.233.96.160   node2   <none>           <none>
    gateway-65b95d8857-zdqkm   1/1     Running   0          5m43s   10.233.92.163   node3   <none>           <none>
    
  3. gateway 服务全部启动后 kibana 上查看启动日志,可以看到每个 gateway pod 都获取到了 3 个商品的 endpoint:

    10.233.92.161:80
    10.233.96.155:80
    10.233.96.156:80
    

    如下图:

    image-20211216112703916

  4. goods 服务扩容为 4,观察 gateway 日志是否都增加了对应的 endpoint

    goods 扩容命令:

    kubectl scale deployment goods -n etcd-demo --replicas 4
    

    扩容后查看 goods Pod,新增的 Pod IP 为 10.233.92.168

    root@master:~# kubectl get pod -n etcd-demo -o wide | grep goods
    goods-7466dd7d4f-dwdrr     1/1     Running   0          12m   10.233.92.161   node3   <none>           <none>
    goods-7466dd7d4f-pzmc7     1/1     Running   0          12m   10.233.96.155   node2   <none>           <none>
    goods-7466dd7d4f-tkb6n     0/1     Running   0          9s    10.233.92.168   node3   <none>           <none>
    goods-7466dd7d4f-vh5qg     1/1     Running   0          12m   10.233.96.156   node2   <none>           <none>
    

    此时查看 gateway 的监听日志,发现每个 gateway 服务都增加了一个 goods endpoint:10.233.92.168:80

    如下图:

    image-20211216113325356

  5. 缩减 goods Pod 为 3,观察 gateway 日志

    缩减命令:

    kubectl scale deployment goods -n etcd-demo --replicas 3
    

    缩减后查看 goods 的最新 Pod 信息,可以看到移除了 IP 为 10.233.92.168,如下:

    root@master:~# kubectl get pod -n etcd-demo -o wide | grep goods
    goods-7466dd7d4f-dwdrr     1/1     Running   0          16m     10.233.92.161   node3   <none>           <none>
    goods-7466dd7d4f-pzmc7     1/1     Running   0          16m     10.233.96.155   node2   <none>           <none>
    goods-7466dd7d4f-vh5qg     1/1     Running   0          16m     10.233.96.156   node2   <none>           <none>
    

    观察 gateway 日志,发现每个 Pod 都移除了 goods endpoint:10.233.92.168:80

    如下图:

    image-20211216113700977

3.2 测试业务接口

这部分测试的时候,我们访问 gateway 的 /goods/list 接口,返回的数据包含 trace_id 字段,之后根据 trace_id 查看对应的调用日志,测试如下:

root@master:~# curl http://etcd-demo.local.com/goods/list && echo ''
{"code":0,"message":"get goods list success","trace_id":1639626420}

查看 trace_id: 1639626420 的日志

gateway 日志:

# 从 goods 服务获取结果,goods endpoint 为 10.233.96.155:80
Dec 16, 2021 @ 11:47:00.532	gateway-5b679b6fb-hpqbc	trace_id: 1639626420, get goods endpoint result: client=/service_registry_discovery/goods/goods-7466dd7d4f-pzmc7, endpoint=10.233.96.155:80, error=<nil>

# 最终响应结果
Dec 16, 2021 @ 11:47:00.535	gateway-5b679b6fb-hpqbc	trace_id: 1639626420, get goods list result: {"code":0,"message":"get goods list success","trace_id":1639626420}

goods 日志:

Dec 16, 2021 @ 11:47:00.472	goods-7466dd7d4f-pzmc7	trace_id: 1.63962642e+09, goods list result: {"message":"get goods list success","trace_id":1639626420}

如下图:

image-20211216114917447

4. 总结

这篇文章简单介绍了服务注册与发现的流程、核心概念以及 etcd 实现对应功能的命令,最后结合商城实现了一个简单的实例并进行简单测试,结果符合预期。

Etcd Etcd Kubernetes Golang

k8s搭建MySQLGo使用etcd