Dubbo原理和源码剖析之服务揭露,服务发表流程分
分类:计算机编程

在合法《Dubbo 客商指南》架构部分,给出了服务调用的一体化架商谈流程:

github新扩张宾馆 "dubbo-read"(点此查看),集合全部《Dubbo原理和源码分析》连串小说,后续将持续补充该连串,同时将针对Dubbo所做的功效扩大也进展分享。不定时更新,接待Follow。

正文将详细深入分析Dubbo的劳务发表流程,建议组成作品Dubbo SPI 机制深入分析一同读书。

注:文章中央银行使的dubbo源码版本为2.5.4

图片 1

 

在起先深入分析在此以前,有必得熟识一下Dubbo源码的目录结构,乃至各模块的功用。

零、服务公布的指标

服务提供者向登记中央登记服务,将劳动完成类以服务接口的格局提供出去,以便服务花费者从注册中央查阅并调用服务。

别的,在官方《Dubbo 开垦指南》框架设计某个,给出了整机设计:

一、框架设计

在官方《Dubbo 顾客指南》架构部分,给出了服务调用的完全架商谈流程:

图片 2

 

另外,在官方《Dubbo 开荒指南》框架设计有个别,给出了总体安排:

图片 3

乃至暴光服务时序图:

图片 4

 

正文将依照上述几张图,深入分析服务暴光的贯彻原理,并张开详尽的代码追踪与深入分析。

图片 5模块表明:dubbo-common 公共逻辑模块:包罗 Util 类和通用模型。dubbo-remoting 远程通信模块:也正是 Dubbo 商业事务的贯彻,假设 RPC 用 RMI合同则无需选用此包。dubbo-rpc 远程调用模块:抽象各样协商,乃至动态代理,只含有一对一的调用,不爱戴集群的治本。dubbo-cluster 集群模块:将几个劳务提供方伪装为贰个提供方,富含:负载均衡, 容错,路由等,集群的地址列表能够是静态配置的,也得以是由登记核心下发。dubbo-registry 注册中央模块:基于注册中央发出地址的集群方式,以致对各样注册中央的抽象。dubbo-monitor 监察和控制模块:总括服务调用次数,调用时间的,调用链追踪的服务。dubbo-config 配置模块:是 Dubbo 对外的 API,客户通过 Config 使用Dubbo,隐敝 Dubbo 全体细节。dubbo-container 容器模块:是二个 Standlone 的容器,以简单的 Main 加载 Spring 运转,因为劳动普通无需 汤姆cat/JBoss 等 Web 容器的特点,没供给用 Web 容器去加载服务。

一、服务公布入口

图片 6

二、原理和源码分析

Spring 对外留出的扩大

dubbo是基于spring 配置来兑现劳务的颁发的,那么确定是依据spring的扩展来写了一套本人的竹签。在dubbo配置文件中看看的<dubbo:service> ,正是属于自定义扩大标签。

要促成自定义扩充,有七个步骤(在spring中定义了三个接口,用来兑现扩充)1.NamespaceHandler: 注册一群BeanDefinitionParser,利用他们来开展深入分析2.BeanDefinitionParser:用于剖析每一个element的内容3.Spring暗许会加载jar包下的META-INF/spring.handlers文件搜索对应的NamespaceHandler。

以下是Dubbo-config模块下dubbo-config-spring的配置:

图片 7

约等于说会通过DubboNamespaceHandler去剖判dubbo自定义的标签。DubboBeanDefinitionParser用于把不一致的配备分别转化成spring容器中的bean对象。

public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); }}

为了在spring运行的时候,也呼应的开发银行了发表服务和挂号服务的经过,而还要为了让客户端在开行的时候自动订阅发现服务,参与了八个bean ServiceBean、ReferenceBean。分别继承了ServiceConfig和ReferenceConfig。并分别完结了InitializingBean、DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware。

InitializingBean为bean提供了伊始化方法的不二诀要,它只包罗afterPropertiesSet方法,凡是承继该接口的类,在伊始化bean的时候会施行该方法。DisposableBean bean被销毁的时候,spring容器会活动实行destory方法,举例释放财富。ApplicationContextAware 达成了那么些接口的bean,当spring容器伊始化的时候,会自动的将ApplicationContext注入进来。ApplicationListener ApplicationEvent事件监听,spring容器运行后会发一个平地风波通报。BeanNameAware 获得自己起初化时,本人的bean的id属性。

因而能够见到,Dubbo 的劳务发布流程的达成思路是:1.使用spring的剖析收集xml中的配置音信,然后把那个布署音讯囤积到serviceConfig中。2.调用瑟维斯Config的export方法来进展服务的公布和登记。

1.1 Spring配置及ServiceBean映射

劳动公布方在工程中会有如下Spring配置

图片 8

劳动发表的spring配置

当中demoService为Spirng中布局服务的求实贯彻,即Spring中的一个Bean

<bean id="demoService"class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />

而对于下方配置,spring容器在运维的进程中会分析自定义的schema元素dubbo:service将其改换为实在的布局完结ServiceBean ,并把服务暴暴露来

<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref=”demoService”/>

以至揭露服务时序图:

2.1 标签分析

从文章《Dubbo原理和源码分析之标签深入分析》中我们知道,<dubbo:service> 标签会被深入分析成 ServiceBean。

ServiceBean 实现了 InitializingBean,在类加载成功现在会调用 afterPropertiesSet() 方法。在 afterPropertiesSet() 方法中,依次深入分析以下标签音信:

  • <dubbo:provider>
  • <dubbo:application>
  • <dubbo:module>
  • <dubbo:registry>
  • <dubbo:monitor>
  • <dubbo:protocol>

ServiceBean 还完结了 ApplicationListener,在 Spring 容器最早化的时候会调用 onApplication伊芙nt 方法。ServiceBean 重写了 onApplication伊夫nt 方法,达成了劳务揭示的作用。

ServiceBean.java

public void onApplicationEvent(ApplicationEvent event) {
    if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
        if (isDelay() && ! isExported() && ! isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: "   getInterface());
            }
            export();
        }
    }
}

Spring容器开头化调用

当Spring容器实例化bean完成,走到结尾一步发表ContextRefresh伊夫nt事件的时候,ServiceBean会实施onApplication伊夫nt方法,该办法调用ServiceConfig的export方法。

ServiceConfig起首化的时候,会先起头化静态变量protocol和proxyFactory,那七个变量初叶化的结果是由此dubbo的spi扩张机制得到的。

生成的protocol实例是:

package com.alibaba.dubbo.rpc;import com.alibaba.dubbo.common.extension.ExtensionLoader;public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol; if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("   url.toString use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension; return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); //根据URL配置信息获取Protocol协议,默认是dubbo String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol; if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("   url.toString use keys([protocol])"); //根据协议名,获取Protocol的实现 //获得Protocol的实现过程中,会对Protocol先进行依赖注入,然后进行Wrapper包装,最后返回被修改过的Protocol //包装经过了ProtocolFilterWrapper,ProtocolListenerWrapper,RegistryProtocol com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension; return extension.export; } public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); }}

生成的proxyFactory实例:

package com.alibaba.dubbo.rpc;import com.alibaba.dubbo.common.extension.ExtensionLoader;public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("   url.toString use keys"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension; return extension.getInvoker(arg0, arg1, arg2); } public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("   url.toString use keys"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension; return extension.getProxy; }}

