在上一篇我提到了dubbo的rpc是基于netty实现的,而且也按照它的原理简单的写了些代码,大致的梳理了一个整体的链路.而这一篇,我打算在上一篇的基础上,把整体的代码运用到正式项目中,看看该怎样实现
1
| 这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等
|
整体的项目结构如下

- 服务层链路就是简单的调用消费端接口,然后消费端再去服务端发送远端请求
- 然后接口层还是按照dubbo的服务最佳实践提到的,分离维护
先来看下消费者的代码(pom就是一个web和我手写的rpc)
1 2 3 4 5 6 7 8 9 10
| @RestController public class ConsumerController { @Autowired ApplicationContext applicationContext; @GetMapping("/{name}") public String test(@PathVariable String name){ PayService payService = applicationContext.getBean(PayService.class); return payService.pay(name); } }
|
再来看下提供者的代码(这个PayService就是interface模块里的一个接口,如下)
1 2 3 4 5 6 7
| @MyDubboService(PayService.class) public class PayserviceImpl implements PayService { @Override public String pay(String name) { return name+": 支付100元"; } }
|
interface公共接口模块
1 2 3 4
| @MyDubboRefrence public interface PayService { String pay(String name); }
|
至于这个@MyDubboRefrence和@MyDubboService是什么我接下再进行描述
重点部分-dubbo-framework模块
先看下pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>${hutool.version}</version> </dependency> <dependency> <groupId>org.reflections</groupId> <artifactId>reflections</artifactId> <version>${reflections.version}</version> </dependency> </dependencies>
|
- 还如上一篇一样,只不过我加了些反射的东西
先来看下这个模块有哪些包

annotations包

- EnableDubboConsumer注解用于注在消费方,且注意到这里我@Import的类,一旦服务标注此注解,会初始化ServiceBeanDefinitionRegistry的逻辑
1 2 3 4 5 6 7
| @Documented @Inherited @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Import(ServiceBeanDefinitionRegistry.class) public @interface EnableDubboConsumer { }
|
- EnableDubboProvider注解用于标注在提供方,同上,标注该注解的服务会依次初始化Import中的几个
1 2 3 4 5 6 7 8
| @Documented @Inherited @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @EnableAsync @Import({RedisRegisterCenter.class, Url.class,NettyServer.class}) public @interface EnableDubboProvider { }
|
- 标注在公共接口上,客户端在调用之初会扫描注解下所有类,然后将其动态代理到spring中
1 2 3 4 5 6 7 8 9 10 11
|
@Documented @Inherited @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface MyDubboRefrence { }
|
- 标注在实现类上,一样通过扫描,使其被找到并注册到注册中心
1 2 3 4 5 6 7 8 9 10 11 12
|
@Documented @Inherited @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface MyDubboService { Class<?> value(); }
|
consumer包

- 就是初始化netty-client的,发送远程调用通知使用,这段我还是像上一篇一样会在代理模式中用到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class NettyClient {
public static NettyClientHandler clientHandler = null;
public static void initClient(String hostName,Integer port){ clientHandler = new NettyClientHandler(); final Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); try { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(clientHandler);
} }); bootstrap.connect(hostName, port).sync(); } catch (InterruptedException ignored) { } } }
|
- netty-client handler,消费端发送请求处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private Invocation invocation;
private String result;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; }
@Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); notify(); }
public void setInvocation(Invocation invocation) { this.invocation = invocation; }
@Override public synchronized Object call() throws Exception { context.writeAndFlush(invocation); wait(10000); return result; } }
|
discovery包
- 服务发现中心,这里还是基于redis实现的一个处理器,当消费端发送请求时会从redis中获取到提供方的地址信息

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class RedisDiscoveryCenter {
private Jedis jedis;
public Url get(String interfaceName) throws Exception{ String s = jedis.get(interfaceName); JSONArray objects = JSONUtil.parseArray(s); List<Url> urls = objects.toList(Url.class); return urls.get(ThreadLocalRandom.current().nextInt(urls.size())); }
public void jedis(String host, Integer port, String password){ Jedis jedis = new Jedis(host, port); if (StrUtil.isNotEmpty(password)){ jedis.auth(password); } this.jedis = jedis; } }
|
provider包

