dubbo3

文章参考博客:风祈的时光录

前期准备

dubbo3 源码

官方文档 源码构建

1
2
下载源码 git clone https://github.com/apache/dubbo.git
编译打包 mvn clean source:jar install -Dmaven.test.skip

dubbo sample 源码

dubbo admin 安装

前提条件是需要先安装 zookeeper

1
2
3
docker pull zookeeper

docker run -d -e TZ="Asia/Shanghai" -p 2181:2181 --name zookeeper zookeeper
1
2
3
获取docker容器:docker pull docker.io/apache/dubbo-admin:0.4.0

运行:docker run -d --name dubbo-admin -p 8080:8080 -e admin.registry.address=zookeeper://ip:2181 -e admin.config-center=zookeeper://ip:2181 -e admin.metadata-report.address=zookeeper://ip:2181 docker.io/apache/dubbo-admin:0.4.0

整体设计

官方文档 框架设计

基本介绍

什么是RPC

分布式计算中,远程过程调用(英语:Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一个地址空间(通常为一个开放网络的一台计算机)的子程序,而程序员就像调用本地程序一样,无需额外地为这个交互作用编程(无需关注细节)。RPC是一种服务器-客户端(Client/Server)模式,经典实现是一个通过发送请求-接受回应进行信息交互的系统。

如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用远程方法调用,例:Java RMI

RPC是一种进程间通信的模式,程序分布在不同的地址空间里。如果在同一主机里,RPC可以通过不同的虚拟地址空间(即便使用相同的物理地址)进行通讯,而在不同的主机间,则通过不同的物理地址进行交互。许多技术(通常是不兼容)都是基于这种概念而实现的

–来至自维基百科


远程方法调用本地方法调用是相对的两个概念,本地方法调用指的是进程内部的方法调用,而远程方法调用指的是两个进程内的方法相互调用

如果实现远程方法调用,基本的就是通过网络,通过传输数据来进行调用

所以就有了:

  1. RPC over Http:基于Http协议来传输数据
  2. PRC over Tcp:基于Tcp协议来传输数据

对于所传输的数据,可以交由RPC的双方来协商定义,但基本都会包括:

  1. 调用的是哪个类或接口
  2. 调用的是哪个方法,方法名和方法参数类型(考虑方法重载)
  3. 调用方法的入参

所以,我们其实可以看到RPC的自定义性是很高的,各个公司内部都可以实现自己的一套RPC框架,而Dubbo就是阿里所开源出来的一套RPC框架,根据上面所描述的条件很容易联想到将类名,方法名称,参数,参数列表通过网络传输到服务提供方,通过反射等方式执行服务提供方代码逻辑后将返回值原路返回,这就是整个Dubbo实现RPC调用的基本原理

什么是Dubbo

官网地址:http://dubbo.apache.org/zh/

目前,官网上是这么介绍的:Apache Dubbo 是一款高性能、轻量级的开源 Java服务框架

在几个月前,官网的介绍是:Apache Dubbo 是一款高性能、轻量级的开源 JavaRPC框架

为什么会将RPC改为服务

Dubbo一开始的定位就是RPC,专注于两个服务之间的调用。但随着微服务的盛行,除开服务调用之外,Dubbo也在逐步的涉猎 服务治理服务监控服务网关 等等,所以现在的Dubbo目标已经不止是RPC框架了,而是和Spring Cloud类似想成为了一个服务框架

dubbo RPC主要用于两个dubbo系统之间作远程调用,特别适合高并发、小数据的互联网场景

Dubbo网关参考:https://github.com/apache/dubbo-proxy(社区不是很活跃)

基本原理

相关文档

值得注意的是,dubbo3服务注册粒度由接口级改为应用级

开源RPC框架对比

功能HessianMontanrpcxgRPCThriftDubboDubboxSpring Cloud
开发语言跨语言JavaGo跨语言跨语言JavaJavaJava
分布式(服务治理)×××
多序列化框架支持hessian√(支持Hessian2、Json,可扩展)× 只支持(protobuf)×(thrift格式)
多种注册中心×××
管理中心×××
跨编程语言×(支持php client和C server)××××
支持REST××××××
关注度
上手难度
运维成本
开源机构CauchoWeiboApacheGoogleApacheAlibabaDangdangApache

基本与高级应用

启动时检查

使用文档 启动时检查

消费端启动时检查依赖的服务是否可用,不可用时会抛出异常,阻止 Spring 初始化完成,默认开启

  • 服务级别配置

    1
    @DubboReference(check = true)
  • 全局配置

    1
    2
    3
    dubbo:  
    consumer:
    check: true
  • XML 配置参考官方文档

线程模型

使用文档 线程模型

配置 Dubbo 中的线程模型

如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。

但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。

如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。

  • XML 配置 (缺省配置)

    1
    <dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="200" />
  • yml 配置

    1
    2
    3
    4
    5
    dubbo:  
    protocol:
    dispatcher: all
    threadpool: fixed
    threads: 200

Dispatcher

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。(默认配置)
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

ThreadPool

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(默认配置)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

消费端线程模型

官方介绍

直接提供者

使用文档 直连提供者

Dubbo 中点对点的直连方式

在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直连方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表

主要用于开发阶段本地调试

  • XML

    1
    <dubbo:reference id="xxxService" interface="com.alibaba.xxx.XxxService" url="dubbo://localhost:20890" />
  • -D 参数

    1
    java -Dcom.alibaba.xxx.XxxService=dubbo://localhost:20890
  • 注解

    1
    @DubboReference(url = "dubbo://localhost:20880")

只订阅

使用文档 只订阅

只订阅不注册

为方便开发测试,经常会在线下共用一个所有服务可用的注册中心,这时,如果一个正在开发中的服务提供者注册,可能会影响消费者不能正常运行

可以让服务提供者开发方,只订阅服务(开发的服务可能依赖其它服务),而不注册正在开发的服务,通过直连测试正在开发的服务

主要用于开发阶段本地调试

  • XML

    1
    2
    3
    <dubbo:registry address="10.20.153.10:9090" register="false" />
    或者
    <dubbo:registry address="10.20.153.10:9090?register=false" />
  • yml

    1
    2
    3
    4
    dubbo:
    registry:
    address: zookeeper://my-server:2181
    register: false

多注册中心

在 Dubbo 中把同一个服务注册到多个注册中心上

Dubbo 支持同一服务向多注册中心同时注册,或者不同服务分别注册到不同的注册中心上去,甚至可以同时引用注册在不同注册中心上的同名服务。另外,注册中心是支持自定义扩展的

在没有配置 default=false 的注册中心都是默认的注册中心(默认值是 true),不管服务怎么指定注册中心都一定会注册到配置 default=false 的注册中心


多注册中心

比如:中文站有些服务来不及在青岛部署,只在杭州部署,而青岛的其它应用需要引用此服务,就可以将服务同时注册到两个注册中心

注意:3.0.9的 default=false 有bug, 默认还是会向 default=false 的注册中心注册

  • XML

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="world" />
    <!-- 多注册中心配置 -->
    <dubbo:registry id="hangzhouRegistry" address="10.20.141.150:9090" />
    <dubbo:registry id="qingdaoRegistry" address="10.20.141.151:9010" default="false" />
    <!-- 向多个注册中心注册 -->
    <dubbo:service interface="com.alibaba.hello.api.HelloService" version="1.0.0" ref="helloService" registry="hangzhouRegistry,qingdaoRegistry" />
    </beans>
  • 注解

    1
    2
    3
    4
    5
    6
    7
    8
    # 多注册中心配置
    dubbo:
    registries:
    hangzhouRegistry:
    address: zookeeper://10.20.141.150:9090:9090
    qingdaoRegistry:
    default: false #默认不向这个注册中心注册,除非指定
    address: zookeeper://10.20.141.151:9090
    1
    @DubboService(registry = {"hangzhouRegistry","qingdaoRegistry"})

不同服务使用不同注册中心

比如:CRM 有些服务是专门为国际站设计的,有些服务是专门为中文站设计的

  • XML

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="world" />
    <!-- 多注册中心配置 -->
    <dubbo:registry id="chinaRegistry" address="10.20.141.150:9090" />
    <dubbo:registry id="intlRegistry" address="10.20.154.177:9010" default="false" />
    <!-- 向中文站注册中心注册 -->
    <dubbo:service interface="com.alibaba.hello.api.HelloService" version="1.0.0" ref="helloService" registry="chinaRegistry" />
    <!-- 向国际站注册中心注册 -->
    <dubbo:service interface="com.alibaba.hello.api.DemoService" version="1.0.0" ref="demoService" registry="intlRegistry" />
    </beans>
  • 注解

    yml 需要先配置多注册中心

    1
    2
    3
    4
    5
    // 这个服务会注册到 intlRegistry 和 chinaRegistry
    @DubboService(registry = "chinaRegistry")
    public class HelloServiceImpl implements HelloService {
    ...
    }
    1
    2
    3
    4
    @DubboService(registry = "intlRegistry")
    public class DemoServiceImpl implements DemoService {
    ...
    }

多注册中心引用

比如:CRM 需同时调用中文站和国际站的 PC2 服务,PC2 在中文站和国际站均有部署,接口及版本号都一样,但连的数据库不一样

  • XML

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="world" />
    <!-- 多注册中心配置 -->
    <dubbo:registry id="chinaRegistry" address="10.20.141.150:9090" />
    <dubbo:registry id="intlRegistry" address="10.20.154.177:9010" default="false" />
    <!-- 引用中文站服务 -->
    <dubbo:reference id="chinaHelloService" interface="com.alibaba.hello.api.HelloService" version="1.0.0" registry="chinaRegistry" />
    <!-- 引用国际站站服务 -->
    <dubbo:reference id="intlHelloService" interface="com.alibaba.hello.api.HelloService" version="1.0.0" registry="intlRegistry" />
    </beans>
  • 注解

    1
    2
    @DubboReference(registry = "chinaHelloService")
    HelloService helloService;

    如果只是测试环境临时需要连接两个不同注册中心,使用竖号分隔多个不同注册中心地址:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <dubbo:application name="world" />
    <!-- 多注册中心配置,竖号分隔表示同时连接多个不同注册中心,同一注册中心的多个集群地址用逗号分隔 -->
    <dubbo:registry address="10.20.141.150:9090|10.20.154.177:9010" />
    <!-- 引用服务 -->
    <dubbo:reference id="helloService" interface="com.alibaba.hello.api.HelloService" version="1.0.0" />
    </beans>

服务分组

使用文档 服务分组

当一个接口有多种实现时,可以用 group 区分

  • XML

    服务

    1
    2
    <dubbo:service group="feedback" interface="com.xxx.IndexService" />
    <dubbo:service group="member" interface="com.xxx.IndexService" />

    引用

    1
    2
    <dubbo:reference id="feedbackIndexService" group="feedback" interface="com.xxx.IndexService" />
    <dubbo:reference id="memberIndexService" group="member" interface="com.xxx.IndexService" />

    任意组

    1
    <dubbo:reference id="barService" interface="com.foo.BarService" group="*" />

  • 注解

    实现一个服务,服务端使用服务分组实现加法和减法两种实现,客户端根据需要引用

    接口项目

    1
    2
    3
    public interface GroupService {
    int calculate(int a, int b);
    }

    服务端

    1
    2
    3
    4
    5
    6
    7
    @DubboService(group = "add")
    public class GroupAddServiceImpl implements GroupService {
    @Override
    public int calculate(int a, int b) {
    return a + b;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    @DubboService(group = "sub")
    public class GroupSubServiceImpl implements GroupService {
    @Override
    public int calculate(int a, int b) {
    return a - b;
    }
    }

    消费端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @SpringBootTest
    public class GroupTest {
    @DubboReference(group = "sub")
    GroupService groupService;

    /* @DubboReference(group = "add")
    GroupService groupService;*/

    @Test
    public void test() {
    // 根据引入不同的分组实现不同的计算逻辑
    System.out.println(groupService.calculate(10, 5));
    }
    }

静态服务

使用文档 静态服务

将 Dubbo 服务标识为非动态管理模式

有时候希望人工管理服务提供者的上线和下线,此时需将注册中心标识为非动态管理模式,模块的上下线


  • XML

    1
    <dubbo:registry address="10.20.141.150:9090" dynamic="false" />

    或者

    1
    <dubbo:registry address="10.20.141.150:9090?dynamic=false" />

  • 注解

    1
    @DubboService(dynamic = false)

多版本

使用文档 多版本

在 Dubbo 中为同一个服务配置多个版本

当一个接口实现,出现不兼容升级时,可以用版本号过渡,版本号不同的服务相互间不引用

可以按照以下的步骤进行版本迁移:

  1. 在低压力时间段,先升级一半提供者为新版本
  2. 再将所有消费者升级为新版本
  3. 然后将剩下的一半提供者升级为新版本

老版本服务提供者配置

1
2
3
4
5
6
7
@DubboService(version = "1.0.0")
public class VersionServerImpl implements VersionServer {
@Override
public String version() {
return "1.0.0";
}
}

新版本服务提供者配置

1
2
3
4
5
6
7
@DubboService(version = "2.0.0")
public class VersionServerImpl implements VersionServer {
@Override
public String version() {
return "2.0.0";
}
}

老版本服务消费者配置

1
2
@DubboReference(version = "1.0.0")
VersionServer versionServer;

新版本服务消费者配置

1
2
@DubboReference(version = "2.0.0")
VersionServer versionServer;

如果不需要区分版本,可以按照以下的方式配置

1
2
@DubboReference(version = "*")
VersionServer versionServer;

分组聚合

通过分组对结果进行聚合并返回聚合后的结果

分组合并可以指定 服务级别 方法级别 指定方法合并 指定方法不合并需要合并的方法返回值必须是个 集合

通过分组对结果进行聚合并返回聚合后的结果,比如菜单服务,用group区分同一接口的多种实现,现在消费方需从每种group中调用一次并返回结果,对结果进行合并之后返回,这样就可以实现聚合菜单项


接口项目

1
2
3
4
5
6
7
public interface GroupMergerService {
// A模块菜单
List<String> menuA();

// B模块菜单
String menuB();
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@DubboService(group = "user")
public class GroupMergerServiceImpl implements GroupMergerService {
@Override
public List<String> menuA() {
List<String> menus = new ArrayList<>();
menus.add("user.1");
menus.add("user.2");
return menus;
}

@Override
public String menuB() {
return "user";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@DubboService(group = "order")
public class GroupMergerServiceImpl2 implements GroupMergerService {
@Override
public List<String> menuA() {
List<String> menus = new ArrayList<>();
menus.add("order.1");
menus.add("order.2");
return menus;
}

@Override
public String menuB() {
return "order";
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@SpringBootTest
public class GroupMergerTest {
@DubboReference(group = "*", merger = "true")
GroupMergerService groupMergerService;

@Test
public void test() {
// user.1
// user.2
// order.1
// order.2
List<String> menuList = groupMergerService.menuA();
menuList.forEach(line -> log.info(line));

// 报错 There is no merger to merge result (返回值需要集合)
String s = groupMergerService.menuB();
System.out.println(s);
}
}

参数验证

在 Dubbo 中进行参数验证

参数验证功能是基于 JSR303 实现的,用户只需标识 JSR303 标准的验证 annotation,并通过声明 filter 来实现验证, 验证逻辑建议放在消费端,这能能够在验证异常后不进行RPC调用


接口项目

添加 maven 依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

通用返回实体

1
2
3
4
5
6
7
@Data
@Accessors(chain = true)
public class R<T> implements Serializable {
private Integer code;
private T data;
private String message;
}

实体对象

1
2
3
4
5
6
7
8
9
10
@Data
@Accessors(chain = true)
public class User implements Serializable {
@Min(value = 0, message = "最小值要大于0")
@Max(value = 1000, message = "最大值不能超过1000")
private Integer id;

@NotNull(message = "name 不能为空")
private String name;
}

服务接口

1
2
3
4
5
public interface ValidationService {
R<Void> addUser(User user);

R<Void> select(@NotNull(message = "id 不能为空") Integer id);
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
@DubboService
public class ValidationServiceImpl implements ValidationService {
@Override
public R<Void> addUser(User user) {
return new R().setCode(200);
}

@Override
public R<Void> select(Integer id) {
return new R().setCode(200);
}
}

消费端

验证扩展 参数校验器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @author: ken 😃
* @date: 2022-07-22
* @description: 参数校验器
*
* 首先编写自定义的校验器。实现Validation接口。再编写自己的校验者
**/
public class ParamValidation extends JValidation {
@Override
public Validator getValidator(URL url) {
// 自定义校验者
return new ParamValidator(url);
}
}

参数校验者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author: ken 😃
* @date: 2022-07-22
* @description: 参数校验者 拷贝 org.apache.dubbo.validation.support.jvalidation.JValidator
**/
public class ParamValidator implements Validator {

public void validate(String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Exception {
...
} catch (Exception e) {
// 修改点
// 如果是 ConstraintViolationException 直接返回,3.0.9会重新封装成 ValidationException
if (e instanceof ConstraintViolationException) {
throw e;
}
throw new ValidationException(e.getMessage());
}
}
}

扩展验证拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @author: ken 😃
* @date: 2022-07-22
* @description: 参考 org.apache.dubbo.validation.filter.ValidationFilter
**/
@Activate(group = {CONSUMER, PROVIDER}, value = VALIDATION_KEY, order = 10000)
public class DubboValidationFilter implements Filter {
private Validation validation;

public void setValidation(Validation validation) {
this.validation = validation;
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (validation != null && !invocation.getMethodName().startsWith("$")
&& ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), VALIDATION_KEY))) {
try {
Validator validator = validation.getValidator(invoker.getUrl());
if (validator != null) {
validator.validate(invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
}
} catch (ConstraintViolationException e) {
// 处理参数校验异常 返回通用 VO
ConstraintViolation<?> constraintViolation = e.getConstraintViolations().stream().findFirst().get();
R result = new R()
.setCode(1000)
.setMessage(constraintViolation.getMessage());
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
} catch (RpcException e) {
throw e;
} catch (Throwable t) {
return AsyncRpcResult.newDefaultAsyncResult(t, invocation);
}
}
return invoker.invoke(invocation);
}
}

添加SPI配置

  • Filter

    1
    2
    3
    4
    |-resources
    |-META-INF
    |-dubbo
    |-org.apache.dubbo.rpc.Filter

    org.apache.dubbo.rpc.Filter

    1
    dubboValidation=com.wgf.springboot.filter.DubboValidationFilter
  • Validation

    1
    2
    3
    4
    |-resources
    |-META-INF
    |-dubbo
    |-org.apache.dubbo.validation.Validation

    org.apache.dubbo.validation.Validation

    1
    paramValidation=com.wgf.springboot.validation.ParamValidation

修改 yml 配置

1
2
3
4
dubbo:
consumer:
validation: 'paramValidation' # 启用自定义验证
filter: 'dubboValidation, -validation' # 启用自定义参数校验拦截器,禁用默认的参数校验拦截器

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootTest
public class ValidationTest {
@DubboReference
ValidationService validationService;

@Test
public void addTest() {
User user = new User()
.setId(10);
R<Void> r = validationService.addUser(user);
Assertions.assertEquals(r.getCode(), 200);
}

@Test
public void selectTest() {
R<Void> r = validationService.select(null);
Assertions.assertEquals(r.getCode(), 200);
}
}

3.0.9 版本如果需要实现验证异常后返回自定义的VO, 需要扩展 JValidationJValidator, 因为 ConstraintViolationException 异常被 JValidator 捕获重新抛出 ValidationException , 并且需要重写 (取消默认实现用 -)ValidationFilter, 验证异常后返回自定义的VO

负载均衡

相关文档

LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得


Random LoadBalance

  • 随机,按权重设置随机概率。
  • 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

RoundRobin LoadBalance

  • 轮询,按公约后的权重设置轮询比率。
  • 存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive LoadBalance

  • 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
  • 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大

最少活跃数应该是在服务提供者端进行统计的,服务提供者统计有多少个请求正在执行中。但在Dubbo中,就是不讲道理,它是在消费端进行统计的,为什么能在消费端进行统计?

  1. 每个消费者都会从注册中心(常用的是Zookeeper)缓存所调用服务的所有提供者信息到本地,比如记为p1、p2、p3三个服务提供者,每个提供者内都有一个属性记为active,默认位0
  2. 消费者在调用服务时,如果负载均衡策略是leastactive
  3. 消费者端会判断缓存的所有服务提供者的active,选择最小的,如果都相同,则随机
  4. 选出某一个服务提供者后,假设是p2,Dubbo就会对p2.active+1
  5. 然后真正发出请求调用该服务
  6. 消费端收到响应结果后,对p2.active-1
  7. 这样就完成了对某个服务提供者当前活跃调用数进行了统计,并且并不影响服务调用的性能
  8. 如果由服务提供者来统计调用数反而不好统计,因为服务提供者有多个,你无法确定是哪个服务提供者统计调用数,除非你放到zookeeper这种分布式共享的数据中心,但是这样的话,每个消费者都要请求zookeeper找到需要调用的那一台服务提供者机器然后加1,在调用结束后,还要在zookeeper上进行减1操作,zookeeper明显扛不住。
  9. 由服务消费者来统计调用数的话,虽然每个消费者都有自己的一套调用数数据,调用数数据可能不一样,但是经过长时间的调用后,每个消费者自己本地存的调用数据还是能够有差不多的趋势(这里的趋势不是指数据相等)。比如说p2响应很慢,堆积了很多请求,那么每个消费者在请求多次后,短时间内都不会再请求p2

总结:如果由服务提供者来统计调用次数,那么就需要共享每个服务接口的调用次数,不管是使用那种共享中间件都会有额外的开销。最节省性能的算法是在服务消费者端的jvm内存维护一份本地的服务调用次数表,再选择空闲服务调用,这样可以避免数据共享的复杂性


ConsistentHash LoadBalance

  • 一致性 Hash,相同参数的请求总是发到同一提供者。
  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
  • 算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
  • 缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key="hash.arguments" value="0,1" />
  • 缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key="hash.nodes" value="320" />

配置

服务端

  • XML

    • 服务级别

      1
      <dubbo:service interface="..." loadbalance="roundrobin" />
    • 方法级别

      1
      2
      3
      <dubbo:service interface="...">
      <dubbo:method name="hello" loadbalance="roundrobin"/>
      </dubbo:service>
  • 注解

    • 服务级别

      1
      @DubboService(loadbalance = "roundrobin")
    • 方法级别

      1
      2
      3
      @DubboService(methods = {
      @Method(name = "...", loadbalance = "roundrobin")
      })

客户端

  • XML

    • 服务级别

      1
      <dubbo:reference interface="..." loadbalance="roundrobin" />
    • 方法级别

      1
      2
      3
      <dubbo:reference interface="...">
      <dubbo:method name="..." loadbalance="roundrobin"/>
      </dubbo:reference>
  • 注解

    • 服务级别

      1
      @DubboReference(loadbalance = "roundrobin")
    • 方法级别

      1
      2
      3
      @DubboReference(methods = {
      @Method(name = "...", loadbalance = "roundrobin")
      })
  • yml

    1
    2
    3
    dubbo:
    provider:
    loadbalance: roundrobin

官方已提供扩展

  • org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
  • org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
  • org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
  • org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

可取值: random roundrobin leastactive consistenthash

在集群负载均衡时,Dubbo 提供了多种均衡策略,缺省为 random 随机调用


如果在消费端服务端都配置了负载均衡策略,以消费端为准

如果在服务级别方法级别都配置了负载均衡策略,以方法级别为准,遵循配置就近原则

服务超时

默认超时时间:1000ms

超时会触发重试机制:默认重试两次,不包含第一次

在服务提供者和服务消费者上都可以配置服务超时时间,这两者是不一样的

服务端

服务级别 xml<dubbo:service interface=“…” timeout=“4000” />
方法级别 xml<dubbo:service interface=“…”>
<dubbo:method name=“…” loadbalance=“roundrobin”/>
</dubbo:reference>
服务级别 注解@DubboService(timeout = 3000)
方法级别 注解@DubboService(methods = {
@Method(name = “methodTimeout”, timeout = 5000)
})

消费端

服务级别 xml<dubbo:reference interface=“…” timeout=“4000” />
方法级别 xml<dubbo:reference interface=“…”>
<dubbo:method name=“…” loadbalance=“roundrobin”/>
</dubbo:reference>
服务级别 注解@DubboReference(timeout = 3000)
方法级别 注解@DubboReference(methods = {
@Method(name = “xxx”, timeout = 5000)
})
全局配置 xml<dubbo:provider timeout=“5000”/>

yml

1
2
3
4
5
6
7
8
9
10
# 按服务配置
dubbo:
application:
xxx:
timeout: 3000

# 全局配置
dubbo:
provider:
timeout: 3000

消费者调用一个服务,分为三步:

sequenceDiagram
	消费端 ->> 服务端 : 1. 消费者发送请求(网络传输)
	服务端 ->> 服务端 : 2. 执行逻辑
	服务端 ->> 消费端 : 3. 服务端返回响应(网络请求)

如果在服务端和消费端只在其中一方配置了 timeout,那么没有歧义,表示消费端调用服务的超时时间消费端如果超过时间还没有收到响应结果,则消费端会抛超时异常。但服务端不会抛异常,服务端在执行服务后,会检查执行该服务的时间,如果超过timeout,则会打印一个超时日志。服务会正常的执行完

如果在服务端和消费端各配了一个timeout,那就比较复杂了,假设

  1. 服务执行为2s
  2. 消费端timeout=1s
  3. 服务端timeout=3s
1
2
3
4
5
6
7
8
9
// 服务端
@DubboService(timeout = 3000)
public class TimeOutServiceImpl implements TimeOutService {
...
}

// 消费端
@DubboReference(timeout = 1000)
TimeOutService timeOutService;
  • 那么消费端调用服务时,消费端会收到超时异常(因为消费端超时了),服务端一切正常(服务端没有超时),实际服务端正常执行业务流程
  • 无论何种情况,服务端的timeout配置的作用是:如果服务执行时间超过这个timeout,仅仅只是打印一个超时日志

provider超时打断

Dubbo provider执行超时释放执行线程

支持provider根据超时时间进行业务打断

适用场景:对于一个provider,如果某个操作执行超时,则打断(释放)该执行线程,而不是仅仅打印超时日志

提示

支持版本:2.7.12 之后版本,3.0.x 暂不支持

核心实现类 org.apache.dubbo.remoting.transport.dispatcher.all2.AllChannelHandler2

接口项目

1
2
3
public interface TimeoutReleaseService {
void hello();
}

服务端

yml配置

1
2
3
dubbo:
protocol:
dispatcher: all2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@DubboService(timeout = 1000, retries = 0)
public class TimeoutReleaseServiceImpl implements TimeoutReleaseService {

@Override
public void hello() {

try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(200);
log.info(i + "");
}
}catch (Exception e) {
log.error("线程被打断");
}
}
}

消费端

1
2
3
4
5
6
7
8
9
10
@SpringBootTest
public class TimeoutReleaseTest {
@DubboReference
TimeoutReleaseService timeoutReleaseService;

@Test
public void test() {
timeoutReleaseService.hello();
}
}

运行结果,服务端超时

1
2
3
4
5
6
2022-07-23 16:21:56.977  INFO 3344 --- [:20880-thread-2] c.w.s.p.TimeoutReleaseServiceImpl        : 0
2022-07-23 16:21:57.180 INFO 3344 --- [:20880-thread-2] c.w.s.p.TimeoutReleaseServiceImpl : 1
2022-07-23 16:21:57.383 INFO 3344 --- [:20880-thread-2] c.w.s.p.TimeoutReleaseServiceImpl : 2
2022-07-23 16:21:57.586 INFO 3344 --- [:20880-thread-2] c.w.s.p.TimeoutReleaseServiceImpl : 3
2022-07-23 16:21:57.790 INFO 3344 --- [:20880-thread-2] c.w.s.p.TimeoutReleaseServiceImpl : 4
2022-07-23 16:21:57.868 ERROR 3344 --- [:20880-thread-2] c.w.s.p.TimeoutReleaseServiceImpl : 线程被打断

服务重试

使用文档 重试次数配置

超时和重试一般情况下是成对配置的

Dubbo 服务在尝试调用一次之后,如出现非业务异常(服务突然不可用、超时等),Dubbo 默认会进行额外的最多2次重试

服务端

目前 3.0.9 版本使用注解 @DubboService(timeout = 3000, retries = 1) 重试次数不生效,只有客户端配置才生效,原因是服务端注册时注册URL没有加上注解的重试配置

服务级别 xml<dubbo:service interface=“…” timeout=“4000” retries=“2”/>
方法级别 xml<dubbo:service interface=“…”>
<dubbo:method name="…"timeout=“4000” retries=“2”/>
</dubbo:service>
服务级别 注解@DubboService(timeout = 3000, retries = 2) 3.0.9 不生效
方法级别 注解@DubboService(methods = {
@Method(name = “globalTimeout”, timeout = 3000, retries = 0)
}) 3.0.9 不生效

客户端

服务级别 xml<dubbo:reference interface=“…” timeout=“4000” retries=“2”/>
方法级别 xml<dubbo:reference interface=“…”>
<dubbo:method name="…"timeout=“4000” retries=“2”/>
</dubbo:reference>
服务级别 注解@DubboReference(timeout = 2000, retries = 2)
方法级别 注解@DubboReference(methods = {
@Method(name = “…”, timeout = 2000, retries = 2)}
)

yml

1
2
3
4
# 客户端全局配置
dubbo:
consumer:
retries: 0

集群容错

使用文档 集群容错

扩展文档 集群扩展

目前 3.0.9 版本在服务端配置不生效,消费端正常

集群容错表示服务消费者在调用某个服务时,这个服务有多个服务提供者,在经过负载均衡后选出其中一个服务提供者之后进行调用,但调用报错后,Dubbo所采取的后续处理策略。

集群容错模式

  • Failover Cluster (默认)

  • 取值:failover

    失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)


  • Failfast Cluster

  • 取值:failfast

    快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录


  • Failsafe Cluster

  • 取值:failsafe

    失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作


  • Failback Cluster

  • 取值:failback

    失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作


  • Forking Cluster

  • 取值:forking

    并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数


  • Broadcast Cluster

  • 取值:broadcast

    广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息

    现在广播调用中,可以通过 broadcast.fail.percent 配置节点调用失败的比例,当达到这个比例后,BroadcastClusterInvoker 将不再调用其他节点,直接抛出异常。 broadcast.fail.percent 取值在 0~100 范围内。默认情况下当全部调用失败后,才会抛出异常。 broadcast.fail.percent 只是控制的当失败后是否继续调用其他节点,并不改变结果(任意一台报错则报错)。broadcast.fail.percent 参数 在 dubbo2.7.10 及以上版本生效

    Broadcast Cluster 配置 broadcast.fail.percent

    broadcast.fail.percent=20 代表了当 20% 的节点调用失败就抛出异常,不再调用其他节点

    1
    @DubboReference(cluster = "broadcast", parameters = {"broadcast.fail.percent", "20"})

yml

1
2
3
4
# 客户端全局配置
dubbo:
consumer:
cluster: failfast

客户端的服务级别配置

1
@DubboReference(cluster = "forking")

服务降级

使用文档 服务降级

更复杂的服务降级参考 本地伪装

  • 服务降级表示:服务消费者在调用某个服务提供者时,如果该服务提供者报错了,所采取的备选措施

  • 集群容错和服务降级的区别在于:

    • 集群容错是整个集群范围内的容错
    • 服务降级是单个服务提供者的自身容错

用法

  • mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响

  • mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响


1
@DubboReference(mock = "force:return 123")

不发起远程调用,直接返回 mork 的值


1
@DubboReference(timeout = 1000, mock = "fail: return 123")

尝试远程调用,如果超时则返回 mork 的值

注意

  • 如果服务调用超时会返回 mork 的值,但是服务端默认会被调用三次,因为默认的 集群容错failover
  • 如果服务端抛出异常会将异常传递到消费端,而不是返回 mork 的值

指定IP

使用文档 指定IP

提示

支持版本:2.7.12 之后, 3.0.x 暂不支持

对于Provider集群中注册的多个实例,指定Ip:Port进行调用

当多个Provider注册到注册中心时,可以通过在RpcContext中动态的指定其中一个实例的Ip,Port进行Dubbo调用


接口项目

1
2
3
public interface SpecifiedIpService {
String hello();
}

服务 1

  • ip 192.168.1.106
  • port 20880
1
2
3
4
5
6
7
@DubboService
public class SpecifiedIpServiceImpl implements SpecifiedIpService {
@Override
public String hello() {
return "i am provider";
}
}

服务 2

  • ip 192.168.1.106
  • port 20881
1
2
3
4
5
6
7
@DubboService
public class SpecifiedIpServiceImpl implements SpecifiedIpService {
@Override
public String hello() {
return "i am provider2";
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@SpringBootTest
public class SpecifiedIpTest {
@DubboReference(parameters = {"router","address"})
SpecifiedIpService specifiedIpService;

@Test
public void test() {
// 根据provider的ip,port创建Address实例
Address address = new Address("192.168.1.106", 20880);
RpcContext.getContext().setObjectAttachment("address", address);
log.info(specifiedIpService.hello());

address = new Address("192.168.1.106", 20881);
RpcContext.getContext().setObjectAttachment("address", address);
log.info(specifiedIpService.hello());
}
}

2022-07-23 16:46:14.352 INFO 3988 --- [ main] com.wgf.springboot.SpecifiedIpTest : i am provider
2022-07-23 16:46:14.431 INFO 3988 --- [ main] com.wgf.springboot.SpecifiedIpTest : i am provider2

收集广播响应

Dubbo broadcast2 广播模式收集所有服务提供者的接口响应

适用场景:对于一个dubbo消费者,广播调用多个dubbo 提供者,该消费者可以收集所有服务提供者的响应结果

提示

支持版本:2.7.12 后,3.0.x 无效


接口项目

1
2
3
public interface BroadcastService {
String hello();
}

服务端 1

1
2
3
4
5
6
7
@DubboService
public class BroadcastServiceImpl implements BroadcastService {
@Override
public String hello() {
return "hello";
}
}

服务端 2

1
2
3
4
5
6
7
8
@DubboService
public class BroadcastServiceImpl implements BroadcastService {
@Override
public String hello() {
int a = 10 / 0;
return "hello";
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@SpringBootTest
public class BroadcastTest {
@DubboReference(cluster = "broadcast2")
BroadcastService broadcastService;

@Test
public void test() {
try{
broadcastService.hello();
}catch (Exception e){
Map<String, String> m = RpcContext.getServerContext().getAttachments();
log.info(m.toString()+"|"+"fail");
}
Map<String, String> m = RpcContext.getServerContext().getAttachments();
log.info(m.toString()+"|"+"success");
}
}

2022-07-23 23:19:38.571 INFO 712 --- [ main] com.wgf.springboot.BroadcastTest : {broadcast.results=[{"ip":"192.168.1.106","port":20880,"exceptionMsg":"/ by zero"},{"ip":"192.168.1.106","port":20881,"exceptionMsg":"/ by zero"}]}|success

结果缓存

Dubbo broadcast2 广播模式收集所有服务提供者的接口响应

结果缓存,用于加速热门数据的访问速度,Dubbo 提供声明式缓存,以减少用户加缓存的工作量

提示

2.1.0 以上版本支持

支持 服务级别 方法级别


缓存类型

  • lru 基于最近最少使用原则删除多余缓存,保持最热的数据被缓存
  • threadlocal 当前线程缓存,比如一个页面渲染,用到很多 portal,每个 portal 都要去查用户信息,通过线程缓存,可以减少这种多余访问
  • jcache 与 JSR107 集成,可以桥接各种缓存实现

接口项目

1
2
3
public interface ResultCacheService {
int increment();
}

服务端

1
2
3
4
5
6
7
8
9
@DubboService
public class ResultCacheServiceImpl implements ResultCacheService {
private AtomicInteger seq = new AtomicInteger(0);

@Override
public int increment() {
return seq.incrementAndGet();
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@SpringBootTest
public class ResultCacheTest {
@DubboReference(cache = "lru")
ResultCacheService resultCacheService;

@Test
public void test() {
IntStream.range(0, 10).forEach(line -> log.info(resultCacheService.increment() + ""));
}
}

INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 1
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2
INFO 11740 --- [ main] com.wgf.springboot.ResultCacheTest : 2

事件通知

使用文档 事件通知

在调用之前、调用之后、出现异常时的事件通知

在调用之前、调用之后、出现异常时,会触发 oninvokeonreturnonthrow 三个事件,可以配置当事件发生时,通知哪个类的哪个方法

提示

支持版本:2.0.7 之后


接口项目

1
2
3
public interface EventsNotifyService {
String hello(String name);
}

服务端

1
2
3
4
5
6
7
8
9
10
11
@DubboService
public class EventsNotifyServiceImpl implements EventsNotifyService {
@Override
public String hello(String name) {
if ("test".equals(name)) {
throw new RuntimeException("调用异常");
}

return "hello !!";
}
}

消费端

定义事件通知接口

1
2
3
4
5
public interface EventsNotify {
void onReturn(String result, String name);

void onThrow(Throwable ex, String name);
}

实现事件通知接口并注册到Spring 容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Service("eventsNotify")
public class EventsNotifyImpl implements EventsNotify {
@Override
public void onReturn(String result, String name) {
log.info("result : {}", result);
log.info("name : {}", name);
}

@Override
public void onThrow(Throwable ex, String name) {
log.error(ex.getMessage());
log.info("name : {}", name);
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootTest
public class EventsNotifyTest {
@DubboReference(methods = {
@Method(name = "hello", async = true,
onreturn = "eventsNotify.onReturn",
onthrow = "eventsNotify.onThrow"
)
})
EventsNotifyService eventsNotifyService;

@Test
public void test() {
eventsNotifyService.hello("wgf");
eventsNotifyService.hello("test");
}
}

//INFO 16480 --- [andler-thread-1] c.w.springboot.notify.EventsNotifyImpl : result : hello !!
//INFO 16480 --- [andler-thread-1] c.w.springboot.notify.EventsNotifyImpl : name : wgf
//ERROR 16480 --- [andler-thread-1] c.w.springboot.notify.EventsNotifyImpl : 调用异常
//INFO 16480 --- [andler-thread-1] c.w.springboot.notify.EventsNotifyImpl : name : test

本地伪装

使用文档 本地伪装

注意:服务降级 和 本地伪装 只能处理Dubbo框架自身的 RpcException,业务异常熔断需要集成 其他中间件

Mock 是 Stub 的一个子集,便于服务提供方在客户端执行容错逻辑

本地伪装就是Mock,Dubbo中Mock的功能相对于本地存根更简单一点,Mock其实就是Dubbo中的 服务降级,不同的名词罢了


接口项目

1
2
3
public interface LocalMockService {
String hello();
}

mock实现

1
2
3
4
5
6
public class LocalMockServiceMock implements LocalMockService {
@Override
public String hello() {
return "服务降级数据";
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@DubboService
public class LocalMockServiceImpl implements LocalMockService {

@Override
public String hello() {
try {
TimeUnit.SECONDS.sleep(1100);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "hello";
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
@SpringBootTest
public class LocalMockTest {
// 本地伪装只能处理Dubbo框架自身调用异常,无法处理业务异常
@DubboReference(mock = "com.wgf.springboot.connector.mock.LocalMockServiceMock", timeout = 1000)
LocalMockService localMockService;

@Test
public void test() {
log.info(localMockService.hello());
}
}

本地存根

使用文档 本地存根

在 Dubbo 中利用本地存根在客户端执行部分逻辑

本地存根,名字很抽象,但实际上不难理解,本地存根就是一段逻辑,这段逻辑是在服务消费端执行的,这段逻辑一般都是由服务提供者提供,服务提供者可以利用这种机制在服务消费者远程调用服务提供者之前或之后再做一些其他事情,比如结果缓存,请求参数验证等等


接口项目

1
2
3
public interface LocalStubService {
String hello();
}

同包下本地存根实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author: ken 😃
* @date: 2022-07-19
* @description: 本地存根实现
* 本地存根其实就是使用装饰者模式在本地增强一些功能
**/
public class LocalStubServiceStub implements LocalStubService {
// 真正的远端调用对象
private LocalStubService localStubService;

public LocalStubServiceStub(LocalStubService localStubService) {
this.localStubService = localStubService;
}

@Override
public String hello() {
try {
String hello = this.localStubService.hello();
return "hi " + hello;
} catch (Exception e) {
return "";
}
}
}

服务端

1
2
3
4
5
6
7
@DubboService
public class LocalStubServiceImpl implements LocalStubService {
@Override
public String hello() {
return "wgf";
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
@SpringBootTest
public class LocalStubTest {
// stub=true的是个默认配置,默认用接口的全限定类名+Stub去调用
@DubboReference(stub = "true")
LocalStubService localStubService;

@Test
public void test() {
log.info(localStubService.hello());
}
}

  1. stub=true的是个默认配置,默认用接口的全限定类名+Stub去调用
  2. 也可以用 stub=权限定类名 进行调用
  3. stub实现必须传入一个远程的调用对象,也就是暴露的服务接口
  4. stub实现必须要有一个传入远程调用对象的构造参数
  5. 说白了就是使用装饰者模式在消费端本地进行功能增强

延迟暴露

使用文档 延迟暴露

延迟暴露 Dubbo 服务

如果你的服务需要预热时间,比如初始化缓存,等待相关资源就位等,可以使用 delay 进行延迟暴露。我们在 Dubbo 2.6.5 版本中对服务延迟暴露逻辑进行了细微的调整,将需要延迟暴露(delay > 0)服务的倒计时动作推迟到了 Spring 初始化完成后进行。你在使用 Dubbo 的过程中,并不会感知到此变化,因此请放心使用

如果依赖服务开启启动时检查则如果服务同时重启依赖服务可能会启动失败


1
2
3
4
5
6
7
@DubboService(delay = 60000)
public class DelayPublishServiceImpl implements DelayPublishService {
@Override
public String hello() {
return "hello";
}
}

延迟连接

使用文档 延迟连接

在 Dubbo 中配置延迟连接

提示

该配置只对使用长连接的 dubbo 协议生效

延迟连接用于减少长连接数。当有调用发起时,再创建长连接

1
2
3
// 默认值就是 false
@DubboReference(lazy = false)
xxxService xxxService;

参数回调

使用文档 参数回调

参数回调方式与调用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中声明哪个参数是 callback 类型即可。Dubbo 将基于长连接生成反向代理,这样就可以从服务器端调用客户端逻辑


接口项目

1
2
3
4
5
6
7
8
/**
* @author: ken.😊
* @create: 2022-07-19 22:28
* @description: 定义回调参数接口
**/
public interface CallbackListener {
String getFirstName();
}
1
2
3
4
5
6
7
8
9
public interface CallbackService {
/**
* 第二个参数是回调参数
* @param lastName
* @param callbackListener
* @return
*/
String hello(String lastName, CallbackListener callbackListener);
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author: ken.😊
* @create: 2022-07-19 22:33
* @description: 参数回调
*
* 这个回调对象CallbackListener是Dubbo给我们生成的代理对象
* 通过 @Argument 注解指定哪个参数是回调参数
* callbacks = 3 支持3个回调,数目超过了会报错
**/
@DubboService(methods = {
@Method(name = "hello", arguments = {
@Argument(index = 1, callback = true)
})
}, callbacks = 3)
public class CallbackServiceImpl implements CallbackService {
@Override
public String hello(String lastName, CallbackListener callbackListener) {
// 回调客户端逻辑
String firstName = callbackListener.getFirstName();
return String.format("hello %s %s", firstName, lastName);
}
}

消费端

1
2
3
4
5
6
7
8
9
// 回调接口实现
@Slf4j
public class MyCallback implements CallbackListener {
@Override
public String getFirstName() {
log.info("参数回调");
return "wu";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@SpringBootTest
public class CallbackTest {
@DubboReference
CallbackService callbackService;

@Test
public void test() {
// 创建回调参数
CallbackListener callbackListener = new MyCallback();
log.info(callbackService.hello("gengfeng", callbackListener));
}
}

// 输出
//2022-07-19 22:51:45.791 INFO 1885 --- [andler-thread-2] com.wgf.springboot.callback.MyCallback : 参数回调
//2022-07-19 22:51:45.799 INFO 1885 --- [ main] com.wgf.springboot.CallbackTest : hello wu gengfeng

异步执行

使用文档 异步执行

Dubbo 服务提供方的异步执行

Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。异步执行无益于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行

注意

Provider 端 异步执行 和 Consumer 端 异步调用 是相互独立的,你可以任意正交组合两端配置

  • Consumer同步 - Provider同步
  • Consumer异步 - Provider同步
  • Consumer同步 - Provider异步
  • Consumer异步 - Provider异步
  • 用法一 参考 异步调用 用的 AsyncCallServiceImpl

  • 用法二 使用AsyncContext

    Dubbo 提供了一个类似 Serverlet 3.0 的异步接口AsyncContext在没有 CompletableFuture 签名接口的情况下,也可以实现 Provider 端的异步执行


接口项目

1
2
3
public interface AsyncExecuteService {
String hello();
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@DubboService
public class AsyncExecuteServiceImpl implements AsyncExecuteService {
@Override
public String hello() {
log.info(Thread.currentThread().getName());
final AsyncContext asyncContext = RpcContext.startAsync();

new Thread(() -> {
// 如果要使用上下文,则必须要放在第一句执行
asyncContext.signalContextSwitch();
log.info(Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 写回响应
asyncContext.write("Hello");
}).start();
return null;
}
}

//INFO 9340 --- [:20880-thread-5] c.w.s.provider.AsyncExecuteServiceImpl : DubboServerHandler-192.168.1.106:20880-thread-5
//INFO 9340 --- [ Thread-17] c.w.s.provider.AsyncExecuteServiceImpl : Thread-17

客户端

1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@SpringBootTest
public class AsyncExecuteTest {
@DubboReference
AsyncExecuteService asyncExecuteService;

@Test
public void test() {
Assertions.assertEquals(asyncExecuteService.hello(), "Hello");
}
}

异步调用

使用文档 异步调用

其他异步调用方式

理解起来比较容易,主要要理解 CompletableFuture。只是说dubbo也可以支持java的CompletableFuture

sent 属性

  • sent="true" 等待消息发出,消息发送失败将抛出异常。
  • sent="false" 不等待消息发出,将消息放入 IO 队列,即刻返回。
  • 如果你只是想异步,完全忽略返回值,可以配置 return="false",以减少 Future 对象的创建和管理成本
1
2
3
@DubboReference(methods = {
@Method(name = "hello", async = true, sent = false)
})

接口项目

1
2
3
4
public interface AsyncCallService {
// 有多种扩展模式, 默认方法扩展、RpcContext
CompletableFuture<String> hello();
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
@DubboService
public class AsyncCallServiceImpl implements AsyncCallService {
@Override
public CompletableFuture<String> hello() {
log.info("hello");
// 建议为supplyAsync提供自定义线程池,避免使用JDK公用线程池
return CompletableFuture.supplyAsync(() -> {
// 将同步逻辑包装成异步
return getName();
});
}

private String getName() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("getName");
return "wgf";
}
}

// 输出
// 2022-07-19 23:47:30.242 INFO 2216 --- [:20880-thread-5] c.w.s.provider.AsyncCallServiceImpl : hello
// 2022-07-19 23:47:30.749 INFO 2216 --- [onPool-worker-1] c.w.s.provider.AsyncCallServiceImpl : getName

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@SpringBootTest
public class AsyncCallTest {
@DubboReference(methods = {
@Method(name = "hello", async = true)
})
AsyncCallService asyncCallService;

@Test
public void test() throws InterruptedException {
CompletableFuture<String> future = asyncCallService.hello();

// future 可以使用其他 Api 等待结果返回
future.whenComplete((result, exception) -> {
if (exception == null) {
log.info("异步调用返回值:{}", result);
} else {
exception.printStackTrace();
}
});

log.info("测试执行完毕");

// 等待异步返回
TimeUnit.SECONDS.sleep(1);
}
}

// 输出
// 2022-07-19 23:42:47.311 INFO 2194 --- [ main] com.wgf.springboot.AsyncCallTest : 测试执行完毕
// 2022-07-19 23:42:48.320 INFO 2194 --- [andler-thread-1] com.wgf.springboot.AsyncCallTest : 异步调用返回值:wgf

并发控制

使用文档 并发控制

相关博客

Dubbo 中的并发控制,可控制 服务级别 方法级别 服务端 消费端


用法一 服务级别 限制服务器端并发执行线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@DubboService(executes = 1)
public class ConcurrencyServiceImpl implements ConcurrencyService {
@Override
public String hello() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}
}

// 并发超过会引发Rpc异常

用法二 方法级别 限制服务器端并发执行线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@DubboService(methods = {
@Method(name = "hello", executes = 1)
})
public class ConcurrencyServiceImpl implements ConcurrencyService {
@Override
public String hello() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}
}

用法三 服务级别 限制每客户端并发执行请求数

说明:2.7.x 版本生效,3.0.x版本不生效

1
2
@DubboReference(actives = 1)
ConcurrencyService concurrencyService;

用法四 方法级别 限制每客户端并发执行请求数

说明:2.7.x 版本生效,3.0.x版本不生效

1
2
3
4
@DubboReference(methods = {
@Method(name = "hello", actives = 1)
})
ConcurrencyService concurrencyService;

连接控制

使用文档 连接控制

Dubbo 中服务端和客户端的连接控制


服务端连接控制

限制服务器端接受的连接不能超过 N 个

1
2
3
4
dubbo:
provider:
protocol: dubbo
accepts: 10

或者

1
2
3
4
dubbo:
protocol:
name: dubbo
accepts: 10

或者

1
2
3
4
@DubboService(connections = 1)
public class ConnectionsServiceImpl implements ConnectionsService {
...
}

或者

1
2
@DubboReference(connections = 1)
ConnectionsService connectionsService;

粘滞连接

使用文档 粘滞连接

为有状态服务配置粘滞连接

粘滞连接用于有状态服务(当有多个服务提供者),尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。

粘滞连接将自动开启延迟连接,以减少长连接数。

支持消费端 服务级别 方法级别 配置


1
2
@DubboReference(sticky = true)
StickyService stickyService;

泛化调用

使用文档 使用泛化调用

泛化接口调用方式主要用于客户端没有 API 接口及模型类元的情况,参数及返回值中的所有 POJO 均用 Map 表示,通常用于框架集成,比如:实现一个通用的服务测试框架,可通过 GenericService 调用所有服务实现,GenericService 是 Dubbo 提供的


消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SpringBootTest
public class GenericTest {

// 泛化 LoadBalanceService
@DubboReference(interfaceName = "com.wgf.springboot.connector.LoadBalanceService", generic = true)
GenericService genericService;

@Test
public void test() {
// 方法名称
// 参数类型
// 参数
Object hello = genericService.$invoke("hello", new String[]{"java.lang.String"}, new Object[]{"wgf"});
System.out.println(hello);
}
}

json泛化调用

使用文档 json泛化调用

提示

支持版本:2.7.12之后, 3.0.x 暂时不支持

对于Dubbo泛化调用,提供一种新的方式:直接传递字符串来完成一次调用。即用户可以直接传递参数对象的json字符串来完成一次Dubbo泛化调用


接口项目

1
2
3
public interface GenericJsonService {
User add(User user);
}

服务端

1
2
3
4
5
6
7
8
9
@Slf4j
@DubboService
public class GenericJsonServiceImpl implements GenericJsonService {
@Override
public User add(User user) {
log.info(user.toString());
return user;
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@SpringBootTest
public class GenericJsonTest {
@DubboReference(interfaceName = "com.wgf.springboot.connector.GenericJsonService", generic = true)
GenericService genericService;

@Test
public void test() {
String json = "{'id':10,'name':'wgf'}";

// 3.0.x 版本不支持 2.7.12 之后支持
// RpcContext中设置generic=gson
RpcContext.getContext().setAttachment("generic","gson");
Object result = genericService.$invoke("add", new String[]{"com.wgf.springboot.entity.User"},
new Object[]{json});
log.info(result.toString());
}
}

泛化服务

使用文档 实现泛化调用

泛化接口实现方式主要用于服务器端没有 API 接口及模型类元的情况,参数及返回值中的所有 POJO 均用 Map 表示

实现一个通用的远程服务 Mock 框架,可通过实现 GenericService 接口处理所有服务请求


服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @author: ken 😃
* @date: 2022-07-20
* @description:
* 泛化 LoadBalanceService 服务 实现一个mock服务,需要通过version和原版本区分
**/
@DubboService(interfaceName = "com.wgf.springboot.connector.LoadBalanceService", version = "mock")
public class GenericServiceImpl implements GenericService {


/**
* @param method 方法名字
* @param parameterTypes 参数类型数组
* @param args 参数值数组
*/
@Override
public Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException {
return String.format("调用 %s 方法", method);
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@SpringBootTest
public class GenericServiceTest {

// 指定为泛化版本
@DubboReference(version = "mock")
LoadBalanceService loadBalanceService;

@Test
public void test() {
log.info(loadBalanceService.hello("test"));
}
}

  • 服务端会暴露原版和泛化的 LoadBalanceService 服务
  • 如果没有指定 mock 版本,默认执行 LoadBalanceService 实现原逻辑
  • 指定 mock 版本,则会进入泛化服务的 $invoke 方法中

REST支持

使用文档 REST 支持

使用文档 多协议

注意Dubbo的REST也是Dubbo所支持的一种协议

当我们用Dubbo提供了一个服务后,如果消费者没有使用Dubbo也想调用服务,那么这个时候我们就可以让我们的服务支持REST协议,这样消费者就可以通过REST形式调用我们的服务了

注意:如果某个服务只有REST协议可用,那么该服务必须用@Path注解定义访问路径

接口项目

1
2
3
public interface RestService {
Map<Integer, Object> selectUser(Integer id);
}

服务端

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-rpc-rest</artifactId>
<version>3.0.9</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author: ken 😃
* @date: 2022-07-20
* @description: REST 应用,服务可以同时支持多个协议
**/
@Path("test")
@DubboService(protocol = {"rest", "dubbo"})
public class RestServiceImpl implements RestService {

@GET
@Path("selectUser")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
@Override
public Map<Integer, Object> selectUser(@QueryParam("id") Integer id) {
Map<Integer, Object> map = new HashMap<>();
map.put(id, "test");
return map;
}
}

多协议配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
dubbo:
application:
name: dubbo-springboot-provider
protocols:
dubbo:
name: dubbo
port: -1
rest:
name: rest
port: 8080
server: jetty
registry:
id: zk-registry
address: zookeeper://my-server:2181
config-center:
address: zookeeper://my-server:2181
metadata-report:
address: zookeeper://my-server:2181

客户端

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
@SpringBootTest
public class RestTest {
// 使用dubbo 协议
@DubboReference
RestService restService;

@Test
public void test() {
Assertions.assertEquals(restService.selectUser(1).get(1), "test");
}
}

浏览器访问http://localhost:8080/test/selectUser?id=2

msgpack序列化

使用文档 msgpack序列化

msgpack 官网

msgpack 用法

MessagePack 是一种高效的二进制序列化格式。它允许您在 JSON 等多种语言之间交换数据。但它更快更小。小整数被编码为一个字节,典型的短字符串除了字符串本身之外只需要一个额外的字节

需要注意的是解包顺序必须与打包顺序一致(字段顺序),否则会出错。也就是说协议格式的维护要靠两端手写代码进行保证,而这是很不安全的

提示

支持版本:2.7.12 之后

回声测试

使用文档 回声测试

通过回声测试检测 Dubbo 服务是否可用

回声测试用于检测服务是否可用,回声测试按照正常请求流程执行,能够测试整个调用是否通畅,可用于监控。

所有服务自动实现 EchoService 接口,只需将任意服务引用强制转型为 EchoService,即可使用。


接口项目

1
2
3
public interface TestEchoService {
String hello();
}

服务端

1
2
3
4
5
6
7
@DubboService
public class TestEchoServiceImpl implements TestEchoService {
@Override
public String hello() {
return "HELLO";
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@SpringBootTest
public class EchoTest {
@DubboReference
TestEchoService testEchoService;

@Test
public void test() {
EchoService echoService = (EchoService) testEchoService;
log.info(echoService.$echo("ok").toString());
}
}

INFO 3448 --- [ main] com.wgf.springboot.EchoTest : ok

上下文信息

通过上下文存放当前调用过程中所需的环境信息

上下文中存放的是当前调用过程中所需的环境信息。所有配置信息都将转换为 URL 的参数

RpcContext 是一个 ThreadLocal 的临时状态记录器,当接收到 RPC 请求,或发起 RPC 请求时,RpcContext 的状态都会变化。比如:A 调 B,B 再调 C,则 B 机器上,在 B 调 C 之前,RpcContext 记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext 记录的是 B 调 C 的信息。


服务端

1
2
3
4
5
6
7
8
9
@DubboService
public class RpcContextServiceImpl implements RpcContextService {
@Override
public void hello() {
final RpcContext context = RpcContext.getContext();
// 反射输出
RpcContextService.print(context);
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
public class RpcContextTest {
@DubboReference
RpcContextService rpcContextService;

@Test
public void test() {
rpcContextService.hello();
// 注意 调用后获取上下文
final RpcContext context = RpcContext.getContext();
RpcContextService.print(context);
}
}

隐式参数

使用文档 隐式参数

通过 Dubbo 中的 Attachment 在服务消费方和提供方之间隐式传递参数

可以通过 RpcContext 上的 setAttachment 和 getAttachment 在服务消费方和提供方之间进行参数的隐式传递

注意

path, group, version, dubbo, token, timeout 几个 key 是保留字段,请使用其它值


接口项目

1
2
3
public interface AttachmentService {
void hello();
}

服务端

1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@DubboService
public class AttachmentServiceImpl implements AttachmentService {
@Override
public void hello() {
// 隐式参数获取 用于框架集成,不建议常规业务使用
log.info(RpcContext.getContext().getAttachment("param"));
}
}

INFO 11172 --- [:20880-thread-5] c.w.s.provider.AttachmentServiceImpl : test

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
public class AttachmentTest {

@DubboReference
AttachmentService attachmentService;

@Test
public void test() {
// 传递隐式参数
RpcContext.getContext().setAttachment("param", "test");
attachmentService.hello();
}
}

TLS

使用文档 TLS

传输安全协议 TLS

SSL 证书配置博客

通过 TLS 保证传输安全

2.7.5 版本在传输链路的安全性上做了很多工作,对于内置的 Dubbo Netty Server 和新引入的 gRPC 协议都提供了基于 TLS 的安全链路传输机制

TLS 的配置都有统一的入口,如下所示:


Provider 端

1
2
3
4
5
6
7
8
9
10
SslConfig sslConfig = new SslConfig();
sslConfig.setServerKeyCertChainPath("path to cert");
sslConfig.setServerPrivateKeyPath(args[1]);
// 如果开启双向 cert 认证
if (mutualTls) {
sslConfig.setServerTrustCertCollectionPath(args[2]);
}

ProtocolConfig protocolConfig = new ProtocolConfig("dubbo/grpc");
protocolConfig.setSslEnabled(true);

Consumer 端

1
2
3
4
5
6
7
if (!mutualTls) {}
sslConfig.setClientTrustCertCollectionPath(args[0]);
} else {
sslConfig.setClientTrustCertCollectionPath(args[0]);
sslConfig.setClientKeyCertChainPath(args[1]);
sslConfig.setClientPrivateKeyPath(args[2]);
}

令牌验证

使用文档 令牌验证

通过令牌验证在注册中心控制权限

通过令牌验证在注册中心控制权限,以决定要不要下发令牌给消费者,可以防止消费者绕过注册中心访问提供者,另外通过注册中心可灵活改变授权方式,而不需修改或升级提供者


服务端 全局配置

1
2
3
dubbo:
provider:
token: 'true' # 随机token令牌,使用UUID生成

1
2
3
dubbo:
provider:
token: '123456' # 固定token令牌,相当于密码

服务端 服务级别 配置

1
2
3
4
@DubboService(token = "true")
public class xxxServiceImpl implements xxxService{
...
}

1
2
3
4
@DubboService(token = "123456")
public class xxxServiceImpl implements xxxService{
...
}

测试 本地直连会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
public class LoadBalanceTest {
// 绕过注册中心 本地直连方式
@DubboReference(url = "dubbo://localhost:20880")
LoadBalanceService loadBalanceService;

@Test
public void test() {
IntStream.range(0, 20). forEach(line -> log.info(loadBalanceService.hello(String.valueOf(line))));
}
}

// org.apache.dubbo.rpc.RpcException: Invalid token! Forbid invoke remote service in\terface ...

路由规则

使用文档 路由规则

通过 Dubbo 中的路由规则做服务治理

路由规则在发起一次RPC调用前起到过滤目标服务器地址的作用,过滤后的地址列表,将作为消费端最终发起RPC调用的备选地址

  • 条件路由。支持以服务或 Consumer 应用为粒度配置路由规则。
  • 标签路由。以 Provider 应用为粒度配置路由规则。

条件路由

您可以随时在服务治理控制台 Dubbo-Admin 写入路由规则

admin 控制台具体配置参考使用文档

配置规则

使用文档 规则配置

在 Dubbo 中配置应用级治理规则和服务级治理规则

提示

本文描述的是新版本规则配置,而不是老版本配置规则

覆盖规则是 Dubbo 设计的在无需重启应用的情况下,动态调整 RPC 调用行为的一种能力。2.7.0 版本开始,支持从服务应用两个粒度来调整动态配置

请在服务治理控制台查看或修改覆盖规则 「动态配置」

使用请查看使用文档

优雅停机

使用文档 优雅停机

让 Dubbo 服务完成优雅停机

Dubbo 是通过 JDK 的 ShutdownHook 来完成优雅停机的,所以如果用户使用 kill -9 PID 等强制关闭指令,是不会执行优雅停机的,只有通过 kill PID 时,才会执行


原理

服务提供方

  • 停止时,先标记为不接收新请求,新请求过来时直接报错,让客户端重试其它机器(默认的集群容错 Failover)
  • 然后,检测线程池中的线程是否正在运行,如果有,等待所有线程执行完成,除非超时,则强制关闭

服务消费方

  • 停止时,不再发起新的调用请求,所有新的调用在客户端即报错
  • 然后,检测有没有请求的响应还没有返回,等待响应返回,除非超时,则强制关闭

设置方式

设置优雅停机超时时间,缺省超时时间是 10 秒,如果超时则强制关闭

1
2
3
dubbo:
application:
shutwait: 15000

如果 ShutdownHook 不能生效,可以自行调用

1
DubboShutdownHook.destroyAll();

建议

使用 tomcat 等容器部署的场景,建议通过扩展 ContextListener 等自行调用以下代码实现优雅停机

主机绑定

使用文档 主机绑定

在 Dubbo 中绑定主机名


查找顺序

缺省主机 IP 查找顺序:

  • 通过 LocalHost.getLocalHost() 获取本机地址
  • 如果是 127.* 等 loopback 地址,则扫描各网卡,获取网卡 IP

主机配置

注册的地址如果获取不正确,比如需要注册公网地址,可以:

  1. 可以在 /etc/hosts 中加入:机器名 公网 IP,比如:

    1
    test1 205.182.23.201
  2. dubbo.xml 中加入主机地址的配置:

    1
    <dubbo:protocol host="205.182.23.201">
  3. Yml 文件:

    1
    2
    3
    dubbo:  
    protocol:
    host: '205.182.23.201'
  4. 或在 dubbo.properties 中加入主机地址的配置:

    1
    dubbo.protocol.host=205.182.23.201

端口配置

缺省主机端口与协议相关:

协议端口
dubbo20880
rmi1099
http80
hessian80
webservice80
memcached11211
redis6379

可以按照下面的方式配置端口:

  1. dubbo.xml 中加入主机地址的配置:

    1
    <dubbo:protocol name="dubbo" port="20880">
  2. dubbo.properties 中加入主机地址的配置:

    1
    dubbo.protocol.dubbo.port=20880
  3. yml 配置文件:

    1
    2
    3
    dubbo:  
    protocol:
    port: 20880

主机配置

使用文档 主机配置

自定义 Dubbo 服务对外暴露的主机地址

服务地址默认取值:InetAddress.getLocalHost().getHostAddress() 获取默认 host

主要解决服务提供者暴露服务的IP地址,可以指定 内网地址公网地址 或者是 一个域名

同时也可以解决 docker 映射

具体使用参考使用文档

<<<<<<< Updated upstream

序列化框架

=======

序列化漫谈

Stashed changes

官方文档 序列化漫谈

框架优点缺点
Kryo速度快,序列化后体积小跨语言支持较复杂,字段增、减,序列化和反序列化时无法兼容
Hessian默认支持跨语言较慢,字段增、减,序列化和反序列化时可以兼容
Protostuff速度快,基于protobuf需静态编译,只能在末尾添加新字段
Protostuff-Runtime无需静态编译,但序列化前需预先传入schema不支持无默认构造函数的类,反序列化时需用户自己初始化序列化后的对象,其只负责将该对象进行赋值
Java使用方便,可序列化所有类速度慢,占空间

源码

**源码部分只提供 DEMO 及运行结果,源码的解析请看最权威的官方文档 **

Dubbo SPI

官方文档 Dubbo SPI

SPI 全称为 Service Provider Interface,是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过 SPI 机制为我们的程序提供拓展功能。SPI 机制在第三方框架中也有所应用,比如 Dubbo 就是通过 SPI 机制加载所有的组件。不过,Dubbo 并未使用 Java 原生的 SPI 机制,而是对其进行了增强,使其能够更好的满足需求。在 Dubbo 中,SPI 是一个非常重要的模块。基于 SPI,我们可以很容易的对 Dubbo 进行拓展。如果大家想要学习 Dubbo 的源码,SPI 机制务必弄懂。接下来,我们先来了解一下 Java SPI 与 Dubbo SPI 的用法,然后再来分析 Dubbo SPI 的源码

Java SPI

定义 SPI 接口

1
2
3
4
5
6
/**
* SPI 接口
*/
public interface Animal {
void name();
}

接口实现

1
2
3
4
5
6
public class Cat implements Animal{
@Override
public void name() {
System.out.println("cat");
}
}
1
2
3
4
5
6
public class Dog implements Animal{
@Override
public void name() {
System.out.println("dog");
}
}

resources META-INF\services 添加接口全限定名配置文件

com.wgf.springboot.spi.java.Animal

1
2
com.wgf.springboot.spi.java.Cat
com.wgf.springboot.spi.java.Dog

使用 ServiceLoader 加载配置的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
public class SpiTest {

/**
* java spi 机制
*/
@Test
public void spiTest() {
// 加载 META-INF/services/com.wgf.springboot.spi.java.Animal 文件下配置的SPI接口实现类
ServiceLoader<Animal> serviceLoader = ServiceLoader.load(Animal.class);
log.info("Java SPI");
serviceLoader.forEach( line -> line.name());
}
}

// cat
// dog

Dubbo 扩展SPI

Dubbo 并未使用 Java SPI,而是重新实现了一套功能更强的 SPI 机制。Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader,我们可以加载指定的实现类。Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下

Java SPI 默认会加载配置文件下的所有配置类,在实际场景中很多 SPI 可能都不会被使用,造成资源浪费

与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类。另外,在测试 Dubbo SPI 时,需要在 Robot 接口上标注 @SPI 注解


定义 SPI 接口

1
2
3
4
@SPI
public interface Robot {
void sayHello();
}

SPI 接口实现类

注意:SPI 接口的实现类必须有无参构造函数,因为 ExtensionLoader 底层是通过 Class.getDeclaredConstructor() 获取构造函数反射创建 SPI 实现类的

1
2
3
4
5
6
public class OptimusPrime implements Robot {
@Override
public void sayHello() {
System.out.println("Hello, I am Optimus Prime");
}
}
1
2
3
4
5
6
public class Bumblebee implements Robot {
@Override
public void sayHello() {
System.out.println("Hello, I am Bumblebee");
}
}

SPI 文件配置 resources\META-INF\dubbo\com.wgf.springboot.spi.dubbo.Robot

1
2
optimusPrime = com.wgf.springboot.spi.dubbo.OptimusPrime
bumblebee = com.wgf.springboot.spi.dubbo.Bumblebee

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SpiTest {
@Test
public void test() throws InterruptedException {
// 扩展加载器, 类似 java spi 的 ServiceLoader
ExtensionLoader<Robot> extensionLoader =
ExtensionLoader.getExtensionLoader(Robot.class);

// 需要用到哪个 SPI 的实现类用对应的 key 进行加载
// 第一次调用 extensionLoader 方法时,默认会去解析 resources\META-INF\dubbo\com.wgf.springboot.spi.dubbo.Robot
// 将所有 SPI 配置类加载到 cachedNames 中缓存
Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
optimusPrime.sayHello();

Robot bumblebee = extensionLoader.getExtension("bumblebee");
bumblebee.sayHello();
}
}

//Hello, I am Optimus Prime
//Hello, I am Bumblebee

Dubbo SPI实现AOP

AOP 具体实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* dubbo SPI 的 AOP 使用 装饰者模式实现
*/
@Slf4j
public class RobotWrapper implements Robot {
private Robot robot;

public RobotWrapper(Robot robot) {
this.robot = robot;
}

@Override
public void sayHello() {
log.info("包装方法开始");
robot.sayHello();
log.info("包装方法结束");
}
}

SPI 文件配置 resources\META-INF\dubbo\com.wgf.springboot.spi.dubbo.Robot 添加AOP实现类全限定类名

1
2
3
optimusPrime = com.wgf.springboot.spi.dubbo.OptimusPrime
bumblebee = com.wgf.springboot.spi.dubbo.Bumblebee
com.wgf.springboot.spi.dubbo.RobotWrapper

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SpiTest {
@Test
public void test() throws InterruptedException {
// 扩展加载器, 类似 java spi 的 ServiceLoader
ExtensionLoader<Robot> extensionLoader =
ExtensionLoader.getExtensionLoader(Robot.class);

// 需要用到哪个 SPI 的实现类用对应的 key 进行加载
// 第一次调用 extensionLoader 方法时,默认会去解析 resources\META-INF\dubbo\com.wgf.springboot.spi.dubbo.Robot
// 将所有 SPI 配置类加载到 cachedNames 中缓存
Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
optimusPrime.sayHello();

Robot bumblebee = extensionLoader.getExtension("bumblebee");
bumblebee.sayHello();
}
}
//17:10:18.053 [main] INFO com.wgf.springboot.spi.dubbo.RobotWrapper - 包装方法开始
//Hello, I am Optimus Prime
//17:10:18.053 [main] INFO com.wgf.springboot.spi.dubbo.RobotWrapper - 包装方法结束
//17:10:18.053 [main] INFO com.wgf.springboot.spi.dubbo.RobotWrapper - 包装方法开始
//Hello, I am Bumblebee
//17:10:18.053 [main] INFO com.wgf.springboot.spi.dubbo.RobotWrapper - 包装方法结束

Dubbo SPI实现IOC

官方文档 SPI 自适应拓展

对应于Adaptive机制,Dubbo提供了一个注解@Adaptive,该注解可以用于接口的某个子类上,也可以用于接口方法上。如果用在接口的子类上,则表示Adaptive机制的实现会按照该子类的方式进行自定义实现;如果用在方法上,则表示Dubbo会为该接口自动生成一个子类,并且按照一定的格式重写该方法,而其余没有标注@Adaptive注解的方法将会默认抛出异常


SPI 接口

1
2
3
4
5
6
7
8
9
10
@SPI
public interface LoadBalance {

/**
* 自适应扩展,value用于指定 URL 的属性,通过解析URL 参数(loadbalance) 对应的 value 完成SPI注入
* @param url
*/
@Adaptive("loadbalance")
void balance(URL url);
}

SPI 实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 装饰者模式
* IOC 注入实现类,注入只能是注入SPI 实现类
*/
@Slf4j
public class CommonLoadBalance implements LoadBalance {
private LoadBalance loadBalance;

@Override
public void balance(URL url) {
log.info("IOC 调用前");
loadBalance.balance(url);
log.info("IOC 调用后");
}

/**
* 提供注入 set 方法
* @param loadBalance
*/
public void setLoadBalance(LoadBalance loadBalance) {
this.loadBalance = loadBalance;
}
}
1
2
3
4
5
6
public class RandomLoadBalance implements LoadBalance {
@Override
public void balance(URL url) {
System.out.println("随机负载均衡策略");
}
}
1
2
3
4
5
6
public class WeightsLoadBalance implements LoadBalance{
@Override
public void balance(URL url) {
System.out.println("权重负载均衡策略");
}
}

**SPI 文件配置 resources\META-INF\dubbo\SPI 文件配置 resources\META-INF\dubbo\com.wgf.springboot.spi.dubbo.Robot 添加AOP实现类全限定类名 **

1
2
3
randomLoadBalance=com.wgf.springboot.spi.dubbo.ioc.RandomLoadBalance
weightsLoadBalance=com.wgf.springboot.spi.dubbo.ioc.WeightsLoadBalance
commonLoadBalance=com.wgf.springboot.spi.dubbo.ioc.CommonLoadBalance

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class IocTest {
@Test
public void test() {
ExtensionLoader<LoadBalance> extensionLoader = ExtensionLoader.getExtensionLoader(LoadBalance.class);
LoadBalance loadBalance = extensionLoader.getExtension("commonLoadBalance");
URL url = URL.valueOf("test://localhost/test?loadbalance=randomLoadBalance");
loadBalance.balance(url);

url = URL.valueOf("test://localhost/test?loadbalance=weightsLoadBalance");
loadBalance.balance(url);
}
}
//18:05:21.306 [main] INFO com.wgf.springboot.spi.dubbo.ioc.CommonLoadBalance - IOC 调用前
//随机负载均衡策略
//18:05:21.321 [main] INFO com.wgf.springboot.spi.dubbo.ioc.CommonLoadBalance - IOC 调用后
//18:05:21.321 [main] INFO com.wgf.springboot.spi.dubbo.ioc.CommonLoadBalance - IOC 调用前
//权重负载均衡策略
//18:05:21.321 [main] INFO com.wgf.springboot.spi.dubbo.ioc.CommonLoadBalance - IOC 调用后

dubbo3
https://wugengfeng.cn/2022/07/13/dubbo3/
作者
wugengfeng
发布于
2022年7月13日
许可协议