转移的代码中得以看看,暗许的Protocol达成是dubbo,默许的proxy是javassist。

1.2 ServiceBean

图片 9

ServiceBean

类协会与onApplicationEvent回调:
ServiceBean除了继续dubbo自个儿的陈设类ServiceConfig以外,还达成了一多重的spring接口用来参加到spring容器的开发银行以致bean创造进度中。个中包罗ApplicationListener。

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: "   getInterface());
                }
                export();
            }
        }
    }

Spring容器ApplicationContext的起步最后一步会触发ContextRefreshed伊夫nt事件, 而ServiceBean实现了ApplicationListener接口监听那一件事件,触发onApplication伊夫nt(Application伊夫nt event)方法,在这几个情势中触发export方法来揭露服务。

含有的习性:
ServiceBean的父类ServiceConfig中隐含了相当多配置属性,这几个属性通过Spring配置注入赋值。

private transient String beanName;  //bean名称,对应xml中的id
private String interfaceName;  //接口名称,对应xml中的interface
private Class<?> interfaceClass;  //通过Class.forName(interfaceName)生成
private T ref;  //  接口实现类引用,对应xml中的ref
protected List<ProtocolConfig> protocols;  //协议列表
protected List<RegistryConfig> registries;  //注册中心列表
private final List<Exporter<?>> exporters;  //已发布服务列表
private final List<URL> urls;  //已发布服务地址列表
private static final Protocol protocol;
private static final ProxyFactory proxyFactory;

图片 10

2.2 延迟暴光

ServiceBean 扩大了 瑟维斯Config,调用 export() 方法时由 ServiceConfig 实现劳动揭发的职能达成。

ServiceConfig.java

public synchronized void export() {
    if (provider != null) {
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && ! export.booleanValue()) {
        return;
    }
    if (delay != null && delay > 0) {
        Thread thread = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(delay);
                } catch (Throwable e) {
                }
                doExport();
            }
        });
        thread.setDaemon(true);
        thread.setName("DelayExportServiceThread");
        thread.start();
    } else {
        doExport();
    }
}

由地点代码可以知道,若是设置了 delay 参数,Dubbo 的管理模式是运营多个照应线程在 sleep 钦赐时期后再 doExport。

ServiceConfig的export

export的步骤简单介绍

  • 先是会检查各样配置音讯,填充各种质量,同理可得就是保险小编在最早展露服务在此之前,所有事物都策动好了,何况是不易的。
  • 加载全体的注册焦点,因为大家揭穿服务供给注册到注册中央中去。
  • 基于布署的全体合同和登记中央url分别实行导出。
  • 开展导出的时候,又是一波属性的取得设置检查等操作。
  • 一旦布置的不是remote,则做本地导出。
  • 只要安排的不是local,则暴露为远程服务。
  • 甭管是当地照旧长途服务暴露,首先都会取得Invoker。
  • 获得完Invoker之后,调换到对外的Exporter,缓存起来。

export方法先剖断是不是须求延期揭破(Thread.sleep,然后实践doExport方法。

 public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep; } catch (Throwable e) { } doExport; thread.setDaemon; thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); }

doExport方法先实行一连串的检讨措施,然后调用doExportUrls方法。检查方法会检验dubbo的安顿是不是在Spring配置文件中声称,未有的话读取properties文件初叶化。

doExportUrls方法先调用loadRegistries获取具备的挂号中央url,然后遍历调用doExportUrlsFor1Protocol形式。对于在标签中钦命了registry属性的Bean,会在加载BeanDefinition的时候就加载了注册中央。

收获注册主题url,会把注册的音讯都放在八个U奥迪Q7L对象中,多个U瑞鹰L内容如下:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&organization=&owner=&pid=2939&registry=zookeeper&timestamp=1488898049284

doExportUrlsFor1Protocol基于区别的情商将劳动以U凯雷德L格局暴光。如若scope配置为none则不暴露,如若服务未配备成remote,则地面暴露exportLocal,要是未布署成local,则注册服务registryProcotol。

这里的URL是:

dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&application.version=1.0&delay=5000&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=&owner=&pid=2939&side=provider&timestamp=1488898464953

二、多少个主要概念:

本文将依照上述几张图,深入分析服务暴光的贯彻原理,并展开详细的代码追踪与深入分析。

2.3 参数检查

在 瑟维斯Config 的 doExport() 方法中会举行参数检查和装置,包罗:

  • 泛化调用
  • 地面达成
  • 本地存根
  • 本地伪装
  • 配置(application、registry、protocol等)

