在上一篇我提到了dubbo的rpc是基于netty实现的,而且也按照它的原理简单的写了些代码,大致的梳理了一个整体的链路.而这一篇,我打算在上一篇的基础上,把整体的代码运用到正式项目中,看看该怎样实现

1
这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等

整体的项目结构如下

  • 服务层链路就是简单的调用消费端接口,然后消费端再去服务端发送远端请求
  • 然后接口层还是按照dubbo的服务最佳实践提到的,分离维护

先来看下消费者的代码(pom就是一个web和我手写的rpc)

1
2
3
4
5
6
7
8
9
10
@RestController
public class ConsumerController {
@Autowired
ApplicationContext applicationContext;
@GetMapping("/{name}")
public String test(@PathVariable String name){
PayService payService = applicationContext.getBean(PayService.class);
return payService.pay(name);
}
}

再来看下提供者的代码(这个PayService就是interface模块里的一个接口,如下)

1
2
3
4
5
6
7
@MyDubboService(PayService.class)
public class PayserviceImpl implements PayService {
@Override
public String pay(String name) {
return name+": 支付100元";
}
}

interface公共接口模块

1
2
3
4
@MyDubboRefrence
public interface PayService {
String pay(String name);
}

至于这个@MyDubboRefrence和@MyDubboService是什么我接下再进行描述

重点部分-dubbo-framework模块

先看下pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reflections/reflections -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
</dependencies>
  • 还如上一篇一样,只不过我加了些反射的东西

    先来看下这个模块有哪些包

annotations包

  • EnableDubboConsumer注解用于注在消费方,且注意到这里我@Import的类,一旦服务标注此注解,会初始化ServiceBeanDefinitionRegistry的逻辑
1
2
3
4
5
6
7
@Documented
@Inherited
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Import(ServiceBeanDefinitionRegistry.class)
public @interface EnableDubboConsumer {
}
  • EnableDubboProvider注解用于标注在提供方,同上,标注该注解的服务会依次初始化Import中的几个
1
2
3
4
5
6
7
8
@Documented
@Inherited
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@EnableAsync
@Import({RedisRegisterCenter.class, Url.class,NettyServer.class})
public @interface EnableDubboProvider {
}
  • 标注在公共接口上,客户端在调用之初会扫描注解下所有类,然后将其动态代理到spring中
1
2
3
4
5
6
7
8
9
10
11
/**
* @author 小五
* 标注在接口上,使客户端在调用之初将该接口动态代理到spring中
*/
@Documented
@Inherited
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
@Component
public @interface MyDubboRefrence {
}
  • 标注在实现类上,一样通过扫描,使其被找到并注册到注册中心
1
2
3
4
5
6
7
8
9
10
11
12
/**
* @author 小五
* 标注在实现类上,使其被找到并注册到注册中心
*/
@Documented
@Inherited
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
@Component
public @interface MyDubboService {
Class<?> value();
}

consumer包

  • 就是初始化netty-client的,发送远程调用通知使用,这段我还是像上一篇一样会在代理模式中用到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NettyClient {

public static NettyClientHandler clientHandler = null;

public static void initClient(String hostName,Integer port){
clientHandler = new NettyClientHandler();
final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(clientHandler);

}
});
bootstrap.connect(hostName, port).sync();
} catch (InterruptedException ignored) {
}
}
}
  • netty-client handler,消费端发送请求处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 因为调用的时候,需要等待调用结果,再将结果返回,这需要一个过程,所以需要用到线程等待 wait notify方法
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

private ChannelHandlerContext context;

private Invocation invocation;

private String result;


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//在run方法中会用得到
this.context = ctx;
}

@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}

public void setInvocation(Invocation invocation) {
this.invocation = invocation;
}

@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(invocation);
wait(10000);
return result;
}
}

discovery包

  • 服务发现中心,这里还是基于redis实现的一个处理器,当消费端发送请求时会从redis中获取到提供方的地址信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RedisDiscoveryCenter {

private Jedis jedis;

// random
// 服务发现
public Url get(String interfaceName) throws Exception{
String s = jedis.get(interfaceName);
JSONArray objects = JSONUtil.parseArray(s);
List<Url> urls = objects.toList(Url.class);
return urls.get(ThreadLocalRandom.current().nextInt(urls.size()));
}

public void jedis(String host, Integer port, String password){
Jedis jedis = new Jedis(host, port);
if (StrUtil.isNotEmpty(password)){
jedis.auth(password);
}
this.jedis = jedis;
}
}

