dubbo之所以可以提供流畅的RPC服务,和一个稳定、可靠的分布式系统的可靠协调者是分不开的
dubbo列举了支持的注册中,不过大多数使用中都会偏向zookeeper作为自己的注册中心
zookeeper是Hadoop的一个子项目是分布式系统的可靠协调者,他提供了配置维护,名字服务,分布式同步等服务。
启动工程之后,我们在zookeeper节点上会看到dubbo留下的注册信息
因为zookeeper中不能存特殊字符所以,子节点的内容做过转码。
对应于dubbo官方文档中的结构
在之前介绍服务的暴露和调用中提到过一个类RegistryProtocol
服务暴露时会export
- public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
- //export invoker
- final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
- //registry provider
- final Registry registry = getRegistry(originInvoker);
- final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
- registry.register(registedProviderUrl);
- // 订阅override数据
- // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
- final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
- final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
- overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
- registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
- //保证每次export都返回一个新的exporter实例
- return new Exporter<T>() {
- public Invoker<T> getInvoker() {
- return exporter.getInvoker();
- }
- public void unexport() {
- try {
- exporter.unexport();
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- try {
- registry.unregister(registedProviderUrl);
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- try {
- overrideListeners.remove(overrideSubscribeUrl);
- registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- }
- };
- }
服务调用时会refer
- @SuppressWarnings("unchecked")
- public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
- url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
- Registry registry = registryFactory.getRegistry(url);
- if (RegistryService.class.equals(type)) {
- return proxyFactory.getInvoker((T) registry, type, url);
- }
-
- // group="a,b" or group="*"
- Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
- String group = qs.get(Constants.GROUP_KEY);
- if (group != null && group.length() > 0 ) {
- if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
- || "*".equals( group ) ) {
- return doRefer( getMergeableCluster(), registry, type, url );
- }
- }
- return doRefer(cluster, registry, type, url);
- }
根据配置利用Dubbo的SPI机制获取具体注册中心注册器Registry registry = registryFactory.getRegistry(url);
接口 RegistryFactory 的抽象方法Registry getRegistry(URL url);
其中AbstractRegistryFactory 是一个抽象类
public abstract class AbstractRegistryFactory implements RegistryFactory
- public Registry getRegistry(URL url) {
- url = url.setPath(RegistryService.class.getName())
- .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
- .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
- String key = url.toServiceString();
- // 锁定注册中心获取过程,保证注册中心单一实例
- LOCK.lock();
- try {
- Registry registry = REGISTRIES.get(key);
- if (registry != null) {
- return registry;
- }
- registry = createRegistry(url);
- if (registry == null) {
- throw new IllegalStateException("Can not create registry " + url);
- }
- REGISTRIES.put(key, registry);
- return registry;
- } finally {
- // 释放锁
- LOCK.unlock();
- }
- }
registry = createRegistry(url);是关键
因为这里我们只介绍zookeeper所以继续分析ZookeeperRegistryFactory
- public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
-
- private ZookeeperTransporter zookeeperTransporter;
-
- public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
- this.zookeeperTransporter = zookeeperTransporter;
- }
-
- public Registry createRegistry(URL url) {
- return new ZookeeperRegistry(url, zookeeperTransporter);
- }
-
- }
这里创建zookeepr注册器ZookeeperRegistryZookeeperTransporter是操作zookeepr的客户端的工厂类,用来创建zookeeper客户端,这里客户端并不是zookeeper源代码的自带的,而是采用第三方工具包,主要来简化对zookeeper的操作,例如用zookeeper做注册中心需要对zookeeper节点添加watcher做反向推送,但是每次回调后节点的watcher都会被删除,这些客户会自动维护了这些watcher,在自动添加到节点上去。
zkClient 和 curator 是zookeeper的二个开源客户端
dubbo默认使用zkclient。其实curator的使用也很不错,都封装了很多zookeeper的操作,简化了调用
- public ZkclientZookeeperClient(URL url) {
- super(url);
- client = new ZkClient(url.getBackupAddress());
- client.subscribeStateChanges(new IZkStateListener() {
- public void handleStateChanged(KeeperState state) throws Exception {
- ZkclientZookeeperClient.this.state = state;
- if (state == KeeperState.Disconnected) {
- stateChanged(StateListener.DISCONNECTED);
- } else if (state == KeeperState.SyncConnected) {
- stateChanged(StateListener.CONNECTED);
- }
- }
- public void handleNewSession() throws Exception {
- stateChanged(StateListener.RECONNECTED);
- }
- });
- }
继续回到ZookeeperRegistryFactory 的createRegistry方法
- public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
-
- private ZookeeperTransporter zookeeperTransporter;
-
- public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
- this.zookeeperTransporter = zookeeperTransporter;
- }
-
- public Registry createRegistry(URL url) {
- return new ZookeeperRegistry(url, zookeeperTransporter);
- }
-
- }
继续 看ZookeeperRegistry的实现,这里就是做监听节点的设置
- public class ZookeeperRegistry extends FailbackRegistry {
-
- private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
-
- private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
-
- private final static String DEFAULT_ROOT = "dubbo";
-
- private final String root;
-
- private final Set<String> anyServices = new ConcurrentHashSet<String>();
-
- private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
-
- private final ZookeeperClient zkClient;
-
- public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
- super(url);
- if (url.isAnyHost()) {
- throw new IllegalStateException("registry address == null");
- }
- String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
- if (! group.startsWith(Constants.PATH_SEPARATOR)) {
- group = Constants.PATH_SEPARATOR + group;
- }
- this.root = group;
- zkClient = zookeeperTransporter.connect(url);
- zkClient.addStateListener(new StateListener() {
- public void stateChanged(int state) {
- if (state == RECONNECTED) {
- try {
- recover();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- });
- }
RECONNECTED 是zookeeper的一个会话状态ZookeeperRegistry 还有其他一些重要方法
销毁连接destroy
- public void destroy() {
- super.destroy();
- try {
- zkClient.close();
- } catch (Exception e) {
- logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
注册doRegister 等等
- protected void doRegister(URL url) {
- try {
- zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
- } catch (Throwable e) {
- throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
订阅url, 功能是服务消费端订阅服务提供方在zookeeper上注册地址
- protected void doSubscribe(final URL url, final NotifyListener listener) {
- try {
- if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
- String root = toRootPath();
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
- if (listeners == null) {
- zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
- listeners = zkListeners.get(url);
- }
- ChildListener zkListener = listeners.get(listener);
- if (zkListener == null) {
- listeners.putIfAbsent(listener, new ChildListener() {
- public void childChanged(String parentPath, List<String> currentChilds) {
- for (String child : currentChilds) {
- if (! anyServices.contains(child)) {
- anyServices.add(child);
- subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
- }
- }
- }
- });
- zkListener = listeners.get(listener);
- }
- zkClient.create(root, false);
- List<String> services = zkClient.addChildListener(root, zkListener);
- if (services != null && services.size() > 0) {
- anyServices.addAll(services);
- for (String service : services) {
- subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
- }
- }
- } else {
- List<URL> urls = new ArrayList<URL>();
- for (String path : toCategoriesPath(url)) {
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
- if (listeners == null) {
- zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
- listeners = zkListeners.get(url);
- }
- ChildListener zkListener = listeners.get(listener);
- if (zkListener == null) {
- listeners.putIfAbsent(listener, new ChildListener() {
- public void childChanged(String parentPath, List<String> currentChilds) {
- ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
- }
- });
- zkListener = listeners.get(listener);
- }
- zkClient.create(path, false);
- List<String> children = zkClient.addChildListener(path, zkListener);
- if (children != null) {
- urls.addAll(toUrlsWithEmpty(url, path, children));
- }
- }
- notify(url, listener, urls);
- }
- } catch (Throwable e) {
- throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
这里代码比较绕,我复制了一段解释,方便大家理解
1) 对传入url的serviceInterface是*代表订阅url目录下所有节点即所有服务,这个注册中心需要订阅所有
2) 如果指定了订阅接口通过toCategoriesPath(url)转换需要订阅的url
如传入url consumer://10.33.37.8/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&dubbo=2.5.4-SNAPSHOT&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=4088&side=consumer×tamp=1417405597808
转换成urls
/dubbo/com.alibaba.dubbo.demo.DemoService/providers,/dubbo/com.alibaba.dubbo.demo.DemoService/configurators, /dubbo/com.alibaba.dubbo.demo.DemoService/routers
3) 适配传入的回调接口NotifyListener,转换成dubbo对zookeeper操作的ChildListener
4)以/dubbo/com.alibaba.dubbo.demo.DemoService/providers为例创建节点zkClient.create(path, false);
但是一般情况下如果服务提供者已经提供服务,那么这个目录节点应该已经存在,Dubbo在Client层屏蔽掉了创建异常。
5) 以/dubbo/com.alibaba.dubbo.demo.DemoService/providers为例给节点添加监听器,返回所有子目录
List<String> children = zkClient.addChildListener(path, zkListener);
if (children !=null) {urls.addAll(toUrlsWithEmpty(url, path,hildren));}
toUrlsWtihEmpty用来配置是不是需要订阅的url,是加入集合
6) 主动根据得到服务提供者urls回调NotifyListener,引用服务提供者生成invoker可执行对象
5. 取消订阅url, 只是去掉url上的注册的监听器
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。