- netty-server,这里我实现了ApplicationContextAware和CommandLineRunner,重写ApplicationContextAware的setApplicationContext方法,其在spring初始化对象时会调用到这里,对redisRegisterCenter和url进行赋值操作,CommandLineRunner的run方法会在容器启动完毕时执行,然后异步启动netty-server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| @Slf4j @Component public class NettyServer implements ApplicationContextAware, CommandLineRunner {
private RedisRegisterCenter redisRegisterCenter;
private Url url;
private Map<String, Class> handlerMap = new HashMap<>();
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { redisRegisterCenter = applicationContext.getBean(RedisRegisterCenter.class); url = applicationContext.getBean(Url.class); Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(MyDubboService.class); if (CollectionUtil.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass() .getAnnotation(MyDubboService.class).value().getName(); handlerMap.put(interfaceName, serviceBean.getClass()); } } }
@Async @Override public void run(String... args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(new NettyServerHandler(handlerMap)); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(url.getHost(), url.getPort()).sync(); for (String clazzName : handlerMap.keySet()) { redisRegisterCenter.register(clazzName,url); } log.info("mydubbo提供方启动"); future.channel().closeFuture().sync(); } catch (Exception e){ log.error("mydubbo提供方启动error: {}",e.getMessage()); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
|
- netty-server-handler 提供方接收到请求的处理器,解析Invocation,利用反射执行方法得到结果并返回给消费端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Map<String, Class> handlerMap;
public NettyServerHandler(Map<String, Class> handlerMap) { this.handlerMap = handlerMap; }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Invocation invocation = (Invocation) msg; Class aClass = handlerMap.get(invocation.getInterfaceName()); Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes()); Object res = method.invoke(aClass.newInstance(), invocation.getParams()); ctx.writeAndFlush(res); } }
|
proxy包
- BeanDefinitionRegistryPostProcessor,其父类是BeanFactoryPostProcessor,很熟悉,在aac类-refresh方法-invokeBeanFactoryPostProcessors方法中能够看到,简单说,这玩意是动态注入bean的,而这里我就是利用它将PayService动态注入到spring中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
public class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {
@Autowired ApplicationContext applicationContext;
@Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { Properties properties = new Properties(); InputStream is = this.getClass().getResourceAsStream("/application.properties"); try { if (is == null){ is = this.getClass().getResourceAsStream("/application.yml"); } properties.load(is); } catch (IOException ignored) {} Set<Class<?>> typesAnnotatedWith = new Reflections(properties.getProperty("dubbo.interface.path"), Arrays.asList( new SubTypesScanner(false) ,new MethodParameterNamesScanner() ,new MethodAnnotationsScanner() ,new MemberUsageScanner() ,new TypeAnnotationsScanner() )).getTypesAnnotatedWith(MyDubboRefrence.class); for (Class beanClazz : typesAnnotatedWith) { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz); GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
String host = properties.getProperty("dubbo.redis.host"); Integer port = Integer.valueOf(properties.getProperty("dubbo.redis.port")); String password = properties.getProperty("dubbo.redis.password"); RedisDiscoveryCenter redisDiscoveryCenter = new RedisDiscoveryCenter(); redisDiscoveryCenter.jedis(host,port,password); AsyncTaskExecutor executor = ExecutorServicePool.executor(); definition.getPropertyValues().addPropertyValue("redisDiscoveryCenter", redisDiscoveryCenter); definition.getPropertyValues().addPropertyValue("executor", executor); definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz);
definition.setBeanClass(ProxyFactory.class);
definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); registry.registerBeanDefinition(beanClazz.getSimpleName(), definition); } }
@Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
} }
|
- ProxyFactory-FactoryBean:自定义bean的创建过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
@RequiredArgsConstructor public class ProxyFactory<T> implements FactoryBean<T> {
@Setter private AsyncTaskExecutor executor;
@Setter private RedisDiscoveryCenter redisDiscoveryCenter;
private Class<T> interfaceClass;
@Override public T getObject() throws Exception { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args); if(clientHandler == null){ Url url = redisDiscoveryCenter.get(interfaceClass.getName()); initClient(url.getHost(),url.getPort()); } clientHandler.setInvocation(invocation); return executor.submit(clientHandler).get(); } }); }
public ProxyFactory(Class<T> interfaceClass) { this.interfaceClass = interfaceClass; }
@Override public Class<?> getObjectType() { return interfaceClass; }
@Override public boolean isSingleton() { return true; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
@NoArgsConstructor @AllArgsConstructor @Getter @Setter @Configuration @EnableConfigurationProperties @ConfigurationProperties(prefix = "dubbo.redis") public class RedisRegisterCenter {
private String host;
private int port;
private String password;
@Bean public Jedis jedis(){ Jedis jedis = new Jedis(host, port); if (StrUtil.isNotEmpty(password)){ jedis.auth(password); } return jedis; }
public void register(String interfaceName, Url url){ try { Jedis jedis = jedis(); String s = jedis.get(interfaceName); List<Url> urls = null; if (s == null){ urls = new ArrayList<>(); }else { JSONArray objects = JSONUtil.parseArray(s); urls = objects.toList(Url.class); } urls.add(url); jedis.set(interfaceName,JSONUtil.toJsonStr(urls)); } catch (Exception e) { e.printStackTrace(); } } }
|
整个核心包的开发并没过多的代码,整个流程是这样的
提供者在开启@EnableDubboProvider后,获取所有标注@MyDubboService的实现类,放到handlerMap中,key为其value值(所实现接口全路径),value为实现类的class,,供netty-server-handler使用.然后调用注册中心,将提供者项目的ip和port注册到redis中,然后启动netty-server等待请求过来,当请求发送给来,走handler的read方法,解析,反射,返回response
消费者在开启@EnableDubboConsumer后,走ServiceBeanDefinitionRegistry的逻辑,扫描dubbo.interface.path下所有标注了@MyDubboRefrence的接口,将其动态注入到spring中,以便项目可以对其进行依赖注入,(autowired,这里用的是byType),然后定义了具体创建该bean过程的ProxyFactory
当客户端发送请求后会走到ProxyFactory的getObject方法,然后组装Invocation,从服务发现中心获取到该服务的地址和port,发送到netty-server端,然后阻塞等待结果通知,
代码仓库地址(https://gitee.com/xiaowu_wang/mydubbo.git)