rpc框架之dubbo从入门到使用-源码分析
rpc框架之dubbo从入门到使用-源码分析
了解 Dubbo 核心概念和架构
以上是 Dubbo 的工作原理图,从抽象架构上分为两层:服务治理抽象控制面 和 Dubbo 数据面 。
- 服务治理控制面。服务治理控制面不是特指如注册中心类的单个具体组件,而是对 Dubbo 治理体系的抽象表达。控制面包含协调服务发现的注册中心、流量管控策略、Dubbo Admin 控制台等,如果采用了 Service Mesh 架构则还包含 Istio 等服务网格控制面。
- Dubbo 数据面。数据面代表集群部署的所有 Dubbo 进程,进程之间通过 RPC 协议实现数据交换,Dubbo 定义了微服务应用开发与调用规范并负责完成数据传输的编解码工作。
- 服务消费者 (Dubbo Consumer),发起业务调用或 RPC 通信的 Dubbo 进程
- 服务提供者 (Dubbo Provider),接收业务调用或 RPC 通信的 Dubbo 进程
Dubbo 数据面
从数据面视角,Dubbo 帮助解决了微服务实践中的以下问题:
- Dubbo 作为 服务开发框架 约束了微服务定义、开发与调用的规范,定义了服务治理流程及适配模式
- Dubbo 作为 RPC 通信协议实现 解决服务间数据传输的编解码问题
dubbo框架总体架构
节点角色说明
节点 | 角色说明 |
---|---|
Provider | 暴露服务的服务提供方 |
Consumer | 调用远程服务的服务消费方 |
Registry | 服务注册与发现的注册中心 |
Monitor | 统计服务的调用次数和调用时间的监控中心 |
Container | 服务运行容器 |
代码架构
图例说明:
- 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
- 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
- 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
- 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调用链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。
各层说明
- Config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
- Proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
- Registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
- Cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
- Monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
- Protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
- Exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- Transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
- Serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
关系说明
- 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。
- 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 Provider, Consumer, Registry, Monitor 划分逻辑拓扑节点,保持统一概念。
- 而 Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有一个提供者时,是不需要 Cluster 的。
- Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
- 而 Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
- Registry 和 Monitor 实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起。
模块分包
模块说明:
- dubbo-common 公共逻辑模块:包括 Util 类和通用模型。
- dubbo-remoting 远程通讯模块:相当于 Dubbo 协议的实现,如果 RPC 用 RMI协议则不需要使用此包。
- dubbo-rpc 远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
- dubbo-cluster 集群模块:将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
- dubbo-registry 注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
- dubbo-monitor 监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。
- dubbo-config 配置模块:是 Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节。
- dubbo-container 容器模块:是一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。
整体上按照分层结构进行分包,与分层的不同点在于:
- Container 为服务容器,用于部署运行服务,没有在层中画出。
- Protocol 层和 Proxy 层都放在 rpc 模块中,这两层是 rpc 的核心,在不需要集群也就是只有一个提供者时,可以只使用这两层完成 rpc 调用。
- Transport 层和 Exchange 层都放在 remoting 模块中,为 rpc 调用的通讯基础。
- Serialize 层放在 common 模块中,以便更大程度复用。
依赖关系
图例说明:
- 图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互。
- 图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点。
- 图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用。
- 图中只包含 RPC 的层,不包含 Remoting 的层,Remoting 整体都隐含在 Protocol 中。
调用链
领域模型
在 Dubbo 的核心领域模型中:
- Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
- Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠拢,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
- Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
RPC传输层实现原理
RPC中Protocol 远程调用层是核心层,封装 RPC 调用。
所以,只要搞清楚Protocol的底层实现就可以了。
上面介绍说, Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,那具体是怎么实现的呢?
从dubbo给出的官方例子入手
启动一个服务端(provide)
GreetingsService.java:
public interface GreetingsService {
String sayHi(String name);
}
GreetingsServiceImpl.java:
public class GreetingsServiceImpl implements GreetingsService {
@Override
public String sayHi(String name) {
return "hi, " + name + " I am provider";
}
}
启动一个Provide
MainAppllication.java:
public class MainApplication {
public static void main(String[] args) {
// 定义具体的服务
ServiceConfig<GreetingsService> service = new ServiceConfig<>();
service.setInterface(GreetingsService.class);
service.setRef(new GreetingsServiceImpl());
// 启动 Dubbo
DubboBootstrap.getInstance()
.application("first-dubbo-provider")
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.protocol(new ProtocolConfig("dubbo", -1))
.service(service)
.start()
.await();
}
}
根据启动dubbo服务端的代码,整理了调用时序图如下:
所以从上图可看出,暴露服务的工作其实主要集中在ServiceConfig.doExportUrl(local,false)这个方法上。
Protocol是服务域,也就是Invoker的入口,是门面。
所以,this.protocolSPI.export((Invoker)invoker);才是真正执行暴露服务的底层方法。
protocolSPI的填充实例是AbstractProxyProtocol对象。
AbstractProxyProtocol类图表示如下:
我们再进入到里面的实现看看,做了些什么:
进入实现层,AbstractProxyProtocol.java 是个抽象类:
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter)this.exporterMap.get(uri);
if (exporter != null && Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
return exporter;
} else {
final Runnable runnable = this.doExport(this.proxyFactory.getProxy(new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public Result invoke(Invocation invocation) throws RpcException {
Map var10000 = RpcContext.getServiceContext().getObjectAttachments();
Objects.requireNonNull(invocation);
var10000.forEach(invocation::setObjectAttachment);
return invoker.invoke(invocation);
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public void destroy() {
invoker.destroy();
}
}, true), invoker.getInterface(), invoker.getUrl());
Exporter<T> exporter = new AbstractExporter<T>(invoker) {
public void afterUnExport() {
AbstractProxyProtocol.this.exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable var2) {
this.logger.warn("4-1", "", "", var2.getMessage(), var2);
}
}
}
};
this.exporterMap.put(uri, exporter);
return exporter;
}
}
代码分析
- final String uri = serviceKey(invoker.getUrl()); //从Map中取URL信息,用于配置客户端请求的协议信息(调用的类,方法,参数、协议等)
- final Runnable runnable = this.doExport(this.proxyFactory.getProxy(…)
这个方法是构建一个线程任务,通过调用AbstractProxyProtocol类的实例方法doExport(...)
得到,它的实现类默认是:GrpcProtocol.java
进入GrpcProtocol.java实现类:
protected <T> Runnable doExport(T proxiedImpl, Class<T> type, URL url) throws RpcException {
String key = url.getAddress();
ProtocolServer protocolServer = (ProtocolServer)this.serverMap.computeIfAbsent(key, (k) -> {
DubboHandlerRegistry registry = new DubboHandlerRegistry();
NettyServerBuilder builder = (NettyServerBuilder)NettyServerBuilder.forPort(url.getPort()).fallbackHandlerRegistry(registry);
Server originalServer = GrpcOptionsUtils.buildServerBuilder(url, builder).build();
GrpcRemotingServer remotingServer = new GrpcRemotingServer(originalServer, registry);
return new AbstractProxyProtocol.ProxyProtocolServer(this, remotingServer);
});
GrpcRemotingServer grpcServer = (GrpcRemotingServer)protocolServer.getRemotingServer();
FrameworkServiceRepository serviceRepository = this.frameworkModel.getServiceRepository();
ProviderModel providerModel = serviceRepository.lookupExportedService(url.getServiceKey());
if (providerModel == null) {
throw new IllegalStateException("Service " + url.getServiceKey() + "should have already been stored in service repository, but failed to find it.");
} else {
Object originalImpl = providerModel.getServiceInstance();
Class<?> implClass = originalImpl.getClass();
try {
Method method = implClass.getMethod("setProxiedImpl", type);
method.invoke(originalImpl, proxiedImpl);
} catch (Exception var12) {
throw new IllegalStateException("Failed to set dubbo proxied service impl to stub, please make sure your stub was generated by the dubbo-protoc-compiler.", var12);
}
grpcServer.getRegistry().addService((BindableService)originalImpl, url.getServiceKey());
if (!grpcServer.isStarted()) {
grpcServer.start();
}
return () -> {
grpcServer.getRegistry().removeService(url.getServiceKey());
};
}
}
可以看到,暴露服务的最底层实现是通过Netty实现的,这里先不讨论Netty服务的原理。
回来到抽象类的export方法,看这行代码:
生成代理,并实现一个Invoker实现,定义了invoke方法的调用(反射):
this.proxyFactory.getProxy(new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public Result invoke(Invocation invocation) throws RpcException {
Map var10000 = RpcContext.getServiceContext().getObjectAttachments();
Objects.requireNonNull(invocation);
var10000.forEach(invocation::setObjectAttachment);
return invoker.invoke(invocation);
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public void destroy() {
invoker.destroy();
}
}, true)
那基本上就了然了,就是说,dubbo通过netty作为底层服务通信,然后从netty服务中取得client端发起的请求信息后,再通过proxy进行反射调用Provide端的实现类。(export->proxy->proxyFactory->Invoker)
以上实现类在provider端会将实现地址和、参数和服务名分别注册本地会保存在exporterMap和serverMap中,而在🕊️中,我们设置了用于注册中心的Kafka地址,所以,这些服务、参数和用于反射的类、方法等信息都存储在Kafka中。只有在client端发起rpc调用时,通过netty传送给client端,拿到后才会触发Invoker反射调用(延迟调用的callback语法)。
javassist来说执行动态生成的Wrapper#invokeMethod
使用client端调用Provider接口
@Test
public void test() {
ReferenceConfig<GreetingsService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("first-dubbo-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(GreetingsService.class);
GreetingsService service = reference.get();
String message = service.sayHi("dubbo");
Assertions.assertEquals(message, "hi, dubbo");
}
代码分析:
1)声明客户端服务名称:first-dubbo-consumer
\2) 设置注册中心地址
3)获取GreetingsService的实现类(通过反射)
第三步中调用如下代码生成反射代理类:
private T createProxy(Map<String, String> referenceParameters) {
if (this.shouldJvmRefer(referenceParameters)) {
this.createInvokerForLocal(referenceParameters);
} else {
this.urls.clear();
this.meshModeHandleUrl(referenceParameters);
if (StringUtils.isNotEmpty(this.url)) {
this.parseUrl(referenceParameters);
} else if (!"injvm".equalsIgnoreCase(this.getProtocol())) {
this.aggregateUrlFromRegistry(referenceParameters);
}
this.createInvokerForRemote();
}
if (logger.isInfoEnabled()) {
logger.info("Referred dubbo service: [" + (String)referenceParameters.get("interface") + "]." + (Boolean.parseBoolean((String)referenceParameters.get("generic")) ? " it's GenericService reference" : " it's not GenericService reference"));
}
URL consumerUrl = new ServiceConfigURL("consumer", (String)referenceParameters.get("register.ip"), 0, (String)referenceParameters.get("interface"), referenceParameters);
URL consumerUrl = consumerUrl.setScopeModel(this.getScopeModel());
consumerUrl = consumerUrl.setServiceModel(this.consumerModel);
MetadataUtils.publishServiceDefinition(consumerUrl, this.consumerModel.getServiceModel(), this.getApplicationModel());
return this.proxyFactory.getProxy(this.invoker, ProtocolUtils.isGeneric(this.generic));
}
代码分析:
这里面目测最重要的就两个方法,this.createInvokerForLocal(referenceParameters);
和 this.createInvokerForRemote();
- this.createInvokerForLocal(referenceParameters):
private void createInvokerForLocal(Map<String, String> referenceParameters) {
URL url = new ServiceConfigURL("injvm", "127.0.0.1", 0, this.interfaceClass.getName(), referenceParameters);
URL url = url.setScopeModel(this.getScopeModel());
url = url.setServiceModel(this.consumerModel);
Invoker<?> withFilter = this.protocolSPI.refer(this.interfaceClass, url);
List<Invoker<?>> invokers = new ArrayList();
invokers.add(withFilter);
this.invoker = Cluster.getCluster(url.getScopeModel(), "failover").join(new StaticDirectory(url, invokers), true);
if (logger.isInfoEnabled()) {
logger.info("Using in jvm service " + this.interfaceClass.getName());
}
}
目测Invoker<?> withFilter = this.protocolSPI.refer(this.interfaceClass, url);
这段代码是从SPI协议类获取Invoker对象,进行反射,那么,与注册中心通信并获取反射需要的必要信息一定在这里面。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return this.protocolBindingRefer(type, url);
}
再往下看:
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
this.checkDestroyed();
this.optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
this.invokers.add(invoker);
return invoker;
}
在这个方法里,就已经获取到了DubboInvoker对象,那这个是关键代码:
DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
这段代码目测只有this.getClients(url)
跟通信有关,故再探:
private ExchangeClient[] getClients(URL url) {
int connections = url.getParameter("connections", 0);
if (connections == 0) {
String shareConnectionsStr = StringUtils.isBlank(url.getParameter("shareconnections", (String)null)) ? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), "shareconnections", "1") : url.getParameter("shareconnections", (String)null);
connections = Integer.parseInt(shareConnectionsStr);
List<ReferenceCountExchangeClient> shareClients = this.getSharedClient(url, connections);
ExchangeClient[] clients = new ExchangeClient[connections];
Objects.requireNonNull(shareClients);
Arrays.setAll(clients, shareClients::get);
return clients;
} else {
ExchangeClient[] clients = new ExchangeClient[connections];
for(int i = 0; i < clients.length; ++i) {
clients[i] = this.initClient(url);
}
return clients;
}
}
嗯,再目测看看getsharedClient
方法:
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
Object clients = this.referenceClientMap.get(key);
List typedClients;
if (clients instanceof List) {
typedClients = (List)clients;
if (this.checkClientCanUse(typedClients)) {
this.batchClientRefIncr(typedClients);
return typedClients;
}
}
typedClients = null;
synchronized(this.referenceClientMap) {
while(true) {
clients = this.referenceClientMap.get(key);
if (clients instanceof List) {
typedClients = (List)clients;
if (this.checkClientCanUse(typedClients)) {
this.batchClientRefIncr(typedClients);
return typedClients;
}
this.referenceClientMap.put(key, PENDING_OBJECT);
break;
}
if (clients != PENDING_OBJECT) {
this.referenceClientMap.put(key, PENDING_OBJECT);
break;
}
try {
this.referenceClientMap.wait();
} catch (InterruptedException var21) {
}
}
}
boolean var18 = false;
try {
var18 = true;
connectNum = Math.max(connectNum, 1);
if (CollectionUtils.isEmpty(typedClients)) {
typedClients = this.buildReferenceCountExchangeClientList(url, connectNum);
var18 = false;
} else {
for(int i = 0; i < typedClients.size(); ++i) {
ReferenceCountExchangeClient referenceCountExchangeClient = (ReferenceCountExchangeClient)typedClients.get(i);
if (referenceCountExchangeClient != null && !referenceCountExchangeClient.isClosed()) {
referenceCountExchangeClient.incrementAndGetCount();
} else {
typedClients.set(i, this.buildReferenceCountExchangeClient(url));
}
}
var18 = false;
}
} finally {
if (var18) {
synchronized(this.referenceClientMap) {
if (typedClients == null) {
this.referenceClientMap.remove(key);
} else {
this.referenceClientMap.put(key, typedClients);
}
this.referenceClientMap.notifyAll();
}
}
}
synchronized(this.referenceClientMap) {
if (typedClients == null) {
this.referenceClientMap.remove(key);
} else {
this.referenceClientMap.put(key, typedClients);
}
this.referenceClientMap.notifyAll();
return typedClients;
}
}
以上代码大意是,先从本地cache中拿,拿不到就从注册中心取,那只要看这段代码就行:
typedClients = this.buildReferenceCountExchangeClientList(url, connectNum);
准确的说,看这个方法就行,buildReferenceCountExchangeClient
:
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = this.initClient(url);
ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, "dubbo");
int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
client.setShutdownWaitTime(shutdownTimeout);
return client;
}
马上就要看到真相了,再看看initClient
方法:
private ExchangeClient initClient(URL url) {
String str = url.getParameter("client", url.getParameter("server", "netty"));
if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + ", supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
} else {
try {
URL url = new ServiceConfigURL("dubbo", url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
url = url.addParameter("codec", "dubbo");
url = url.addParameterIfAbsent("heartbeat", String.valueOf(60000));
return (ExchangeClient)(url.getParameter("lazy", false) ? new LazyConnectExchangeClient(url, this.requestHandler) : Exchangers.connect(url, this.requestHandler));
} catch (RemotingException var4) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + var4.getMessage(), var4);
}
}
}
这段代码说要拿netty服务器这些参数字段:String str = url.getParameter("client", url.getParameter("server", "netty"))
说明provider默认是netty服务。
再看这段代码:
return (ExchangeClient)(url.getParameter("lazy", false) ? new LazyConnectExchangeClient(url, this.requestHandler) : Exchangers.connect(url, this.requestHandler));
真相出来了。。。。。
Exchangers.connect(url, this.requestHandler)
:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
} else if (handler == null) {
throw new IllegalArgumentException("handler == null");
} else {
return getExchanger(url).connect(url, handler);
}
}
再进入HeaderExchanger类的connect方法:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}), true);
}
再看Transporters.connect()
方法:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
} else {
Object handler;
if (handlers != null && handlers.length != 0) {
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
} else {
handler = new ChannelHandlerAdapter();
}
return getTransporter(url).connect(url, (ChannelHandler)handler);
}
}
最后再进去getTransporter(url).connect(url, (ChannelHandler)handler)
这个实现,发现大部分都是Netty的实现:
通过调试,发现它的实现是:org.apache.dubbo.remoting.transport.netty4.NettyClient.java
- this.createInvokerForRemote():
private void createInvokerForRemote() {
if (this.urls.size() == 1) {
URL curUrl = (URL)this.urls.get(0);
this.invoker = this.protocolSPI.refer(this.interfaceClass, curUrl);
if (!UrlUtils.isRegistry(curUrl) && !curUrl.getParameter("unloadClusterRelated", false)) {
List<Invoker<?>> invokers = new ArrayList();
invokers.add(this.invoker);
this.invoker = Cluster.getCluster(this.getScopeModel(), "failover").join(new StaticDirectory(curUrl, invokers), true);
}
} else {
List<Invoker<?>> invokers = new ArrayList();
URL registryUrl = null;
Iterator var3 = this.urls.iterator();
while(var3.hasNext()) {
URL url = (URL)var3.next();
invokers.add(this.protocolSPI.refer(this.interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryUrl = url;
}
}
if (registryUrl != null) {
String cluster = registryUrl.getParameter("cluster", "zone-aware");
this.invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
} else {
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
URL curUrl = ((Invoker)invokers.get(0)).getUrl();
String cluster = curUrl.getParameter("cluster", "failover");
this.invoker = Cluster.getCluster(this.getScopeModel(), cluster).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
local和remote方法大同小异,createInvoderForLocal()是创建本地调用的反射Invoker,而createInvokerForRemote()就是创建远程调用的反射Invoker。
那么,知道了通讯层是netty,那我再学习一下netty的通讯原理就行了。
netty通信原理
待更。。。。。。