先来看看看看springboot整合dubbo的常规用法
下面是三个服务的调用链路

项目结构

服务提供者(例如pay-service)需要提供接口服务(@Service)

服务消费者(例如user-service)需要指定服务接口(@Reference),接口多实现可能需要配合version属性

这样就完成了基本的上游对下游服务的调用
探究下底层实现
知道dubbo是基于netty实现的,
所以它实现这种rpc调用的特点较其他(如feign-http)更适用高并发以及短链接的项目,接下来用netty的代码简单复现下基本的dubbo应用
1
| 这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等
|
配合dubbo的服务注册发现链路图

简单捋顺下思路,看看我们需要哪几样东西
- 一个共享容器,用来存储服务地址(ip,port等),当服务存在多个,还需要负载均衡,以及服务信息修改后的通知功能
- 为了确定唯一的服务接口信息,需要包含接口名(像dubbo一样,多实现可能需要再加一个version),方法名,方法参数类型列表,方法值列表
- netty,保证服务和调用方之间的通信(rpc调用)
这样我们就能实现一个简单dubbo了。好,开整
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.8</version> </dependency> </dependencies>
|
1 2 3 4 5 6 7 8 9 10 11 12
| @AllArgsConstructor @Data public class Invocation implements Serializable { String interfaceName; String methodName; Class[] paramTypes; Object[] params; }
|
1 2 3 4 5 6
| @Data @AllArgsConstructor public class Url implements Serializable { String hostName; Integer port; }
|
- 注册服务的容器,在nettyserver启动时注入服务接口(这里不是存储ip port的共享容器,这里是netty-server用来注册服务接口的容器)
1 2 3 4 5 6 7 8 9 10 11 12
| public class LocalRegister {
private static Map<String, Class> map = new ConcurrentHashMap<>();
public static void register(String interfaceName,Class implClass){ map.put(interfaceName,implClass); }
public static Class get(String interfaceName){ return map.get(interfaceName); } }
|
- 用来存储服务的ip和port,如果是多个(集群部署)还需要支持负载均衡
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @UtilityClass public class RedisRegister {
static Jedis jedis;
static { jedis = new Jedis("127.0.0.1", 6379); } public void regist(String interfaceName, Url url){ try { String s = jedis.get(interfaceName); List<Url> urls = null; if (s == null){ urls = new ArrayList<>(); }else { JSONArray objects = JSONUtil.parseArray(s); urls = objects.toList(Url.class); } urls.add(url); jedis.set(interfaceName,JSONUtil.toJsonStr(urls)); } catch (Exception e) { e.printStackTrace(); } }
public Url get(String interfaceName) throws Exception{ String s = jedis.get(interfaceName); JSONArray objects = JSONUtil.parseArray(s); List<Url> urls = objects.toList(Url.class); return LoadBalance.random(urls); } }
|
- 负载均衡机制(dubbo支持4种,这里先简单使用dubbo默认的random)
1 2 3 4 5 6 7 8 9 10 11
|
@UtilityClass public class LoadBalance {
public Url random(List<Url> urls){ Random random = new Random(); return urls.get(random.nextInt(urls.size())); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class NettyServer { public static void start(String hostName, Integer port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(hostName, port).sync(); System.out.println("服务提供方开始提供服务"); channelFuture.channel().closeFuture().sync(); }catch (Exception ignored){}finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Invocation invocation = (Invocation) msg; Class aClass = LocalRegister.get(invocation.getInterfaceName()); Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes()); Object res = method.invoke(aClass.newInstance(), invocation.getParams()); ctx.writeAndFlush("Netty: "+res); } }
|
- 定义接口和实现类(按照dubbo官方的服务最佳实践,接口应该与业务部分分离开来)
1 2 3 4 5 6 7 8 9
| public interface HelloService { String sayHello(); } public class HelloServiceImpl implements HelloService { @Override public String sayHello() { return "Hello"; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class NettyClient {
public static NettyClientHandler clientHandler = null;
public static void initClient(String hostName,Integer port){ clientHandler = new NettyClientHandler(); final Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); try { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(clientHandler);
} }); bootstrap.connect(hostName,port).sync(); } catch (InterruptedException ignored) { } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private Invocation invocation;
private String result;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; }
@Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); notify(); }
public void setInvocation(Invocation invocation) { this.invocation = invocation; }
@Override public synchronized Object call() throws Exception { context.writeAndFlush(invocation); wait(); return result; } }
|
- 好,基本的逻辑已经完成,接下就是需要创建一个helloservice的代理对象来调用sayhello方法
- 创建一个获取代理对象的factory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ProxyFactory {
final static ExecutorService executorService = newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static <T> T getProxy(final Class<T> interfaceClass) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args); if(clientHandler == null){ Url url = RedisRegister.get(interfaceClass.getName()); initClient(url.getHostName(),url.getPort()); } clientHandler.setInvocation(invocation); return executorService.submit(clientHandler).get(); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class Provider {
public static void main(String[] args) throws IOException { LocalRegister.register(HelloService.class.getName(), HelloServiceImpl.class); Url url = new Url(InetAddress.getLocalHost().getHostAddress(), 8080); RedisRegister.regist(HelloService.class.getName(),url); NettyServer.start(url.getHostName(),url.getPort()); } }
|
1 2 3 4 5 6 7 8 9 10
| public class Consumer {
public static void main(String[] args) { HelloService service = ProxyFactory.getProxy(HelloService.class); System.out.println(service.sayHello()); } }
|
到这里,一个简单基于netty的dubbo就完成了
代码仓库地址:
参考文章