Skip to content

16GoRPC如何实现服务间通信?

在上一课时,我们讲解了 RPC 的相关概念和常见的 RPC 框架。其中, Go RPC 是指 Go 语言原生支持的 RPC 框架,它虽然简单但却十分经典,非常适合作为你后续深入了解 RPC 框架时的研究对象。

在本课时,我们将先通过一个字符串服务为案例简单讲解 Go RPC 是如何进行通信的,然后再具体剖析 Go RPC 的底层原理和实现,对以服务端注册服务、接收并处理客户端请求和客户端发起请求等步骤分别进行详细介绍,相信你学习后,一定会对 Go RPC 有更加全面的了解和认识。

Go 语言 RPC 过程调用实践

Go 语言原生的 RPC 过程调用实现起来非常简单。服务端只需实现对外提供的远程过程方法和结构体,然后将其注册到 RPC 服务中,客户端就可以通过其服务名称和方法名称进行 RPC 方法调用。

本课时我们就使用字符串操作的服务来展示如何使用 Go 语言原生的 RPC 来进行过程调用。

第一步,定义远程过程调用相关接口传入参数和返回参数的数据结构。如下代码所示,调用字符串操作的请求包括两个参数:字符串 A 和字符串 B。

go
type StringRequest struct {
  A string
  B string
}

第二步,定义一个服务对象。这个服务对象可以很简单,比如类型是 int 或者是 interface{},重要的是它输出的方法。这里我们定义一个字符串服务类型的 interface,其名称为 Service,它有一个字符串拼接函数 Concat;然后定义一个名为 StringService 的结构体,实现 Service 接口,并给出 Concat 具体实现。代码如下:

go
type Service interface {
 // Concat a and b
 Concat(req StringRequest, ret *string) error
}
type StringService struct {
}
func (s StringService) Concat(req StringRequest, ret *string) error {
 // test for length overflow
 if len(req.A)+len(req.B) > StrMaxSize {
   *ret = ""
   return ErrMaxSize
 }
 *ret = req.A + req.B
 return nil
}

第三步,实现 RPC 服务器。这里我们生成了一个 StringSevice 结构体,并使用 rpc.Register 注册这个服务,然后通过 net.Listen 监听对应 socket 并对外提供服务。客户端可以访问服务 StringService 以及它的方法 Concat,代码如下:

go
func main() {
 stringService := new(service.StringService)
 rpc.Register(stringService)
 rpc.HandleHTTP()
 l, e := net.Listen("tcp", "127.0.0.1:1234")
 if e != nil {
   log.Fatal("listen error:", e)
 }
 http.Serve(l, nil)
}

第四步,建立 HTTP 客户端,然后通过 Call 方法调用远程 StringService 的对应方法,比如使用同步的方式,代码如下所示。这时客户端就可以进行远程调用了。

go
func main() {
 client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
 if err != nil {
   log.Fatal("dialing:", err)
 }
 stringReq := &service.StringRequest{"A", "B"}
 var reply string
 err = client.Call("StringService.Concat", stringReq, &reply)
 if err != nil {
   log.Fatal("Concat error:", err)
 }
 // 异步的调用方式 
 call := client.Call("StringService.Concat", stringReq, &reply)
 _ := <-call.Done
}

通过上述代码的编写就可以实现两个 Go 服务之间的 RPC 调用了。那 Go RPC 又是如何实现的呢?

Go RPC 原理解析

接下来我们将对 Go 语言的 RPC 原生实现进行源码分析,细致讲解其具体实现和原理。首先我们会对 RPC 的服务(Server)端代码进行分析,包括注册服务、反射处理和存根保存,然后讲解服务端处理 RPC 请求的流程,最后讲解客户(Client)端的 RPC 请求处理。

1. Go RPC 服务端原理

服务端的 RPC 代码主要分为两个部分:①服务方法注册,包括调用注册接口,通过反射处理将方法取出,并存到 map 中;②处理网络调用,主要是监听端口、读取数据包、解码请求和调用反射处理后的方法,将返回值编码,返回给客户端。

在上面的示例代码中,我们使用 rpc.Register 对 StringService 进行了注册,Register 是进行 RPC 服务注册的入口方法,其参数 interface{} 类型的 rcvr 就是要注册的 RPC 服务类型,该注册过程的流程如下图所示。

服务端注册 RPC 服务示意图

Register 方法中通过反射获取接口类型和值,并通过 suitableMethods 函数判断注册的 RPC 是否符合规范,最后调用 serviceMap 的 LoadOrStore(sname, s) 方法将对应 RPC 存根存放于 map 中,供之后查找。

