一个基于 Go 实现的企业级高性能协议分流器,具备毫秒级协议识别、智能连接池管理、DDoS 防护、内存优化和分布式架构等特性,能够自动识别并转发不同类型的网络协议到相应的目标服务器。
IdentifyMinBytes 多轮读取,稳健识别 RTMP、TLS 等长握手协议pm.AddProtocol(protoplex.CreateInternalProtocol(protocols.NewHTTPWebDAVProtocol, HTTPServiceHandler))IdentifyMinBytes 控制最小读取字节数,配合循环读取策略确保 RTMP/TLS 等长握手协议识别成功。Priority 管理常见协议优先级,结合缓存提高命中率。Protocol 设置 CustomValidator,在业务侧实现更复杂的合法性检查。| 配置项 | Go 选项 | 默认值 | 说明 |
|---|---|---|---|
MaxConnections | WithMaxConnections | 1024 | 同时允许的最大并发连接数 |
BufferSize | WithBufferSize | 32 * 1024 | 协议首帧读取的缓冲区大小 |
IdentifyTimeout | WithIdentifyTimeout | 15s | 协议识别阶段的超时时间 |
IdentifyMinBytes | WithIdentifyMinBytes | 1024 | 在判定协议前需要读取的最小字节数,支持多轮读取 |
CacheTTL | WithCacheTTL | 5m | 识别结果缓存生命周期 |
DialTimeout | WithDialTimeout | 5s | 代理目标连接的拨号超时 |
⚠️ 注意:
IdentifyMinBytes必须为正且不超过BufferSize,对 RTMP/TLS/OpenVPN 等握手较长协议建议设为2048或更高。
import (
"encoding/binary"
"github.com/darkit/protoplex/protocols"
)
func NewCustomBinary(target string) *protocols.Protocol {
return &protocols.Protocol{
Name: "CustomBinary",
Target: target,
MatchStartBytes: [][]byte{{0xAA, 0x55}},
Priority: 7,
CustomValidator: func(data []byte) bool {
if len(data) < 4 {
return false
}
length := binary.BigEndian.Uint16(data[2:4])
return int(length)+4 <= len(data)
},
}
}
pm.AddProtocol(NewCustomBinary("custom-backend:9000"))
Protocol 结构体关键字段:
type Protocol struct {
Name string
Target string
MatchStartBytes [][]byte
MatchBytes [][]byte
MatchRegexes []*regexp.Regexp
NoComparisonBeforeBytes int
NoComparisonAfterBytes int
EstablishConnection []EstablishConnection
Priority int
CustomValidator func([]byte) bool
}
支持 18+ 主流网络协议,覆盖 Web 服务、流媒体、远程访问、物联网等场景:
| 协议类别 | 支持协议 | 应用场景 |
|---|---|---|
| Web 服务 | HTTP、WebDAV、WebSocket | Web 应用、文件服务、实时通信 |
| 数据库 | Redis | 缓存服务、数据存储 |
| 流媒体 | RTSP、RTMP | 视频流、直播服务 |
| 远程访问 | SSH、RDP | 服务器管理、远程桌面 |
| 安全隧道 | TLS、OpenVPN、SOCKS4/5 | 加密通信、代理服务 |
| 物联网 | MQTT | IoT 设备通信 |
| 网络协议 | TURN/STUN | P2P 通信、NAT 穿透 |
| 通信协议 | FTP、DNS、SMTP、SIP | 文件传输、域名解析、邮件、VoIP |
💡 易扩展:基于插件化架构,可轻松添加自定义协议支持
go get github.com/darkit/protoplex
git clone https://github.com/darkit/protoplex.git
cd protoplex
go build -o bin/protoplex
docker run -d --name protoplex \ -p 9090:9090 \ -v /path/to/config.yaml:/etc/protoplex/config.yaml \ protoplex:latest
package main
import (
"context"
"log"
"github.com/darkit/protoplex"
"github.com/darkit/protoplex/protocols"
)
func main() {
// 创建企业级协议管理器
pm := protoplex.NewProtocolManager()
defer pm.Close() // 自动资源清理
// 添加协议支持(自动启用连接池、安全防护)
pm.AddProtocol(protocols.NewHTTPWebDAVProtocol("192.168.1.5:80"))
pm.AddProtocol(protocols.NewMQTTProtocol("127.0.0.1:1883"))
pm.AddProtocol(protocols.NewRedisProtocol("192.168.1.6:6379"))
log.Printf("🚀 Protoplex 启动成功,监听端口 :9090")
log.Printf("📊 监控面板:http://localhost:9091/metrics")
// 启动高性能服务器
if err := pm.RunServer(context.Background(), ":9090"); err != nil {
log.Fatal("启动失败:", err)
}
}
package main
import (
"context"
"net"
"github.com/darkit/protoplex"
"github.com/darkit/protoplex/protocols"
)
func HTTPServiceHandler(conn net.Conn) {
// 直接处理 HTTP 请求,无需网络转发
defer conn.Close()
conn.Write([]byte("HTTP/1.1 200 OK\r\n\r\nHello from internal service!"))
}
func RedisServiceHandler(conn net.Conn) {
// 直接处理 Redis 命令,无需网络转发
defer conn.Close()
conn.Write([]byte("+PONG\r\n"))
}
func main() {
pm := protoplex.NewProtocolManager()
defer pm.Close()
// 🎯 一行式协议注册 - 终极优雅!
pm.AddProtocol(protoplex.CreateInternalProtocol(protocols.NewHTTPWebDAVProtocol, HTTPServiceHandler))
pm.AddProtocol(protoplex.CreateInternalProtocol(protocols.NewRedisProtocol, RedisServiceHandler))
pm.AddProtocol(protoplex.CreateInternalProtocol(protocols.NewSMTPProtocol, SMTPServiceHandler))
// 🚀 零网络转发,直接内存路由!
pm.RunServer(context.Background(), ":9090")
}
package main
import (
"context"
"github.com/darkit/protoplex"
)
func main() {
// 创建分布式协议管理器
dm, err := protoplex.NewDistributedManager(protoplex.DistributedConfig{
NodeID: "protoplex-node-1",
EtcdEndpoints: []string{"etcd1:2379", "etcd2:2379", "etcd3:2379"},
ServiceDiscovery: protoplex.ServiceDiscoveryConfig{
Enable: true,
RefreshInterval: 30, // 30秒刷新一次服务列表
},
LoadBalancer: protoplex.LoadBalancerConfig{
Strategy: "weighted_round_robin",
HealthCheckInterval: 10, // 10秒健康检查间隔
},
ConfigManager: protoplex.ConfigManagerConfig{
Enable: true,
WatchPrefix: "/protoplex/config",
},
})
if err != nil {
log.Fatal("创建分布式管理器失败:", err)
}
defer dm.Close()
log.Printf("🌐 分布式 Protoplex 启动成功")
log.Printf("📊 服务发现:已发现 %d 个目标服务", dm.GetServiceCount())
// 启动分布式服务器
if err := dm.RunServer(context.Background(), ":9090"); err != nil {
log.Fatal("启动分布式服务器失败:", err)
}
}
// 创建高性能企业级配置
pm := protoplex.NewProtocolManager(
protoplex.WithMaxConnections(20000), // 支持 2 万并发
protoplex.WithBufferSize(128 * 1024), // 128KB 大缓冲区
protoplex.WithIdentifyTimeout(5 * time.Second), // 快速识别
protoplex.WithIdentifyMinBytes(2048), // 最小读取字节数,保障长握手协议识别
protoplex.WithCacheTTL(30 * time.Minute), // 长效缓存
protoplex.WithDialTimeout(3 * time.Second), // 快速连接
)
defer pm.Close()
// 配置协议优先级和负载均衡
httpProtocol := protocols.NewHTTPWebDAVProtocol("192.168.1.5:80")
httpProtocol.Priority = 1 // 最高优先级
pm.AddProtocol(httpProtocol)
// 多目标负载均衡
redisProtocol := protocols.NewRedisProtocol("redis-cluster:6379,redis-backup:6379")
pm.AddProtocol(redisProtocol)
// 实时性能监控(类型安全)
go func() {
for range time.Tick(10 * time.Second) {
metrics := pm.GetMetrics()
log.Printf("📊 活跃连接:%d,连接池效率:%.2f%%,缓存命中率:%.2f%%",
metrics.ActiveConnections,
float64(metrics.ConnectionPool.ConnectionsReused)/float64(metrics.ConnectionPool.ConnectionsCreated)*100,
metrics.CacheHitRate * 100)
}
}()
# config.yaml - 企业级配置文件
server:
max_connections: 20000
buffer_size: 131072 # 128KB
identify_timeout: "5s"
identify_min_bytes: 2048
cache_ttl: "30m"
dial_timeout: "3s"
security:
enable_rate_limit: true
rate_limit:
requests_per_second: 1000
burst_size: 100
enable_ddos_guard: true
ip_whitelist:
- "10.0.0.0/8"
- "192.168.0.0/16"
ip_blacklist: []
protocols:
- name: "HTTP"
target: "web-server:80"
priority: 1
- name: "Redis"
target: "redis-cluster:6379"
priority: 2
- name: "MQTT"
target: "mqtt-broker:1883"
priority: 3
monitoring:
enable_metrics: true
metrics_port: 9091
enable_health_check: true
health_check_port: 9092
# Phase 4: 分布式配置
distributed:
enable: true
node_id: "protoplex-node-1"
etcd_endpoints:
- "etcd1:2379"
- "etcd2:2379"
- "etcd3:2379"
service_discovery:
enable: true
refresh_interval: 30
load_balancer:
strategy: "weighted_round_robin"
health_check_interval: 10
config_manager:
enable: true
watch_prefix: "/protoplex/config"
// 使用配置文件启动
configManager, _ := protoplex.NewConfigManager("config.yaml")
pm := protoplex.NewProtocolManagerWithConfig(configManager)
// 配置自动重载
configManager.OnConfigChange(func(newConfig *protoplex.Config) {
log.Printf("🔄 配置已更新,应用新设置...")
pm.UpdateConfig(newConfig)
})
| 指标 | Phase 1 (基础版) | Phase 2 (企业版) | Phase 4 (分布式版) | 提升倍数 |
|---|---|---|---|---|
| 并发连接数 | 2,000 | 20,000+ | 100,000+ | 50x |
| 协议匹配速度 | O(n) 线性 | O(1) 常数 | O(1) 常数 | 5-10x |
| 内存使用 | 无优化 | 分层池化 | 类型安全池化 | 节省 60% |
| 缓存命中率 | 0% | 95%+ | 98%+ | ∞ |
| 故障检测 | 无 | 毫秒级 | 分布式监控 | - |
| 配置更新 | 需重启 | 热重载 | 分布式热重载 | - |
| 类型安全 | 否 | 否 | 完全类型安全 | - |
旧实现(全量重建): 23,939 ns/op, 22,976 B/op, 271 allocs/op 新实现(增量删除): 9,888 ns/op, 5,616 B/op, 70 allocs/op 性能提升: 2.42x 内存优化: 4.09x (减少 75.6% 内存分配)
旧实现(每次 ParseCIDR): O(n·parse) 复杂度,热路径性能瓶颈 新实现(预解析缓存): O(n) 复杂度,消除解析开销 特性: 支持精确 IP (O(1) 哈希) + CIDR 网段 (O(n) 前缀匹配) 混合场景
旧实现: 固定 30s 盲等,强制中断所有连接 新实现: 智能轮询(1秒间隔),活跃连接归零或超时后关闭 特性: - 新请求零中断(立即使用新连接池) - 旧连接自然完成(最长等待 60s) - 可配置 GracefulShutdownTimeout
// O(1) 首字节索引算法
type ProtocolIndex struct {
// 256 个索引桶,每个首字节对应一个桶
firstByteIndex [256][]*protocols.Protocol
// 类型安全统计信息
stats ProtocolIndexStats
}
type ConnectionPool struct {
pools map[string]*TargetPool // 按目标分组
health *HealthChecker // 健康检查
metrics PoolMetrics // 类型安全指标
}
type MemoryPoolManager struct {
smallPool *pool[[]byte] // 4KB - 协议识别
mediumPool *pool[[]byte] // 32KB - 一般转发
largePool *pool[[]byte] // 128KB - 大文件传输
identity *pool[[]byte] // 1600B - 协议头部
metrics DetailedMemoryPoolMetrics // 类型安全指标
}
type DistributedManager struct {
serviceDiscovery *ServiceDiscovery // 服务发现
loadBalancer *LoadBalancer // 负载均衡
configManager *DistributedConfig // 配置管理
healthChecker *DistributedHealth // 健康检查
}
type ProtocolManagerMetrics struct {
ActiveConnections int64 `json:"active_connections"`
ConnectionPool PoolMetrics `json:"connection_pool"`
Security SecurityMetrics `json:"security"`
MemoryPool DetailedMemoryPoolMetrics `json:"memory_pool"`
}
type ProtocolIndexStats struct {
TotalProtocols int `json:"total_protocols"`
FirstByteEntries int `json:"first_byte_entries"`
PrefixEntries int `json:"prefix_entries"`
FirstByteDistribution map[string]int `json:"first_byte_distribution"`
}
type SecurityMetrics struct {
TotalConnections int64 `json:"total_connections"`
RateLimitedRequests int64 `json:"rate_limited_requests"`
BlockedConnections int64 `json:"blocked_connections"`
WhitelistedAccess int64 `json:"whitelisted_access"`
BlacklistedDenied int64 `json:"blacklisted_denied"`
DDoSAttacks int64 `json:"ddos_attacks"`
}
type DetailedMemoryPoolMetrics struct {
PoolStats struct {
SmallPool PoolOperationStats `json:"small_pool"`
MediumPool PoolOperationStats `json:"medium_pool"`
LargePool PoolOperationStats `json:"large_pool"`
IdentityPool PoolOperationStats `json:"identity_pool"`
} `json:"pool_stats"`
MemoryStats struct {
CurrentAllocatedBytes int64 `json:"current_allocated_bytes"`
RuntimeAllocBytes uint64 `json:"runtime_alloc_bytes"`
} `json:"memory_stats"`
GCStats struct {
GCRuns uint32 `json:"gc_runs"`
GCPauseAvgNs uint64 `json:"gc_pause_avg_ns"`
} `json:"gc_stats"`
}
// 集成 Prometheus
import "github.com/prometheus/client_golang/prometheus/promhttp"
// 在 protoplex 中启用 metrics 导出
pm.EnablePrometheusMetrics(9091)
// 指标自动暴露在 http://localhost:9091/metrics
// 旧方式(需要类型断言)
metrics := pm.GetMetrics()
activeConns := metrics["active_connections"].(int64)
poolEfficiency := metrics["connection_pool"].(map[string]interface{})["efficiency"].(float64)
// 新方式(类型安全,编译时检查)
metrics := pm.GetMetrics()
activeConns := metrics.ActiveConnections
poolEfficiency := float64(metrics.ConnectionPool.ConnectionsReused) / float64(metrics.ConnectionPool.ConnectionsCreated) * 100
# Dockerfile FROM golang:1.23-alpine AS builder WORKDIR /app COPY . . RUN go build -o protoplex FROM alpine:latest RUN apk --no-cache add ca-certificates WORKDIR /root/ COPY --from=builder /app/protoplex . COPY --from=builder /app/config.yaml . EXPOSE 9090 9091 9092 CMD ["./protoplex", "-config", "config.yaml"]
# docker-compose.yml
version: "3.8"
services:
etcd:
image: quay.io/coreos/etcd:v3.5.0
ports:
- "2379:2379"
command:
- etcd
- --name=etcd-node
- --data-dir=/etcd-data
- --listen-client-urls=http://0.0.0.0:2379
- --advertise-client-urls=http://etcd:2379
- --initial-cluster-state=new
protoplex-node-1:
image: protoplex:latest
ports:
- "9090:9090"
- "9091:9091"
volumes:
- ./config-distributed.yaml:/root/config.yaml
depends_on:
- etcd
environment:
- PROTOPLEX_NODE_ID=node-1
- ETCD_ENDPOINTS=etcd:2379
protoplex-node-2:
image: protoplex:latest
ports:
- "9093:9090"
- "9094:9091"
volumes:
- ./config-distributed.yaml:/root/config.yaml
depends_on:
- etcd
environment:
- PROTOPLEX_NODE_ID=node-2
- ETCD_ENDPOINTS=etcd:2379
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: protoplex-deployment
labels:
app: protoplex
spec:
replicas: 3
selector:
matchLabels:
app: protoplex
template:
metadata:
labels:
app: protoplex
spec:
containers:
- name: protoplex
image: protoplex:latest
ports:
- containerPort: 9090
name: service
- containerPort: 9091
name: metrics
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health/live
port: 9092
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 9092
initialDelaySeconds: 5
periodSeconds: 5
env:
- name: PROTOPLEX_NODE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ETCD_ENDPOINTS
value: "etcd-service:2379"
volumeMounts:
- name: config
mountPath: /root/config.yaml
subPath: config.yaml
volumes:
- name: config
configMap:
name: protoplex-config
---
apiVersion: v1
kind: Service
metadata:
name: protoplex-service
spec:
selector:
app: protoplex
ports:
- name: service
port: 9090
targetPort: 9090
- name: metrics
port: 9091
targetPort: 9091
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
name: etcd-service
spec:
selector:
app: etcd
ports:
- port: 2379
targetPort: 2379
连接池配置
缓存策略
类型安全
分布式部署
欢迎提交 PR 和 Issue!在贡献之前请:
git checkout -b feature/amazing-featuregit commit -m 'Add amazing feature'git push origin feature/amazing-featureMIT License - 详见 LICENSE 文件
感谢所有贡献者对 Protoplex 项目的支持!
🚀 现在就开始使用 Protoplex,体验企业级高性能协议分流的未来!