Nacos初探

背景

前一段时间某个商户做营销活动,某个营销H5页面在短时间内高频率的访问,导致了生产环境的Nginx服务器的CPU直接被流量打满,在长达半小时的时间范围内,Nginx服务器的流量相对于平时流量的最高峰翻了5倍达到了150M的宽带速度,而CPU也同样直接100%满荷载。Nginx在此期间对外部请求处理不及时出现了大量的请求超时,因此引发了一系列的生产事故。

image

image

事后对故障分析发现CPU满载、流量打满的主要原因如下:

  1. 被频繁访问的H5页面的静态资源文件全部在Nginx上,商户做营销活动时H5页面的频繁访问导致Nginx服务器的宽带和CPU压力很大
  2. Nginx未对类似容易出现大量突发请求的场景做限流处理
  3. 部分后台接口的响应报文数据有大量的无用字段,浪费了大量的宽带资源和CPU资源,甚至出现了响应报文长达数十K字节长度,而实际上有用的字段仅不到百字节长度的极端情况
  4. 为了负载均衡,后端应用之间的调用都是通过Nginx做了一层代理,结果前端 - A应用 - B应用的接口调用链路实际上就变成了前端 - Nginx - A应用 - Nginx - B应用,后端应用之间的调用不仅延时增加,更加重了Nginx的压力

针对以上几种问题,各自作出了一下的优化方案:

  1. 前端H5页面资源的静态文件全部上传到CDN,Nginx服务器不再处理H5页面的静态文件资源访问
  2. 针对可能出现大量突发请求且允许失败的接口做限流处理,并返回指定HTTP协议错误码给前端,由前端展示兜底提醒
  3. 给臃肿的报文做瘦身,检查各个接口的响应报文结构,删除无用且长度过长的字段
  4. 后端应用之间的调用不再经过Nginx代理,通过服务注册中心来实现服务的动态发现

调研

引用Nacos社区中的一张热门注册中心功能对比图:

image

我们项目组为JAVA应用与Golang应用相互调用的情况,需要注册中心同时支持JAVA生态和Golang生态。经过调研后发现最符合我们项目组要求的注册中心有Nacos和Consul,两者无论是市场定位还是功能特效都非常将近,最终鉴于Nacos是阿里出品,相对Consul来说中文文档更齐全也更容易上手,选择了Nacos作为我们项目组的注册中心。

Nacos实现细节

Nacos的主要角色有注册中心、服务提供方、服务消费方。

注册中心

Nacos注册中心主要实现了服务注册发现以及配置中心的功能。

服务提供方

Nacos同时支持服务提供方节点向注册中心发送心跳包的续租模式(默认)和注册中心回调服务提供方的健康检查接口两种模式来确定注册节点的健康状况。服务提供方节点启动后,会将当前节点信息注册到注册中心中,并在注册中心保存该节点的元数据。

服务提供方向注册中心注册的节点分两种:临时节点(ephemeral=false)和持久化节点(ephemeral=true)。持久化节点不会自动下线,且不采用节点发送心跳包的方式来续租,而是由注册中心主动回调健康检查接口的方式来判断节点是否健康。临时节点默认配置下使用节点续租的模式,流程如下:

  • 服务提供方每5秒向注册中心发送心跳包
  • 服务提供方超过15秒未向注册中心发送心跳包后,注册中心则下线该节点
  • 服务提供方超过30秒未向注册中心发送心跳包后,注册中心则删除该节点

默认参数见Nacos注册中心源码com.alibaba.nacos.api.common.Constants

1
2
3
4
5
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);

public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);

public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);

当然,以上参数也可以由服务提供方自定义配置,通过在元数据中配置一下参数(单位毫秒)来覆盖默认的5秒/15秒/30秒的参数:

com.alibaba.nacos.api.naming.PreservedMetadataKeys

1
2
3
4
5
public static final String HEART_BEAT_TIMEOUT = "preserved.heart.beat.timeout";

public static final String IP_DELETE_TIMEOUT = "preserved.ip.delete.timeout";

public static final String HEART_BEAT_INTERVAL = "preserved.heart.beat.interval";
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class NacosNamingService implements NamingService {
...

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

// 持久化节点不会自动下线
// 只有临时节点需要发送心跳包
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
// 心跳包间隔优先读取元数据配置preserved.heart.beat.interval
// 未配置元数据时使用默认参数5秒
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

// 注册心跳包定时任务
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}

serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

...
}

服务消费方

服务消费方获取某个服务提供方的节点列表时,直接通过读取的本地的缓存,而该缓存的刷新则通过以下两种方式来实现本地缓存与注册中心最新的服务提供方节点列表信息同步:

  1. 服务消费方会在本地记录已经调用过的服务提供方,并通过定时任务每10秒请求注册中心更新本地缓存中服务提供方的最新节点数据
  2. 服务消费方每次通过步骤1向注册中心请求某个服务提供方的数据时,会再Nacos注册中心将自己注册成该服务提供方的消费者节点,并额外向注册中心注册一个UPD回调端口,当服务提供方的节点发生变动时,注册中心会通过UPD端口给该服务提供方的所有消费者推送最新的节点信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 默认每10秒请求注册中心更新最新的服务提供方节点信息
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;

public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}

@Override
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

// 第一次更新当前服务消费者订阅的服务提供方信息
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}

if (serviceObj.getLastRefTime() <= lastRefTime) {
// 10秒内没有其他线程执行过该更新任务
// 则更新当前服务消费者订阅的服务提供方信息
// 同时给当前服务消费者节点续租,并注册UDP端口
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// 请求注册中心,给当前服务消费者节点续租,并注册UDP端口
refreshOnly(serviceName, clusters);
}

lastRefTime = serviceObj.getLastRefTime();

if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}

// 默认参数下10秒后再一次更新当前服务消费者订阅的服务提供方信息
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);


} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 开启一个UDP端口,等待注册中心主动推送服务提供方节点变动信息
public class PushReceiver implements Runnable {
...

public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
// 开启一个随机UDP端口
udpSocket = new DatagramSocket();

executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});

executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}

@Override
public void run() {
// 监听udp端口,当注册中心中服务提供方变动/配置中心配置变动时
// Nacos注册中心通过udp端口推送最新节点数据
while (true) {
try {
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

udpSocket.receive(packet);

String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJSON(pushPacket.data);

// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}

udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
...
}

由于服务消费方每次都是通过本地缓存来获取可用的服务提供方节点信息,即使突然Nacos注册中心不可用,也并不会影响服务消费方对服务提供方的调用,仅仅是影响到服务消费方不能获取到服务提供方的节点变化而已。

另外,服务消费方缓存的服务提供方节点信息还可以持久化到本地文件,Nacos注册中心不可用时即使重启了服务消费方,服务消费方虽然可以从本地文件中获取最后一次更新的服务提供方节点信息,保证服务消费方对服务提供方的调用不会因为Nacos注册中心不可用而受到影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class HostReactor {
...

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {

executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});

this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
// 本地文件缓存开启后,服务消费方启动后先从本地文件读取服务提供方的节点信息
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}

this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
}
...
}

使用样例

JAVA(Springboot)

1
2
3
4
5
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-discovery-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 8080 # 当前节点web端口

spring:
application:
name: projectName # 当前节点应用名

nacos:
discovery:
server-addr: '127.0.0.1:8847,127.0.0.1:8848,127.0.0.1:8849'
auto-register: true
register:
ip: ${LAN_IP} # 当前节点IP从环境变量读取

