logo
Public
0
0
WeChat Login

ARPC - 更高效的网络通信框架

Slack

Mentioned in Awesome Go MIT licensed Build Status Go Report Card Coverage Status

目录

功能特性

  • 双向调用(客户端 ↔ 服务端)
  • 双向通知(无需响应)
  • 同步和异步调用
  • 同步和异步响应
  • 批量写入 | writev | net.Buffers
  • 广播
  • 中间件(路由中间件、编解码中间件)
  • 发布/订阅
  • OpenTracing 支持
模式交互方向描述
Call双向:
客户端 → 服务端
服务端 → 客户端
请求-响应
Notify双向:
客户端 → 服务端
服务端 → 客户端
无需响应

性能

以下包含 ARPC 的第三方基准测试,尽管这些仓库提供了性能报告,但我建议您自己运行代码获取真实结果,而不仅仅相信他人的文档

消息头布局

  • 小端字节序
字段长度描述
消息体长度4 字节消息体总长度
保留字段1 字节保留用于自定义标志
命令1 字节命令类型(请求/响应/通知/等)
标志位1 字节标志位(错误/异步/流)
方法名长度1 字节方法名长度(最大 127)
序列号8 字节序列号,用于关联请求和响应
方法名方法名长度方法名
消息体消息体长度-方法名长度序列化数据负载

安装

  1. 获取并安装 ARPC:
$ go get -u github.com/lesismal/arpc
  1. 在代码中导入:
import "github.com/lesismal/arpc"

主模块内置的官方子包:

  • github.com/lesismal/arpc/arpchttp
  • github.com/lesismal/arpc/listener
  • github.com/lesismal/arpc/middleware/router
  • github.com/lesismal/arpc/middleware/coder
  • github.com/lesismal/arpc/middleware/coder/gzip

快速开始

服务端

package main import "github.com/lesismal/arpc" func main() { server := arpc.NewServer() // 注册路由 server.Handler.Handle("/echo", func(ctx *arpc.Context) { var req string if err := ctx.Bind(&req); err == nil { ctx.Write(req) } }) server.Run("localhost:8888") }

客户端

package main import ( "log" "net" "time" "github.com/lesismal/arpc" ) func main() { client, err := arpc.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", "localhost:8888", time.Second*3) }) if err != nil { panic(err) } defer client.Stop() req := "hello" rsp := "" err = client.Call("/echo", &req, &rsp, time.Second*5) if err != nil { log.Fatalf("Call 失败: %v", err) } else { log.Printf("Call 响应: %q", rsp) } }

API 示例

注册路由

var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务端 handler = server.Handler // 客户端 handler = client.Handler // 消息将在同一连接读取 goroutine 中默认顺序处理 handler.Handle("/route", func(ctx *arpc.Context) { // ... }) handler.Handle("/route2", func(ctx *arpc.Context) { // ... }) // 设置 async=true 会让消息由新的 goroutine 处理 async := true handler.Handle("/asyncResponse", func(ctx *arpc.Context) { // ... }, async)

路由中间件

详见路由中间件,实现中间件非常容易。

  • 包路径已内置在主模块中
  • 对应示例:github.com/lesismal/arpc/examples/middleware/router/...
import "github.com/lesismal/arpc/middleware/router" var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务端 handler = server.Handler // 客户端 handler = client.Handler handler.Use(router.Recover()) handler.Use(router.Logger()) handler.Use(func(ctx *arpc.Context) { // 前置处理中间件 }) handler.Handle("/echo", func(ctx *arpc.Context) { // 路由处理函数 }) handler.Use(func(ctx *arpc.Context) { // 后置处理中间件 })

编解码中间件

  • 编解码中间件用于将消息数据转换为您设计的格式,如加密/解密和压缩/解压缩
  • gzip 编解码器已内置在主模块中
  • 对应示例:github.com/lesismal/arpc/examples/middleware/coder/gzip/...
import "github.com/lesismal/arpc/middleware/coder/gzip" var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务端 handler = server.Handler // 客户端 handler = client.Handler handler.UseCoder(gzip.New()) handler.Handle("/echo", func(ctx *arpc.Context) { // ... })

客户端 Call、CallAsync、Notify

  1. Call(阻塞,支持超时/context)
request := &Echo{...} response := &Echo{} timeout := time.Second * 5 err := client.Call("/call/echo", request, response, timeout) // 使用 context: // ctx, cancel := context.WithTimeout(context.Background(), time.Second) // defer cancel() // err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync(非阻塞,支持回调和超时/context)
request := &Echo{...} timeout := time.Second * 5 err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context, err error) { response := &Echo{} ctx.Bind(response) // ... }, timeout)
  1. Notify(与 CallAsync 相同,支持超时/context,但没有回调)
data := &Notify{...} client.Notify("/notify", data, time.Second) // 使用 context: // ctx, cancel := context.WithTimeout(context.Background(), time.Second) // defer cancel() // client.NotifyWith(ctx, "/notify", data)

服务端 Call、CallAsync、Notify

  1. 获取客户端并在应用中保存它:
var client *arpc.Client server.Handler.Handle("/route", func(ctx *arpc.Context) { client = ctx.Client // 释放客户端 client.OnDisconnected(func(c *arpc.Client) { client = nil }) }) go func() { for { time.Sleep(time.Second) if client != nil { client.Call(...) client.CallAsync(...) client.Notify(...) } } }()
  1. 然后调用 Call/CallAsync/Notify

广播 - Notify

var mux sync.RWMutex var clientMap = make(map[*arpc.Client]struct{}) func broadcast() { var svr *arpc.Server = ... msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i)) mux.RLock() for client := range clientMap { client.PushMsg(msg, arpc.TimeZero) } mux.RUnlock() }

异步响应

var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务端 handler = server.Handler // 客户端 handler = client.Handler asyncResponse := true // 默认为 true,或设置为 false handler.Handle("/echo", func(ctx *arpc.Context) { var req string err := ctx.Bind(&req) if err == nil { ctx.Write(req) } }, asyncResponse)

标准库风格 Stream

*arpc.Stream 实现了 net.Conn,可直接配合 io.Copy、使用 net.Listener 风格的库使用。 如果要把某个 Stream 路由交给只接收 net.Listener 的组件,可以使用 arpc.NewStreamListener

// 服务端:把 /rtmp stream route 暴露成 net.Listener server := arpc.NewServer() streamLn := arpc.NewStreamListener(server.Handler, "/rtmp") go func() { // 例如:rtmpServer.Serve(streamLn) for { conn, err := streamLn.Accept() if err != nil { return } go func(conn net.Conn) { defer conn.Close() // 这里可以按标准 net.Conn 方式读写 io.Copy(conn, conn) }(conn) } }() server.Run("localhost:8888")
// 客户端:把本地 TCP 连接桥接到远端 stream localConn, _ := net.Dial("tcp", "127.0.0.1:1935") stream := client.NewStream("/rtmp") go func() { defer stream.Close() defer localConn.Close() io.Copy(stream, localConn) }() go func() { defer stream.Close() defer localConn.Close() io.Copy(localConn, stream) }()

处理新连接

// 包级别 arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) { // ... }) // 服务端 svr := arpc.NewServer() svr.Handler.HandleConnected(func(c *arpc.Client) { // ... }) // 客户端 client, err := arpc.NewClient(...) client.Handler.HandleConnected(func(c *arpc.Client) { // ... })

处理断开连接

// 包级别 arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) { // ... }) // 服务端 svr := arpc.NewServer() svr.Handler.HandleDisconnected(func(c *arpc.Client) { // ... }) // 客户端 client, err := arpc.NewClient(...) client.Handler.HandleDisconnected(func(c *arpc.Client) { // ... })

处理客户端发送队列溢出

// 包级别 arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client, m *arpc.Message) { // ... }) // 服务端 svr := arpc.NewServer() svr.Handler.HandleOverstock(func(c *arpc.Client, m *arpc.Message) { // ... }) // 客户端 client, err := arpc.NewClient(...) client.Handler.HandleOverstock(func(c *arpc.Client, m *arpc.Message) { // ... })

自定义网络协议

// 服务端 var ln net.Listener = ... svr := arpc.NewServer() svr.Serve(ln) // 客户端 dialer := func() (net.Conn, error) { return ... } client, err := arpc.NewClient(dialer)

通过 HTTP 暴露 ARPC Handler

import ( "net/http" "github.com/lesismal/arpc" "github.com/lesismal/arpc/arpchttp" "github.com/lesismal/arpc/codec" ) func main() { handler := arpc.NewHandler() handler.Handle("/echo", func(ctx *arpc.Context) { _ = ctx.Write(ctx.Body()) }) // NewSyncHandler 会克隆一个同步副本: // 关闭异步写/异步响应,并将已注册路由改为同步执行。 http.HandleFunc("/rpc", arpchttp.NewSyncHandler(handler, codec.DefaultCodec)) http.ListenAndServe(":8080", nil) }
  • arpchttp.NewSyncHandler 适合标准的 HTTP request-response 场景,调用方无需手动修改原 Handler
  • arpchttp.NewHandler 保留原始异步语义,适合高级桥接或自定义传输场景
  • 纯 HTTP bridge 示例:github.com/lesismal/arpc/examples/arpchttp/server
  • 若需要 WebSocket + 浏览器页面的混合演示,参见:github.com/lesismal/arpcex/examples/httprpc

自定义编解码器

import "github.com/lesismal/arpc/codec" var codec arpc.Codec = ... // 包级别 codec.DefaultCodec = codec // 服务端 svr := arpc.NewServer() svr.Codec = codec // 客户端 client, err := arpc.NewClient(...) client.Codec = codec

示例:使用 Gob 或 Msgpack 编解码器

package main import ( "github.com/lesismal/arpc" "github.com/lesismal/arpc/codec" _ "github.com/lesismal/arpcex/codec/msgpack" ) func main() { // 切换为 Gob 编解码 if err := codec.Use("gob"); err != nil { panic(err) } // 如需使用 Msgpack,可直接按名称切换; // 只要导入 extension codec 包,它会自动完成注册。 // _ = codec.Use("msgpack") server := arpc.NewServer() client, _ := arpc.NewClient(...) // 也可以只对单个实例设置,而不影响全局默认值: // client.Codec = &codec.GobCodec{} _ = server _ = client }
  • 根包默认仅内置 jsongob 两种标准库 codec;像 msgpack 这类第三方 codec 建议通过 extension 模块按需导入。
  • 如果需要基于名称切换,可以先调用 codec.Register("mycodec", myCodec) 注册,再通过 codec.SetDefaultByName("mycodec")WithClientCodecName("mycodec") 等方式启用。
  • 创建客户端或服务端时,可使用 arpc.NewClientWithOptionsarpc.NewServerWithOptions 搭配 WithClientCodecWithServerCodecWithClientSendQueueSize 等选项一次性完成配置。
  • 运行中如需热切换,可直接调用 client.SetCodecByName("msgpack")server.SetCodecByName("gob"),内部会自动查表并原子更新实例。

构造参数与边界约束

  • NewClient / NewClientWithOptionsdialer 不能为空;传入 nil 会返回 ErrClientInvalidDialer
  • NewClientPoolWithOptionssize 必须大于 0;否则返回 ErrClientInvalidPoolSize
  • NewClientPoolFromDialersWithOptions 需要非空且不包含 nil 的 dialer 列表;否则会返回对应参数错误。
  • NewMessage / Client.NewMessage / Server.NewMessage 的附加参数仅接受 map[any]any 形式的 metadata;其他类型会被忽略而不会触发 panic。
  • ClientPool.GetClientPool.NextClientPool.Handler 在空池场景下会安全返回 nil,调用方可以直接做空值判断。

自定义日志记录

import "github.com/lesismal/arpc/log" var logger arpc.Logger = ... log.SetLogger(logger) // log.DefaultLogger = logger if err := log.SetLevelString("warn"); err != nil { panic(err) } // 也可以从环境变量读取,默认读取 ARPC_LOG_LEVEL: if err := log.SetLevelFromEnv(""); err != nil { panic(err) }

连接收发前的自定义操作

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error { // ... }) arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error { // ... })

通过包装 net.Conn 自定义 arpc.Client 的 Reader

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader { // ... })

自定义 arpc.Client 的发送队列容量

arpc.DefaultHandler.SetSendQueueSize(4096)

JS 客户端

Web 聊天示例

发布/订阅示例

  • 启动一个服务端
import "github.com/lesismal/arpcex/pubsub" var ( address = "localhost:8888" password = "123qwe" topicName = "Broadcast" ) func main() { s := pubsub.NewServer() s.Password = password // 服务端发布消息到所有客户端 go func() { for i := 0; true; i++ { time.Sleep(time.Second) s.Publish(topicName, fmt.Sprintf("message from server %v", i)) } }() s.Run(address) }
  • 启动一个订阅客户端
import "github.com/lesismal/arpc/log" import "github.com/lesismal/arpcex/pubsub" var ( address = "localhost:8888" password = "123qwe" topicName = "Broadcast" ) func onTopic(topic *pubsub.Topic) { log.Info("[OnTopic] [%v] %q, [%v]", topic.Name, string(topic.Data), time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000")) } func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password // 认证 err = client.Authenticate() if err != nil { panic(err) } // 订阅主题 if err := client.Subscribe(topicName, onTopic, time.Second); err != nil { panic(err) } <-make(chan int) }
  • 启动一个发布客户端
import "github.com/lesismal/arpcex/pubsub" var ( address = "localhost:8888" password = "123qwe" topicName = "Broadcast" ) func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password // 认证 err = client.Authenticate() if err != nil { panic(err) } for i := 0; true; i++ { if i%5 == 0 { // 发布消息到所有客户端 client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second) } else { // 发布消息到单个客户端 client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second) } time.Sleep(time.Second) } }

更多示例

  • 主模块示例:参见 examples
  • 扩展示例:参见 extension/examples
  • 单机 service-edge 网关:edgehub(控制面、edge 注册、按 edgeID 定向 Call/Notify/Stream,命令入口:cmd/edgehub
  • RTSP 隧道示例:examples/rtsp_tunnel(服务端下发拉流,客户端通过 ARPC stream 回传 RTSP over TCP)
  • 本仓库新增扩展示例:extension/examples/quick(Builder + 自动注册)
  • 代码生成器示例:examples/arpcgen

About

更有效的网络通信框架

11.59 MiB
0 forks0 stars10 branches33 TagREADMEMIT license
Language
Go77.9%
JavaScript19.4%
HTML1.7%
Makefile0.3%
Others0.7%