| 模式 | 交互方向 | 描述 |
|---|---|---|
| call | 双向: 客户端 -> 服务器 服务器 -> 客户端 | 请求和响应 |
| notify | 双向: 客户端 -> 服务器 服务器 -> 客户端 | 请求不响应 |
以下是包含 arpc 的第三方基准测试,尽管这些仓库提供了性能报告,但我建议你自己运行代码获取真实结果,而不仅仅相信他人的文档:
| 消息体长度 | 保留字段 | 命令 | 标志位 | 方法名长度 | 序列号 | 方法名 | 消息体 |
|---|---|---|---|---|---|---|---|
| 4 字节 | 1 字节 | 1 字节 | 1 字节 | 1 字节 | 8 字节 | methodLen 字节 | bodyLen-methodLen 字节 |
$ go get -u github.com/lesismal/arpc
import "github.com/lesismal/arpc"
package main
import "github.com/lesismal/arpc"
func main() {
server := arpc.NewServer()
// 注册路由
server.Handler.Handle("/echo", func(ctx *arpc.Context) {
str := ""
if err := ctx.Bind(&str); err == nil {
ctx.Write(str)
}
})
server.Run("localhost:8888")
}
package main
import (
"log"
"net"
"time"
"github.com/lesismal/arpc"
)
func main() {
client, err := arpc.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
})
if err != nil {
panic(err)
}
defer client.Stop()
req := "hello"
rsp := ""
err = client.Call("/echo", &req, &rsp, time.Second*5)
if err != nil {
log.Fatalf("Call failed: %v", err)
} else {
log.Printf("Call Response: \"%v\"", rsp)
}
}
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务器
handler = server.Handler
// 客户端
handler = client.Handler
// 消息将在同一连接读取 goroutine 中默认顺序处理
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })
// 这会让消息由新的 goroutine 处理
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)
详见路由中间件,实现中间件非常容易
import "github.com/lesismal/arpcex/middleware/router"
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务器
handler = server.Handler
// 客户端
handler = client.Handler
handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })
import "github.com/lesismal/arpcex/middleware/coder/gzip"
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务器
handler = server.Handler
// 客户端
handler = client.Handler
handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
request := &Echo{...}
timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
response := &Echo{}
ctx.Bind(response)
...
}, timeout)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
client = ctx.Client
// 释放客户端
client.OnDisconnected(func(c *arpc.Client){
client = nil
})
})
go func() {
for {
time.Sleep(time.Second)
if client != nil {
client.Call(...)
client.CallAsync(...)
client.Notify(...)
}
}
}()
var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})
func broadcast() {
var svr *arpc.Server = ...
msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
mux.RLock()
for client := range clientMap {
client.PushMsg(msg, arpc.TimeZero)
}
mux.RUnlock()
}
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务器
handler = server.Handler
// 客户端
handler = client.Handler
asyncResponse := true // 默认为 true,或设置为 false
handler.Handle("/echo", func(ctx *arpc.Context) {
req := ...
err := ctx.Bind(req)
if err == nil {
ctx.Write(data)
}
}, asyncResponse)
// 包级别
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
...
})
// 服务器
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
...
})
// 客户端
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
...
})
// 包级别
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
...
})
// 服务器
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
...
})
// 客户端
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
...
})
// 包级别
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
...
})
// 服务器
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
...
})
// 客户端
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
...
})
// 服务器
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)
// 客户端
dialer := func() (net.Conn, error) {
return ...
}
client, err := arpc.NewClient(dialer)
import "github.com/lesismal/arpc/codec"
var codec arpc.Codec = ...
// 包级别
codec.Defaultcodec = codec
// 服务器
svr := arpc.NewServer()
svr.Codec = codec
// 客户端
client, err := arpc.NewClient(...)
client.Codec = codec
package main
import (
"github.com/lesismal/arpc"
"github.com/lesismal/arpc/codec"
)
func main() {
// 切换为 Gob 编解码
if err := codec.Use("gob"); err != nil {
panic(err)
}
// 如果需要使用 Msgpack 编解码,请改为:
// _ = codec.Use("msgpack")
server := arpc.NewServer()
client, _ := arpc.NewClient(...)
// 也可以只对单个实例设置,而不影响全局默认值:
// server.Codec = &codec.MsgpackCodec{}
// client.Codec = &codec.GobCodec{}
_ = server
_ = client
}
codec.Register("mycodec", myCodec) 注册,再通过 codec.SetDefaultByName("mycodec") 或 WithClientCodecName("mycodec") 等方式启用。arpc.NewClientWithOptions、arpc.NewServerWithOptions 搭配 WithClientCodec、WithServerCodec、WithClientSendQueueSize 等选项一次性完成配置。client.SetCodecByName("msgpack") 或 server.SetCodecByName("gob"),内部会自动查表并原子更新实例。import "github.com/lesismal/arpc/log"
var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger
arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
// ...
})
arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
// ...
})
arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
// ...
})
arpc.DefaultHandler.SetSendQueueSize(4096)
import "github.com/lesismal/arpcex/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func main() {
s := pubsub.NewServer()
s.Password = password
// 服务器发布消息到所有客户端
go func() {
for i := 0; true; i++ {
time.Sleep(time.Second)
s.Publish(topicName, fmt.Sprintf("message from server %v", i))
}
}()
s.Run(address)
}
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpcex/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func onTopic(topic *pubsub.Topic) {
log.Info("[OnTopic] [%v] \"%v\", [%v]",
topic.Name,
string(topic.Data),
time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}
func main() {
client, err := pubsub.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second*3)
})
if err != nil {
panic(err)
}
client.Password = password
// 认证
err = client.Authenticate()
if err != nil {
panic(err)
}
// 订阅主题
if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
panic(err)
}
<-make(chan int)
}
import "github.com/lesismal/arpcex/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func main() {
client, err := pubsub.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second*3)
})
if err != nil {
panic(err)
}
client.Password = password
// 认证
err = client.Authenticate()
if err != nil {
panic(err)
}
for i := 0; true; i++ {
if i%5 == 0 {
// 发布消息到所有客户端
client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
} else {
// 发布消息到单个客户端
client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
}
time.Sleep(time.Second)
}
}
extension/examples/quick(Builder + 自动注册)、extension/examples/arpcgen(代码生成器)