go
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
  // 如果服务为空,默认注册一个 
  if server.serviceMap == nil { 
    server.serviceMap = make(map[string]*service) 
  }
  // 获取注册服务的反射信息 
  s := new(service) 
  s.typ = reflect.TypeOf(rcvr) 
  s.rcvr = reflect.ValueOf(rcvr) 
  // 可以使用自定义名称 
  sname := reflect.Indirect(s.rcvr).Type().Name() 
  if useName { 
    sname = name 
  } 
  // 方法必须是暴露的,既服务名首字符大写;不允许重复注册。代码有省略
  if !isExported(sname) && !useName { 
  }   
  if _, present := server.serviceMap[sname]; present { 
  } 
  
  s.name = sname 
  // 开始注册 rpc struct 内部的方法存根 
  s.method = suitableMethods(s.typ, true) 
  if len(s.method) == 0 { 
     // 如果struct内部一个方法也没,那么直接报错,打印详细的错误信息
  }
  // 保存在server的serviceMap中 
  server.serviceMap[s.name] = s 
  return nil 
}

接下来,我们来看一下服务端处理 RPC 请求的实现。如下图就展示了服务端 RPC 程序处理请求的过程,它会一直循环处理接收到的客户端 RPC 请求,将其交由 ReadRequestHandler 处理,然后从之前 Register 方法保存的 map 中获取到要调用的对应方法;接着从请求中解码出对应的参数,使用反射调用其方法,获取到结果后将结果编码成响应消息返回给客户端。

RPC 服务端处理流程示意图

下面,我们来看一下服务端接收并处理 RPC 请求的具体代码实现。

(1)接收请求

Server 的 Accept 函数会无限循环地调用 net.Listener 的 Accept 函数来获取客户端建立连接的请求,获取到连接请求后,再使用协程来处理请求。代码如下:

go
func (server *Server) Accept(lis net.Listener) { 
  for { 
    conn, err := lis.Accept() 
    if err != nil { 
      log.Fatal("rpc.Serve: accept:", err.Error())
    }
    // accept连接以后,打开一个goroutine处理请求 
    go server.ServeConn(conn) 
  } 
}

(2)读取并解析请求

ServeConn 函数会从建立的连接中读取数据,然后创建一个 gobServerCodec,并将其交由 Server 的 ServeCodec 函数处理,如下所示:

go
func (server *Server) ServeConn(conn io.ReadWriteCloser) { 
  buf := bufio.NewWriter(conn) 
  srv := &gobServerCodec{ 
    rwc:  conn, 
    dec:  gob.NewDecoder(conn), 
    enc:  gob.NewEncoder(buf), 
    encBuf: buf, 
  } 
  // 根据指定的codec进行协议解析 
  server.ServeCodec(srv) 
}

ServeCodec 函数会循环地调用 readRequest 函数,读取网络连接上的字节流,解析出请求,然后开启协程执行 Server 的 call 函数,处理对应的 RPC 调用。

go
func (server *Server) ServeCodec(codec ServerCodec) { 
  sending := new(sync.Mutex) 
  for { 
    // 解析请求 
    service, mtype, req, argv, replyv, keepReading, err := server.readRequest (codec)
    if err != nil { 
      if debugLog && err != io.EOF { 
        log.Println("rpc:", err) 
      } 
      if !keepReading { 
        break 
      } 
      // send a response if we actually managed to read a header. 
      // 如果当前请求错误了,我们应该返回信息,然后继续处理 
      if req != nil { 
        server.sendResponse(sending, req, invalidRequest, codec, err.Error())
        server.freeRequest(req) 
      } 
      continue 
    } 
    // 因为需要继续处理后续请求,所以开一个gorutine处理rpc方法 
    go service.call(server, sending, mtype, req, argv, replyv, codec) 
  }
  // 如果连接关闭了需要释放资源 
  codec.Close() 
}

(3)执行远程方法并返回响应

Server 的 call 函数就是通过 Func.Call 反射调用对应 RPC 过程的方法,它还会调用 send Response 将返回值发送给 RPC 客户端,代码如下:

go
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
  function := mtype.method.Func 
  // 这里是真正调用rpc方法的地方 
  returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) 
  errInter := returnValues[0].Interface() 
  errmsg := "" 
  // 处理返回请求了 
  server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) 
  server.freeRequest(req) 
}

2. 客户端发送 RPC 请求原理

无论是同步调用还是异步调用,每次 RPC 请求都会生成一个 Call 对象,并使用 seq 作为 key 保存在 map 中,服务端返回响应值时再根据响应值中的 seq 从 map 中取出 Call,进行相应处理。客户端发起 RPC 调用的过程大致如下图所示。

客户端发送和接收请求流程示意图

下面我们将依次讲解同步调用和异步调用、请求参数编码和接收服务器响应三个部分的具体实现。

(1)同步调用和异步调用

本课时的案例展示了 Go 原生 RPC 的客户端支持同步和异步两种调用,下面我们来介绍一下这两种调用的函数以及调用的数据结构。

go
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
 return call.Error
}

