author | Li Zhixin
Reading guide : With the last article 《Dubbo-go Source notes ( One )Server Open the service process 》 The bedding of , It can be compared to the starting process of the client starting from the server . One of the biggest differences is that the server through zk Registration service , Publish your own ivkURL And subscribe to events to start listening ; And the customer should be through zk Certified components , Get what you need to call serviceURL, to update invoker And rewrite the user's RPCService, So as to encapsulate the details of remote procedure call .
Configuration files and client source code
1. client The configuration file
helloworld Provided demo:profiles/client.yaml.
registries :
"demoZk":
protocol: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
references:
"UserProvider":
# You can specify multiple registry, Separated by commas ; Do not specify default registration with all registries
registry: "demoZk"
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
retries: 3
You can see that the configuration file is the same as that discussed earlier Server The end is very similar , Its refrences Some fields are the configuration of the service to be called by the current service , It details the call protocol 、 Registration Agreement 、 Interface id、 Calling method 、 Cluster strategy, etc , These configurations will interact with the registration component later 、 rewrite ivk、 In the process of calling .
2. The client uses the framework source code
user.go:
func init() {
config.SetConsumerService(userProvider)
hessian.RegisterPOJO(&User{})
}
main.go:
func main() {
hessian.RegisterPOJO(&User{})
config.Load()
time.Sleep(3e9)
println("\n\n\nstart to test dubbo")
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
if err != nil {
panic(err)
}
println("response result: %v\n", user)
initSignal()
}
On the official website helloworld demo In the source code , It is similar to the service , stay user.go Registered in rpc-service, And the need for rpc Transmission structure user.
stay main Function , Also called config.Load() function , After that, we can achieve good rpc-service:userProvider Call the corresponding function directly , That is to say rpc call .
You can guess. , from hessian Registration structure 、SetConsumerService, To the calling function .GetUser() period , User defined rpc-service That is to say userProvider The corresponding function is rewritten , The rewritten GetUser The function already contains... That implements the remote call logic invoker.
Next , By reading the source code , have a look dubbo-go How to do it .
Implement remote procedure call
1. Load profile
// file: config/config_loader.go :Load()
// Load Dubbo Init
func Load() {
// init router
initRouter()
// init the global event dispatcher
extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
// start the metadata report if config set
if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
// reference config
loadConsumerConfig()
stay main Called in the function config.Load() function , And then called loadConsumerConfig, It's similar to what I said before server End configuration read in function .
stay loadConsumerConfig Function , Three steps have been taken :
// config/config_loader.go
func loadConsumerConfig() {
// 1 init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {}
checkApplicationName(consumerConfig.ApplicationConfig)
configCenterRefreshConsumer()
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
// 2 refer-implement-reference
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
rpcService := GetConsumerService(key)
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
}
// 3 wait for invoker is available, if wait over default 3s, then panic
for {}
}
- Check the configuration file and write the configuration to memory
- stay for Internal loop , Quote... In turn (refer) And instantiate (implement) Each one is tuned reference
- Wait three seconds for all invoker be ready
The important thing is for References and instantiations in loops , Two step operation , It's going to be discussed next .
thus , The configuration has been written to the framework .
2. Get remote Service URL, Implement callable invoker
Aforementioned ref.Refer This part of the operation is completed .
chart ( One )
1) Construct registration url
and server The end is similar to , Registration exists url And the service url,dubbo Habit will serve url As a registration url Of sub.
// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) {
//( One ) To configure url Parameters (serviceUrl), Will be sub
cfgURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
)
...
// ( Two ) Registered address can be registered through url Format given , It can also be given by configuring the format
// The meaning of this step is to configure -> Extract information to generate URL
if c.Url != "" {// Given by the user url Information , It could be a point-to-point address , It can also be the address of the registry
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
serviceUrl, err := common.NewURL(urlStr)
...
}
} else {// Configure information to read into the registry
// assemble SubURL from register center's configuration mode
// This is registration url,protocol = registry, Contains zk Username 、 password 、ip wait
c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
...
// set url to regUrls
for _, regUrl := range c.urls {
regUrl.SubURL = cfgURL// regUrl Of subURl Save the current configuration url
}
}
// thus , In whatever form , I've got all the regURL
// ( 3、 ... and ) obtain registryProtocol example , Call its Refer Method , Introduce the new structure regURL
if len(c.urls) == 1 {
// This step goes to registry/protocol/protocol.go registryProtocol.Refer
// Here is registry
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
} else {
// If there are multiple registries , There are many invoker, Cluster strategy is adopted
invokers := make([]protocol.Invoker, 0, len(c.urls))
...
}
In this function , It has been processed from Register Configuration to RegisterURL Transformation , This is the picture ( One ) Middle part :
Next , What you've got url Will be passed on to RegistryProtocol, further refer.
2)registryProtocol Get zkRegistry example , further Refer
// file: registry/protocol/protocol.go: Refer
// Refer provider service from registry center
// It's a configuration file registries Of url, He can generate a invoker = Point to the end addr, For the client to call directly .
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
// What I got here is referenceConfig,serviceUrl It contains Reference All the information about , contain interfaceName、method wait
var serviceUrl = registryUrl.SubURL
if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry"
protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "")
registryUrl.Protocol = protocol// Replaced with a specific value , such as "zookeeper"
}
// Interface object
var reg registry.Registry
// ( One ) Instantiate the interface object , Cache policy
if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
// There is no current in the cache registry, Create a new one reg
reg = getRegistry(®istryUrl)
// cached
proto.registries.Store(registryUrl.Key(), reg)
} else {
reg = regI.(registry.Registry)
}
// Come here , Got it reg example zookeeper Of registry
//( Two ) according to Register Example zkRegistry And incoming regURL Create a new one directory
// There is a complex asynchronous logic to this step , Got the purpose from the registry service The real addr, Got invoker And put in directory,
// This step is described in detail below
// new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}
// ( 3、 ... and )DoRegister stay zk Register on the current client service
err = reg.Register(*serviceUrl)
if err != nil {
logger.Errorf("consumer service %v register registry %v error, error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
}
// ( Four )new cluster invoker, take directory Write to cluster , Get the invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory)
// invoker preservation
proto.invokers = append(proto.invokers, invoker)
return invoker
}
Please read the above notes in detail , This function completes from url To invoker The whole process of :
( One ) First get Registry object , The default is instantiated before zkRegistry, And before server obtain Registry It's very similar to .
( Two ) By constructing a new directory, Before receiving asynchronously zk Registered on server End message , Generate invoker.
( 3、 ... and ) stay zk Register on the current service.
( Four ) Clustering strategy , Get the final invoker.
This step completes the picture ( One ) Most of the remaining operations in , Next, we need to look at it in detail directory Construction process of .
3) structure directory( Contains more complex asynchronous operations )
chart ( Two )
Aforementioned extension.GetDefaultRegistryDirectory(®istryUrl, reg)
function , It essentially calls the registered NewRegistryDirectory
function :
// file: registry/directory/directory.go: NewRegistryDirectory()
// NewRegistryDirectory will create a new RegistryDirectory
// This function is used as default Registered in extension above
// url To register url,reg by zookeeper registry
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
go dir.subscribe(url.SubURL)
return dir, nil
}
First, we construct a registration directory, Open the coroutine and call it subscribe function , Pass in serviceURL.
This directory Now it contains the corresponding zkRegistry, And the incoming URL, its cacheInvokers It's partially empty .
Get into dir.subscribe(url.SubURL) This asynchronous function :
/ file: registry/directory/directory.go: subscribe()
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
// Add two monitors ,
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
// subscribe call
dir.registry.Subscribe(url, dir)
}
The key is coming. , It calls for zkRegistry Of Subscribe Method , At the same time, treat yourself as ConfigListener Pass in .
I think this kind of introduction listener It's worth learning , And there's a lot of java The smell of . For waiting zk An asynchronous operation that returns subscription information , Need to pass in a Listener, This Listener Need to achieve Notify Method , Then, after being passed in as a parameter , Can be called asynchronously Notify, Asynchronous events that will be triggered internally “ Pass it on ”, Further processing . Layers of Listener Chain of events , Can bring in the original serviceURL adopt zkConn Send to zk service , Get the server registered url The corresponding binary information . and Notify Callback chain , Then the string of byte[] Step by step analysis 、 machining ; In the form of events , Finally fell to directory Last time , It's already in shape newInvokers 了 . The details are no longer shown in the form of source code , Refer to the above figure for the source code .
So far, I've got server End registration good real invoker.
Finished the picture ( One ) Part of :
4) Construct... With cluster strategy clusterinvoker
After the above operations , It's got server End Invokers, Put in directory Of cacheinvokers Cache in the array .
The following operation corresponds to this article from url To invoker The last step in the process of , from directory Generate cluster strategy with features invoker.
// ( Four )new cluster invoker, take directory Write to cluster , Get the invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory)
123
Join The implementation of the function is as follows :
// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()
func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &failoverClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
12345
dubbo-go The default frame selection is failover Strategy , Now that we've returned one invoker, So let's see failoverClusterInvoker Of Invoker Method , See how it encapsulates the cluster policy into Invoker intra-function :
// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()
// Invoker function
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
...
// call List How to get directory All of the cache invokers
invokers := invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {// Check whether the call can be implemented
return &protocol.RPCResult{Err: err}
}
// Get incoming from user direction
methodName := invocation.MethodName()
retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
for i := 0; i <= retries; i++ {
// important ! Here is the embodiment of cluster strategy , Try again after failure !
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
}
// Here is the embodiment of the load balancing strategy ! Choose specific ivk To call .
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
}
return result
}
...
}
I saw a lot of Invoke Implementation of function , All the like Invoker Functions contain two directions : One is user oriented invcation; One is the bottom layer of the function direction invokers. And cluster strategy invoke Function itself as an operator , hold invocation Step by step , According to the call requirements and cluster strategy , Select specific invoker To execute . proxy So is the function , One is user oriented ins[] reflect.Type, One is the function direction invoker. proxy The function is responsible for ins Convert to invocation, Call the corresponding invoker Of invoker function , Connect . And out of this design , Step by step Invoker In the process of encapsulation , Every Invoker Only care about your part of the operation , The whole call stack is decoupled . Wonderful !!!
thus , We understand failoverClusterInvoker Of Invoke Function implementation , And it's with this cluster strategy Invoker Returned , Accept calls from above .
Finished drawing ( One ) Medium :
5) stay zookeeper Register on the current client
Get invokers after , You can go back to this function :
// file: config/refrence_config.go: Refer()
if len(c.urls) == 1 {
// This step goes to registry/protocol/protocol.go registryProtocol.Refer
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
// ( One ) Got the real invokers
} else {
// If there are multiple registries , There are many invoker, Cluster strategy is adopted
invokers := make([]protocol.Invoker, 0, len(c.urls))
...
cluster := extension.GetCluster(hitClu)
// If 'zone-aware' policy select, the invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
// ( Two )create proxy, Configure the proxy for the function
if c.Async {
callback := GetCallback(c.id)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
} else {
// here c.invoker It's already the purpose addr 了
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
}
We have something we can get through invokers, But you can't call , because invoker The entry parameter of is invocation, The calling function uses a specific parameter list , It needs to go through a layer of proxy To standardize input and output parameters .
Next, create a new default proxy, Put in c.proxy Inside , For future use .
thus , Finished the picture ( One ) The last operation in :
3. Write the calling logic as a proxy function rpc-service
It's done config.Refer operation , go back to :
config/config_loader.go: loadConsumerConfig()
The next important function is Implement, Its operation is relatively simple : Designed to use the above generated c.proxy agent , Link user defined rpcService To clusterInvoker Information transmission of .
The function is longer , Only the important parts are selected :
// file: common/proxy/proxy.go: Implement()
// Implement
// proxy implement
// In consumer, RPCService like:
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
// Implement Implementation process , Namely proxy According to the function name and return value , By calling invoker Construct a proxy function with remote call logic
// Will the current rpc All available functions are registered with proxy.rpc Inside
func (p *Proxy) Implement(v common.RPCService) {
// makeDubboCallProxy This is a construction proxy function , The return value of this function is zero func(in []reflect.Value) []reflect.Value Such a function
// The returned function is the carrier of the request implementation , It's up to him to initiate the call to get the result
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
// according to methodName and outs The type of , Construct such a function , This function can change in Input value Converted to output value
// This function is implemented as follows :
...
// So far I've got methodName、 All of the participating interface and value, Give parameters reply
// ( One ) Based on this, we generate a rpcinvocation
inv = invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr),
invocation_impl.WithReply(reply.Interface()),
invocation_impl.WithCallBack(p.callBack),
invocation_impl.WithParameterValues(inVArr))
for k, value := range p.attachments {
inv.SetAttachments(k, value)
}
// add user setAttachment
atm := invCtx.Value(constant.AttachmentKey) // If the incoming ctx There are attachment, Also write inv
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachments(k, value)
}
}
// So far the structure inv complete
// ( Two ) Trigger Invoker I've already put cluster_invoker Put in proxy, Use Invoke Method , adopt getty Remote procedure call
result := p.invoke.Invoke(invCtx, inv)
// If there is attachment, Join in
if len(result.Attachments()) > 0 {
invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())
}
...
}
}
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
methodName := t.Tag.Get("dubbo")
if methodName == "" {
methodName = t.Name
}
f := valueOfElem.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // For each function
outNum := t.Type.NumOut()
// The function output can only have 1/2 individual
if outNum != 1 && outNum != 2 {
logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
t.Name, t.Type.String(), outNum)
continue
}
// The latest return type of the method must be error.
// Specifies that the last return value must be error
if returnType := t.Type.Out(outNum - 1); returnType != typError {
logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
continue
}
// Get all the output parameter types , Put it in the array
var funcOuts = make([]reflect.Type, outNum)
for i := 0; i < outNum; i++ {
funcOuts[i] = t.Type.Out(i)
}
// do method proxy here:
// ( 3、 ... and ) call make function , Pass in the function name and return value , Get a remote call to proxy, Put this proxy Replace the original function position
f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
logger.Debugf("set method [%s]", methodName)
}
}
...
}
As I said before ,proxy The function is to list the user-defined function parameters , Translate into abstract invocation Pass in Invoker, To call .
Three important places have been marked :
- In the proxy function, it is generated by the parameter list Invocation The logic of
- In the proxy function implementation call Invoker The logic of
- Replace the surrogate function with the original rpc-service The corresponding function
thus , And it solved the problem at the beginning :
// file: client.go: main()
config.Load()
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
The user-defined rpcService Function of GetUser, The actual call here is the rewritten function proxy , So we can implement remote call .
from client To server Of invoker Nested chains - Summary
In the reading dubbo-go In the process of source code , We can find a clear line of invoker-proxy Nested chains , I hope that it can be shown in the form of graph :
If you have any questions , Welcome to join the nail exchange group : Nailing group no. 23331795.
Author's brief introduction
Li Zhixin (GitHubID LaurenceLiZhixin), Students majoring in software engineering in Sun Yat sen University , Good at using Java/Go Language , Focus on cloud native and micro services and other technical directions .
“ Alibaba cloud native Focus on microservices 、Serverless、 Containers 、Service Mesh And other technical fields 、 The trend of primary popular technology of focus cloud 、 Large scale practice of cloud original , Official account of cloud developers .”