Conn
代表客户端与服务器之前的连接。
Plugins
包含了客户端启用的插件。
他有这些方法:
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
func (client *Client) Close() error
func (c *Client) Connect(network, address string) error
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
func (client *Client) IsClosing() bool
func (client *Client) IsShutdown() bool
Call
代表对服务同步调用。客户端在收到响应或错误前一直是阻塞的。 然而 Go
是异步调用。它返回一个指向 Call 的指针, 你可以检查 *Call
的值来获取返回的结果或错误。
Close
会关闭所有与服务的连接。他会立刻关闭连接,不会等待未完成的请求结束。
IsClosing
表示客户端是关闭着的并且不会接受新的调用。
IsShutdown
表示客户端不会接受服务返回的响应。
Client
uses the default CircuitBreaker (circuit.NewRateBreaker(0.95, 100)) to handle errors. This is a poplular rpc error handling style. When the error rate hits the threshold, this service is marked unavailable in 10 second window. You can implement your customzied CircuitBreaker.
Client
使用默认的 CircuitBreaker (circuit.NewRateBreaker(0.95, 100)) 来处理错误。这是rpc处理错误的普遍做法。当出错率达到阈值, 这个服务就会在接下来的10秒内被标记为不可用。你也可以实现你自己的 CircuitBreaker。
下面是客户端的例子:
client := &Client{
option: DefaultOption,
err := client.Connect("tcp", addr)
if err != nil {
t.Fatalf("failed to connect: %v", err)
defer client.Close()
args := &Args{
A: 10,
B: 20,
reply := &Reply{}
err = client.Call(context.Background(), "Arith", "Mul", args, reply)
if err != nil {
t.Fatalf("failed to call: %v", err)
if reply.C != 200 {
t.Fatalf("expect 200 but got %d", reply.C)
XClient
XClient
是对客户端的封装,增加了一些服务发现和服务治理的特性。
type XClient interface {
SetPlugins(plugins PluginContainer)
ConfigGeoSelector(latitude, longitude float64)
Auth(auth string)
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Close() error
SetPlugins
方法可以用来设置 Plugin 容器, Auth
可以用来设置鉴权token。
ConfigGeoSelector
是一个可以通过地址位置选择器来设置客户端的经纬度的特别方法。
一个XCLinet只对一个服务负责,它可以通过serviceMethod
参数来调用这个服务的所有方法。如果你想调用多个服务,你必须为每个服务创建一个XClient。
一个应用中,一个服务只需要一个共享的XClient。它可以被通过goroutine共享,并且是协程安全的。
Go
代表异步调用, Call
代表同步调用。
XClient对于一个服务节点使用单一的连接,并且它会缓存这个连接直到失效或异常。
rpcx 支持许多服务发现机制,你也可以实现自己的服务发现。
Peer to Peer: 客户端直连每个服务节点。 the client connects the single service directly. It acts like the client
type.
Peer to Multiple: 客户端可以连接多个服务。服务可以被编程式配置。
Zookeeper: 通过 zookeeper 寻找服务。
Etcd: 通过 etcd 寻找服务。
Consul: 通过 consul 寻找服务。
mDNS: 通过 mDNS 寻找服务(支持本地服务发现)。
In process: 在同一进程寻找服务。客户端通过进程调用服务,不走TCP或UDP,方便调试使用。
下面是一个同步的 rpcx 例子:
package main
import (
"context"
"flag"
"log"
example "github.com/rpcx-ecosystem/rpcx-examples3"
"github.com/smallnest/rpcx/client"
var (
addr = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
服务治理 (失败模式与负载均衡)
在一个大规模的rpc系统中,有许多服务节点提供同一个服务。客户端如何选择最合适的节点来调用呢?如果调用失败,客户端应该选择另一个节点或者立即返回错误?这里就有了故障模式和负载均衡的问题。
rpcx 支持 故障模式:
Failfast:如果调用失败,立即返回错误
Failover:选择其他节点,直到达到最大重试次数
Failtry:选择相同节点并重试,直到达到最大重试次数
对于负载均衡,rpcx 提供了许多选择器:
Random: 随机选择节点package main
import (
"context"
"flag"
"log"
example "github.com/rpcx-ecosystem/rpcx-examples3"
"github.com/smallnest/rpcx/client"
var (
addr2 = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr2, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
reply := &example.Reply{}
call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
if err != nil {
log.Fatalf("failed to call: %v", err)
replyCall := <-call.Done
if replyCall.Error != nil {
log.Fatalf("failed to call: %v", replyCall.Error)
} else {
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
客户端使用了 Failtry
模式并且随机选择节点。
广播与群发
特殊情况下,你可以使用 XClient 的 Broadcast
和 Fork
方法。
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast
表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。此时FailMode 和 SelectMode的设置是无效的。请设置超时来避免阻塞。
Fork
表示向所有服务器发送请求,只要任意一台服务器正确返回就成功。此时FailMode 和 SelectMode的设置是无效的。
你可以使用 NewXClient
来获取一个 XClient 实例。
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient
NewXClient
必须使用服务名称作为第一个参数, 然后是 failmode、 selector、 discovery等其他选项。