Dubbo-go Client端调用服务过程

InfoQ 2020-11-11 14:13:47
Dubbo 服务 调用 dubbo-go client


{"type":"doc","content":[{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"导读:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"有了上一篇文章"},{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/1eaa3531f5e37cc35e6eabcbchttps://blog.csdn.net/forevermoonlight/article/details/108962115","title":""},"content":[{"type":"text","text":"《Dubbo-go Server 端开启服务过程》"}]},{"type":"text","text":"的铺垫,可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务,发布自己的ivkURL并订阅事件开启监听;而客户应该是通过zk注册组件,拿到需要调用的serviceURL,更新invoker并重写用户的RPCService,从而实现对远程过程调用细节的封装。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"1. 配置文件和客户端源码"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"1.1 client配置文件"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"helloworld提供的demo:profiles/client.yaml"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"registries :\n \"demoZk\":\n protocol: \"zookeeper\"\n timeout : \"3s\"\n address: \"127.0.0.1:2181\"\n username: \"\"\n password: \"\"\nreferences:\n \"UserProvider\":\n # 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册\n registry: \"demoZk\"\n protocol : \"dubbo\"\n interface : \"com.ikurento.user.UserProvider\"\n cluster: \"failover\"\n methods :\n - name: \"GetUser\"\n retries: 3"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可看到配置文件与之前讨论过的server端非常类似,其refrences部分字段就是对当前服务要主调的服务的配置,其中详细说明了调用协议、注册协议、接口id、调用方法、集群策略等,这些配置都会在之后与注册组件交互,重写ivk、调用的过程中使用到。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"1.2 客户端使用框架源码"}]},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: user.go\nfunc init() {\n config.SetConsumerService(userProvider)\n hessian.RegisterPOJO(&User{})\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: main.go\nfunc main() {\n hessian.RegisterPOJO(&User{})\n config.Load()\n time.Sleep(3e9)\n println(\"\\n\\n\\nstart to test dubbo\")\n user := &User{}\n err := userProvider.GetUser(context.TODO(), []interface{}{\"A001\"}, user)\n if err != nil {\n panic(err)\n }\n println(\"response result: %v\\n\", user)\n initSignal()\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"官网提供的helloworld demo的源码。可看到与服务端类似,在user.go内注册了rpc-service,以及需要rpc传输的结构体user。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在main函数中,同样调用了config.Load()函数,之后就可以直接通过实现好的rpc-service:userProvider 直接调用对应的功能函数,即可实现rpc调用。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可以猜到,从hessian注册结构、SetConsumerService,到调用函数.GetUser()期间,用户定义的rpc-service也就是userProvider对应的函数被重写,重写后的GetUser函数已经包含了实现了远程调用逻辑的invoker。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"接下来,就要通过阅读源码,看看dubbo-go是如何做到的。"}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"2. 实现远程过程调用"}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.1 加载配置文件"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file:config/config_loader.go :Load()\n\n// Load Dubbo Init\nfunc Load() {\n // init router\n initRouter()\n // init the global event dispatcher\n extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)\n // start the metadata report if config set\n if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {\n logger.Errorf(\"Provider starts metadata report error, and the error is {%#v}\", err)\n return\n }\n // reference config\n loadConsumerConfig()"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在main函数中调用的config.Load()函数,进而调用了loadConsumerConfig,类似于之前讲到的server端配置读入函数。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在loadConsumerConfig函数中,进行了三步操作:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/config_loader.go\n\nfunc loadConsumerConfig() {\n // 1 init other consumer config\n conConfigType := consumerConfig.ConfigType\n for key, value := range extension.GetDefaultConfigReader() {}\n checkApplicationName(consumerConfig.ApplicationConfig)\n configCenterRefreshConsumer()\n checkRegistries(consumerConfig.Registries, consumerConfig.Registry)\n \n // 2 refer-implement-reference\n for key, ref := range consumerConfig.References {\n if ref.Generic {\n genericService := NewGenericService(key)\n SetConsumerService(genericService)\n }\n rpcService := GetConsumerService(key)\n ref.id = key\n ref.Refer(rpcService)\n ref.Implement(rpcService)\n }\n\n // 3 wait for invoker is available, if wait over default 3s, then panic\n for {}\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"1. 检查配置文件并将配置写入内存"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2. "},{"type":"text","marks":[{"type":"strong"}],"text":"在for循环内部"},{"type":"text","text":",依次引用(refer)并且实例化(implement)每个被调reference。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"3. 等待三秒钟所有invoker就绪"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"其中重要的就是for循环里面的引用和实例化,两步操作,会在接下来展开讨论。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,配置已经被写入了框架。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.2 获取远程Service URL,实现可供调用的invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上述的ref.Refer完成的就是这部分的操作。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/a7/a710e0d54df9ba92d24228b021237fca.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"图(一)"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.1 构造注册url"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"和server端类似,存在注册url和服务url,dubbo习惯将服务url作为注册url的sub。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/reference_config.go: Refer()\n\nfunc (c *ReferenceConfig) Refer(_ interface{}) {\n //(一)配置url参数(serviceUrl),将会作为sub\n cfgURL := common.NewURLWithOptions(\n common.WithPath(c.id),\n common.WithProtocol(c.Protocol),\n common.WithParams(c.getUrlMap()),\n common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),\n )\n ...\n // (二)注册地址可以通过url格式给定,也可以通过配置格式给定\n // 这一步的意义就是配置->提取信息生成URL\n if c.Url != \"\" {// 用户给定url信息,可以是点对点的地址,也可以是注册中心的地址\n // 1. user specified URL, could be peer-to-peer address, or register center's address.\n urlStrings := gxstrings.RegSplit(c.Url, \"\\\\s*[;]+\\\\s*\")\n for _, urlStr := range urlStrings {\n serviceUrl, err := common.NewURL(urlStr)\n ...\n }\n } else {// 配置读入注册中心的信息\n // assemble SubURL from register center's configuration mode\n // 这是注册url,protocol = registry,包含了zk的用户名、密码、ip等等\n c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)\n ...\n // set url to regUrls\n for _, regUrl := range c.urls {\n regUrl.SubURL = cfgURL// regUrl的subURl存当前配置url\n }\n }\n //至此,无论通过什么形式,已经拿到了全部的regURL\n // (三)获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL\n if len(c.urls) == 1 {\n // 这一步访问到registry/protocol/protocol.go registryProtocol.Refer\n // 这里是registry\n c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])\n } else {\n // 如果有多个注册中心,即有多个invoker,则采取集群策略\n invokers := make([]protocol.Invoker, 0, len(c.urls))\n ...\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这个函数中,已经处理完从Register配置到RegisterURL的转换,即图(一)中部分:"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b8/b893dd65aa320c2901a222660d6fa904.png","alt":null,"title":"","style":[{"key":"width","value":"50%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"接下来,已经拿到的url将被传递给RegistryProtocol,进一步refer。"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.2 registryProtocol获取到zkRegistry实例,进一步Refer"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/protocol/protocol.go: Refer\n\n// Refer provider service from registry center\n// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。\nfunc (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {\n var registryUrl = url\n // 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等\n var serviceUrl = registryUrl.SubURL\n if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = \"registry\"\n protocol := registryUrl.GetParam(constant.REGISTRY_KEY, \"\")\n registryUrl.Protocol = protocol//替换成了具体的值,比如\"zookeeper\"\n }\n // 接口对象\n var reg registry.Registry\n // (一)实例化接口对象,缓存策略\n if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {\n // 缓存中不存在当前registry,新建一个reg\n reg = getRegistry(®istryUrl)\n // 缓存起来\n proto.registries.Store(registryUrl.Key(), reg)\n } else {\n reg = regI.(registry.Registry)\n }\n // 到这里,获取到了reg实例 zookeeper的registry\n //(二)根据Register的实例zkRegistry和传入的regURL新建一个directory\n // 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory,\n // 这一步将在下面详细给出步骤\n // new registry directory for store service url from registry\n directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg)\n if err != nil {\n logger.Errorf(\"consumer service %v create registry directory error, error message is %s, and will return nil invoker!\",\n serviceUrl.String(), err.Error())\n return nil\n }\n // (三)DoRegister 在zk上注册当前client service\n err = reg.Register(*serviceUrl)\n if err != nil {\n logger.Errorf(\"consumer service %v register registry %v error, error message is %s\",\n serviceUrl.String(), registryUrl.String(), err.Error())\n }\n // (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker\n cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))\n invoker := cluster.Join(directory)\n // invoker保存\n proto.invokers = append(proto.invokers, invoker)\n return invoker\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可详细阅读上述注释,这个函数完成了从url到invoker的全部过程"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(一)首先获得Registry对象,默认是之前实例化的zkRegistry,和之前server获取Registry的处理很类似。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(二)通过构造一个新的directory,异步拿到之前在zk上注册的server端信息,生成invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(三)在zk上注册当前service"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"(四)集群策略,获得最终invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这一步完成了图(一)中所有余下的绝大多数操作,接下来就需要详细的查看directory的构造过程:"}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.3 构造directory(包含较复杂的异步操作)"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/44/443a79e991df95bb099da308faa04c29.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"图(二)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上述的 "},{"type":"codeinline","content":[{"type":"text","text":"extension.GetDefaultRegistryDirectory(®istryUrl, reg)"}]},{"type":"text","text":"函数,本质上调用了已经注册好的"},{"type":"codeinline","content":[{"type":"text","text":"NewRegistryDirectory"}]},{"type":"text","text":"函数:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/directory/directory.go: NewRegistryDirectory()\n\n// NewRegistryDirectory will create a new RegistryDirectory\n// 这个函数作为default注册在extension上面\n// url为注册url,reg为zookeeper registry\nfunc NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {\n if url.SubURL == nil {\n return nil, perrors.Errorf(\"url is invalid, suburl can not be nil\")\n }\n dir := &RegistryDirectory{\n BaseDirectory: directory.NewBaseDirectory(url),\n cacheInvokers: []protocol.Invoker{},\n cacheInvokersMap: &sync.Map{},\n serviceType: url.SubURL.Service(),\n registry: registry,\n }\n dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)\n go dir.subscribe(url.SubURL)\n return dir, nil\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"首先构造了一个注册directory,开启协程调用其subscribe函数,传入serviceURL。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这个directory目前包含了对应的zkRegistry,以及传入的URL,他cacheInvokers的部分是空的。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"进入dir.subscribe(url.SubURL)这个异步函数:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: registry/directory/directory.go: subscribe()\n\n// subscribe from registry\nfunc (dir *RegistryDirectory) subscribe(url *common.URL) {\n // 增加两个监听,\n dir.consumerConfigurationListener.addNotifyListener(dir)\n dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)\n // subscribe调用\n dir.registry.Subscribe(url, dir)\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"重点来了,他调用了zkRegistry的Subscribe方法,与此同时将自己作为ConfigListener传入"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"我认为这种传入listener的设计模式非常值得学习,而且很有java的味道。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"针对等待zk返回订阅信息这样的异步操作,需要传入一个Listener,这个Listener需要实现Notify方法,进而在作为参数传入内部之后,可以被异步地调用Notify,将内部触发的异步事件“传递出来”,再进一步处理加工。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"层层的Listener事件链,能将传入的原始serviceURL通过zkConn发送给zk服务,获取到服务端注册好的url对应的二进制信息。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而Notify回调链,则将这串byte[]一步一步解析、加工;以事件的形式向外传递,最终落到directory上的时候,已经是成型的newInvokers了。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"具体细节不再以源码形式展示,可参照上图查阅源码。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此已经拿到了server端注册好的真实invoker。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"完成了图(一)中的部分:"}]}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/21/21ef1360c04a762b87c4729c675aa3ee.png","alt":null,"title":"","style":[{"key":"width","value":"75%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"2.2.4 构造带有集群策略的clusterinvoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"经过上述操作,已经拿到了server端Invokers,放入了directory的cacheinvokers数组里面缓存。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"后续的操作对应本文2.2.2的第四步,由directory生成带有特性集群策略的invoker"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker\n cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))\n invoker := cluster.Join(directory)\n123"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Join函数的实现就是如下函数:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: cluster/clusterimpl/failovercluster_invokers.go: newFailoverClusterInvoker()\n\nfunc newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {\n return &failoverClusterInvoker{\n baseClusterInvoker: newBaseClusterInvoker(directory),\n }\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"dubbo-go框架默认选择failover策略,既然返回了一个invoker,我们查看一下failoverClusterInvoker的Invoker方法,看他是如何将集群策略封装到Invoker函数内部的:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()\n\n// Invoker 函数\nfunc (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {\n ...\n //调用List方法拿到directory缓存的所有invokers\n invokers := invoker.directory.List(invocation)\n if err := invoker.checkInvokers(invokers, invocation); err != nil {// 检查是否可以实现调用\n return &protocol.RPCResult{Err: err}\n }\n // 获取来自用户方向传入的\n methodName := invocation.MethodName()\n retries := getRetries(invokers, methodName)\n loadBalance := getLoadBalance(invokers[0], invocation)\n for i := 0; i <= retries; i++ {\n // 重要!这里是集群策略的体现,失败后重试!\n //Reselect before retry to avoid a change of candidate `invokers`.\n //NOTE: if `invokers` changed, then `invoked` also lose accuracy.\n if i > 0 {\n if err := invoker.checkWhetherDestroyed(); err != nil {\n return &protocol.RPCResult{Err: err}\n }\n invokers = invoker.directory.List(invocation)\n if err := invoker.checkInvokers(invokers, invocation); err != nil {\n return &protocol.RPCResult{Err: err}\n }\n }\n // 这里是负载均衡策略的体现!选择特定ivk进行调用。\n ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)\n if ivk == nil {\n continue\n }\n invoked = append(invoked, ivk)\n //DO INVOKE\n result = ivk.Invoke(ctx, invocation)\n if result.Error() != nil {\n providers = append(providers, ivk.GetUrl().Key())\n continue\n }\n return result\n }\n ...\n}"}]},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"看了很多Invoke函数的实现,所有类似的Invoker函数都包含两个方向,一个是用户方向的invcation,一个是函数方向的底层invokers。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而集群策略的invoke函数本身作为接线员,把invocation一步步解析,根据调用需求和集群策略,选择特定的invoker来执行"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"proxy函数也是这样,一个是用户方向的ins[] reflect.Type, 一个是函数方向的invoker。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"proxy函数负责将ins转换为invocation,调用对应invoker的invoker函数,实现连通。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而出于这样的设计,可以在一步步Invoker封装的过程中,每个Invoker只关心自己负责操作的部分,从而使整个调用栈解耦。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"秒啊!!!"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,我们理解了failoverClusterInvoker 的Invoke函数实现,也正是和这个集群策略Invoker被返回,接受来自上方的调用。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"已完成图(一)中的:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b5/b59ea1bddfdd392fb303c99f85fd4ab7.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2.2.5 在zookeeper上注册当前client"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"拿到invokers后,可以回到这个函数了:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: config/refrence_config.go: Refer()\n\nif len(c.urls) == 1 {\n // 这一步访问到registry/protocol/protocol.go registryProtocol.Refer\n c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])\n // (一)拿到了真实的invokers\n } else {\n // 如果有多个注册中心,即有多个invoker,则采取集群策略\n invokers := make([]protocol.Invoker, 0, len(c.urls))\n ...\n cluster := extension.GetCluster(hitClu)\n // If 'zone-aware' policy select, the invoker wrap sequence would be:\n // ZoneAwareClusterInvoker(StaticDirectory) ->\n // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker\n c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))\n }\n // (二)create proxy,为函数配置代理\n if c.Async {\n callback := GetCallback(c.id)\n c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)\n } else {\n // 这里c.invoker已经是目的addr了\n c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"我们有了可以打通的invokers,但还不能直接调用,因为invoker的入参是invocation,而调用函数使用的是具体的参数列表。需要通过一层proxy来规范入参和出参。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"接下来新建一个默认proxy,放置在c.proxy内,以供后续使用"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,完成了图(一)中最后的操作"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/28/2818614794c118b396ab004f8412ff6f.png","alt":null,"title":"","style":[{"key":"width","value":"50%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.3 将调用逻辑以代理函数的形式写入rpc-service"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上面完成了config.Refer操作"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"回到"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"config/config_loader.go: loadConsumerConfig()"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/88/881256613ed8dbe6e37f46188d34e15b.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"下一个重要的函数是Implement,他完的操作较为简单:旨在使用上面生成的c.proxy代理,链接用户自己定义的rpcService到clusterInvoker的信息传输。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"函数较长,只选取了重要的部分:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: common/proxy/proxy.go: Implement()\n\n// Implement\n// proxy implement\n// In consumer, RPCService like:\n// type XxxProvider struct {\n// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error\n// }\n// Implement 实现的过程,就是proxy根据函数名和返回值,通过调用invoker 构造出拥有远程调用逻辑的代理函数\n// 将当前rpc所有可供调用的函数注册到proxy.rpc内\nfunc (p *Proxy) Implement(v common.RPCService) {\n // makeDubboCallProxy 这是一个构造代理函数,这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数\n // 这个被返回的函数是请求实现的载体,由他来发起调用获取结果\n makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {\n return func(in []reflect.Value) []reflect.Value {\n // 根据methodName和outs的类型,构造这样一个函数,这个函数能将in 输入的value转换为输出的value\n // 这个函数具体的实现如下:\n ...\n // 目前拿到了 methodName、所有入参的interface和value,出参数reply\n // (一)根据这些生成一个 rpcinvocation\n inv = invocation_impl.NewRPCInvocationWithOptions(\n invocation_impl.WithMethodName(methodName),\n invocation_impl.WithArguments(inIArr),\n invocation_impl.WithReply(reply.Interface()),\n invocation_impl.WithCallBack(p.callBack),\n invocation_impl.WithParameterValues(inVArr))\n for k, value := range p.attachments {\n inv.SetAttachments(k, value)\n }\n // add user setAttachment\n atm := invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachment,也要写入inv\n if m, ok := atm.(map[string]string); ok {\n for k, value := range m {\n inv.SetAttachments(k, value)\n }\n }\n // 至此构造inv完毕\n // (二)触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用\n result := p.invoke.Invoke(invCtx, inv)\n // 如果有attachment,则加入\n if len(result.Attachments()) > 0 {\n invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())\n }\n ...\n }\n }\n numField := valueOfElem.NumField()\n for i := 0; i < numField; i++ {\n t := typeOf.Field(i)\n methodName := t.Tag.Get(\"dubbo\")\n if methodName == \"\" {\n methodName = t.Name\n }\n f := valueOfElem.Field(i)\n if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 针对于每个函数\n outNum := t.Type.NumOut()\n // 规定函数输出只能有1/2个\n if outNum != 1 && outNum != 2 {\n logger.Warnf(\"method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2\",\n t.Name, t.Type.String(), outNum)\n continue\n }\n // The latest return type of the method must be error.\n // 规定最后一个返回值一定是error\n if returnType := t.Type.Out(outNum - 1); returnType != typError {\n logger.Warnf(\"the latest return type %s of method %q is not error\", returnType, t.Name)\n continue\n }\n // 获取到所有的出参类型,放到数组里\n var funcOuts = make([]reflect.Type, outNum)\n for i := 0; i < outNum; i++ {\n funcOuts[i] = t.Type.Out(i)\n }\n // do method proxy here:\n // (三)调用make函数,传入函数名和返回值,获得能调用远程的proxy,将这个proxy替换掉原来的函数位置\n f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))\n logger.Debugf(\"set method [%s]\", methodName)\n }\n }\n ...\n}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"正如之前所说,proxy的作用是将用户定义的函数参数列表,转化为抽象的invocation传入Invoker,进行调用。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"其中已标明有三处较为重要的地方:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"numberedlist","attrs":{"start":"","normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"在代理函数中实现由参数列表生成Invocation的逻辑"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"在代理函数实现调用Invoker的逻辑"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"将代理函数替换为原始rpc-service对应函数"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"至此,也就解决了一开始的问题:"}]},{"type":"paragraph","attrs":{"indent":1,"number":0,"align":null,"origin":null}},{"type":"codeblock","attrs":{"lang":"go"},"content":[{"type":"text","text":"// file: client.go: main()\n\nconfig.Load()\nuser := &User{}\nerr := userProvider.GetUser(context.TODO(), []interface{}{\"A001\"}, user)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这里直接调用用户定义的rpcService的函数GetUser,这里实际调用的是经过重写入的函数代理,所以就能实现远程调用了。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"3. 从client到server的invoker嵌套链 - 小结"}]},{"type":"blockquote","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在阅读dubbo-go源码的过程中,我能发现一条清晰的invoker-proxy嵌套链,我希望通过图的形式来展现:"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/5f/5ff59ddeb3dc646ee3de5bbd22e91dc0.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}
版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/c1cfd7aa3ac77380a7a6f635f?utm_source=rss&utm_medium=article

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云