logo
Public
0
0
WeChat Login

ARPC - 更高效的网络通信

Slack

Mentioned in Awesome Go MIT licensed Build Status Go Report Card Coverage Statusd

目录

功能特性

  • 双向调用
  • 双向通知
  • 同步和异步调用
  • 同步和异步响应
  • 批量写入 | Writev | net.Buffers
  • 广播
  • 中间件
  • 发布/订阅
  • Opentracing
模式交互方向描述
call双向:
客户端 -> 服务器
服务器 -> 客户端
请求和响应
notify双向:
客户端 -> 服务器
服务器 -> 客户端
请求不响应

性能

以下是包含 arpc 的第三方基准测试,尽管这些仓库提供了性能报告,但我建议你自己运行代码获取真实结果,而不仅仅相信他人的文档

消息头布局

  • 小端字节序
消息体长度保留字段命令标志位方法名长度序列号方法名消息体
4 字节1 字节1 字节1 字节1 字节8 字节methodLen 字节bodyLen-methodLen 字节

安装

  1. 获取并安装 arpc
$ go get -u github.com/lesismal/arpc
  1. 在你的代码中导入:
import "github.com/lesismal/arpc"

快速开始

package main import "github.com/lesismal/arpc" func main() { server := arpc.NewServer() // 注册路由 server.Handler.Handle("/echo", func(ctx *arpc.Context) { str := "" if err := ctx.Bind(&str); err == nil { ctx.Write(str) } }) server.Run("localhost:8888") }
package main import ( "log" "net" "time" "github.com/lesismal/arpc" ) func main() { client, err := arpc.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", "localhost:8888", time.Second*3) }) if err != nil { panic(err) } defer client.Stop() req := "hello" rsp := "" err = client.Call("/echo", &req, &rsp, time.Second*5) if err != nil { log.Fatalf("Call failed: %v", err) } else { log.Printf("Call Response: \"%v\"", rsp) } }

API 示例

注册路由

var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务器 handler = server.Handler // 客户端 handler = client.Handler // 消息将在同一连接读取 goroutine 中默认顺序处理 handler.Handle("/route", func(ctx *arpc.Context) { ... }) handler.Handle("/route2", func(ctx *arpc.Context) { ... }) // 这会让消息由新的 goroutine 处理 async := true handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

路由中间件

详见路由中间件,实现中间件非常容易

import "github.com/lesismal/arpcex/middleware/router" var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务器 handler = server.Handler // 客户端 handler = client.Handler handler.Use(router.Recover()) handler.Use(router.Logger()) handler.Use(func(ctx *arpc.Context) { ... }) handler.Handle("/echo", func(ctx *arpc.Context) { ... }) handler.Use(func(ctx *arpc.Context) { ... })

编码器中间件

  • 编码器中间件用于将消息数据转换为你设计的格式,如加密/解密和压缩/解压缩
import "github.com/lesismal/arpcex/middleware/coder/gzip" var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务器 handler = server.Handler // 客户端 handler = client.Handler handler.UseCoder(gzip.New()) handler.Handle("/echo", func(ctx *arpc.Context) { ... })

客户端 Call, CallAsync, Notify

  1. Call(阻塞,支持超时/context)
request := &Echo{...} response := &Echo{} timeout := time.Second*5 err := client.Call("/call/echo", request, response, timeout) // ctx, cancel := context.WithTimeout(context.Background(), time.Second) // defer cancel() // err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync(非阻塞,支持回调和超时/context)
request := &Echo{...} timeout := time.Second*5 err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) { response := &Echo{} ctx.Bind(response) ... }, timeout)
  1. Notify(与 CallAsync 相同,支持超时/context,但没有回调)
data := &Notify{...} client.Notify("/notify", data, time.Second) // ctx, cancel := context.WithTimeout(context.Background(), time.Second) // defer cancel() // client.NotifyWith(ctx, "/notify", data)

服务器 Call, CallAsync, Notify

  1. 获取客户端并在应用中保存它
var client *arpc.Client server.Handler.Handle("/route", func(ctx *arpc.Context) { client = ctx.Client // 释放客户端 client.OnDisconnected(func(c *arpc.Client){ client = nil }) }) go func() { for { time.Sleep(time.Second) if client != nil { client.Call(...) client.CallAsync(...) client.Notify(...) } } }()
  1. 然后调用 Call/CallAsync/Notify

广播 - Notify

var mux = sync.RWMutex{} var clientMap = make(map[*arpc.Client]struct{}) func broadcast() { var svr *arpc.Server = ... msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i)) mux.RLock() for client := range clientMap { client.PushMsg(msg, arpc.TimeZero) } mux.RUnlock() }

异步响应

var handler arpc.Handler // 包级别 handler = arpc.DefaultHandler // 服务器 handler = server.Handler // 客户端 handler = client.Handler asyncResponse := true // 默认为 true,或设置为 false handler.Handle("/echo", func(ctx *arpc.Context) { req := ... err := ctx.Bind(req) if err == nil { ctx.Write(data) } }, asyncResponse)