Call 方法直接调用了 Go 方法,而 Go 方法则是先创建并初始化了 Call 对象,记录下此次调用的方法、参数和返回值,并生成 DoneChannel;然后调用 Client 的 send 方法进行真正的请求发送处理,代码如下:

go
// 异步调用实现
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
 // 初始化 Call 
 call := new(Call)
 call.ServiceMethod = serviceMethod
 call.Args = args
 call.Reply = reply
 if done == nil {
   done = make(chan *Call, 10) // buffered.
 } else {
   if cap(done) == 0 {
     log.Panic("rpc: done channel is unbuffered")
   }
 }
 call.Done = done
  // 调用 Client 的 send 方法
 client.send(call)
 return call
}
type Call struct {
  ServiceMethod string   // 服务名及方法名 格式:服务.方法
  Args     interface{} // 函数的请求参数 (*struct).
  Reply     interface{} // 函数的响应参数 (*struct).
  Error     error    // 方法完成后 error的状态.
  Done     chan *Call // 
}

(2)请求参数编码

Client 的 send 函数首先会判断客户端实例的状态,如果处于关闭状态,则直接返回结果;否则会生成唯一的 seq 值,将 Call 保存到客户端的哈希表 pending 中,然后调用客户端编码器的WriteRequest 来编码请求并发送,代码如下:

go
func (client *Client) send(call *Call) {
  // ....
  //生成seq,每次调用均生成唯一的seq,在服务端返回结果后会通过该值进行匹配
  seq := client.seq
  client.seq++
  client.pending[seq] = call
  client.mutex.Unlock()
 
  // 请求并发送请求
  client.request.Seq = seq
  client.request.ServiceMethod = call.ServiceMethod
  err := client.codec.WriteRequest(&client.request, call.Args)
  if err != nil {
    //发送请求错误时,将map中call对象删除.
    client.mutex.Lock()
    call = client.pending[seq]
    delete(client.pending, seq)
    client.mutex.Unlock()
  }
}

(3)接收返回值

接下来我们来看一下客户端是如何接收并处理服务端返回值的。客户端的 input 函数接收服务端返回的响应值,它进行无限 for 循环,不断调用 codec 也就是 gobClientCodecd 的 ReadResponseHeader 函数,然后根据其返回数据中的 seq 来判断是否是本客户端发出请求的响应值。如果是,则获取对应的 Call 对象,并将其从 pending 哈希表中删除,继续调用 codec 的 ReadReponseBody 方法获取返回值 Reply 对象,并调用 Call 对象的 done 方法,代码如下:

go
func (client *Client) input() {
  var err error
  var response Response
  for err == nil {
    response = Response{}
    //通过response中的 Seq获取call对象
    seq := response.Seq
    client.mutex.Lock()
    call := client.pending[seq]
    delete(client.pending, seq)
    client.mutex.Unlock()
 
    switch {
    case call == nil:
    case response.Error != "":
      //上述两个case,一个处理call为nil,另外处理服务端返回的错误,直接将错误返回
    default:
      //通过编码器,将Resonse的body部分解码成reply对象.
      err = client.codec.ReadResponseBody(call.Reply)
      if err != nil {
        call.Error = errors.New("reading body " + err.Error())
      }
      call.done()
    }
  }
}

上述代码中,gobClientCodecd 的 ReadResponseHeader、ReadReponseBody 方法和上文中的 WriteRequest 类似,这里不做赘述。Call 对象的 done 方法则通过 Call 的 DoneChannel,将获得返回值的结果通知到调用层,代码如下:

go
func (call *Call) done() {
 select {
 case call.Done <- call:
   // ok
 default:
   if debugLog {
     log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
   }
 }
}

客户端接收到 RPC 请求的响应后会进行其他业务逻辑操作,RPC 框架则会对进行 RPC 请求所需要的资源进行回收,下次进行 RPC 请求时则需要再次建立相应的结构体并获取对应的资源。

小结

Go 语言原生 RPC 算是个基础版本的 RPC 框架,代码精简,可扩展性高,但是只实现了 RPC 最基本的网络通信,而超时熔断、链接管理(保活与重连)、服务注册发现等功能还是欠缺的。因此还是达不到生产环境"开箱即用"的水准,不过 GitHub 就有一个基于 RPC 的功能增强版本------rpcx,支持了大部分主流 RPC 的特性。

虽然目前官方已经宣布不再添加新功能,并推荐使用 gRPC,但是作为 Go 标准库中的 RPC 框架,还是有很多地方值得我们借鉴和学习,比如注册服务时如何保存反射信息等。本课时我们从源码角度分析了 Go 语言原生 RPC 框架,希望能给你带来对 RPC 框架的整体认知。

如果 Go 官方团队准备为 Go RPC 添加新功能,那么你最希望添加哪一项目前尚未支持的功能呢?欢迎你在留言区分享你的想法。