ServiceConfig.java

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("Already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!");
    }
    checkDefault();
    //省略
    if (ref instanceof GenericService) {
        interfaceClass = GenericService.class;
        generic = true;
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, methods);
        checkRef();
        generic = false;
    }
    if(local !=null){
        if(local=="true"){
            local=interfaceName "Local";
        }
        Class<?> localClass;
        try {
            localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if(!interfaceClass.isAssignableFrom(localClass)){
            throw new IllegalStateException("The local implemention class "   localClass.getName()   " not implement interface "   interfaceName);
        }
    }
    if(stub !=null){
        if(stub=="true"){
            stub=interfaceName "Stub";
        }
        Class<?> stubClass;
        try {
            stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if(!interfaceClass.isAssignableFrom(stubClass)){
            throw new IllegalStateException("The stub implemention class "   stubClass.getName()   " not implement interface "   interfaceName);
        }
    }
    //此处省略:检查并设置相关参数
    doExportUrls();
}

地方揭穿

此时会先做当地揭破,exportLocal:

private void exportLocal { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol { //这时候转成本地暴露的url:injvm://127.0.0.1/dubbo.common.hello.service.HelloService?anyhost=true& //application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product& //interface=dubbo.common.hello.service.HelloService&methods=sayHello URL local = URL.valueOf(url.toFullString .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(NetUtils.LOCALHOST) .setPort; //首先还是先获得Invoker //然后导出成Exporter,并缓存 //这里的proxyFactory实际是JavassistProxyFactory //有关详细的获得Invoke以及exporter会在下面的流程解析,在本地暴露这个流程就不再说明。 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref,  interfaceClass, local)); exporters.add; logger.info("Export dubbo service "   interfaceClass.getName()  " to local registry"); }}

2.1 Invoker

public interface Invoker<T> extends Node {

    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;

}

可施行对象的画饼充饥,能够依照办法的名称、参数拿到相应的进行结果。
Invoker可分为三类:

  • AbstractProxyInvoker:本地推行类的Invoker,实际通过Java反射的点子推行原始对象的主意
  • AbstractInvoker:远程通讯类的Invoker,实际通过通讯合同发起远程调用诉求并收取响应
  • AbstractClusterInvoker:三个长途通讯类的Invoker聚合成的集群版Invoker,参与了集群容错和负载均衡计谋

    图片 11

    Invoker继承图

Invocation:富含了须求推行的主意和参数等入眼音讯,他有多少个达成类WranglerpcInvocation和MockInvocation。

从文章《Dubbo原理和源码剖判之标签深入分析》中大家知道,<dubbo:service> 标签会被分析成 ServiceBean。

2.4 多左券、多注册中央

在检讨完参数之后,开头展露服务。Dubbo 扶持多协议和多注册中央:

ServiceConfig.java

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

暴光为远程服务

接下去是揭露为远程服务,跟本地暴光的流水生产线同样依旧先取得Invoker,然后导出成Exporter:

//根据服务具体实现,实现接口,以及registryUrl通过ProxyFactory将HelloServiceImpl封装成一个本地执行的Invoker//invoker是对具体实现的一种代理。//这里proxyFactory是上面列出的生成的代码 Invoker<?> invoker = proxyFactory.getInvoker(ref,  interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString; //使用Protocol将invoker导出成一个Exporter //暴露封装服务invoker //调用Protocol生成的适配类的export方法 //这里的protocol是上面列出的生成的代码 Exporter<?> exporter = protocol.export;

至于Invoker,Exporter等的演说参见最上面包车型地铁内容。

2.2 ProxyFactory

public interface ProxyFactory {

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create invoker.
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

劳务接口代理抽象,用于转移贰个接口的代理类。
getInvoker方法:针对Server端,将劳动对象(如德姆oServiceImpl)包装成两个Invoker对象。
getProxy方法:针对Client端,创立接口(如德姆o瑟维斯)的代理对象。

瑟维斯Bean 实现了 InitializingBean,在类加载成功之后会调用 afterPropertiesSet() 方法。在 afterPropertiesSet() 方法中,依次解析以下标签音信:

2.5 组装URL

本着各种合同、各样注册中央,初阶创建 U宝马X5L。

ServiceConfig.java

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    //处理host

    //处理port

    Map<String, String> map = new HashMap<String, String>();
    //设置参数到map

    // 导出服务
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath   "/")   path, map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    //此处省略:服务暴露(详见 2.6 和 2.7)

    this.urls.add(url);
}

纸包不住火远程服务时的拿走Invoker进度

服务完成类转变到Invoker,大约的步子是:

  • 依据上面生成的proxyFactory方法调用具体的ProxyFactory达成类的getInvoker方法获取Invoker。
  • getInvoker的历程是,首先对贯彻类做多个打包,生成八个卷入后的类。
  • 下一场新创造三个Invoker实例,那么些Invoker中隐含着调换的Wrapper类,Wrapper类中有切实可行的贯彻类。
Invoker<?> invoker = proxyFactory.getInvoker(ref,  interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString;

那行代码中含有服务完成类调换到Invoker的历程,在那之中proxyFactory是地点列出的动态变化的代码,当中getInvoker的代码为:

public Invoker getInvoker(Object arg0, Class arg1, URL arg2) throws Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); //传进来的url是dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider //&application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello URL url = arg2; //没有proxy参数配置,默认使用javassist String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(ProxyFactory) name from url("   url.toString use keys"); //这一步就使用javassist来获取ProxyFactory的实现类JavassistProxyFactory ProxyFactory extension = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension; //JavassistProxyFactory的getInvoker方法 return extension.getInvoker(arg0, arg1, arg2);}

2.3 Exporter

public interface Exporter<T> {

    /**
     * get invoker.
     *
     * @return invoker
     */
    Invoker<T> getInvoker();

    /**
     * unexport.
     * <p>
     * <code>
     * getInvoker().destroy();
     * </code>
     */
    void unexport();

}

保证Invoker的生命周期,内部含有Invoker大概ExporterMap。

  • <dubbo:provider>
  • <dubbo:application>
  • <dubbo:module>
  • <dubbo:registry>
  • <dubbo:monitor>
  • <dubbo:protocol>

2.6 本地暴光

假诺布署 scope=none, 则不会进展服务暴光;若无配置 scope 或许scope=local,则会开展本地暴露。

ServiceConfig.java

//public static final String LOCAL_PROTOCOL = "injvm";
//public static final String LOCALHOST = "127.0.0.1";

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //......
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置为none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        //......
    }
    //......
}

private void exportLocal(URL url) {
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(NetUtils.LOCALHOST)
                .setPort(0);
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service "   interfaceClass.getName()  " to local registry");
    }
}

class="token md md-strong"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token blockquote ace_string ace_constant ace_other"> class="token md md-gt"> class="token md md-li">1. 揭露服务的时候,会因而代理创设 class="token strong ace_keyword ace_strong"> class="token md md-strong">Invoker class="token md md-strong">;

class="token md md-strong"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token md md-code"> class="token lf"> class="token md md-li"> class="token code ace_support ace_function"> class="token md md-code"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token blockquote ace_string ace_constant ace_other"> class="token md md-gt"> class="token md md-li"> class="token strong ace_keyword ace_strong"> class="token md md-strong"> class="token lf"> class="token blockquote ace_string ace_constant ace_other"> class="token md md-gt"> class="token md md-li">2. 地点暴光时选用 class="token strong ace_keyword ace_strong"> class="token md md-strong">injvm class="token md md-strong"> 协议, class="token strong ace_keyword ace_strong"> class="token md md-strong">injvm class="token md md-strong"> 公约是贰个伪合同,它不开启端口,无法被远程调用,只在 class="token md md-strong">JVM class="token md md-strong"> 内平昔关联,但施行 class="token strong ace_keyword ace_strong"> class="token md md-strong">Dubbo 的 class="token md md-strong">Filter class="token md md-strong"> 链。

使用JavassistProxyFactory获取Invoker

JavassistProxyFactory的getInvoker方法:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 //第一步封装一个Wrapper类 //该类是手动生成的 //如果类是以$开头,就使用接口类型获取,其他的使用实现类获取 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf < 0 ? proxy.getClass; //返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod //关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } };}

生成wrapper类的长河,首先看getWrapper方法:

public static Wrapper getWrapper(Class<?> c){ while( ClassGenerator.isDynamicClass // can not wrapper on dynamic class. c = c.getSuperclass(); //Object类型的 if( c == Object.class ) return OBJECT_WRAPPER; //先去Wrapper缓存中查找 Wrapper ret = WRAPPER_MAP.get; if( ret == null ) { //缓存中不存在,生成Wrapper类,放到缓存 ret = makeWrapper; WRAPPER_MAP.put; } return ret;}

makeWrapper方法代码不在列出,太长了。正是生成二个继承自Wrapper的类,最终的结果差相当少是:

public class Wrapper1 extends Wrapper { public static String[] pns; public static Map pts; public static String[] mns; // all method name array. public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; } public boolean hasProperty { return pts.containsKey; } public Class getPropertyType { return  pts.get; } public String[] getMethodNames() { return mns; } public String[] getDeclaredMethodNames() { return dmns; } public void setPropertyValue(Object o, String n, Object v) { dubbo.provider.hello.service.impl.HelloServiceImpl w; try { w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException; } throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property ""   $2   "" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl."); } public Object getPropertyValue(Object o, String n) { dubbo.provider.hello.service.impl.HelloServiceImpl w; try { w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException; } throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property ""   $2   "" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl."); } public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { dubbo.provider.hello.service.impl.HelloServiceImpl w; try { w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException; } try { if ("sayHello".equals && $3.length == 0) { w.sayHello(); return null; } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException; } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method ""   $2   "" in class dubbo.provider.hello.service.impl.HelloServiceImpl."); }}

变动完Wrapper现在,再次回到三个AbstractProxyInvoker实例。至此生成Invoker的手续就成功了。能够看看Invoker推行办法的时候,会调用Wrapper的invoke(),那个措施中会有忠实的兑现类调用真实方法的代码。

2.4 Protocol

@SPI("dubbo")
public interface Protocol {

    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:
     * @param <T>     服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:
     * @param <T>  服务的类型
     * @param type 服务的类型
     * @param url  远程服务的URL地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议
     */
    void destroy();

}

合计抽象接口。封装RPC调用。
exporter方法:揭露远程服务(用于服务端),便是将Invoker对象通过协商揭发给外界。
refer方法:引用远程服务(用于顾客端),通过Clazz、url等音讯创立远程的动态代理Invoker。

ServiceBean 还落到实处了 ApplicationListener,在 Spring 容器伊始化的时候会调用 onApplication伊夫nt 方法。ServiceBean 重写了 onApplicationEvent 方法,达成了劳动暴露的功力。

2.7 远程揭破

假设未有安顿scope 只怕scope=remote,则会进行远程揭露。

ServiceConfig.java

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置为none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        //......
        //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
        if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service "   interfaceClass.getName()   " to url "   url);
            }
            if (registryURLs != null && registryURLs.size() > 0
                    && url.getParameter("register", true)) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service "   interfaceClass.getName()   " url "   url   " to registry "   registryURL);
                    }
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                Exporter<?> exporter = protocol.export(invoker);
                exporters.add(exporter);
            }
        }
    }
}

在劳务揭示时,有二种情状:

  • 不使用登记核心:直接暴光对应合同的服务,引用服务时只可以因而直连格局援用
  • 采纳注册中央:暴光对应公约的劳动后,会将服务节点注册到注册主旨,援用服务时方可透过注册主题动态获取服务提供者列表,也能够因此直连情势援用

使用JdkProxyFactory获取invoker

JdkProxyFactory的getInvoker方法

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } };}

直接重临八个AbstractProxyInvoker实例,未有做处理,只是利用反射调用具体的办法。

JdkProxyFactory的getProxy方法:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return  Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler;}

利用Java的反射机制生成叁个代理类。

2.5 关系图

图片 12

服务发表有关接口关系图

1)ServiceConfig包含ProxyFactoryProtocol,通过SPI的情势注入生成;
2)ProxyFactory担任成立Invoker
3)Protocol担当通过Invoker生成Exporter,将服务运行并揭穿;

ServiceBean.java

2.8 揭穿服务

不利用登记中心时,直接调用对应协议(如 Dubbo 合同)的 export() 揭露服务。以 Dubbo 左券为例:

DubboProtocol.java

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispaching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer ["  url.getParameter(Constants.INTERFACE_KEY)  
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);

    return exporter;
}

调用 openServer() 方法创建并运转 Server:

DubboProtocol.java

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //server支持reset,配合override功能使用
            server.reset(url);
        }
    }
}

private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: "   str   ", url: "   url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: "   url   ") "   e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: "   str);
        }
    }
    return server;
}

Exchanger (默许 HeaderExchanger)封装央浼响应情势,同步转异步,以 Request、Response 为核心:

HeaderExchager.java

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters.java

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().bind(url, handler);
}

底层传输暗中同意使用 NettyTransporter,最后是制造 NettyServer:

NettyTransporter.java

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

NettyServer.java

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

AbstractServer.java

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start "   getClass().getSimpleName()   " bind "   getBindAddress()   ", export "   getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind "   getClass().getSimpleName() 
                                          " on "   getLocalAddress()   ", cause: "   t.getMessage(), t);
        }
        if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }
    }
}

NettyServer.java

public class NettyServer extends AbstractServer implements Server {
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
}

揭示远程服务时导出Invoker为Exporter

Invoker导出为Exporter分为三种情景,第一种是Registry类型的Invoker,第二种是别的协商项目标Invoker,分开深入分析。

代码入口:

Exporter<?> exporter = protocol.export;

三、服务发布流程详解:

public void onApplicationEvent(ApplicationEvent event) { if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName { if  && ! isExported() && ! isUnexported { if (logger.isInfoEnabled { logger.info("The service ready on spring started. service: "   getInterface; } export(); } }}

2.9 服务注册

即便应用了注册中央,则在经过切实磋商(如 Dubbo 公约)揭露服务之后(即在 2.8 基础之上)走入劳动注册流程,将劳动节点注册到注册主旨。

RegistryProtocol.java

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //向Zookeeper注册节点
    registry.register(registedProviderUrl);
    // 订阅override数据
    // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保证每次export都返回一个新的exporter实例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                overrideListeners.remove(overrideSubscribeUrl);
                registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    };
}

private Registry getRegistry(final Invoker<?> originInvoker){
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryFactory.getRegistry(registryUrl);
}

getRegistry() 方法依照登记中央项目(私下认可 Zookeeper)获取注册中央客商端,由注册中央客商端实例来举行真正的服务登记。

挂号中央顾客端将节点注册到注册中央,相同的时间订阅对应的 override 数据,实时监听服务的习性变动完成动态配置效应。

最后回到的 Exporter 达成了 unexport() 方法,那样在劳务下线时清理相关财富。

 

至此,服务暴露流程结束。

 

 

 

 

Registry类型的Invoker管理进度

差不离的步调是:

  • 由此三个决不做其余处理的Wrapper类,然后达到RegistryProtocol中。
  • 因此实际的构和导出Invoker为Exporter。
  • 挂号服务到注册大旨。
  • 订阅注册中央的劳务。
  • 转移三个新的Exporter实例,将方面包车型客车Exporter进行引进,然后回来。protocol是地点列出的动态变化的代码,会先调用ProtocolListenerWrapper,那些Wrapper担负开始化揭发和引用服务的监听器。对于Registry类型的不做拍卖,代码如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //registry类型的Invoker,不需要做处理 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //非Registry类型的Invoker,需要被监听器包装 return new ListenerExporterWrapper<T>(protocol.export, Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}

接着调用ProtocolFilterWrapper中的export方法,ProtocolFilterWrapper负担初步化invoker全数的Filter。那一个类特别关键,dubbo机制里面日志记录、超时等等效能都是在这里一片段实现的。那一个类有3个特征:1.它有二个参数为Protocol protocol的构造函数;2.它完毕了Protocol接口;3.它利用义务链方式,对export和refer函数进行了包装。

