1. 概述
在微服务架构中,每一个微服务都是独立部署的,各个微服务之间需要有一个方式获取自己需要服务的连接方式,这里就用到了服务注册与发现。简单点说,服务注册就是将自己的服务地址注册到服务中心,并做好健康检测,服务发现呢就是监听服务中心,动态获知自己依赖服务的变动(新增修改或删除)。
1.1 流程说明
下图中,服务A依赖服务B,传统做法中,是将服务B的访问地址配置到服务A中,但假如服务B有多个服务地址的话,就需要动态去维护,这样实际操作很不方便,这个时候转换思路,搭建一个服务仓库,服务B将自己的服务地址(endpoint)注册到服务仓库,服务A呢从服务仓库获取,并且服务B做好健康检查,服务A呢做好监听,如果有变动,可以立即做出修改:
- 服务新增:服务 B 新增两个服务 B2 和 B3,这两个服务会注册到服务中心,服务 A 会监听得到这两个服务的 endpoint,进而可以将部分流量发送到 B2 和 B3。
- 服务删除:服务 B1 因为某些原因宕机,此时 B1 不再保持健康,继而服务仓库中将移除它的 endpoint,服务A 监听后也会将本都 B1 的连接信息移除。
1.2 核心概念
这里有几个核心概念需要说明下:
- 服务注册:各个服务将自己的访问信息注册到服务中心
- 服务发现:从服务中心获取自己需要的服务的访问信息
- 健康检查:注册到服务中心的服务需要定时维护上报自己的健康信息,即 keep alive
1.3 etcd
基于上述的核心概念,我们列出 etcd 中对应实现的方法:
- 服务注册:生成租约(lease grant)、注册入口(put –lease),健康检查(keep alive)
- 服务发现:获取入口(get)、监听变动(watch)
2. 实例
这里我们以商城获取商品列表为例进行说明,假设我们有一个商城,包含一个网关服务和隐藏在后端的商品活动用户等服务,后端的服务都是动态扩缩服务数量的,因此我们需要将后端的服务入口注册到服务仓库,网关层从服务仓库中获取到所有的服务入口,简单架构如下:
基于上图我们做一个简单的实例,这个实例中,后端服务只写了商品服务,并且包含一个获取商品列表数据的接口,网关层对应的也是提供一个商品列表接口,其中服务仓库我们使用 etcd 来做,所有的服务前缀使用 /service_registry_discovery,key 的格式为 /service_registry_discovery/{服务名}/{主机名},val 格式为 ip:port。
完整的代码托管在 Github 。
2.1 服务注册
这里我们将商品服务的入口的注册到 etcd 中,流程如下:
- 生成一个3秒的租约
- 将服务的入口写入 etcd,写入的时候使用上一步生成的租约
- 保持租约存活
代码如下:
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
// ServiceGoods 商品服务
ServiceGoods = "goods"
)
// serviceEndpointKeyPrefix 服务入口在 etcd 存储的 key 前缀
var serviceEndpointKeyPrefix = "/service_registry_discovery"
// hostname 主机名
var hostname string
// endpoint 访问入口
var endpoint string
// servicePort 服务端口
var servicePort int = 80
// etcdCfg Etcd配置
var etcdCfg = clientv3.Config{
Endpoints: []string{
"http://etcd-1.etcd-headless.devops.svc.cluster.local:2379",
"http://etcd-2.etcd-headless.devops.svc.cluster.local:2379",
"http://etcd-3.etcd-headless.devops.svc.cluster.local:2379",
},
DialTimeout: time.Second * 30,
DialKeepAliveTimeout: time.Second * 30,
Username: "root",
Password: "90CjPHPRlxw=",
}
// init 初始化
func init() {
hostname, _ = os.Hostname()
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
return
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
endpoint = fmt.Sprintf("%s:%d", ipnet.IP.String(), servicePort)
break
}
}
}
}
func main() {
// 服务注册
go func() {
ServiceRegistry()
}()
http.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprint(w, "goods")
},
)
http.HandleFunc("/goods/list", GetGoodsList)
_ = http.ListenAndServe(fmt.Sprintf(":%d", servicePort), nil)
}
// GetGoodsList 获取商品列表
func GetGoodsList(w http.ResponseWriter, r *http.Request) {
// 生成订单信息
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("read body err, %v\n", err)
return
}
params := make(map[string]interface{})
_ = json.Unmarshal(body, ¶ms)
var traceID = params["trace_id"]
var res = map[string]interface{}{
"trace_id": traceID,
"message": "get goods list success",
}
b, _ := json.Marshal(res)
fmt.Printf(
"trace_id: %v, goods list result: %s\n", traceID, string(b),
)
_, _ = fmt.Fprint(w, string(b))
}
// ServiceRegistry 服务注册
func ServiceRegistry() {
hostname, _ = os.Hostname()
cli, err := clientv3.New(etcdCfg)
if err != nil {
panic(err)
}
key := fmt.Sprintf("%s/%s/%s", serviceEndpointKeyPrefix, ServiceGoods, hostname)
ctx := context.Background()
// 过期时间: 3秒钟
ttl := 3
// 创建租约
lease, err := cli.Grant(ctx, int64(ttl))
if err != nil {
panic(err)
}
b, _ := json.Marshal(lease)
fmt.Printf("grant lease suucess: %s\n", string(b))
// put kv
res, err := cli.Put(ctx, key, endpoint, clientv3.WithLease(lease.ID))
if err != nil {
panic(err)
}
b, _ = json.Marshal(res)
fmt.Printf("put kv with lease suucess: %s\n", string(b))
// 保持租约不过期
klRes, err := cli.KeepAlive(ctx, lease.ID)
if err != nil {
panic(err)
}
// 监听续约情况
for v := range klRes {
b, _ = json.Marshal(v)
fmt.Printf("keep lease alive suucess: %s\n", string(b))
}
fmt.Println("stop keeping lease alive")
}
2.2 服务发现
网关服务启动时会从 etcd 中获取所需服务的入口,并启动监听,如果有新增则写入对应服务的 endpoints 配置,如果有删除则删除 endpoints 中对应的记录,代码如下:
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
// ServiceGoods 商品服务
ServiceGoods = "goods"
)
// serviceEndpointKeyPrefix 服务入口在 etcd 存储的 key 前缀
var serviceEndpointKeyPrefix = "/service_registry_discovery"
// serviceEndpoints 服务入口列表
var serviceEndpoints = map[string]map[string]string{
ServiceGoods: {},
}
// 全局服务锁
var serviceLocker = sync.Mutex{}
// etcdCfg Etcd配置
var etcdCfg = clientv3.Config{
Endpoints: []string{
"http://etcd-1.etcd-headless.devops.svc.cluster.local:2379",
"http://etcd-2.etcd-headless.devops.svc.cluster.local:2379",
"http://etcd-3.etcd-headless.devops.svc.cluster.local:2379",
},
DialTimeout: time.Second * 30,
DialKeepAliveTimeout: time.Second * 30,
Username: "root",
Password: "90CjPHPRlxw=",
}
// servicePort 服务端口
var servicePort int = 80
func main() {
// 监听服务入口
go func() {
ServiceDiscovery()
}()
http.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprint(w, "gateway")
},
)
http.HandleFunc("/goods/list", GetGoodsList)
_ = http.ListenAndServe(fmt.Sprintf(":%d", servicePort), nil)
}
// GetGoodsList 获取商品列表
func GetGoodsList(w http.ResponseWriter, r *http.Request) {
var traceID = time.Now().Unix()
var res = map[string]interface{}{
"trace_id": traceID,
"code": 0,
}
client, endpoint, err := GetSvcEndpoin(ServiceGoods)
fmt.Printf(
"trace_id: %d, get goods endpoint result: client=%s, endpoint=%s, error=%v\n", traceID, client, endpoint, err,
)
if err != nil {
res["code"] = -1
res["message"] = err.Error()
b, _ := json.Marshal(res)
_, _ = fmt.Fprint(w, string(b))
return
}
url := fmt.Sprintf("http://%s/goods/list", endpoint)
m := map[string]interface{}{
"trace_id": time.Now().Unix(),
}
body, _ := json.Marshal(m)
resp, err := http.Post(url, "application/json", bytes.NewReader(body))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
b, _ := ioutil.ReadAll(resp.Body)
var jsonResp map[string]interface{}
_ = json.Unmarshal(b, &jsonResp)
res["message"] = jsonResp["message"]
b, _ = json.Marshal(res)
fmt.Printf(
"trace_id: %d, get goods list result: %s\n", traceID, string(b),
)
_, _ = fmt.Fprint(w, string(b))
}
// ServiceDiscovery 服务发现
func ServiceDiscovery() {
cli, err := clientv3.New(etcdCfg)
if err != nil {
panic(err)
}
for k, _ := range serviceEndpoints {
go func(svc string) {
ctx := context.Background()
serviceKey := fmt.Sprintf("%s/%s", serviceEndpointKeyPrefix, svc)
// 获取当前所有服务入口
getRes, _ := cli.Get(ctx, serviceKey, clientv3.WithPrefix())
serviceLocker.Lock()
for _, v := range getRes.Kvs {
serviceEndpoints[svc][string(v.Key)] = string(v.Value)
}
serviceLocker.Unlock()
fmt.Printf(
"[service_endpoint_change] [%s] service %s get endpoints success, %v\n", svc, svc,
serviceEndpoints[svc],
)
ch := cli.Watch(ctx, serviceKey, clientv3.WithPrefix(), clientv3.WithPrevKV())
for v := range ch {
for _, v := range v.Events {
key := string(v.Kv.Key)
endpoint := string(v.Kv.Value)
preEndpoint := ""
if v.PrevKv != nil {
preEndpoint = string(v.PrevKv.Value)
}
switch v.Type {
// PUT,新增或替换
case 0:
serviceLocker.Lock()
serviceEndpoints[svc][key] = endpoint
serviceLocker.Unlock()
fmt.Printf(
"[service_endpoint_change] service %s put endpoint, key: %s, endpoint: %s\n", svc,
key, endpoint,
)
// DELETE
case 1:
serviceLocker.Lock()
delete(serviceEndpoints[svc], key)
serviceLocker.Unlock()
fmt.Printf(
"[service_endpoint_change] service %s delete endpoint, key: %s, endpoint: %s\n",
svc, key, preEndpoint,
)
}
}
}
}(k)
}
}
// GetSvcEndpoin 获取服务入口
func GetSvcEndpoin(svc string) (key, endpoint string, err error) {
endpoints := serviceEndpoints[svc]
if len(endpoints) == 0 {
return "", "", errors.New(fmt.Sprintf("%s服务不可用,请稍后再试", svc))
}
num := len(endpoints)
keys := make([]string, num)
for v := range endpoints {
keys = append(keys, v)
}
randomKey := keys[rand.Intn(len(keys))]
return randomKey, endpoints[randomKey], nil
}
3. 测试
这里验证使用的是现有的 k8s 集群,配置如下:
节点 | 角色 | IP | 配置 | Label |
---|---|---|---|---|
master | master, etcd | 192.168.1.100 | 4核4G50G | usefulness=schedule |
node1 | worker | 192.168.1.101 | 8核32G100G | usefulness=devops |
node2 | worker | 192.168.1.102 | 8核12G100G | usefulness=business |
node3 | worker | 192.168.1.103 | 8核12G100G | usefulness=business |
其中 traefik 安装在 node1 节点上,因此我们需要先将域名解析到 node1 节点上,如下:
cat >> /etc/hosts <<EOF
192.168.1.101 etcd-demo.local.com
EOF
接着需要在 k8s 集群中部署 gateway 和 goods,计划部署在 business 节点上,并且 gateway 启动 10 个 Pod,goods 启动 3 个 Pod。
k8s 清单如下:
namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: etcd-demo
annotations:
scheduler.alpha.kubernetes.io/node-selector: usefulness=business
gateway.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: gateway
namespace: etcd-demo
labels:
app: gateway
spec:
# 期望 pod 数量
replicas: 10
# 新创建的 pod 在运行指定秒数后才视为运行可用,配合就绪探针可以在滚动升级失败的时候阻止升级,避免部署出错的应用
minReadySeconds: 4
strategy:
rollingUpdate:
# 滚动升级过程中最多允许超出期望副本数的数量,比如期望3,maxSurge 配置为1,则最多存在4个pod,也可以配置百分比
maxSurge: 1
# 滚动升级过程中最多允许存在不可用的 pod 数量,配置为0表示升级过程中所有的 pod 都必须可用,即 pod 挨个替换,也可以配置百分比
maxUnavailable: 0
# 匹配器,匹配 pod 的方式
selector:
matchLabels:
app: gateway
template:
metadata:
name: gateway
labels:
app: gateway
spec:
imagePullSecrets:
- name: harbor-jormin
containers:
- name: gateway
image: harbor.wcxst.com/etcd-demo/gateway:latest
# 就绪探针
readinessProbe:
# 执行周期,单位:秒
periodSeconds: 1
# 初始化延迟,单位:秒
initialDelaySeconds: 3
httpGet:
path: /
port: 80
---
kind: Service
apiVersion: v1
metadata:
name: gateway
namespace: etcd-demo
spec:
ports:
- protocol: TCP
port: 80
targetPort: 80
selector:
app: gateway
---
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
name: gateway
namespace: etcd-demo
spec:
entryPoints:
- web
routes:
- match: Host(`etcd-demo.local.com`) && PathPrefix(`/`)
kind: Rule
services:
- name: gateway
port: 80
goods.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: goods
namespace: etcd-demo
labels:
app: goods
spec:
# 期望 pod 数量
replicas: 3
# 新创建的 pod 在运行指定秒数后才视为运行可用,配合就绪探针可以在滚动升级失败的时候阻止升级,避免部署出错的应用
minReadySeconds: 2
strategy:
rollingUpdate:
# 滚动升级过程中最多允许超出期望副本数的数量,比如期望3,maxSurge 配置为1,则最多存在4个pod,也可以配置百分比
maxSurge: 1
# 滚动升级过程中最多允许存在不可用的 pod 数量,配置为0表示升级过程中所有的 pod 都必须可用,即 pod 挨个替换,也可以配置百分比
maxUnavailable: 0
# 匹配器,匹配 pod 的方式
selector:
matchLabels:
app: goods
template:
metadata:
name: goods
labels:
app: goods
spec:
imagePullSecrets:
- name: harbor-jormin
containers:
- name: goods
image: harbor.wcxst.com/etcd-demo/goods:latest
# 就绪探针
readinessProbe:
# 执行周期,单位:秒
periodSeconds: 1
# 初始化延迟,单位:秒
initialDelaySeconds: 3
httpGet:
path: /
port: 80
测试前需要先创建命名空间:
kubectl apply -f namespace.yaml
3.1 测试服务注册与服务发现
我们的测试流程如下:
- 启动 goods 服务,会部署 3 个 Pod,此时会向 etcd 中注册三个 goods 服务入口
- 启动 gateway 服务,从 etcd 获取 goods 服务入口,并打印日志
- goods 服务扩容为 4 个 Pod,gateway 服务监听得知后新增该服务,并打印日志
- goods 服务再次缩容为 3 个 Pod,gateway 服务监听得知后删除该服务的入口,并打印日志
开始测试:
-
启动 goods 服务
kubectl apply -f goods.yaml
启动结果如下:
root@master:~# kubectl get pod -n etcd-demo -o wide | grep goods goods-7466dd7d4f-dwdrr 1/1 Running 0 7m21s 10.233.92.161 node3 <none> <none> goods-7466dd7d4f-pzmc7 1/1 Running 0 7m21s 10.233.96.155 node2 <none> <none> goods-7466dd7d4f-vh5qg 1/1 Running 0 7m21s 10.233.96.156 node2 <none> <none>
-
goods 服务全部启动成功后启动 gateway 服务
kubectl apply -f gateway.yaml
启动结果如下:
root@master:~# kubectl get pod -n etcd-demo -o wide | grep gateway gateway-65b95d8857-2xvss 1/1 Running 0 5m43s 10.233.92.165 node3 <none> <none> gateway-65b95d8857-64qrv 1/1 Running 0 5m43s 10.233.96.158 node2 <none> <none> gateway-65b95d8857-67tws 1/1 Running 0 5m43s 10.233.92.164 node3 <none> <none> gateway-65b95d8857-7g2rr 1/1 Running 0 5m43s 10.233.92.166 node3 <none> <none> gateway-65b95d8857-9dj6n 1/1 Running 0 5m43s 10.233.96.161 node2 <none> <none> gateway-65b95d8857-9sd9d 1/1 Running 0 5m43s 10.233.96.159 node2 <none> <none> gateway-65b95d8857-gfqdj 1/1 Running 0 5m43s 10.233.96.157 node2 <none> <none> gateway-65b95d8857-q7bg7 1/1 Running 0 5m43s 10.233.92.162 node3 <none> <none> gateway-65b95d8857-tw6t9 1/1 Running 0 5m43s 10.233.96.160 node2 <none> <none> gateway-65b95d8857-zdqkm 1/1 Running 0 5m43s 10.233.92.163 node3 <none> <none>
-
gateway 服务全部启动后 kibana 上查看启动日志,可以看到每个 gateway pod 都获取到了 3 个商品的 endpoint:
10.233.92.161:80 10.233.96.155:80 10.233.96.156:80
如下图:
-
goods 服务扩容为 4,观察 gateway 日志是否都增加了对应的 endpoint
goods 扩容命令:
kubectl scale deployment goods -n etcd-demo --replicas 4
扩容后查看 goods Pod,新增的 Pod IP 为 10.233.92.168:
root@master:~# kubectl get pod -n etcd-demo -o wide | grep goods goods-7466dd7d4f-dwdrr 1/1 Running 0 12m 10.233.92.161 node3 <none> <none> goods-7466dd7d4f-pzmc7 1/1 Running 0 12m 10.233.96.155 node2 <none> <none> goods-7466dd7d4f-tkb6n 0/1 Running 0 9s 10.233.92.168 node3 <none> <none> goods-7466dd7d4f-vh5qg 1/1 Running 0 12m 10.233.96.156 node2 <none> <none>
此时查看 gateway 的监听日志,发现每个 gateway 服务都增加了一个 goods endpoint:10.233.92.168:80
如下图:
-
缩减 goods Pod 为 3,观察 gateway 日志
缩减命令:
kubectl scale deployment goods -n etcd-demo --replicas 3
缩减后查看 goods 的最新 Pod 信息,可以看到移除了 IP 为 10.233.92.168,如下:
root@master:~# kubectl get pod -n etcd-demo -o wide | grep goods goods-7466dd7d4f-dwdrr 1/1 Running 0 16m 10.233.92.161 node3 <none> <none> goods-7466dd7d4f-pzmc7 1/1 Running 0 16m 10.233.96.155 node2 <none> <none> goods-7466dd7d4f-vh5qg 1/1 Running 0 16m 10.233.96.156 node2 <none> <none>
观察 gateway 日志,发现每个 Pod 都移除了 goods endpoint:10.233.92.168:80
如下图:
3.2 测试业务接口
这部分测试的时候,我们访问 gateway 的 /goods/list 接口,返回的数据包含 trace_id 字段,之后根据 trace_id 查看对应的调用日志,测试如下:
root@master:~# curl http://etcd-demo.local.com/goods/list && echo ''
{"code":0,"message":"get goods list success","trace_id":1639626420}
查看 trace_id: 1639626420 的日志
gateway 日志:
# 从 goods 服务获取结果,goods endpoint 为 10.233.96.155:80
Dec 16, 2021 @ 11:47:00.532 gateway-5b679b6fb-hpqbc trace_id: 1639626420, get goods endpoint result: client=/service_registry_discovery/goods/goods-7466dd7d4f-pzmc7, endpoint=10.233.96.155:80, error=<nil>
# 最终响应结果
Dec 16, 2021 @ 11:47:00.535 gateway-5b679b6fb-hpqbc trace_id: 1639626420, get goods list result: {"code":0,"message":"get goods list success","trace_id":1639626420}
goods 日志:
Dec 16, 2021 @ 11:47:00.472 goods-7466dd7d4f-pzmc7 trace_id: 1.63962642e+09, goods list result: {"message":"get goods list success","trace_id":1639626420}
如下图:
4. 总结
这篇文章简单介绍了服务注册与发现的流程、核心概念以及 etcd 实现对应功能的命令,最后结合商城实现了一个简单的实例并进行简单测试,结果符合预期。