处理新连接

// 包级别 arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) { ... }) // 服务器 svr := arpc.NewServer() svr.Handler.HandleConnected(func(c *arpc.Client) { ... }) // 客户端 client, err := arpc.NewClient(...) client.Handler.HandleConnected(func(c *arpc.Client) { ... })

处理断开连接

// 包级别 arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) { ... }) // 服务器 svr := arpc.NewServer() svr.Handler.HandleDisconnected(func(c *arpc.Client) { ... }) // 客户端 client, err := arpc.NewClient(...) client.Handler.HandleDisconnected(func(c *arpc.Client) { ... })

处理客户端发送队列溢出

// 包级别 arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) { ... }) // 服务器 svr := arpc.NewServer() svr.Handler.HandleOverstock(func(c *arpc.Client) { ... }) // 客户端 client, err := arpc.NewClient(...) client.Handler.HandleOverstock(func(c *arpc.Client) { ... })

自定义网络协议

// 服务器 var ln net.Listener = ... svr := arpc.NewServer() svr.Serve(ln) // 客户端 dialer := func() (net.Conn, error) { return ... } client, err := arpc.NewClient(dialer)

自定义编解码器

import "github.com/lesismal/arpc/codec" var codec arpc.Codec = ... // 包级别 codec.Defaultcodec = codec // 服务器 svr := arpc.NewServer() svr.Codec = codec // 客户端 client, err := arpc.NewClient(...) client.Codec = codec

示例:使用 Gob 或 Msgpack 编解码器

package main import ( "github.com/lesismal/arpc" "github.com/lesismal/arpc/codec" ) func main() { // 切换为 Gob 编解码 if err := codec.Use("gob"); err != nil { panic(err) } // 如果需要使用 Msgpack 编解码,请改为: // _ = codec.Use("msgpack") server := arpc.NewServer() client, _ := arpc.NewClient(...) // 也可以只对单个实例设置,而不影响全局默认值: // server.Codec = &codec.MsgpackCodec{} // client.Codec = &codec.GobCodec{} _ = server _ = client }
  • 如果需要基于名称切换,可以先调用 codec.Register("mycodec", myCodec) 注册,再通过 codec.SetDefaultByName("mycodec")WithClientCodecName("mycodec") 等方式启用。
  • 创建客户端或服务器时,可使用 arpc.NewClientWithOptionsarpc.NewServerWithOptions 搭配 WithClientCodecWithServerCodecWithClientSendQueueSize 等选项一次性完成配置。
  • 运行中若需要热切换,可直接调用 client.SetCodecByName("msgpack")server.SetCodecByName("gob"),内部会自动查表并原子更新实例。

自定义日志记录

import "github.com/lesismal/arpc/log" var logger arpc.Logger = ... log.SetLogger(logger) // log.DefaultLogger = logger

连接收发前的自定义操作

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) { // ... }) arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) { // ... })

通过包装 net.Conn 自定义 arpc.Client 的 Reader

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) { // ... })

自定义 arpc.Client 的发送队列容量

arpc.DefaultHandler.SetSendQueueSize(4096)

JS 客户端

Web 聊天示例

发布/订阅示例

  • 启动一个服务器
import "github.com/lesismal/arpcex/pubsub" var ( address = "localhost:8888" password = "123qwe" topicName = "Broadcast" ) func main() { s := pubsub.NewServer() s.Password = password // 服务器发布消息到所有客户端 go func() { for i := 0; true; i++ { time.Sleep(time.Second) s.Publish(topicName, fmt.Sprintf("message from server %v", i)) } }() s.Run(address) }
  • 启动一个订阅客户端
import "github.com/lesismal/arpc/log" import "github.com/lesismal/arpcex/pubsub" var ( address = "localhost:8888" password = "123qwe" topicName = "Broadcast" ) func onTopic(topic *pubsub.Topic) { log.Info("[OnTopic] [%v] \"%v\", [%v]", topic.Name, string(topic.Data), time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000")) } func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password // 认证 err = client.Authenticate() if err != nil { panic(err) } // 订阅主题 if err := client.Subscribe(topicName, onTopic, time.Second); err != nil { panic(err) } <-make(chan int) }
  • 启动一个发布客户端
import "github.com/lesismal/arpcex/pubsub" var ( address = "localhost:8888" password = "123qwe" topicName = "Broadcast" ) func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password // 认证 err = client.Authenticate() if err != nil { panic(err) } for i := 0; true; i++ { if i%5 == 0 { // 发布消息到所有客户端 client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second) } else { // 发布消息到单个客户端 client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second) } time.Sleep(time.Second) } }

更多示例

  • 参见 examples
  • 本仓库新增示例:extension/examples/quick(Builder + 自动注册)、extension/examples/arpcgen(代码生成器)

About

更有效的网络通信框架

1.65 MiB
0 forks0 stars10 branches33 TagREADMEMIT license
Language
Go83.2%
JavaScript12.9%
HTML2.4%
Makefile0.5%
Others1%