echo=com.alibaba.dubbo.rpc.filter.EchoFiltergeneric=com.alibaba.dubbo.rpc.filter.GenericFiltergenericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFiltertoken=com.alibaba.dubbo.rpc.filter.TokenFilteraccesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilteractivelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilterclassloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFiltercontext=com.alibaba.dubbo.rpc.filter.ContextFilterconsumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilterexception=com.alibaba.dubbo.rpc.filter.ExceptionFilterexecutelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFiltercompatible=com.alibaba.dubbo.rpc.filter.CompatibleFiltertimeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter

那其间提到到无数作用,包括权力验证、至极、超时、总结调用时间等都在这个类达成。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //Registry类型的Invoker不做处理 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //非Registry类型的Invoker需要先构建调用链,然后再导出 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}

此间大家先深入分析的是Registry类型的Invoker,接着就能调用RegistryProtocol的export方法,RegistryProtocol担当登记服务到注册中央和向登记中央订阅服务。代码如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker //这里就交给了具体的协议去暴露服务(先不解析,留在后面,可以先去后面看下导出过程) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider //根据invoker中的url获取Registry实例 //并且连接到注册中心 //此时提供者作为消费者引用注册中心核心服务RegistryService final Registry registry = getRegistry(originInvoker); //注册到注册中心的URL final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //调用远端注册中心的register方法进行服务注册 //若有消费者订阅此服务,则推送消息让消费者引用此服务。 //注册中心缓存了所有提供者注册的服务以供消费者发现。 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); //提供者向注册中心订阅所有注册服务的覆盖配置 //当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务,这由管理页面完成。 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 //返回暴露后的Exporter给上层ServiceConfig进行缓存,便于后期撤销暴露。 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage; } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage; } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage; } } };}

3.1 简洁流程图

图片 13

劳动发表简洁流程图

ServiceBean 增加了 ServiceConfig,调用 export() 方法时由 ServiceConfig 完毕服务暴露的意义完毕。

付出具体的说道去暴光服务

先不分析,留在前面,能够先去后面看下导出进度,然后再回来接着看登记到注册中央的历程。具体磋商暴露服务重大是开采服务器和端口,举办监听。

3.2 公布入口

ServiceBean监听入口:

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: "   getInterface());
                }
                export();
            }
        }
    }

ServiceBean的export()方法内部最后会实施到ServiceConfig的doExportUrls()方法-->

ServiceConfig试行业发布布:
1)加载全体注册中心UENCOREL
2)遍历全体Protocol,进行发布

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrls()方法内部最终施行下边多个关键步骤,即 “本地发布”“远程公布” -->

ServiceConfig.java

连接注册宗旨并收获Registry实例

实际的左券举办揭破况兼再次回到了一个ExporterChangeableWrapper之后,接下去看下一步连接注册中央并注册到注册宗旨,代码是在RegistryProtocol的export方法:

//先假装此步已经分析完final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);//得到具体的注册中心,连接注册中心,此时提供者作为消费者引用注册中心核心服务RegistryServicefinal Registry registry = getRegistry(originInvoker);final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//调用远端注册中心的register方法进行服务注册//若有消费者订阅此服务,则推送消息让消费者引用此服务registry.register(registedProviderUrl);//提供者向注册中心订阅所有注册服务的覆盖配置registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);//返回暴露后的Exporter给上层ServiceConfig进行缓存return new Exporter<T>() {。。。}

getRegistry(originInvoker)方法:

//根据invoker的地址获取registry实例private Registry getRegistry(final Invoker<?> originInvoker){ //获取invoker中的registryUrl URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol { //获取registry的值,这里获得是zookeeper,默认值是dubbo String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); //这里获取到的url为: //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-provider&application.version=1.0&dubbo=2.5.3& //environment=product&export=dubbo://192.168.1.100:20880/ //dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product& //interface=dubbo.common.hello.service.HelloService&methods=sayHello& //organization=china&owner=cheng.xi&pid=9457&side=provider×tamp=1489807681627 registryUrl = registryUrl.setProtocol.removeParameter(Constants.REGISTRY_KEY); } //根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry return registryFactory.getRegistry(registryUrl);}

那边的registryFactory是动态变化的代码,如下:

import com.alibaba.dubbo.common.extension.ExtensionLoader;public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory { public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol; if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url("   url.toString use keys([protocol])"); com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension; return extension.getRegistry; }}

于是这里registryFactory.getRegistry(registryUrl)用的是ZookeeperRegistryFactory。

先看下getRegistry方法,会开采该方法会在AbstractRegistryFactory中落到实处:

public Registry getRegistry { url = url.setPath(RegistryService.class.getName .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); //这里key为: //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService String key = url.toServiceString(); // 锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { //先从缓存中获取Registry实例 Registry registry = REGISTRIES.get; if (registry != null) { return registry; } //创建registry,会直接new一个ZookeeperRegistry返回 //具体创建实例是子类来实现的 registry = createRegistry; if (registry == null) { throw new IllegalStateException("Can not create registry "   url); } //放到缓存中 REGISTRIES.put(key, registry); return registry; } finally { // 释放锁 LOCK.unlock(); }}

createRegistry;是在子类中落到实处的,这里是ZookeeperRegistry,首先必要通过AbstractRegistry的组织:

public AbstractRegistry { //url保存起来 setUrl; // 启动文件保存定时器 // syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); //保存的文件为: ///home/xxx/.dubbo/dubbo-registry-127.0.0.1.cache String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home")   "/.dubbo/dubbo-registry-"   url.getHost()   ".cache"); File file = null; if (ConfigUtils.isNotEmpty) { file = new File; if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists{ if(! file.getParentFile().mkdirs{ throw new IllegalArgumentException("Invalid registry store file "   file   ", cause: Failed to create directory "   file.getParentFile; } } } this.file = file; //加载文件中的属性 loadProperties(); //通知订阅 notify(url.getBackupUrls;}

3.3 本地发布:

就要服务宣布花费地可调用的劳务。

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }

    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service "   interfaceClass.getName()   " to local registry");
        }
    }

重点:Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
代码中的proxyFactory为通过ExtensionLoader动态生成的JavassistProxyFactory
代码中的protocol为通过ExtensionLoader动态生成的InjvmProtocol

...ExtensionLoader相关原理会在三番五次小说特别说解...

JavassistProxyFactory创建Invoker:
通过JavassistProxyFactory创建(new)了一个AbstractProxyInvoker的完成,在那之中间通过Java反射的艺术推行原始对象proxy的不二法门。

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

InjvmProtocol.export:
new了一个InjvmExporter。就是唯有的将url、Exporter归入exporterMap中。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue { return; } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep; } catch (Throwable e) { } doExport; thread.setDaemon; thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); }}

得到Registry时的订阅

notify()方法:

protected void notify(List<URL> urls) { if(urls == null || urls.isEmpty return; //getSubscribed()方法获取订阅者列表 //订阅者Entry里每个URL都对应着n个NotifyListener for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet { URL url = entry.getKey(); if(! UrlUtils.isMatch(url, urls.get { continue; } Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { //通知每个监听器 notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) {} } } }}

notify(url, listener, filterEmpty(url, urls));代码:

protected void notify(URL url, NotifyListener listener, List<URL> urls) { Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch { //分类 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get; if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add; } } if (result.size { return; } Map<String, List<URL>> categoryNotified = notified.get; if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>; categoryNotified = notified.get; } for (Map.Entry<String, List<URL>> entry : result.entrySet { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); //保存到主目录下的.dubbo目录下 saveProperties; //上面获取到的监听器进行通知 listener.notify(categoryList); }}

AbstractRegistry构造器初阶化完,接着调用FailbackRegistry构造器最早化:

public FailbackRegistry { super; //重试时间,默认5000ms int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); //启动失败重试定时器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 检测并连接注册中心 try { //重试方法由每个具体子类实现 //获取到注册失败的,然后尝试注册 retry(); } catch (Throwable t) { // 防御性容错} } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);}

最终回到ZookeeperRegistry的布局早先化:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super; if (url.isAnyHost { throw new IllegalStateException("registry address == null"); } //获得到注册中心中的分组,默认dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR   group; } //注册到注册中心的节点 this.root = group; //使用zookeeperTansporter去连接 //ZookeeperTransport这里是生成的自适应实现,默认使用ZkClientZookeeperTransporter //ZkClientZookeeperTransporter的connect去实例化一个ZkClient实例 //并且订阅状态变化的监听器subscribeStateChanges //然后返回一个ZkClientZookeeperClient实例 zkClient = zookeeperTransporter.connect; //ZkClientZookeeperClient添加状态改变监听器 zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage; } } } });}

3.4 远程宣布:

遍历全体注册宗旨URL,实行远程发表:

                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service "   interfaceClass.getName()   " url "   url   " to registry "   registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }

代码中的proxyFactory为通过ExtensionLoader动态变化的JavassistProxyFactory
代码中的protocol为通过ExtensionLoader动态变化的RegistryProtocol

JavassistProxyFactory创建Invoker:
同本地发表,不赘述。

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

RegistryProtocol.export:
通过RegistryProtocol将Invoker发布成Dubbo服务。
1)doLocalExport 所做的事情,就是调用DubboProtocol生成DubboExporter,并发布Dubbo服务;
2)后续代码所做的作业,正是创办注册宗旨,将公布的劳务注册到注册中央(zk),并监听注册主题(zk)的退换;

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

doLocalExport:
bounds为providerurl <--> exporter的映照,要是exporter未被创建,则调用DubboProtocol成立exporter。

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

DubboProtocol.export:
完全做的业务便是:
1)依照传入的Invoker创造(new)多个DubboExporter并再次来到;
2)运行互连网服务,监听服务U瑞虎L对应的劳动端口;

...对于DubboProtocol的求实完毕原理后续会有专文解说...

向注册宗旨登记/监听:

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

...不举行,后续会有专文疏解...

由位置代码可以知道,就算设置了 delay 参数,Dubbo 的管理形式是开发银行一个守护线程在 sleep 指定期期后再 doExport。

赢得注册到注册宗旨的url

获得到了Registry,Registry实例中保留着连连到了zookeeper的zkClient实例之后,下一步获取要注册到注册中央的url(在RegistryProtocol中)。

final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//得到的URL是://dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?//anyhost=true&application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&//interface=dubbo.common.hello.service.HelloService&methods=sayHello

四、全体流程图总结

图片 14

全体流程图总计

小结起来,Dubbo的服务揭橥进度:
1)通过Spring配置开头化ServiceBean并流入属性;
2)通过监听Spring事件触发服务发布过程;
3)ServiceConfig中的ProxyFactoryProtocol由Dubbo的spi动态变化;
4)对具有的构和,全体的注册中央开展遍历,通过JavassistProxyFactory变化可实行对象Invoker;
5)通过RegistryProtocol将Invoker对象转变为Exporter,同一时候做到服务的开发银行监听和注册;
6)最终由ServiceConfig尊崇有着宣布的Exporter与服务U大切诺基L到本地内部存储器;

在 瑟维斯Config 的 doExport() 方法中会进行参数检查和设置,富含:

注册到注册宗旨

下一场调用registry.register(registedProviderUrl)注册到注册大旨(在RegistryProtocol中)。register方法的兑以往FailbackRegistry中:

public void register { super.register; failedRegistered.remove; failedUnregistered.remove; try { // 向服务器端发送注册请求 //调用子类具体实现,发送注册请求 doRegister; } catch (Exception e) { Throwable t = e; // 如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol; boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw 。。。 } else { } // 将失败的注册请求记录到失败列表,定时重试 failedRegistered.add; }}

doRegister;在这里处是ZookeeperRegistry中实际落到实处的,这里将会登记到注册主旨:

protected void doRegister { try { //这里zkClient就是我们上面调用构造的时候生成的 //ZkClientZookeeperClient //保存着连接到Zookeeper的zkClient实例 //开始注册,也就是在Zookeeper中创建节点 //这里toUrlPath获取到的path为: ///dubbo/dubbo.common.hello.service.HelloService/providers/dubbo://192.168.1.100:20880/ //dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product&interface= //dubbo.common.hello.service.HelloService&methods=sayHello //默认创建的节点是临时节点 zkClient.create(toUrlPath, url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { }}

通过这一步之后,Zookeeper中就有节点存在了,具体节点为

/dubbo dubbo.common.hello.service.HelloService providers /dubbo/dubbo.common.hello.service.HelloService/providers/ dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService? anyhost=true&application=dubbo-provider& application.version=1.0&dubbo=2.5.3&environment=product& interface=dubbo.common.hello.service.HelloService&methods=sayHello

五、后续三番两次串小说预先报告

  • 泛化调用
  • 本地完毕
  • 本土存根
  • 地方伪装
  • 配置(application、registry、protocol等)

订阅注册中央的劳动

在注册到注册中央之后,registry会去订阅覆盖配置的服务,这一步之后就能够在/dubbo/dubbo.common.hello.service/Hello瑟维斯节点下多三个configurators节点。

ServiceConfig.java

返回新Exporter实例

最后重返Exporter新实例,重回到ServiceConfig中。服务的宣布固然实现了。

protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if  { return; } exported = true; if (interfaceName == null || interfaceName.length { throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!"); } checkDefault(); //省略 if (ref instanceof GenericService) { interfaceClass = GenericService.class; generic = true; } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader; } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage; } checkInterfaceAndMethods(interfaceClass, methods); checkRef(); generic = false; } if(local !=null){ if(local=="true"){ local=interfaceName "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader; } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage; } if(!interfaceClass.isAssignableFrom(localClass)){ throw new IllegalStateException("The local implemention class "   localClass.getName()   " not implement interface "   interfaceName); } } if(stub !=null){ if(stub=="true"){ stub=interfaceName "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader; } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage; } if(!interfaceClass.isAssignableFrom(stubClass)){ throw new IllegalStateException("The stub implemention class "   stubClass.getName()   " not implement interface "   interfaceName); } } //此处省略:检查并设置相关参数 doExportUrls();}

付给具体的协商实行劳动暴光

这里也正是非Registry类型的Invoker的导出进程。首要的步骤是将本地ip和20880端口开拓,进行监听。最终打包成exporter重回。doLocalExport:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ //原始的invoker中的url: //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-provider&application.version=1.0&dubbo=2.5.3 //&environment=product&export=dubbo://10.42.0.1:20880/ //dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product& //interface=dubbo.common.hello.service.HelloService&methods=sayHello& //organization=china&owner=cheng.xi&pid=7876&side=provider×tamp=1489057305001& //organization=china&owner=cheng.xi&pid=7876&registry=zookeeper&timestamp=1489057304900 //从原始的invoker中得到的key: //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&//methods=sayHello String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get; if (exporter == null) { synchronized  { exporter = (ExporterChangeableWrapper<T>) bounds.get; if (exporter == null) { //得到一个Invoker代理,里面包含原来的Invoker final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //此处protocol还是最上面生成的代码,调用代码中的export方法,会根据协议名选择调用具体的实现类 //这里我们需要调用DubboProtocol的export方法 //这里的使用具体协议进行导出的invoker是个代理invoker //导出完之后,返回一个新的ExporterChangeableWrapper实例 exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter;}

在检讨完参数之后,发轫展露服务。Dubbo 帮衬多合同和多注册大旨:

运用dubbo合同导出

此地protocol.export(invokerDelegete)将在去具体的DubboProtocol中执行了,DubboProtocol的外侧包裹着ProtocolFilterWrapper,再外面还包裹着ProtocolListenerWrapper。会先经过ProtocolListenerWrapper:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //Registry类型的Invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //其他具体协议类型的Invoker //先进行导出protocol.export //然后获取自适应的监听器 //最后返回的是包装了监听器的Exporter //这里监听器的获取是getActivateExtension,如果指定了listener就加载实现,没有指定就不加载 return new ListenerExporterWrapper<T>(protocol.export, Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}

再经过ProtocolFilterWrapper:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //Registry类型的Invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol { return protocol.export; } //其他具体协议类型的Invoker //先构建Filter链,然后再导出 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}

查阅下构建Invoker链的不二等秘书技:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { //我们要处理的那个Invoker作为处理链的最后一个 Invoker<T> last = invoker; //根据key和group获取自动激活的Filter List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size { //把所有的过滤器都挨个连接起来,最后一个是我们真正的Invoker for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get; final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last;}

继之就到了DubboProtocol的export方法,这里展开暴露服务:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //dubbo://10.42.0.1:20880/dubbo.common.hello.service.HelloService? //anyhost=true&application=dubbo-provider& //application.version=1.0&dubbo=2.5.3&environment=product& //interface=dubbo.common.hello.service.HelloService& //methods=sayHello URL url = invoker.getUrl(); // export service. //key由serviceName,port,version,group组成 //当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。 //dubbo.common.hello.service.HelloService:20880 String key = serviceKey; //将Invoker转换成Exporter //直接new一个新实例 //没做啥处理,就是做一些赋值操作 //这里的exporter就包含了invoker DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //缓存要暴露的服务,key是上面生成的 exporterMap.put(key, exporter); //export an stub service for dispaching event //是否支持本地存根 //远程服务后,客户端通常只剩下接口,而实现全在服务器端, //但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存, //提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub, //客户端生成Proxy实,会把Proxy通过构造函数传给Stub, //然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length{ } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据URL绑定IP与端口,建立NIO框架的Server openServer; return exporter;}

地点得到的Exporter会被平放慢存中去,key正是下面生成的,客商端就足以发需要依据key找到Exporter,然后找到invoker实行调用了。接下来是创设服务器并监听端口。

继之调用openServer方法成立NIO Server实行监听:

private void openServer { // find server. //key是IP:PORT //192.168.110.197:20880 String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if  { ExchangeServer server = serverMap.get; //同一JVM中,同协议的服务,共享同一个Server, //第一个暴露服务的时候创建server, //以后相同协议的服务都使用同一个server if (server == null) { serverMap.put(key, createServer; } else { //同协议的服务后来暴露服务的则使用第一次创建的同一Server //server支持reset,配合override功能使用 //accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化 //这时需要重新设置Server server.reset; } }}

继续看createServer方法:

//url为://dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?//anyhost=true&application=dubbo-provider&//application.version=1.0&dubbo=2.5.3&environment=product&//interface=dubbo.common.hello.service.HelloService&//methods=sayHelloprivate ExchangeServer createServer { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString; //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); //默认使用netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension throw new RpcException("Unsupported server type: "   str   ", url: "   url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //Exchangers是门面类,里面封装的是Exchanger的逻辑。 //Exchanger默认只有一个实现HeaderExchanger. //Exchanger负责数据交换和网络通信。 //从Protocol进入Exchanger,标志着程序进入了remote层。 //这里requestHandler是ExchangeHandlerAdapter server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains { throw new RpcException("Unsupported client type: "   str); } } return server;}

Exchangers.bind方法:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //getExchanger方法根据url获取到一个默认的实现HeaderExchanger //调用HeaderExchanger的bind方法 return getExchanger.bind(url, handler);}

HeaderExchanger的bind方法:

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //直接返回一个HeaderExchangeServer //先创建一个HeaderExchangeHandler //再创建一个DecodeHandler //最后调用Transporters.bind return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler));}

此处会先创立两个HeaderExchangerHandler,包罗着ExchangeHandlerAdapter,接着成立二个DecodeHandler,会包蕴后边的handler,接下去调用Transporters的bind方法,重返二个Server,接着用HeaderExchangeServer包装一下,就赶回给Protocol层了。在HeaderExchangerServer包装的时候会运维心跳沙漏startHeatbeatTimer();

Transports的bind方法:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { //如果有多个handler的话,需要使用分发器包装下 handler = new ChannelHandlerDispatcher; } //getTransporter()获取一个Adaptive的Transporter //然后调用bind方法(默认是NettyTransporter的bind方法) return getTransporter().bind(url, handler);}

getTransporter()生成的Transporter的代码如下:

import com.alibaba.dubbo.common.extension.ExtensionLoader;public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; //Server默认使用netty String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url("   url.toString use keys([server, transporter])"); //获取到一个NettyTransporter com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension; //调用NettyTransporter的bind方法 return extension.bind(arg0, arg1); } public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url("   url.toString use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension; return extension.connect(arg0, arg1);}}

NettyTransporter的bind方法:

 public Server bind(URL url, ChannelHandler listener) throws RemotingException { //创建一个Server return new NettyServer(url, listener);}

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ //handler先经过ChannelHandlers的包装方法 //然后再初始化 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}

ChannelHandlers.wrap方法中会依照SPI扩充机制动态生成Dispatcher的自适应类,生成的代码不在列出,暗中认可使用AllDispatcher管理,会重回二个AllChannelHandler,会把线程池和DataStore都起先化了。然后经过HeartbeatHandler封装,再经过MultiMessageHandler封装后回到。

NettyServer构造,会挨个通过AbstractPeer,AbstractEndpoint,AbstractServer,NettyServer的开头化。重视看下AbstractServer的构造方法:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost.getHost ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort; this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { //初始化的时候会打开Server //具体实现这里是NettyServer中 doOpen(); } catch (Throwable t) { } if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); }}

接下来调用doOpen方法:

protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); //boss线程池 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); //worker线程池 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); //ChannelFactory,没有指定工作者线程数量,就使用cpu 1 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler, this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder; pipeline.addLast("encoder", adapter.getEncoder; pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind之后返回一个Channel channel = bootstrap.bind(getBindAddress;}

doOpen方法创造Netty的Server端并开垦,具体的作业就交由Netty去处理了。

ServiceConfig.java

private void doExportUrls() { List<URL> registryURLs = loadRegistries; for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); }}

本着各个合同、每种注册中心,开端创建 U昂科拉L。

ServiceConfig.java

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); if (name == null || name.length { name = "dubbo"; } //处理host //处理port Map<String, String> map = new HashMap<String, String>(); //设置参数到map // 导出服务 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length && provider != null) { contextPath = provider.getContextpath(); } URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath   "/")   path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol.getConfigurator.configure; } //此处省略:服务暴露(详见 2.6 和 2.7) this.urls.add;}

倘诺安顿 scope=none, 则不会举行劳动揭露;若无配置 scope 恐怕scope=local,则会进展地面暴露。

ServiceConfig.java

//public static final String LOCAL_PROTOCOL = "injvm";//public static final String LOCALHOST = "127.0.0.1";private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //...... String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase { exportLocal; } //...... } //......}private void exportLocal { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol { URL local = URL.valueOf(url.toFullString .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(NetUtils.LOCALHOST) .setPort; Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref,  interfaceClass, local)); exporters.add; logger.info("Export dubbo service "   interfaceClass.getName()  " to local registry"); }}
  1. 揭发服务的时候,会通过代理成立 Invoker;

  2. 本地揭发时采纳 injvm 公约,injvm 左券是三个伪公约,它不开启端口,无法被远程调用,只在 JVM 内直接关系,但施行 Dubbo 的 Filter 链。

若无配备 scope 也许 scope=remote,则交易会开远程揭穿。

ServiceConfig.java

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase { //...... //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase{ if (logger.isInfoEnabled { logger.info("Export dubbo service "   interfaceClass.getName()   " to url "   url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString; } if (logger.isInfoEnabled { logger.info("Register dubbo service "   interfaceClass.getName()   " url "   url   " to registry "   registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref,  interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString; Exporter<?> exporter = protocol.export; exporters.add; } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref,  interfaceClass, url); Exporter<?> exporter = protocol.export; exporters.add; } } }}

在劳务揭示时,有二种境况:

  • 不利用登记中央:直接揭露对应公约的劳动,援引服务时只可以透过直连格局援引
  • 利用登记中央:暴光对应公约的劳动后,会将服务节点注册到注册中央,援用服务时得以通过注册中央动态获取服务提供者列表,也足以经过直连形式援用

不选取注册中央时,直接调用对应公约(如 Dubbo 左券)的 export() 揭露服务。以 Dubbo 协议为例:

DubboProtocol.java

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey; DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length{ if (logger.isWarnEnabled{ logger.warn(new IllegalStateException("consumer ["  url.getParameter(Constants.INTERFACE_KEY)   "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer; return exporter;}

调用 openServer() 方法创立并运行 Server:

DubboProtocol.java

private void openServer { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if  { ExchangeServer server = serverMap.get; if (server == null) { serverMap.put(key, createServer; } else { //server支持reset,配合override功能使用 server.reset; } }}private ExchangeServer createServer { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString; //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension throw new RpcException("Unsupported server type: "   str   ", url: "   url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: "   url   ") "   e.getMessage; } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains { throw new RpcException("Unsupported client type: "   str); } } return server;}

Exchanger (暗中认可 HeaderExchanger)封装央浼响应形式,同步转异步,以 Request、Response 为主干:

HeaderExchager.java

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler));}

Transporters.java

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher; } return getTransporter().bind(url, handler);}

底层传输暗中同意使用 NettyTransporter,最后是创立 NettyServer:

NettyTransporter.java

public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener);}

NettyServer.java

public class NettyServer extends AbstractServer implements Server { public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }}

AbstractServer.java

public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost.getHost ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort; this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled { logger.info("Start "   getClass().getSimpleName()   " bind "   getBindAddress()   ", export "   getLocalAddress; } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind "   getClass().getSimpleName()   " on "   getLocalAddress()   ", cause: "   t.getMessage; } if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } }}

NettyServer.java

public class NettyServer extends AbstractServer implements Server { protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler, this); channels = nettyHandler.getChannels(); // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder; pipeline.addLast("encoder", adapter.getEncoder; pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress; }}

假若应用了注册宗旨,则在经过具体磋商(如 Dubbo 公约)曝光服务之后(即在 2.8 基础之上)踏入劳动登记流程,将劳动节点注册到注册主题。

RegistryProtocol.java

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //向Zookeeper注册节点 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage; } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage; } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage; } } };}private Registry getRegistry(final Invoker<?> originInvoker){ URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol { String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol.removeParameter(Constants.REGISTRY_KEY); } return registryFactory.getRegistry(registryUrl);}

getRegistry() 方法依照登记中央项目(默许Zookeeper)获取注册宗旨客商端,由登记中央顾客端实例来开展真正的劳务注册。

挂号焦点客商端将节点注册到注册主题,同有时候订阅对应的 override 数据,实时监听服务的属性别变化动完结动态配置成效。

末尾回到的 Exporter 实现了 unexport() 方法,那样在劳动下线时清理相关财富。

时至前日,服务暴光流程结束。

看完源码,我们已经掌握了dubbo的严重性发表进度,未来大家回过头来结合dubbo的完好架商谈源码的解析,总计一下dubbo服务公布。服务发布进度一共三个步骤:

  • 事情方将服务接口和贯彻编写定义好,增添dubbo相关配置文件。
  • Config层加载配置文件形成上下文,Config层包罗:ServiceConfig、ProviderConfig、RegistryConfig等。
  • ServiceConfig根据Protocol类型,依据ProtocolConfig、ProviderConfig加载registry,根据加载的registry创建dubbo的UCR-VL。
  • 防微杜渐干活做完后ProxyFactory登台,dubbo中有三种代理方式,JDK代理和Javassist代理,暗中同意使用Javassist代理,Proxy代理类依据dubbo配置新闻获取到接口音讯、通过动态代理格局将接口的拥有办法交给Proxy代理类进行代理,并封装进Invoker里面。
  • 将拥有供给揭发的service封装的Invoker组成贰个list传给音讯交流层提须求花费方举行调用。

本文由pc28.am发布于计算机编程,转载请注明出处:Dubbo原理和源码剖析之服务揭露,服务发表流程分

上一篇:Dubbo原理和源码深入分析之标签分析,Schema扩张深 下一篇:没有了
猜你喜欢
热门排行
精彩图文