打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
dubbo组成原理

dubbo之所以可以提供流畅的RPC服务,和一个稳定、可靠的分布式系统的可靠协调者是分不开的


dubbo列举了支持的注册中,不过大多数使用中都会偏向zookeeper作为自己的注册中心

zookeeper是Hadoop的一个子项目是分布式系统的可靠协调者,他提供了配置维护,名字服务,分布式同步等服务。


启动工程之后,我们在zookeeper节点上会看到dubbo留下的注册信息


因为zookeeper中不能存特殊字符所以,子节点的内容做过转码。

对应于dubbo官方文档中的结构


在之前介绍服务的暴露和调用中提到过一个类RegistryProtocol

服务暴露时会export

  1. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {  
  2.         //export invoker  
  3.         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);  
  4.         //registry provider  
  5.         final Registry registry = getRegistry(originInvoker);  
  6.         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);  
  7.         registry.register(registedProviderUrl);  
  8.         // 订阅override数据  
  9.         // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。  
  10.         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);  
  11.         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);  
  12.         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);  
  13.         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);  
  14.         //保证每次export都返回一个新的exporter实例  
  15.         return new Exporter<T>() {  
  16.             public Invoker<T> getInvoker() {  
  17.                 return exporter.getInvoker();  
  18.             }  
  19.             public void unexport() {  
  20.                 try {  
  21.                     exporter.unexport();  
  22.                 } catch (Throwable t) {  
  23.                     logger.warn(t.getMessage(), t);  
  24.                 }  
  25.                 try {  
  26.                     registry.unregister(registedProviderUrl);  
  27.                 } catch (Throwable t) {  
  28.                     logger.warn(t.getMessage(), t);  
  29.                 }  
  30.                 try {  
  31.                     overrideListeners.remove(overrideSubscribeUrl);  
  32.                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);  
  33.                 } catch (Throwable t) {  
  34.                     logger.warn(t.getMessage(), t);  
  35.                 }  
  36.             }  
  37.         };  
  38.     }  


服务调用时会refer

  1. @SuppressWarnings("unchecked")  
  2.     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {  
  3.         url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);  
  4.         Registry registry = registryFactory.getRegistry(url);  
  5.         if (RegistryService.class.equals(type)) {  
  6.             return proxyFactory.getInvoker((T) registry, type, url);  
  7.         }  
  8.   
  9.         // group="a,b" or group="*"  
  10.         Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));  
  11.         String group = qs.get(Constants.GROUP_KEY);  
  12.         if (group != null && group.length() > 0 ) {  
  13.             if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1  
  14.                     || "*".equals( group ) ) {  
  15.                 return doRefer( getMergeableCluster(), registry, type, url );  
  16.             }  
  17.         }  
  18.         return doRefer(cluster, registry, type, url);  
  19.     }  

根据配置利用Dubbo的SPI机制获取具体注册中心注册器

Registry registry = registryFactory.getRegistry(url);

接口 RegistryFactory 的抽象方法Registry getRegistry(URL url);


其中AbstractRegistryFactory 是一个抽象类

public abstract class AbstractRegistryFactory implements RegistryFactory

  1. public Registry getRegistry(URL url) {  
  2.         url = url.setPath(RegistryService.class.getName())  
  3.                 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())  
  4.                 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);  
  5.         String key = url.toServiceString();  
  6.         // 锁定注册中心获取过程,保证注册中心单一实例  
  7.         LOCK.lock();  
  8.         try {  
  9.             Registry registry = REGISTRIES.get(key);  
  10.             if (registry != null) {  
  11.                 return registry;  
  12.             }  
  13.             registry = createRegistry(url);  
  14.             if (registry == null) {  
  15.                 throw new IllegalStateException("Can not create registry " + url);  
  16.             }  
  17.             REGISTRIES.put(key, registry);  
  18.             return registry;  
  19.         } finally {  
  20.             // 释放锁  
  21.             LOCK.unlock();  
  22.         }  
  23.     }  

registry = createRegistry(url);是关键


因为这里我们只介绍zookeeper所以继续分析ZookeeperRegistryFactory

  1. public class ZookeeperRegistryFactory extends AbstractRegistryFactory {  
  2.       
  3.     private ZookeeperTransporter zookeeperTransporter;  
  4.   
  5.     public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {  
  6.         this.zookeeperTransporter = zookeeperTransporter;  
  7.     }  
  8.   
  9.     public Registry createRegistry(URL url) {  
  10.         return new ZookeeperRegistry(url, zookeeperTransporter);  
  11.     }  
  12.   
  13. }  