Springboot引入nacos依赖后,默认只会自动创建服务自动注册的实现,并不会自动创建服务自动销毁的实现。因此建议创建服务自动销毁的实现,当应用节点正常退出时主动请求注册中心下线当前节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
@EnableConfigurationProperties(value = NacosDiscoveryProperties.class)
public class NacosConfig {

/**
* 启动项目时注册服务发现
*/
@Bean
public NacosDiscoveryAutoRegister discoveryAutoRegister() {
return new NacosDiscoveryAutoRegister();
}

/**
* 关闭项目时销毁服务发现
*/
@Bean
public NacosDiscoveryAutoDeregister discoveryAutoDeregister(@Value("${server.port}") Integer serverPort,
NacosDiscoveryProperties discoveryProperties) {
return new NacosDiscoveryAutoDeregister(discoveryProperties, new WebServer() {
@Override
public void start() throws WebServerException {
}
@Override
public void stop() throws WebServerException {
}
@Override
public int getPort() {
return serverPort;
}
});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@NacosInjected
private NamingService namingService;

public String selectServiceHost(String serviceName) {
String ip;
int port;
try {
// 按权重获取一个健康的服务节点
Instance instance = namingService.selectOneHealthyInstance(serviceName);
ip = instance.getIp();
port = instance.getPort();
} catch (Exception e){
log.error(e.getMessage(), e);
return null;
}
return String.format("http://%s:%s", ip, port);
}

Golang

1
2
3
[nacos]
server_addr = "127.0.0.1:8847,127.0.0.1:8848,127.0.0.1:8849"
auto_register = true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var client naming_client.INamingClient

func init() {
// 从配置文件中server_addr参数解析nacos的服务地址
serverAddr := ""
splits := strings.Split(serverAddr, ",")
var sc []constant.ServerConfig
for _, split := range splits {
ipAddr := strings.Split(split, ":")[0]
port, err := strconv.ParseUint(strings.Split(split, ":")[1], 10, 64)
if err != nil {
panic(err)
}
sc = append(sc, constant.ServerConfig{
IpAddr: ipAddr,
Port: port,
})
}

cc := constant.ClientConfig{
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
RotateTime: "1h",
MaxAge: 3,
}

// 创建Nacos注册中心连接配置
var err error
client, err = clients.CreateNamingClient(map[string]interface{}{
"serverConfigs": sc,
"clientConfig": cc,
})
if err != nil {
panic(err)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 启动项目时注册服务发现
func (this *NacosJob) Start() {
// 获取配置文件中auto_register参数
// 开发环境不需要服务注册
if !this.config.Nacos.AutoRegister {
return
}

registerInstance := vo.RegisterInstanceParam{
Ip: this.config.Nacos.LanIp, // 当前节点IP
Port: uint64(this.config.Web.Listen), // 当前节点web端口
ServiceName: this.serviceName, // 当前节点应用名
Weight: 1,
Enable: true,
Healthy: true,
Ephemeral: true,
}
// 向Nacos注册中心注册当前应用节点
// 如果是临时节点即Ephemeral=true时,自动创建协程定时发送心跳包给当前节点续租
_, err := client.RegisterInstance(registerInstance)
if err != nil {
logrus.Errorf("failed to register nacos instance: %s, instance: %v", err, registerInstance)
panic(err)
}
logrus.Infof("register into nacos: %s, ip: %s, port: %d", registerInstance.ServiceName, registerInstance.Ip, registerInstance.Port)
}

// 正常关闭项目时销毁服务发现
func (this *NacosJob) Close() error {
// 获取配置文件中auto_register参数
// 开发环境没有注册服务发现,不需要销毁服务发现
if !this.config.Nacos.AutoRegister {
return nil
}

deregisterInstance := vo.DeregisterInstanceParam{
Ip: this.config.Nacos.LanIp,
Port: uint64(this.config.Web.Listen),
ServiceName: this.serviceName,
Ephemeral: true, // it must be true
}
// 向Nacos注册中心销毁当前应用节点
_, err := client.DeregisterInstance(deregisterInstance)
if err != nil {
logrus.Errorf("failed to deregister nacos instance: %s", err)
return err
}
logrus.Infof("deregister nacos instance: %s, ip: %s, port: %d", deregisterInstance.ServiceName, deregisterInstance.Ip, deregisterInstance.Port)
return nil
}
1
2
3
4
5
6
7
8
9
10
const serviceName = "coupon"
// 按权重获取一个健康的服务节点
instance, err := client.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
ServiceName: serviceName,
})
if err != nil {
logrus.Errorf("failed to select healthy instance, serviceName: %s, err: %s", serviceName, err)
return err
}
logrus.Infof("select healthy instance: %s, ip: %s, port: %d", serviceName, instance.Ip, instance.Port)

备注

  1. Nacos严禁暴露公网,如必须开放公网则务必添加IP白名单限制
  2. Nacos版本务必在1.4.1+,1.4.1旧版本存在严重安全漏洞issues4701,并于最新1.4.1版本修复