provider包

  • netty-server,这里我实现了ApplicationContextAware和CommandLineRunner,重写ApplicationContextAware的setApplicationContext方法,其在spring初始化对象时会调用到这里,对redisRegisterCenter和url进行赋值操作,CommandLineRunner的run方法会在容器启动完毕时执行,然后异步启动netty-server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Slf4j
@Component
public class NettyServer implements ApplicationContextAware, CommandLineRunner {

private RedisRegisterCenter redisRegisterCenter;

private Url url;

//用于存储业务接口和实现类的实例对象
private Map<String, Class> handlerMap = new HashMap<>();

// spring构造对象时会调用setApplicationContext方法,从而可以在方法中通过自定义注解获得用户的业务接口和实现
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
redisRegisterCenter = applicationContext.getBean(RedisRegisterCenter.class);
url = applicationContext.getBean(Url.class);
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(MyDubboService.class);
if (CollectionUtil.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
//从业务实现类上的自定义注解中获取到value,从来获取到业务接口的全名
String interfaceName = serviceBean.getClass()
.getAnnotation(MyDubboService.class).value().getName();
handlerMap.put(interfaceName, serviceBean.getClass());
}
}
}

/**
* 组件启动时会执行run,启动netty服务
* @throws Exception
*/
@Async
@Override
public void run(String... args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel)
throws Exception {
channel.pipeline()
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
.addLast(new NettyServerHandler(handlerMap));
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);


ChannelFuture future = bootstrap.bind(url.getHost(), url.getPort()).sync();
for (String clazzName : handlerMap.keySet()) {
redisRegisterCenter.register(clazzName,url);
}
log.info("mydubbo提供方启动");
future.channel().closeFuture().sync();
} catch (Exception e){
log.error("mydubbo提供方启动error: {}",e.getMessage());
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
  • netty-server-handler 提供方接收到请求的处理器,解析Invocation,利用反射执行方法得到结果并返回给消费端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

private final Map<String, Class> handlerMap;

public NettyServerHandler(Map<String, Class> handlerMap) {
this.handlerMap = handlerMap;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到请求,处理请求
Invocation invocation = (Invocation) msg;
Class aClass = handlerMap.get(invocation.getInterfaceName());
// 利用反射执行方法得到res
Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
Object res = method.invoke(aClass.newInstance(), invocation.getParams());
// 写回netty,让client端监听到
ctx.writeAndFlush(res);
}
}

proxy包

  • BeanDefinitionRegistryPostProcessor,其父类是BeanFactoryPostProcessor,很熟悉,在aac类-refresh方法-invokeBeanFactoryPostProcessors方法中能够看到,简单说,这玩意是动态注入bean的,而这里我就是利用它将PayService动态注入到spring中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 用于Spring动态注入自定义接口
*/
public class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {

@Autowired
ApplicationContext applicationContext;

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
Properties properties = new Properties();
InputStream is = this.getClass().getResourceAsStream("/application.properties");
try {
if (is == null){
is = this.getClass().getResourceAsStream("/application.yml");
}
properties.load(is);
} catch (IOException ignored) {}
Set<Class<?>> typesAnnotatedWith = new Reflections(properties.getProperty("dubbo.interface.path"), Arrays.asList(
new SubTypesScanner(false)//允许getAllTypes获取所有Object的子类, 不设置为false则 getAllTypes 会报错.默认为true.
,new MethodParameterNamesScanner()//设置方法参数名称 扫描器,否则调用getConstructorParamNames 会报错
,new MethodAnnotationsScanner() //设置方法注解 扫描器, 否则getConstructorsAnnotatedWith,getMethodsAnnotatedWith 会报错
,new MemberUsageScanner() //设置 member 扫描器,否则 getMethodUsage 会报错, 不推荐使用,有可能会报错 Caused by: java.lang.ClassCastException: javassist.bytecode.InterfaceMethodrefInfo cannot be cast to javassist.bytecode.MethodrefInfo
,new TypeAnnotationsScanner()//设置类注解 扫描器 ,否则 getTypesAnnotatedWith 会报错
)).getTypesAnnotatedWith(MyDubboRefrence.class);
for (Class beanClazz : typesAnnotatedWith) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz);
GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
//在这里,我们可以给该对象的属性注入对应的实例。
//比如mybatis,就在这里注入了dataSource和sqlSessionFactory,
// 注意,如果采用definition.getPropertyValues()方式的话,
// 类似definition.getPropertyValues().add("interfaceType", beanClazz);
// 则要求在FactoryBean(本应用中即ServiceFactory)提供setter方法,否则会注入失败
// 如果采用definition.getConstructorArgumentValues(),
// 则FactoryBean中需要提供包含该属性的构造方法,否则会注入失败