这里创建zookeepr注册器ZookeeperRegistry

ZookeeperTransporter是操作zookeepr的客户端的工厂类,用来创建zookeeper客户端,这里客户端并不是zookeeper源代码的自带的,而是采用第三方工具包,主要来简化对zookeeper的操作,例如用zookeeper做注册中心需要对zookeeper节点添加watcher做反向推送,但是每次回调后节点的watcher都会被删除,这些客户会自动维护了这些watcher,在自动添加到节点上去。

zkClient 和 curator 是zookeeper的二个开源客户端

dubbo默认使用zkclient。其实curator的使用也很不错,都封装了很多zookeeper的操作,简化了调用


  1. public ZkclientZookeeperClient(URL url) {  
  2.         super(url);  
  3.         client = new ZkClient(url.getBackupAddress());  
  4.         client.subscribeStateChanges(new IZkStateListener() {  
  5.             public void handleStateChanged(KeeperState state) throws Exception {  
  6.                 ZkclientZookeeperClient.this.state = state;  
  7.                 if (state == KeeperState.Disconnected) {  
  8.                     stateChanged(StateListener.DISCONNECTED);  
  9.                 } else if (state == KeeperState.SyncConnected) {  
  10.                     stateChanged(StateListener.CONNECTED);  
  11.                 }  
  12.             }  
  13.             public void handleNewSession() throws Exception {  
  14.                 stateChanged(StateListener.RECONNECTED);  
  15.             }  
  16.         });  
  17.     }  

继续回到ZookeeperRegistryFactory 的createRegistry方法

  1. public class ZookeeperRegistryFactory extends AbstractRegistryFactory {  
  2.       
  3.     private ZookeeperTransporter zookeeperTransporter;  
  4.   
  5.     public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {  
  6.         this.zookeeperTransporter = zookeeperTransporter;  
  7.     }  
  8.   
  9.     public Registry createRegistry(URL url) {  
  10.         return new ZookeeperRegistry(url, zookeeperTransporter);  
  11.     }  
  12.   
  13. }  
继续 看ZookeeperRegistry的实现,这里就是做监听节点的设置
  1. public class ZookeeperRegistry extends FailbackRegistry {  
  2.   
  3.     private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);  
  4.   
  5.     private final static int DEFAULT_ZOOKEEPER_PORT = 2181;  
  6.       
  7.     private final static String DEFAULT_ROOT = "dubbo";  
  8.   
  9.     private final String        root;  
  10.       
  11.     private final Set<String> anyServices = new ConcurrentHashSet<String>();  
  12.   
  13.     private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();  
  14.       
  15.     private final ZookeeperClient zkClient;  
  16.       
  17.     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {  
  18.         super(url);  
  19.         if (url.isAnyHost()) {  
  20.             throw new IllegalStateException("registry address == null");  
  21.         }  
  22.         String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);  
  23.         if (! group.startsWith(Constants.PATH_SEPARATOR)) {  
  24.             group = Constants.PATH_SEPARATOR + group;  
  25.         }  
  26.         this.root = group;  
  27.         zkClient = zookeeperTransporter.connect(url);  
  28.         zkClient.addStateListener(new StateListener() {  
  29.             public void stateChanged(int state) {  
  30.                 if (state == RECONNECTED) {  
  31.                     try {  
  32.                         recover();  
  33.                     } catch (Exception e) {  
  34.                         logger.error(e.getMessage(), e);  
  35.                     }  
  36.                 }  
  37.             }  
  38.         });  
  39.     }  

RECONNECTED 是zookeeper的一个会话状态

ZookeeperRegistry 还有其他一些重要方法

销毁连接destroy

  1. public void destroy() {  
  2.         super.destroy();  
  3.         try {  
  4.             zkClient.close();  
  5.         } catch (Exception e) {  
  6.             logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);  
  7.         }  
  8.     }  
