| 模式 | 交互方向 | 描述 |
|---|---|---|
| Call | 双向: 客户端 → 服务端 服务端 → 客户端 | 请求-响应 |
| Notify | 双向: 客户端 → 服务端 服务端 → 客户端 | 无需响应 |
以下包含 ARPC 的第三方基准测试,尽管这些仓库提供了性能报告,但我建议您自己运行代码获取真实结果,而不仅仅相信他人的文档:
| 字段 | 长度 | 描述 |
|---|---|---|
| 消息体长度 | 4 字节 | 消息体总长度 |
| 保留字段 | 1 字节 | 保留用于自定义标志 |
| 命令 | 1 字节 | 命令类型(请求/响应/通知/等) |
| 标志位 | 1 字节 | 标志位(错误/异步/流) |
| 方法名长度 | 1 字节 | 方法名长度(最大 127) |
| 序列号 | 8 字节 | 序列号,用于关联请求和响应 |
| 方法名 | 方法名长度 | 方法名 |
| 消息体 | 消息体长度-方法名长度 | 序列化数据负载 |
$ go get -u github.com/lesismal/arpc
import "github.com/lesismal/arpc"
主模块内置的官方子包:
github.com/lesismal/arpc/arpchttpgithub.com/lesismal/arpc/listenergithub.com/lesismal/arpc/middleware/routergithub.com/lesismal/arpc/middleware/codergithub.com/lesismal/arpc/middleware/coder/gzippackage main
import "github.com/lesismal/arpc"
func main() {
server := arpc.NewServer()
// 注册路由
server.Handler.Handle("/echo", func(ctx *arpc.Context) {
var req string
if err := ctx.Bind(&req); err == nil {
ctx.Write(req)
}
})
server.Run("localhost:8888")
}
package main
import (
"log"
"net"
"time"
"github.com/lesismal/arpc"
)
func main() {
client, err := arpc.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
})
if err != nil {
panic(err)
}
defer client.Stop()
req := "hello"
rsp := ""
err = client.Call("/echo", &req, &rsp, time.Second*5)
if err != nil {
log.Fatalf("Call 失败: %v", err)
} else {
log.Printf("Call 响应: %q", rsp)
}
}
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务端
handler = server.Handler
// 客户端
handler = client.Handler
// 消息将在同一连接读取 goroutine 中默认顺序处理
handler.Handle("/route", func(ctx *arpc.Context) {
// ...
})
handler.Handle("/route2", func(ctx *arpc.Context) {
// ...
})
// 设置 async=true 会让消息由新的 goroutine 处理
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) {
// ...
}, async)
详见路由中间件,实现中间件非常容易。
github.com/lesismal/arpc/examples/middleware/router/...import "github.com/lesismal/arpc/middleware/router"
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务端
handler = server.Handler
// 客户端
handler = client.Handler
handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) {
// 前置处理中间件
})
handler.Handle("/echo", func(ctx *arpc.Context) {
// 路由处理函数
})
handler.Use(func(ctx *arpc.Context) {
// 后置处理中间件
})
gzip 编解码器已内置在主模块中github.com/lesismal/arpc/examples/middleware/coder/gzip/...import "github.com/lesismal/arpc/middleware/coder/gzip"
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务端
handler = server.Handler
// 客户端
handler = client.Handler
handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) {
// ...
})
request := &Echo{...}
response := &Echo{}
timeout := time.Second * 5
err := client.Call("/call/echo", request, response, timeout)
// 使用 context:
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
request := &Echo{...}
timeout := time.Second * 5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context, err error) {
response := &Echo{}
ctx.Bind(response)
// ...
}, timeout)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// 使用 context:
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
client = ctx.Client
// 释放客户端
client.OnDisconnected(func(c *arpc.Client) {
client = nil
})
})
go func() {
for {
time.Sleep(time.Second)
if client != nil {
client.Call(...)
client.CallAsync(...)
client.Notify(...)
}
}
}()
var mux sync.RWMutex
var clientMap = make(map[*arpc.Client]struct{})
func broadcast() {
var svr *arpc.Server = ...
msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
mux.RLock()
for client := range clientMap {
client.PushMsg(msg, arpc.TimeZero)
}
mux.RUnlock()
}
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务端
handler = server.Handler
// 客户端
handler = client.Handler
asyncResponse := true // 默认为 true,或设置为 false
handler.Handle("/echo", func(ctx *arpc.Context) {
var req string
err := ctx.Bind(&req)
if err == nil {
ctx.Write(req)
}
}, asyncResponse)
*arpc.Stream 实现了 net.Conn,可直接配合 io.Copy、使用 net.Listener 风格的库使用。
如果要把某个 Stream 路由交给只接收 net.Listener 的组件,可以使用 arpc.NewStreamListener:
// 服务端:把 /rtmp stream route 暴露成 net.Listener
server := arpc.NewServer()
streamLn := arpc.NewStreamListener(server.Handler, "/rtmp")
go func() {
// 例如:rtmpServer.Serve(streamLn)
for {
conn, err := streamLn.Accept()
if err != nil {
return
}
go func(conn net.Conn) {
defer conn.Close()
// 这里可以按标准 net.Conn 方式读写
io.Copy(conn, conn)
}(conn)
}
}()
server.Run("localhost:8888")
// 客户端:把本地 TCP 连接桥接到远端 stream
localConn, _ := net.Dial("tcp", "127.0.0.1:1935")
stream := client.NewStream("/rtmp")
go func() {
defer stream.Close()
defer localConn.Close()
io.Copy(stream, localConn)
}()
go func() {
defer stream.Close()
defer localConn.Close()
io.Copy(localConn, stream)
}()
// 包级别
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
// ...
})
// 服务端
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
// ...
})
// 客户端
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
// ...
})
// 包级别
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
// ...
})
// 服务端
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
// ...
})
// 客户端
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
// ...
})
// 包级别
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client, m *arpc.Message) {
// ...
})
// 服务端
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client, m *arpc.Message) {
// ...
})
// 客户端
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client, m *arpc.Message) {
// ...
})
// 服务端
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)
// 客户端
dialer := func() (net.Conn, error) {
return ...
}
client, err := arpc.NewClient(dialer)
import (
"net/http"
"github.com/lesismal/arpc"
"github.com/lesismal/arpc/arpchttp"
"github.com/lesismal/arpc/codec"
)
func main() {
handler := arpc.NewHandler()
handler.Handle("/echo", func(ctx *arpc.Context) {
_ = ctx.Write(ctx.Body())
})
// NewSyncHandler 会克隆一个同步副本:
// 关闭异步写/异步响应,并将已注册路由改为同步执行。
http.HandleFunc("/rpc", arpchttp.NewSyncHandler(handler, codec.DefaultCodec))
http.ListenAndServe(":8080", nil)
}
arpchttp.NewSyncHandler 适合标准的 HTTP request-response 场景,调用方无需手动修改原 Handlerarpchttp.NewHandler 保留原始异步语义,适合高级桥接或自定义传输场景github.com/lesismal/arpc/examples/arpchttp/servergithub.com/lesismal/arpcex/examples/httprpcimport "github.com/lesismal/arpc/codec"
var codec arpc.Codec = ...
// 包级别
codec.DefaultCodec = codec
// 服务端
svr := arpc.NewServer()
svr.Codec = codec
// 客户端
client, err := arpc.NewClient(...)
client.Codec = codec
package main
import (
"github.com/lesismal/arpc"
"github.com/lesismal/arpc/codec"
_ "github.com/lesismal/arpcex/codec/msgpack"
)
func main() {
// 切换为 Gob 编解码
if err := codec.Use("gob"); err != nil {
panic(err)
}
// 如需使用 Msgpack,可直接按名称切换;
// 只要导入 extension codec 包,它会自动完成注册。
// _ = codec.Use("msgpack")
server := arpc.NewServer()
client, _ := arpc.NewClient(...)
// 也可以只对单个实例设置,而不影响全局默认值:
// client.Codec = &codec.GobCodec{}
_ = server
_ = client
}
json 与 gob 两种标准库 codec;像 msgpack 这类第三方 codec 建议通过 extension 模块按需导入。codec.Register("mycodec", myCodec) 注册,再通过 codec.SetDefaultByName("mycodec") 或 WithClientCodecName("mycodec") 等方式启用。arpc.NewClientWithOptions、arpc.NewServerWithOptions 搭配 WithClientCodec、WithServerCodec、WithClientSendQueueSize 等选项一次性完成配置。client.SetCodecByName("msgpack") 或 server.SetCodecByName("gob"),内部会自动查表并原子更新实例。NewClient / NewClientWithOptions 的 dialer 不能为空;传入 nil 会返回 ErrClientInvalidDialer。NewClientPoolWithOptions 的 size 必须大于 0;否则返回 ErrClientInvalidPoolSize。NewClientPoolFromDialersWithOptions 需要非空且不包含 nil 的 dialer 列表;否则会返回对应参数错误。NewMessage / Client.NewMessage / Server.NewMessage 的附加参数仅接受 map[any]any 形式的 metadata;其他类型会被忽略而不会触发 panic。ClientPool.Get、ClientPool.Next、ClientPool.Handler 在空池场景下会安全返回 nil,调用方可以直接做空值判断。import "github.com/lesismal/arpc/log"
var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger
if err := log.SetLevelString("warn"); err != nil {
panic(err)
}
// 也可以从环境变量读取,默认读取 ARPC_LOG_LEVEL:
if err := log.SetLevelFromEnv(""); err != nil {
panic(err)
}
arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error {
// ...
})
arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error {
// ...
})
arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader {
// ...
})
arpc.DefaultHandler.SetSendQueueSize(4096)
import "github.com/lesismal/arpcex/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func main() {
s := pubsub.NewServer()
s.Password = password
// 服务端发布消息到所有客户端
go func() {
for i := 0; true; i++ {
time.Sleep(time.Second)
s.Publish(topicName, fmt.Sprintf("message from server %v", i))
}
}()
s.Run(address)
}
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpcex/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func onTopic(topic *pubsub.Topic) {
log.Info("[OnTopic] [%v] %q, [%v]",
topic.Name,
string(topic.Data),
time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}
func main() {
client, err := pubsub.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second*3)
})
if err != nil {
panic(err)
}
client.Password = password
// 认证
err = client.Authenticate()
if err != nil {
panic(err)
}
// 订阅主题
if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
panic(err)
}
<-make(chan int)
}
import "github.com/lesismal/arpcex/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func main() {
client, err := pubsub.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second*3)
})
if err != nil {
panic(err)
}
client.Password = password
// 认证
err = client.Authenticate()
if err != nil {
panic(err)
}
for i := 0; true; i++ {
if i%5 == 0 {
// 发布消息到所有客户端
client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
} else {
// 发布消息到单个客户端
client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
}
time.Sleep(time.Second)
}
}
edgehub(控制面、edge 注册、按 edgeID 定向 Call/Notify/Stream,命令入口:cmd/edgehub)examples/rtsp_tunnel(服务端下发拉流,客户端通过 ARPC stream 回传 RTSP over TCP)extension/examples/quick(Builder + 自动注册)examples/arpcgen