String host = properties.getProperty("dubbo.redis.host");
Integer port = Integer.valueOf(properties.getProperty("dubbo.redis.port"));
String password = properties.getProperty("dubbo.redis.password");
RedisDiscoveryCenter redisDiscoveryCenter = new RedisDiscoveryCenter();
redisDiscoveryCenter.jedis(host,port,password);
AsyncTaskExecutor executor = ExecutorServicePool.executor();
definition.getPropertyValues().addPropertyValue("redisDiscoveryCenter", redisDiscoveryCenter);
definition.getPropertyValues().addPropertyValue("executor", executor);
definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz);

//注意,这里的BeanClass是生成Bean实例的工厂,不是Bean本身。
// FactoryBean是一种特殊的Bean,其返回的对象不是指定类的一个实例,
// 其返回的是该工厂Bean的getObject方法所返回的对象。
definition.setBeanClass(ProxyFactory.class);

//这里采用的是byType方式注入,类似的还有byName等
definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE);
registry.registerBeanDefinition(beanClazz.getSimpleName(), definition);
}
}

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

}
}
  • ProxyFactory-FactoryBean:自定义bean的创建过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* @author 小五
*/
@RequiredArgsConstructor
public class ProxyFactory<T> implements FactoryBean<T> {

@Setter
private AsyncTaskExecutor executor;

@Setter
private RedisDiscoveryCenter redisDiscoveryCenter;

private Class<T> interfaceClass;

@Override
public T getObject() throws Exception {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
if(clientHandler == null){
Url url = redisDiscoveryCenter.get(interfaceClass.getName());
initClient(url.getHost(),url.getPort());
}
clientHandler.setInvocation(invocation);
return executor.submit(clientHandler).get();
}
});
}

public ProxyFactory(Class<T> interfaceClass) {
this.interfaceClass = interfaceClass;
}

@Override
public Class<?> getObjectType() {
return interfaceClass;
}

@Override
public boolean isSingleton() {
return true;
}
}
  • 还有一个注册中心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

/**
* @author 小五
*/
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@Configuration
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "dubbo.redis")
public class RedisRegisterCenter {

private String host;

private int port;

private String password;

@Bean
public Jedis jedis(){
Jedis jedis = new Jedis(host, port);
if (StrUtil.isNotEmpty(password)){
jedis.auth(password);
}
return jedis;
}

// 服务注册
public void register(String interfaceName, Url url){
try {
Jedis jedis = jedis();
String s = jedis.get(interfaceName);
List<Url> urls = null;
if (s == null){
urls = new ArrayList<>();
}else {
JSONArray objects = JSONUtil.parseArray(s);
urls = objects.toList(Url.class);
}
urls.add(url);
jedis.set(interfaceName,JSONUtil.toJsonStr(urls));
} catch (Exception e) {
e.printStackTrace();
}
}
}

整个核心包的开发并没过多的代码,整个流程是这样的

提供者在开启@EnableDubboProvider后,获取所有标注@MyDubboService的实现类,放到handlerMap中,key为其value值(所实现接口全路径),value为实现类的class,,供netty-server-handler使用.然后调用注册中心,将提供者项目的ip和port注册到redis中,然后启动netty-server等待请求过来,当请求发送给来,走handler的read方法,解析,反射,返回response

消费者在开启@EnableDubboConsumer后,走ServiceBeanDefinitionRegistry的逻辑,扫描dubbo.interface.path下所有标注了@MyDubboRefrence的接口,将其动态注入到spring中,以便项目可以对其进行依赖注入,(autowired,这里用的是byType),然后定义了具体创建该bean过程的ProxyFactory

当客户端发送请求后会走到ProxyFactory的getObject方法,然后组装Invocation,从服务发现中心获取到该服务的地址和port,发送到netty-server端,然后阻塞等待结果通知,

代码仓库地址(https://gitee.com/xiaowu_wang/mydubbo.git)