高性能 Go 语言服务器发送事件 (Server-Sent Events, SSE) 实现,提供完整的服务端和客户端支持,专为企业级应用设计。
topics,支持按主题广播并自动去重BenchmarkSSEPushOneClient-4 1000000 3575 ns/op 279,699 events/sec BenchmarkSSEServerBroadcast-4 120523 26731 ns/op 37,410 events/sec (10 客户端)
性能优化:
sync.Pool 对象池减少 GC 压力go get cnb.cool/zishuo/sse
依赖要求:
package main
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"time"
"cnb.cool/zishuo/sse"
)
func main() {
// 1. 创建 SSE Hub (事件中心)
hub := sse.NewHub(
sse.WithLogger(slog.Default()), // 自定义日志
sse.WithPushBufferSize(2000), // 推送缓冲区大小
sse.WithWorkerNum(20), // 异步工作池大小
sse.WithPushFailedHandleFn(handleFailed), // 推送失败回调
sse.WithMaxConnections(1000), // 最大并发连接数(可选)
sse.WithAuthCallback(authenticateRequest), // 认证后直接产出 canonical uid(推荐)
sse.WithHMACAuth("your-secret", time.Minute), // HMAC 鉴权(可选)
)
defer hub.Close()
// 2. 创建 HTTP 路由
mux := http.NewServeMux()
// 3. SSE 事件流接口(默认只从 context 获取 uid)
// topics 仍可通过 query/header 传递,如 /events?topics=news,sports
mux.Handle("/events", sse.WithUserIDFromHeader("X-User-ID")(http.HandlerFunc(hub.ServeHandler(sse.WithServeCORSOrigin("https://app.example.com")))))
// 或者手动处理 uid
// mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
// uid := r.Header.Get("X-User-ID")
// if uid == "" {
// http.Error(w, "未认证", http.StatusUnauthorized)
// return
// }
// hub.Serve(w, r, uid)
// })
// 4. 推送接口
mux.HandleFunc("/push", hub.PushEventHandler())
// 5. 统计接口
mux.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"online_users": hub.OnlineClientsNum(),
})
})
// 6. 模拟事件推送
go simulateEvents(hub)
// 7. 启动服务器
if err := http.ListenAndServe(":8080", mux); err != nil {
panic(err)
}
}
// 模拟事件推送
func simulateEvents(hub *sse.Hub) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
i := 0
for range ticker.C {
i++
event := &sse.Event{
Event: sse.DefaultEventType, // "message"
Data: map[string]interface{}{
"id": i,
"message": "Hello World",
"time": time.Now().Unix(),
},
}
// 广播给所有在线用户 (使用便捷方法)
if err := hub.Broadcast(event); err != nil {
slog.Error("推送失败", "error", err)
}
// 或者推送给单个用户
// hub.PushOne("user1", event)
// 或者推送给多个用户
// hub.PushTo(event, "user1", "user2", "user3")
// 或者按主题推送(topics 在客户端连接时声明)
// hub.PushTopic("news", event)
}
}
// 推送失败处理
func handleFailed(uid string, event *sse.Event) {
slog.Warn("推送失败",
"uid", uid,
"event_id", event.ID,
"event_type", event.Event,
)
// 可以在这里实现重试逻辑、数据库记录等
}
package main
import (
"fmt"
"log/slog"
"time"
"cnb.cool/zishuo/sse"
)
func main() {
// 1. 创建 SSE 客户端
client := sse.NewClient(
"http://localhost:8080/events",
sse.WithClientHeaders(map[string]string{
"Authorization": "Bearer your-token",
// HMAC 示例: 计算签名后放入 Header
// "X-Timestamp": ts,
// "X-Signature": sig,
}),
sse.WithClientReconnectTimeInterval(3 * time.Second),
sse.WithClientLogger(slog.Default()),
)
// 2. 注册事件处理器
client.OnEvent(sse.DefaultEventType, func(event *sse.Event) {
fmt.Printf("收到消息: %+v\n", event.Data)
})
client.OnEvent("notification", func(event *sse.Event) {
fmt.Printf("收到通知: %+v\n", event.Data)
})
// 3. 连接到服务器
if err := client.Connect(); err != nil {
fmt.Printf("连接失败: %v\n", err)
return
}
fmt.Println("SSE 客户端已启动,按 Ctrl+C 退出")
// 4. 等待连接关闭
<-client.Wait()
}
# 1. 连接 SSE 服务器
curl -N http://localhost:8080/events
# 2. 推送消息给指定用户
curl -X POST http://localhost:8080/push \
-H "Content-Type: application/json" \
-d '{
"uids": ["user1", "user2"],
"events": [{
"event": "message",
"data": {"text": "Hello"}
}]
}'
# 3. 广播消息给所有用户
curl -X POST http://localhost:8080/push \
-H "Content-Type: application/json" \
-d '{
"events": [{
"event": "notification",
"data": {"title": "系统通知", "content": "新版本发布"}
}]
}'
// 使用原生 EventSource
const eventSource = new EventSource("http://localhost:8080/events");
// 监听默认消息事件
eventSource.addEventListener("message", (e) => {
const data = JSON.parse(e.data);
console.log("收到消息:", data);
});
// 监听自定义事件
eventSource.addEventListener("notification", (e) => {
const data = JSON.parse(e.data);
console.log("收到通知:", data);
});
// 监听连接打开
eventSource.onopen = () => {
console.log("SSE 连接已建立");
};
// 监听错误
eventSource.onerror = (error) => {
console.error("SSE 错误:", error);
};
// 关闭连接
// eventSource.close();
实现 Store 接口以支持事件持久化和断线重连后的事件重放:
// Store 接口定义
type Store interface {
// 保存事件
Save(ctx context.Context, e *Event) error
// 根据事件类型和最后事件 ID 查询事件列表
// 返回: 事件列表、下一个事件 ID、错误
ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*Event, string, error)
}
// 示例: 基于内存的 Store 实现
type MemoryStore struct {
mu sync.RWMutex
events map[string][]*sse.Event // eventType -> events
}
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
events: make(map[string][]*sse.Event),
}
}
func (s *MemoryStore) Save(ctx context.Context, e *sse.Event) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.events[e.Event] == nil {
s.events[e.Event] = make([]*sse.Event, 0)
}
s.events[e.Event] = append(s.events[e.Event], e)
return nil
}
func (s *MemoryStore) ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*sse.Event, string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
events, ok := s.events[eventType]
if !ok {
return nil, "", nil
}
// 查找起始位置
start := 0
if lastID != "" {
found := false
for i, e := range events {
if e.ID == lastID {
start = i + 1
found = true
break
}
}
if !found {
return nil, "", nil
}
}
// 分页返回
end := start + pageSize
if end > len(events) {
end = len(events)
}
result := events[start:end]
nextID := ""
if end < len(events) && len(result) > 0 {
nextID = result[len(result)-1].ID
}
return result, nextID, nil
}
// 创建带持久化的 Hub
func main() {
store := NewMemoryStore()
hub := sse.NewHub(
sse.WithStore(store),
sse.WithEnableResendEvents(), // 启用断线重连后的事件重传
)
defer hub.Close()
// ... 其他代码
}
Redis Store 示例:
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
"cnb.cool/zishuo/sse"
)
type RedisStore struct {
client *redis.Client
}
func NewRedisStore(addr string) *RedisStore {
return &RedisStore{
client: redis.NewClient(&redis.Options{
Addr: addr,
}),
}
}
func (s *RedisStore) Save(ctx context.Context, e *sse.Event) error {
key := fmt.Sprintf("sse:%s", e.Event)
data, err := json.Marshal(e)
if err != nil {
return err
}
// 使用 List 存储,按时间顺序
return s.client.RPush(ctx, key, data).Err()
}
func (s *RedisStore) ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*sse.Event, string, error) {
key := fmt.Sprintf("sse:%s", eventType)
// 获取总数
total, err := s.client.LLen(ctx, key).Result()
if err != nil {
return nil, "", err
}
// 查找起始位置
start := int64(0)
if lastID != "" {
// 查找 lastID 的位置 (需要遍历或使用更高效的索引方案)
// 这里简化处理
}
// 分页获取
end := start + int64(pageSize) - 1
if end >= total {
end = total - 1
}
values, err := s.client.LRange(ctx, key, start, end).Result()
if err != nil {
return nil, "", err
}
var events []*sse.Event
for _, v := range values {
var e sse.Event
if err := json.Unmarshal([]byte(v), &e); err != nil {
continue
}
events = append(events, &e)
}
nextID := ""
if int64(len(events)) == int64(pageSize) && end < total-1 {
nextID = events[len(events)-1].ID
}
return events, nextID, nil
}
// 获取在线用户数
onlineNum := hub.OnlineClientsNum()
// 打印推送统计
hub.PrintPushStats()
// 输出示例:
// PushStats: Total: 1000, Success: 950, Failed: 30, Timeout: 20
// 获取统计数据
stats := hub.PushStats
total := atomic.LoadInt64(&stats.total)
success := atomic.LoadInt64(&stats.success)
failed := atomic.LoadInt64(&stats.failed)
timeout := atomic.LoadInt64(&stats.timeout)
// 主动推送心跳 (通常由 Hub 自动管理)
hub.PushHeartBeat("user123")
// 优雅关闭 Hub
hub.Close()
// 或者关闭但允许客户端继续重连
// hub.CloseAllowReconnect()
为了简化常见推送场景,Hub 提供了三个便捷方法:
// 1. 广播给所有用户 (最常用)
hub.Broadcast(event)
// 等价于: hub.Push(nil, event)
// 2. 推送给单个用户
hub.PushOne("user123", event)
// 等价于: hub.Push([]string{"user123"}, event)
// 3. 推送给多个指定用户
hub.PushTo(event, "user1", "user2", "user3")
// 等价于: hub.Push([]string{"user1", "user2", "user3"}, event)
// 4. 批量推送多个事件 (使用通用方法)
hub.Push(uids, event1, event2, event3)
使用建议:
Push() 通用方法Broadcast() 语义更清晰使用事件对象池减少 GC 压力 (适用于高频推送场景):
// 从对象池获取事件对象
event := sse.GetEvent()
event.Event = "message"
event.Data = map[string]string{"text": "Hello"}
// 推送事件 (使用便捷方法)
hub.Broadcast(event)
// 使用完毕后归还对象池
sse.PutEvent(event)
// 服务端设置额外响应头
mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
uid := getUserID(r)
hub.Serve(w, r, uid, sse.WithServeExtraHeaders(map[string]string{
"X-Custom-Header": "value",
"X-Request-ID": generateRequestID(),
}))
})
// 客户端设置请求头
client := sse.NewClient(url,
sse.WithClientHeaders(map[string]string{
"Authorization": "Bearer token",
"X-Client-ID": "client-123",
}),
)
// 自定义推送失败处理函数
failedHandler := func(uid string, event *sse.Event) {
// 记录到日志
slog.Error("推送失败",
"uid", uid,
"event_id", event.ID,
"event_type", event.Event,
)
// 保存到数据库用于后续重试
db.SaveFailedEvent(uid, event)
// 发送告警通知
alerting.Send("SSE 推送失败", uid, event.ID)
}
hub := sse.NewHub(
sse.WithPushFailedHandleFn(failedHandler),
)
// 使用自定义 Context 控制 Hub 生命周期
ctx, cancel := context.WithCancel(context.Background())
hub := sse.NewHub(
sse.WithContext(ctx, cancel),
)
// 在需要时取消
cancel()
| 选项 | 说明 | 默认值 |
|---|---|---|
WithStore(store Store) | 设置事件持久化存储 | nil (不持久化) |
WithEnableResendEvents() | 启用断线重连后的事件重传 | false |
WithLogger(logger *slog.Logger) | 设置日志记录器 | slog.Default() |
WithPushBufferSize(size int) | 设置推送缓冲区大小 | 1000 |
WithWorkerNum(num int) | 设置异步工作池 goroutine 数量 | 10 |
WithPushFailedHandleFn(fn) | 设置推送失败回调函数 | nil |
WithContext(ctx, cancel) | 设置 Hub 的上下文 | context.Background() |
WithAuthCallback(fn) | 认证请求并返回 canonical uid | nil |
| 选项 | 说明 | 默认值 |
|---|---|---|
WithServeExtraHeaders(headers map[string]string) | 设置额外的 HTTP 响应头 | nil |
WithServeCORSOrigin(origin string) | 显式设置 CORS Origin | nil |
| 选项 | 说明 | 默认值 |
|---|---|---|
WithClientHeaders(headers map[string]string) | 设置 HTTP 请求头 | nil |
WithClientLogger(logger *slog.Logger) | 设置日志记录器 | slog.Default() |
WithClientReconnectTimeInterval(d time.Duration) | 设置重连间隔 | 5s |
// 创建新的事件中心
func NewHub(opts ...HubOption) *Hub
// 推送事件 (通用方法)
// uids: 用户 ID 列表,为 nil 时广播给所有在线用户
// events: 事件列表
func (h *Hub) Push(uids []string, events ...*Event) error
// 广播单个事件给所有在线用户 (便捷方法)
func (h *Hub) Broadcast(event *Event) error
// 推送单个事件给单个用户 (便捷方法)
func (h *Hub) PushOne(uid string, event *Event) error
// 推送单个事件给多个指定用户 (便捷方法)
func (h *Hub) PushTo(event *Event, uids ...string) error
// 为客户端提供 SSE 服务 (需要手动获取 uid)
func (h *Hub) Serve(w http.ResponseWriter, r *http.Request, uid string, opts ...ServeOption)
// 返回标准 http.HandlerFunc (默认只从 context 获取用户 ID)
func (h *Hub) ServeHandler(opts ...ServeOption) http.HandlerFunc
// 返回推送事件的 http.HandlerFunc
func (h *Hub) PushEventHandler() http.HandlerFunc
// 获取在线客户端数量
func (h *Hub) OnlineClientsNum() int
// 打印推送统计信息
func (h *Hub) PrintPushStats()
// 推送心跳给指定用户
func (h *Hub) PushHeartBeat(uid string)
// 关闭 Hub,并通知客户端停止重连
func (h *Hub) Close()
// 关闭 Hub,但允许客户端按自己的重试策略继续重连
func (h *Hub) CloseAllowReconnect()
// 创建新的 SSE 客户端
func NewClient(url string, opts ...ClientOption) *SSEClient
// 注册事件回调函数
func (c *SSEClient) OnEvent(eventType string, callback EventCallback)
// 连接到服务器
func (c *SSEClient) Connect() error
// 断开连接
func (c *SSEClient) Disconnect()
// 返回等待通道 (用于阻塞主 goroutine)
func (c *SSEClient) Wait() <-chan struct{}
// 获取连接状态
func (c *SSEClient) GetConnectStatus() bool
type Event struct {
ID string `json:"id"` // 事件 ID (唯一,自动生成)
Event string `json:"event"` // 事件类型
Data interface{} `json:"data"` // 事件数据 (任意 JSON 可序列化类型)
}
// 检查事件是否有效
func (e *Event) CheckValid() error
// 创建关闭事件 (通知客户端不要重连)
func CloseEvent() *Event
// 从对象池获取事件对象
func GetEvent() *Event
// 归还事件对象到对象池
func PutEvent(event *Event)
type Store interface {
// 保存事件
Save(ctx context.Context, e *Event) error
// 根据事件类型和最后事件 ID 查询事件列表
// 返回: 事件列表、下一次请求应继续使用的 lastID cursor、错误
ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*Event, string, error)
}
type PushStats struct {
// 使用 atomic 操作访问
}
// 递增总推送数
func (s *PushStats) IncTotal()
// 递增成功推送数
func (s *PushStats) IncSuccess()
// 递增失败推送数
func (s *PushStats) IncFailed()
// 递增超时推送数
func (s *PushStats) IncTimeout()
// 使用 HTTP 中间件进行认证
func AuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("Authorization")
if token == "" {
http.Error(w, "未授权", http.StatusUnauthorized)
return
}
// 验证 token 并获取 uid
uid, err := validateToken(token)
if err != nil {
http.Error(w, "无效 token", http.StatusUnauthorized)
return
}
sse.WithUserID(uid)(next).ServeHTTP(w, r)
})
}
// 使用中间件
mux.Handle("/events", AuthMiddleware(http.HandlerFunc(hub.ServeHandler())))
// 或直接让 Hub 在认证阶段产出 canonical uid
hub := sse.NewHub(
sse.WithAuthCallback(func(r *http.Request) (string, error) {
return validateToken(r.Header.Get("Authorization"))
}),
)
// 或者使用内置的中间件辅助函数
mux.Handle("/events", sse.WithUserID(uid)(http.HandlerFunc(hub.ServeHandler())))
// 若必须从请求中提取 uid,请显式声明可信来源
mux.Handle("/events", sse.WithUserIDFromHeader("X-User-ID")(http.HandlerFunc(hub.ServeHandler())))
const (
EventTypeMessage = "message" // 普通消息
EventTypeNotification = "notification" // 系统通知
EventTypeAlert = "alert" // 告警
EventTypeStatus = "status" // 状态更新
EventTypeClose = "close" // 连接关闭
)
// 定义事件数据结构
type MessageData struct {
From string `json:"from"`
Content string `json:"content"`
Time int64 `json:"time"`
}
type NotificationData struct {
Title string `json:"title"`
Content string `json:"content"`
Level string `json:"level"` // info/warning/error
}
// 推送时使用
event := &sse.Event{
Event: EventTypeNotification,
Data: NotificationData{
Title: "系统升级",
Content: "系统将于今晚 22:00 进行维护",
Level: "warning",
},
}
// 服务端
hub := sse.NewHub(
sse.WithPushFailedHandleFn(func(uid string, event *sse.Event) {
// 记录失败日志
slog.Error("推送失败",
"uid", uid,
"event", event.Event,
"event_id", event.ID,
)
// 保存到失败队列用于重试
failedQueue.Push(uid, event)
}),
)
// 客户端
client := sse.NewClient(url)
client.OnEvent("error", func(event *sse.Event) {
slog.Error("收到错误事件", "data", event.Data)
})
// 1. 根据并发量调整缓冲区大小
hub := sse.NewHub(
sse.WithPushBufferSize(5000), // 高并发场景
sse.WithWorkerNum(50), // 增加工作协程数
)
// 2. 使用事件对象池 + 便捷方法 (高频推送场景)
event := sse.GetEvent()
defer sse.PutEvent(event)
event.Event = "message"
event.Data = data
hub.Broadcast(event)
// 3. 批量推送 (使用通用方法)
events := []*sse.Event{event1, event2, event3}
hub.Push(uids, events...)
// 4. 使用便捷方法简化代码
hub.PushOne("user1", event) // 单用户
hub.PushTo(event, "u1", "u2", "u3") // 多用户
hub.Broadcast(event) // 广播
// 5. 避免在回调中执行耗时操作
client.OnEvent("message", func(event *sse.Event) {
// ❌ 错误: 阻塞回调
// processHeavyTask(event.Data)
// ✅ 正确: 异步处理
go processHeavyTask(event.Data)
})
// 使用结构化日志
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// 配置 Hub
hub := sse.NewHub(
sse.WithLogger(logger),
sse.WithStore(redisStore),
sse.WithEnableResendEvents(),
sse.WithPushBufferSize(2000),
sse.WithWorkerNum(20),
sse.WithPushFailedHandleFn(handleFailedPush),
)
// 优雅关闭
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
logger.Info("收到关闭信号,开始优雅关闭...")
// 关闭 Hub,但允许客户端继续按自身策略重连
hub.CloseAllowReconnect()
// 等待正在处理的请求完成
time.Sleep(5 * time.Second)
os.Exit(0)
}()
Q: 客户端连接后立即断开
A: 检查以下几点:
Content-Type: text/event-stream)proxy_buffering off)Q: 事件推送失败
A:
hub.PushStats 统计信息hub.OnlineClientsNum()Q: 内存占用持续增长
A:
GetEvent() / PutEvent()WithPushBufferSize()Q: 断线重连后收不到历史消息
A:
WithEnableResendEvents()Last-Event-ID 头ListByLastID 方法的分页逻辑建议监控以下指标:
// Prometheus 示例
import "github.com/prometheus/client_golang/prometheus"
var (
onlineUsers = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "sse_online_users",
Help: "当前在线用户数",
})
pushTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sse_push_total",
Help: "总推送次数",
})
pushSuccess = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sse_push_success",
Help: "成功推送次数",
})
pushFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sse_push_failed",
Help: "失败推送次数",
})
)
// 定期更新指标
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
onlineUsers.Set(float64(hub.OnlineClientsNum()))
stats := hub.PushStats
pushTotal.Add(float64(atomic.LoadInt64(&stats.total)))
pushSuccess.Add(float64(atomic.LoadInt64(&stats.success)))
pushFailed.Add(float64(atomic.LoadInt64(&stats.failed)))
}
}()
欢迎提交 Issue 和 Pull Request!
在提交 PR 前,请确保:
go fmt 格式化go test ./...MIT License
如有问题或建议,请通过以下方式联系: