添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Conn 是服务器与客户端之间的连接, Plugins 包含客户端的插件。

它包含以下方法:

func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error    
func (client *Client) Close() error    
func (c *Client) Connect(network, address string) error
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call    
func (client *Client) IsClosing() bool
func (client *Client) IsShutdown() bool

Call 是一个同步服务调用。客户端阻塞到相应接收到或者返回一些错误。 Go 是一个异步调用,它返回一个调用指针,你可以检查 *Call 来获取结果或者错误。

Close关闭与服务器的连接。它不会等待未完成的请求,会立即关闭。

IsClosing表明客户端正在关闭,不会接受新的请求。IsShutdown指明客户端不会从服务器接收响应。

Client使用默认的 熔断器(circuit.NewRateBreakr(0.95,100)) 来处理错误。这是一个流行的rpc错误处理风格。当错误速率达到阈值,服务会设置一个10s的不可用窗口。你可以实现你自定义的熔断器。

这里有一个客户端的例程:

client := &Client{
    option: DefaultOption,
err := client.Connect("tcp", addr)    
if err != nil {
        t.Fatalf("failed to connect: %v", err)
defer client.Close()
args := &Args{
    A: 10,
    B: 20,
reply := &Reply{}
err = client.Call(context.Background(), "Arith", "Mul", args, reply)
if err != nil {
    t.Fatalf("failed to call: %v", err)
if reply.C != 200 {
    t.Fatalf("expect 200 but got %d", reply.C)

XClient

XClient是一个客户端的封装,添加了一些服务发现和服务治理特性。

type XClient interface {
    SetPlugins(plugins PluginContainer)
    ConfigGeoSelector(latitude, longitude float64)
    Auth(auth string)
    Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
    Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Close() error

setPlugins设置插件容器,Auth设置授权token。

ConfigGeoSelector是设置客户端的经度和维度并使用地理选择器的特定的方法。

一个XClient代表一个服务,可以通过serviceMethod参数来的调用服务所有的方法。如果你想调用多个服务,你必须为每一个服务创建一个xclient。

应用中一个相应的服务只需要一个共享的xclient。可以通过goroutine共享,并且是goroutine安全的。

Go是一部调用,Call是同步调用。

XClient使用单个连接连接到一个服务节点,它会缓存该连接,直到该连接断开或者关闭。

rpcx支持多种服务发现,你也可以实现自己的服务发现。

  • 一对一:客户端直接连接单个服务。它表现的像是 client类型。
  • 一对多:客户端可以连接多个服务。服务可以通过程序配置。
  • Zookeeper:通过zookeeper发现服务。
  • Etcd:通过etcd发现服务。
  • Consul:通过Consul发现服务。
  • mDNS:通过mDNS发现服务,支持本地服务发现。
  • 同进程服务:查找相同进程内的服务。客户端调用进程内的服务,他们不需要通过TCP或者UDP通讯。这种方式方便测试。
  • 这里有一个同步rpcx的示例:

    package main
    import (    
        "context"
        "flag"
        "log"
        example "github.com/rpcx-ecosystem/rpcx-examples3"
        "github.com/smallnest/rpcx/client"
    var (
        addr = flag.String("addr", "localhost:8972", "server address")
    func main() {
        flag.Parse()
        d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
        xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
        defer xclient.Close()
        args := &example.Args{
            A: 10,
            B: 20,
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    

    服务治理(失败模式和负载均衡)

    在大型可伸缩rpc系统中,有很多服务节点提供相同的服务。客户端如何选择最适合的一个节点去调用?如果一个调用失败了。客户端应该选择另一个节点还是立即返回错误?这些是错误模式问题和负载均衡问题。

    rpcx支持以下错误模式:

  • 快速失败:如果调用失败立即返回错误
  • 失败转移:选择另一个节点,直到达到最大重试次数
  • 失败重试:选择相同的节点并重试,直到达到最大重试次数
  • 对于负载均衡,rpcx提供一系列的选择器:

  • 随机:随机选择节点
  • 轮询:轮询选择节点
  • 一致性哈希:如果服务路径、服务方法和参数相同则选择同一个节点。它使用跳转一致性哈希并且非常快速。
  • 加权:使用服务配置的权重元数据来选择(weight=xxx)。算法类似于nginx的实现(平滑加权算法)
  • 网络质量:它使用ping结果进行选择。网络质量越好,节点被选中的可能性越大。
  • 地理位置:如果有多个数据中心,客户端会倾向于选择相同数据中心的服务。
  • 自定义选择器:如果以上的选择器不适用,你可以使用自定义的选择器。例如,一个rpcx用户为两个数据中心定义自己的选择器,因为一些限制而不能使用网络质量来选择。
  • 这里有一个异步调用rpcx的示例:

    package main
    import (
        "context"
        "flag"
        "log"
        example "github.com/rpcx-ecosystem/rpcx-examples3"
        "github.com/smallnest/rpcx/client"
    var (
        addr2 = flag.String("addr", "localhost:8972", "server address")
    func main() {
        flag.Parse()
        d := client.NewPeer2PeerDiscovery("tcp@"+*addr2, "")
        xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
        defer xclient.Close()
        args := &example.Args{
            A: 10,
            B: 20,
        reply := &example.Reply{}
        call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        replyCall := <-call.Done    
        if replyCall.Error != nil {
            log.Fatalf("failed to call: %v", replyCall.Error)
        else {
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    

    客户端使用错误重试模式,随机选择节点。

    广播和分支

    在一些特殊情况下,你可能需要使用XClient的BroadcastFork方法。

    Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
    

    Broadcast发送请求给所有的服务器,只有所有的服务器返回成功时才返回成功。对于该方法,失败模式和选择模式没有意义。请设置超时以避免悬挂阻塞。

    Fork发送请求给所有的服务器,任意一个服务器返回成功则返回成功。对于该方法,失败模式和选择模式没有意义。

    你可以使用NewXClient来获取一个xclient实例。

    func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient
    

    NewXClient必须使用服务名称作为第一个参数,然后是错误模式、选择器、服务发现和其他选项。