Conn
是服务器与客户端之间的连接,
Plugins
包含客户端的插件。
它包含以下方法:
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
func (client *Client) Close() error
func (c *Client) Connect(network, address string) error
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
func (client *Client) IsClosing() bool
func (client *Client) IsShutdown() bool
Call
是一个同步服务调用。客户端阻塞到相应接收到或者返回一些错误。
Go
是一个异步调用,它返回一个调用指针,你可以检查
*Call
来获取结果或者错误。
Close关闭与服务器的连接。它不会等待未完成的请求,会立即关闭。
IsClosing表明客户端正在关闭,不会接受新的请求。IsShutdown指明客户端不会从服务器接收响应。
Client使用默认的
熔断器(circuit.NewRateBreakr(0.95,100))
来处理错误。这是一个流行的rpc错误处理风格。当错误速率达到阈值,服务会设置一个10s的不可用窗口。你可以实现你自定义的熔断器。
这里有一个客户端的例程:
client := &Client{
option: DefaultOption,
err := client.Connect("tcp", addr)
if err != nil {
t.Fatalf("failed to connect: %v", err)
defer client.Close()
args := &Args{
A: 10,
B: 20,
reply := &Reply{}
err = client.Call(context.Background(), "Arith", "Mul", args, reply)
if err != nil {
t.Fatalf("failed to call: %v", err)
if reply.C != 200 {
t.Fatalf("expect 200 but got %d", reply.C)
XClient
XClient是一个客户端的封装,添加了一些服务发现和服务治理特性。
type XClient interface {
SetPlugins(plugins PluginContainer)
ConfigGeoSelector(latitude, longitude float64)
Auth(auth string)
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Close() error
setPlugins
设置插件容器,Auth
设置授权token。
ConfigGeoSelector
是设置客户端的经度和维度并使用地理选择器的特定的方法。
一个XClient代表一个服务,可以通过serviceMethod
参数来的调用服务所有的方法。如果你想调用多个服务,你必须为每一个服务创建一个xclient。
应用中一个相应的服务只需要一个共享的xclient。可以通过goroutine共享,并且是goroutine安全的。
Go
是一部调用,Call
是同步调用。
XClient使用单个连接连接到一个服务节点,它会缓存该连接,直到该连接断开或者关闭。
rpcx支持多种服务发现,你也可以实现自己的服务发现。
一对一:客户端直接连接单个服务。它表现的像是 client类型。
一对多:客户端可以连接多个服务。服务可以通过程序配置。
Zookeeper:通过zookeeper发现服务。
Etcd:通过etcd发现服务。
Consul:通过Consul发现服务。
mDNS:通过mDNS发现服务,支持本地服务发现。
同进程服务:查找相同进程内的服务。客户端调用进程内的服务,他们不需要通过TCP或者UDP通讯。这种方式方便测试。
这里有一个同步rpcx的示例:
package main
import (
"context"
"flag"
"log"
example "github.com/rpcx-ecosystem/rpcx-examples3"
"github.com/smallnest/rpcx/client"
var (
addr = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
服务治理(失败模式和负载均衡)
在大型可伸缩rpc系统中,有很多服务节点提供相同的服务。客户端如何选择最适合的一个节点去调用?如果一个调用失败了。客户端应该选择另一个节点还是立即返回错误?这些是错误模式问题和负载均衡问题。
rpcx支持以下错误模式:
快速失败:如果调用失败立即返回错误
失败转移:选择另一个节点,直到达到最大重试次数
失败重试:选择相同的节点并重试,直到达到最大重试次数
对于负载均衡,rpcx提供一系列的选择器:
随机:随机选择节点
轮询:轮询选择节点
一致性哈希:如果服务路径、服务方法和参数相同则选择同一个节点。它使用跳转一致性哈希并且非常快速。
加权:使用服务配置的权重元数据来选择(weight=xxx)。算法类似于nginx的实现(平滑加权算法)
网络质量:它使用ping结果进行选择。网络质量越好,节点被选中的可能性越大。
地理位置:如果有多个数据中心,客户端会倾向于选择相同数据中心的服务。
自定义选择器:如果以上的选择器不适用,你可以使用自定义的选择器。例如,一个rpcx用户为两个数据中心定义自己的选择器,因为一些限制而不能使用网络质量来选择。
这里有一个异步调用rpcx的示例:
package main
import (
"context"
"flag"
"log"
example "github.com/rpcx-ecosystem/rpcx-examples3"
"github.com/smallnest/rpcx/client"
var (
addr2 = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr2, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
reply := &example.Reply{}
call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
if err != nil {
log.Fatalf("failed to call: %v", err)
replyCall := <-call.Done
if replyCall.Error != nil {
log.Fatalf("failed to call: %v", replyCall.Error)
else {
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
客户端使用错误重试模式,随机选择节点。
广播和分支
在一些特殊情况下,你可能需要使用XClient的Broadcast
和Fork
方法。
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast
发送请求给所有的服务器,只有所有的服务器返回成功时才返回成功。对于该方法,失败模式和选择模式没有意义。请设置超时以避免悬挂阻塞。
Fork
发送请求给所有的服务器,任意一个服务器返回成功则返回成功。对于该方法,失败模式和选择模式没有意义。
你可以使用NewXClient
来获取一个xclient实例。
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient
NewXClient
必须使用服务名称作为第一个参数,然后是错误模式、选择器、服务发现和其他选项。