Dubbo学习;
Dubbo发布注册订阅的口子: @Service-bean实例化过程 ServiceAnnotationBeanPostProcessor-处理器,解析ServiceBean,并把Bean定义注册到spring容器 postProcessBeanDefinitionRegistry方法 registerServiceBeans方法 //调用下面的方法构造ServiceBean-BeanDefinition AbstractBeanDefinition buildServiceBeanDefinition(Service service, Class<?> interfaceClass,String annotatedServiceBeanName) BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class); ServiceBean里面有服务发布暴露的处理 //基于ContextRefreshedEvent和InitializingBean-afterPropertiesSet()两种处理时机 export(); @Reference-bean实例化过程 ReferenceAnnotationBeanPostProcessor-处理器,解析ReferenceBean 这里使用dubbo自己的注入解析 AnnotationInjectedBeanPostProcessor解析器,把Reference注解 getInjectedObject(A annotation, Object bean, String beanName, Class<?> injectedType,InjectionMetadata.InjectedElement injectedElement)方法 //返回ReferenceBean动态代理对象,ReferenceBean是个FactoryBean,在bean实例化,getBean的时候会调用ReferenceConfig.get()方法,订阅provider实例 Object proxy = buildProxy(referencedBeanName, referenceBean, injectedType); //如果注入bean本身为Service-bean,则提前,基于ServiceBeanExportedEvent订阅provider实例。启动 onServiceBeanExportEvent((ServiceBeanExportedEvent) event);
****************************************************************************************************************************** 1、@Servie、@Reference注解元数据收集 org.apache.dubbo.metadata.annotation.processing.ServiceDefinitionMetadataAnnotationProcessor boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv)->装换为:List<ServiceDefinition> serviceDefinitions 2、元数据注册: org.apache.dubbo.metadata.report.support.AbstractMetadataReport 服务提供者:void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions); 服务消费者void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString); 保存:void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url); 删除:void doRemoveMetadata(ServiceMetadataIdentifier metadataIdentifier); 服务地址:List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier); 订阅数据:void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr); 订阅地址:String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier); 3、元数据向zookeeper注册 org.apache.dubbo.metadata.store.zookeeper.ZookeeperMetadataReport 缓存双方元数据: void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) 初始化属性;ZookeeperMetadataReport(URL url, ZookeeperTransporter zookeeperTransporter) @Override public String getServiceDefinition(MetadataIdentifier metadataIdentifier) { return zkClient.getContent(getNodePath(metadataIdentifier)); }
private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) { zkClient.create(getNodePath(metadataIdentifier), v, false); }
String getNodePath(BaseMetadataIdentifier metadataIdentifier) { return toRootDir() + metadataIdentifier.getUniqueKey(KeyTypeEnum.PATH); } 缓存注册地址:boolean saveExportedURLs(String serviceName, String exportedServicesRevision, String exportedURLsContent) 4、服务地址注册:
org.apache.dubbo.config.ServiceConfig<T> void export() //暴露方法 构造serviceMateData对象,然后把bean转url,进行发布 //协议自适应org.apache.dubbo.registry.integration.RegistryProtocol Exporter<T> export(final Invoker<T> originInvoker) //调用register的 register方法 地址注册-接口方法:org.apache.dubbo.registry.RegistryService void register(URL url); void unregister(URL url); void subscribe(URL url, NotifyListener listener); void unsubscribe(URL url, NotifyListener listener); List<URL> lookup(URL url); 注册监听-接口方法:org.apache.dubbo.registry.RegistryServiceListener void onRegister(URL url); void onUnregister(URL url); void onSubscribe(URL url); void onUnsubscribe(URL url); 具体注册方法实现: org.apache.dubbo.registry.integration.RegistryProtocol Exporter<T> export(final Invoker<T> originInvoker) throws RpcException 注册,订阅方法:org.apache.dubbo.registry.support.AbstractRegistry public AbstractRegistry(URL url),初始化缓存 loadProperties(); notify(url.getBackupUrls());//并触发缓存历史监听 注册订阅:protected void recover() throws Exception 重新注册:void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) Invoker<T> refer(Class<T> type, URL url) 直接生成:return proxyFactory.getInvoker((T) registry, type, url); 或者:Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) 获取服务提供者:Invoker<T> invoker = cluster.join(directory);->调用相应Cluster实例,筛选Invoker 5、服务订阅,发现:
org.apache.dubbo.registry.client.ServiceDiscoveryFactory 根据注册地址获取远程服务:ServiceDiscovery getServiceDiscovery(URL registryURL); org.apache.dubbo.registry.client.ServiceDiscovery void initialize(URL registryURL) throws Exception; void destroy() throws Exception; void register(ServiceInstance serviceInstance) throws RuntimeException; void update(ServiceInstance serviceInstance) throws RuntimeException; void unregister(ServiceInstance serviceInstance) throws RuntimeException; 服务实例:List<ServiceInstance> getInstances(String serviceName) throws NullPointerException; 服务变更监听:void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener); 转发服务变更事件:void dispatchServiceInstancesChangedEvent(String serviceName);
服务注册与发现: org.apache.dubbo.registry.client.ServiceDiscoveryRegistry public ServiceDiscoveryRegistry(URL registryURL) { this.serviceDiscovery = createServiceDiscovery(registryURL);//发现对象 this.serviceNameMapping = ServiceNameMapping.getDefaultExtension();//服务名 String metadataStorageType = getMetadataStorageType(registryURL);//注册服务元数据 this.writableMetadataService = WritableMetadataService.getExtension(metadataStorageType);//client远程服务代理对象 this.subscribedURLsSynthesizers = initSubscribedURLsSynthesizers();//订阅URL } 服务注册:super.register(url); 订阅url:subscribeURLs(url, subscribedURLs, serviceName); url变更Listener:registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {}); 获取注册服务实例列表:List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName); 使用zookeeper的服务发现: org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery implements ServiceDiscovery, EventListener<ServiceInstancesChangedEvent>
服务发现开始:
org.apache.dubbo.config.ReferenceConfig<T> T get() //获取Reference对象 ref = createProxy(map) //动态创建 invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));//协议自适应 invoker = CLUSTER.join(new StaticDirectory(url, invokers));//调用集群筛选 7、消费端,远程服务代理对象生成: org.apache.dubbo.registry.client.metadata.proxy.DefaultMetadataServiceProxyFactory MetadataService createProxy(ServiceInstance serviceInstance) // Simply rely on the first metadata url, as stated in MetadataServiceURLBuilder. Invoker<MetadataService> invoker = protocol.refer(MetadataService.class, urls.get(0)); return proxyFactory.getProxy(invoker); 8、远程调用: consumer端: org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker<T> extends AbstractInvoker<T> 远程调用:Result doInvoke(final Invocation invocation) 调用ExchangeClient结果返回:currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); provider端: org.apache.dubbo.registry.client.metadata.proxy.DefaultMetadataServiceProxyFactory MetadataService createProxy(ServiceInstance serviceInstance) // Simply rely on the first metadata url, as stated in MetadataServiceURLBuilder. Invoker<MetadataService> invoker = protocol.refer(MetadataService.class, urls.get(0)); return proxyFactory.getProxy(invoker); 反射调用 result = method.invoke(result, r.getValue());