注册doRegister  等等

  1. protected void doRegister(URL url) {  
  2.         try {  
  3.             zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));  
  4.         } catch (Throwable e) {  
  5.             throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);  
  6.         }  
  7.     }  


 订阅url, 功能是服务消费端订阅服务提供方在zookeeper上注册地址

  1. protected void doSubscribe(final URL url, final NotifyListener listener) {  
  2.        try {  
  3.            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {  
  4.                String root = toRootPath();  
  5.                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);  
  6.                if (listeners == null) {  
  7.                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());  
  8.                    listeners = zkListeners.get(url);  
  9.                }  
  10.                ChildListener zkListener = listeners.get(listener);  
  11.                if (zkListener == null) {  
  12.                    listeners.putIfAbsent(listener, new ChildListener() {  
  13.                        public void childChanged(String parentPath, List<String> currentChilds) {  
  14.                            for (String child : currentChilds) {  
  15.                                if (! anyServices.contains(child)) {  
  16.                                    anyServices.add(child);  
  17.                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,   
  18.                                            Constants.CHECK_KEY, String.valueOf(false)), listener);  
  19.                                }  
  20.                            }  
  21.                        }  
  22.                    });  
  23.                    zkListener = listeners.get(listener);  
  24.                }  
  25.                zkClient.create(root, false);  
  26.                List<String> services = zkClient.addChildListener(root, zkListener);  
  27.                if (services != null && services.size() > 0) {  
  28.                    anyServices.addAll(services);  
  29.                    for (String service : services) {  
  30.                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,   
  31.                                Constants.CHECK_KEY, String.valueOf(false)), listener);  
  32.                    }  
  33.                }  
  34.            } else {  
  35.                List<URL> urls = new ArrayList<URL>();  
  36.                for (String path : toCategoriesPath(url)) {  
  37.                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);  
  38.                    if (listeners == null) {  
  39.                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());  
  40.                        listeners = zkListeners.get(url);  
  41.                    }  
  42.                    ChildListener zkListener = listeners.get(listener);  
  43.                    if (zkListener == null) {  
  44.                        listeners.putIfAbsent(listener, new ChildListener() {  
  45.                            public void childChanged(String parentPath, List<String> currentChilds) {  
  46.                             ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));  
  47.                            }  
  48.                        });  
  49.                        zkListener = listeners.get(listener);  
  50.                    }  
  51.                    zkClient.create(path, false);  
  52.                    List<String> children = zkClient.addChildListener(path, zkListener);  
  53.                    if (children != null) {  
  54.                     urls.addAll(toUrlsWithEmpty(url, path, children));  
  55.                    }  
  56.                }  
  57.                notify(url, listener, urls);  
  58.            }  
  59.        } catch (Throwable e) {  
  60.            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);  
  61.        }  
  62.    }  

这里代码比较绕,我复制了一段解释,方便大家理解

1) 对传入url的serviceInterface是*代表订阅url目录下所有节点即所有服务,这个注册中心需要订阅所有

2) 如果指定了订阅接口通过toCategoriesPath(url)转换需要订阅的url

如传入url consumer://10.33.37.8/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&dubbo=2.5.4-SNAPSHOT&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=4088&side=consumer&timestamp=1417405597808

转换成urls

/dubbo/com.alibaba.dubbo.demo.DemoService/providers,/dubbo/com.alibaba.dubbo.demo.DemoService/configurators, /dubbo/com.alibaba.dubbo.demo.DemoService/routers

3) 适配传入的回调接口NotifyListener,转换成dubbo对zookeeper操作的ChildListener

4)以/dubbo/com.alibaba.dubbo.demo.DemoService/providers为例创建节点zkClient.create(path, false);

但是一般情况下如果服务提供者已经提供服务,那么这个目录节点应该已经存在,Dubbo在Client层屏蔽掉了创建异常。

5) 以/dubbo/com.alibaba.dubbo.demo.DemoService/providers为例给节点添加监听器,返回所有子目录

List<String> children = zkClient.addChildListener(path, zkListener);

        if (children !=null) {urls.addAll(toUrlsWithEmpty(url, path,hildren));}

        toUrlsWtihEmpty用来配置是不是需要订阅的url,是加入集合

6) 主动根据得到服务提供者urls回调NotifyListener,引用服务提供者生成invoker可执行对象

5. 取消订阅url, 只是去掉url上的注册的监听器



本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Dubbo源码分析(四):Dubbo之Registry
Dubbo原理解析-注册中心之Zookeeper协议注册中心
Dubbo源码解析
Dubbo源码分析:小白入门篇
dubbo源码分析
基于ZooKeeper的Dubbo注册中心
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服