diff --git a/README.md b/README.md index 9cf3136f..13a126d3 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,632 @@ Upper-layer callers need to determine whether to incorporate a retry mechanism b In summary, the data transmission interface of getty does not come with an inherent retry mechanism; instead, it is up to upper-layer callers to decide whether to implement retry logic based on specific situations. This design approach provides developers with greater flexibility in controlling the behavior of data transmission. +## Session Management + +Session is the core component of the Getty framework, responsible for managing connection sessions between clients and servers. Each connection corresponds to a Session instance, providing complete connection lifecycle management. + +### Session Interface + +```go +type Session interface { + Connection + Reset() + Conn() net.Conn + Stat() string + IsClosed() bool + EndPoint() EndPoint + SetMaxMsgLen(int) + SetName(string) + SetEventListener(EventListener) + SetPkgHandler(ReadWriter) + SetReader(Reader) + SetWriter(Writer) + SetCronPeriod(int) + SetWaitTime(time.Duration) + GetAttribute(any) any + SetAttribute(any, any) + RemoveAttribute(any) + WritePkg(pkg any, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error) + WriteBytes([]byte) (int, error) + WriteBytesArray(...[]byte) (int, error) + Close() + AddCloseCallback(handler, key any, callback CallBackFunc) + RemoveCloseCallback(handler, key any) +} +``` + +### Key Methods + +#### Connection Management +- **`Conn()`**: Get the underlying network connection object +- **`IsClosed()`**: Check if the session is closed +- **`Close()`**: Close the session connection +- **`Reset()`**: Reset session state + +#### Configuration Settings +- **`SetName(string)`**: Set session name +- **`SetMaxMsgLen(int)`**: Set maximum message length +- **`SetCronPeriod(int)`**: Set heartbeat detection period (milliseconds) +- **`SetWaitTime(time.Duration)`**: Set wait timeout + +#### Handler Settings +- **`SetEventListener(EventListener)`**: Set event listener for handling connection lifecycle events +- **`SetPkgHandler(ReadWriter)`**: Set packet handler for parsing and serializing network data +- **`SetReader(Reader)`**: Set data reader for custom data parsing +- **`SetWriter(Writer)`**: Set data writer for custom data serialization + +#### Detailed Method Usage + +**SetPkgHandler - Packet Handler** +```go +// SetPkgHandler is responsible for parsing and serializing network packets +session.SetPkgHandler(&EchoPackageHandler{}) + +// The handler must implement ReadWriter interface +type EchoPackageHandler struct{} + +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // Parse incoming data into application packets + // Return: (parsedPacket, bytesConsumed, error) + if len(data) < 4 { + return nil, 0, nil // Need more data + } + length := int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) + if len(data) < 4+length { + return nil, 0, nil // Incomplete packet + } + return data[4:4+length], 4 + length, nil +} + +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + // Serialize application packet into network bytes + data := []byte(fmt.Sprintf("%v", pkg)) + length := len(data) + header := []byte{ + byte(length >> 24), byte(length >> 16), + byte(length >> 8), byte(length), + } + return append(header, data...), nil +} +``` + +**SetEventListener - Event Handler** +```go +// SetEventListener handles connection lifecycle events +session.SetEventListener(&EchoMessageHandler{}) + +type EchoMessageHandler struct{} + +func (h *EchoMessageHandler) OnOpen(session transport.Session) error { + log.Printf("Connection established: %s", session.RemoteAddr()) + return nil +} + +func (h *EchoMessageHandler) OnClose(session transport.Session) { + log.Printf("Connection closed: %s", session.RemoteAddr()) +} + +func (h *EchoMessageHandler) OnError(session transport.Session, err error) { + log.Printf("Connection error: %s, error: %v", session.RemoteAddr(), err) +} + +func (h *EchoMessageHandler) OnCron(session transport.Session) { + // Heartbeat detection - called periodically + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + log.Printf("Connection timeout: %s", session.RemoteAddr()) + session.Close() + } +} + +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + // Handle incoming messages + messageData := pkg.([]byte) + log.Printf("Received message: %s", string(messageData)) + + // Process business logic and send response + response := fmt.Sprintf("Echo: %s", string(messageData)) + session.WritePkg(response, time.Second*5) +} +``` + +**SetCronPeriod - Heartbeat Configuration** +```go +// SetCronPeriod sets heartbeat detection period in milliseconds +// conf.heartbeatPeriod is time.Duration (e.g., 5 * time.Second) +cronPeriod := int(conf.heartbeatPeriod.Nanoseconds() / 1e6) // Convert to milliseconds +session.SetCronPeriod(cronPeriod) // Set 5000ms = 5 seconds + +// The OnCron method will be called every 5 seconds for heartbeat detection +func (h *EchoMessageHandler) OnCron(session transport.Session) { + // Check if connection is still active + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + // Connection timeout, close it + session.Close() + } + + // Optional: Send heartbeat packet + // session.WritePkg("ping", time.Second) +} +``` + +#### Active Time Update Mechanism + +**Automatic Active Time Updates** +```go +// Getty automatically updates session active time when: +// 1. Receiving data from network +func (t *gettyTCPConn) recv(p []byte) (int, error) { + // ... receive data logic + t.UpdateActive() // Automatically called - updates GetActive() value + return length, err +} + +// 2. WebSocket ping/pong frames (WebSocket only) +func (w *gettyWSConn) handlePing(message string) error { + w.UpdateActive() // Updates when receiving ping + return w.writePong([]byte(message)) +} + +func (w *gettyWSConn) handlePong(string) error { + w.UpdateActive() // Updates when receiving pong + return nil +} + +// Note: TCP/UDP Send() methods do NOT automatically call UpdateActive() +// Only data reception and WebSocket ping/pong update active time +``` + +**Server-Side Heartbeat Detection** +```go +// Server automatically calls OnCron periodically for each session +func (h *ServerMessageHandler) OnCron(session transport.Session) { + // Get last active time (automatically updated on data receive/send) + activeTime := session.GetActive() + idleTime := time.Since(activeTime) + + log.Printf("Heartbeat check: %s, last active: %v, idle: %v", + session.RemoteAddr(), activeTime, idleTime) + + // Check for timeout + if idleTime > 30*time.Second { + log.Printf("Client timeout, closing connection: %s", session.RemoteAddr()) + session.Close() + } +} +``` + +**Active Time Update Timeline** +```go +// Example timeline showing when GetActive() values change: +// 00:00:00 - Connection established, GetActive() = 2024-01-01 10:00:00 +// 00:00:05 - Client sends data, GetActive() = 2024-01-01 10:00:05 +// 00:00:10 - Server sends response, GetActive() = 2024-01-01 10:00:10 +// 00:00:15 - OnCron called, checks idle time: 5 seconds +// 00:00:20 - OnCron called, checks idle time: 10 seconds +// 00:00:30 - OnCron called, detects timeout, closes connection +``` + +**Key Points:** +- **Automatic Updates**: Active time is updated automatically on data receive/send +- **Server-Side Detection**: Server calls OnCron periodically to check client activity +- **No Client Request Needed**: Heartbeat detection is server-initiated, not client-requested +- **Real-Time Monitoring**: GetActive() reflects actual network activity + +#### Data Flow Processing + +**Incoming Data Flow** +```go +// 1. Network data received +Network Data → Getty Framework → PkgHandler.Read() → EventListener.OnMessage() → Business Logic + +// 2. Detailed flow: +// Step 1: Raw network data received +func (t *gettyTCPConn) recv(p []byte) (int, error) { + // Raw bytes from network + t.UpdateActive() // Update active time + return length, err +} + +// Step 2: PkgHandler parses data into application packets +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // Parse raw bytes into application packets + // Return: (parsedPacket, bytesConsumed, error) + return parsedPacket, bytesConsumed, nil +} + +// Step 3: EventListener.OnMessage() processes the parsed packet +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + // Process business logic with parsed packet + // Send response back to client +} +``` + +**Outgoing Data Flow** +```go +// 1. Business logic generates response +Business Logic → PkgHandler.Write() → Getty Framework → Network + +// 2. Detailed flow: +// Step 1: Business logic calls WritePkg +session.WritePkg(response, timeout) + +// Step 2: PkgHandler serializes application data +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + // Serialize application packet to network bytes + return serializedBytes, nil +} + +// Step 3: Getty sends to network +func (t *gettyTCPConn) Send(pkg any) (int, error) { + // Send serialized bytes to network + return length, err +} +``` + +**Complete Data Flow Diagram** +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Incoming Data Flow │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Network → Getty → PkgHandler.Read() → EventListener.OnMessage() → Logic │ +└─────────────────────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Outgoing Data Flow │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Logic → WritePkg() → PkgHandler.Write() → Getty → Network │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +**Processing Order:** +1. **PkgHandler first**: Handles protocol-level parsing/serialization +2. **EventListener second**: Handles business logic and events +3. **Two separate goroutines**: One for reading, one for processing + +**Key Components:** +- **PkgHandler**: Implements `ReadWriter` interface for data parsing/serialization +- **EventListener**: Implements `EventListener` interface for business logic +- **OnMessage()**: Method of `EventListener` interface for processing parsed packets + +#### Data Transmission +- **`WritePkg(pkg any, timeout time.Duration)`**: Send data packet, returns total bytes and successfully sent bytes +- **`WriteBytes([]byte)`**: Send byte data +- **`WriteBytesArray(...[]byte)`**: Send multiple byte arrays + +#### Attribute Management +- **`GetAttribute(key any)`**: Get session attribute +- **`SetAttribute(key any, value any)`**: Set session attribute +- **`RemoveAttribute(key any)`**: Remove session attribute + +#### Statistics +- **`Stat()`**: Get session statistics (connection status, read/write bytes, packet count, etc.) + +## Server Management + +Getty provides multiple types of server implementations, supporting TCP, UDP, WebSocket, and WSS protocols. + +### Server Types + +#### TCP Server +```go +// Create TCP server +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### UDP Endpoint +```go +// Create UDP endpoint +server := getty.NewUDPEndPoint( + getty.WithLocalAddress(":8080"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### WebSocket Server +```go +// Create WebSocket server +server := getty.NewWSServer( + getty.WithLocalAddress(":8080"), + getty.WithWebsocketServerPath("/ws"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### Secure WebSocket Server +```go +// Create WSS server +server := getty.NewWSSServer( + getty.WithLocalAddress(":8443"), + getty.WithWebsocketServerPath("/wss"), + getty.WithWebsocketServerCert("cert.pem"), + getty.WithWebsocketServerPrivateKey("key.pem"), + getty.WithServerTaskPool(taskPool), +) +``` + +### Server Interface + +```go +type Server interface { + EndPoint +} + +type StreamServer interface { + Server + Listener() net.Listener +} + +type PacketServer interface { + Server + PacketConn() net.PacketConn +} +``` + +### Key Methods + +- **`RunEventLoop(newSession NewSessionCallback)`**: Start event loop to handle client connections +- **`Close()`**: Close the server +- **`IsClosed()`**: Check if the server is closed +- **`ID()`**: Get server ID +- **`EndPointType()`**: Get endpoint type + +### Event Loop + +The server starts the event loop through the `RunEventLoop` method: + +```go +func (s *server) RunEventLoop(newSession NewSessionCallback) { + if err := s.listen(); err != nil { + panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err))) + } + + switch s.endPointType { + case TCP_SERVER: + s.runTCPEventLoop(newSession) + case UDP_ENDPOINT: + s.runUDPEventLoop(newSession) + case WS_SERVER: + s.runWSEventLoop(newSession) + case WSS_SERVER: + s.runWSSEventLoop(newSession) + default: + panic(fmt.Sprintf("illegal server type %s", s.endPointType.String())) + } +} +``` + +## Options Configuration System + +Getty uses functional options pattern to configure servers and clients, providing flexible configuration. + +### Server Options + +#### Basic Configuration +- **`WithLocalAddress(addr string)`**: Set server listen address +- **`WithServerTaskPool(pool GenericTaskPool)`**: Set server task pool + +#### WebSocket Configuration +- **`WithWebsocketServerPath(path string)`**: Set WebSocket request path +- **`WithWebsocketServerCert(cert string)`**: Set server certificate file +- **`WithWebsocketServerPrivateKey(key string)`**: Set server private key file +- **`WithWebsocketServerRootCert(cert string)`**: Set root certificate file + +#### TLS Configuration +- **`WithServerSslEnabled(sslEnabled bool)`**: Enable/disable SSL +- **`WithServerTlsConfigBuilder(builder TlsConfigBuilder)`**: Set TLS config builder + +### Client Options + +#### Basic Configuration +- **`WithServerAddress(addr string)`**: Set server address +- **`WithConnectionNumber(num int)`**: Set connection number +- **`WithClientTaskPool(pool GenericTaskPool)`**: Set client task pool + +#### Reconnection Configuration +- **`WithReconnectInterval(interval int)`**: Set reconnection interval (nanoseconds) +- **`WithReconnectAttempts(maxAttempts int)`**: Set maximum reconnection attempts + +#### Certificate Configuration +- **`WithRootCertificateFile(cert string)`**: Set root certificate file +- **`WithClientSslEnabled(sslEnabled bool)`**: Enable/disable client SSL +- **`WithClientTlsConfigBuilder(builder TlsConfigBuilder)`**: Set client TLS config + +### Configuration Examples + +#### Server Configuration Examples +```go +// TCP server configuration +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), + getty.WithServerTaskPool(taskPool), +) + +// WebSocket server configuration +wsServer := getty.NewWSServer( + getty.WithLocalAddress(":8080"), + getty.WithWebsocketServerPath("/ws"), + getty.WithServerTaskPool(taskPool), +) + +// WSS server configuration +wssServer := getty.NewWSSServer( + getty.WithLocalAddress(":8443"), + getty.WithWebsocketServerPath("/wss"), + getty.WithWebsocketServerCert("server.crt"), + getty.WithWebsocketServerPrivateKey("server.key"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### Client Configuration Examples +```go +// TCP client configuration +client := getty.NewTCPClient( + getty.WithServerAddress("127.0.0.1:8080"), + getty.WithConnectionNumber(1), + getty.WithReconnectInterval(5e8), // 500ms + getty.WithReconnectAttempts(3), + getty.WithClientTaskPool(taskPool), +) + +// WebSocket client configuration +wsClient := getty.NewWSClient( + getty.WithServerAddress("127.0.0.1:8080"), + getty.WithConnectionNumber(1), + getty.WithReconnectInterval(5e8), + getty.WithClientTaskPool(taskPool), +) +``` + +## TCP Server Example + +Here's a complete TCP server example demonstrating how to use the Getty framework: + +```go +package main + +import ( + "fmt" + "log" + "time" + "github.com/AlexStocks/getty/transport" + gxsync "github.com/dubbogo/gost/sync" +) + +// EchoPackageHandler implements packet handling +type EchoPackageHandler struct{} + +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // Simple length-prefixed protocol + if len(data) < 4 { + return nil, 0, nil + } + + length := int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) + if len(data) < 4+length { + return nil, 0, nil + } + + return data[4:4+length], 4 + length, nil +} + +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + data := []byte(fmt.Sprintf("%v", pkg)) + length := len(data) + header := []byte{ + byte(length >> 24), + byte(length >> 16), + byte(length >> 8), + byte(length), + } + return append(header, data...), nil +} + +// EchoMessageHandler implements event handling +type EchoMessageHandler struct{} + +func (h *EchoMessageHandler) OnOpen(session transport.Session) error { + log.Printf("New connection: %s", session.RemoteAddr()) + return nil +} + +func (h *EchoMessageHandler) OnClose(session transport.Session) { + log.Printf("Connection closed: %s", session.RemoteAddr()) +} + +func (h *EchoMessageHandler) OnError(session transport.Session, err error) { + log.Printf("Connection error: %s, error: %v", session.RemoteAddr(), err) +} + +func (h *EchoMessageHandler) OnCron(session transport.Session) { + // Heartbeat detection + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + session.Close() + } +} + +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + messageData := pkg.([]byte) + log.Printf("Received message: %s", string(messageData)) + + // Echo message + response := fmt.Sprintf("Echo: %s", string(messageData)) + session.WritePkg(response, time.Second*5) +} + +// New connection callback +func newSession(session transport.Session) error { + session.SetName("tcp-echo-session") + session.SetMaxMsgLen(4096) + session.SetReadTimeout(time.Second * 10) + session.SetWriteTimeout(time.Second * 10) + session.SetCronPeriod(5) + session.SetWaitTime(time.Second * 3) + + session.SetPkgHandler(&EchoPackageHandler{}) + session.SetEventListener(&EchoMessageHandler{}) + + session.AddCloseCallback("cleanup", "resources", func() { + log.Printf("Cleaning up resources: %s", session.RemoteAddr()) + }) + + return nil +} + +func main() { + // Create task pool + taskPool := gxsync.NewTaskPoolSimple(0) + defer taskPool.Close() + + // Create TCP server + server := transport.NewTCPServer( + transport.WithLocalAddress(":8080"), + transport.WithServerTaskPool(taskPool), + ) + + // Start server + log.Println("TCP server starting on :8080") + server.RunEventLoop(newSession) +} +``` + +## Framework Architecture + +Getty framework adopts a layered architecture design, from top to bottom: Application Layer, Getty Core Layer, and Network Layer: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Application Layer │ +├─────────────────────────────────────────────────────────────┤ +│ Application Code │ Message Handler │ Codec/ReadWriter │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Getty Core Layer │ +├─────────────────────────────────────────────────────────────┤ +│ Session Management │ Server Management │ Client Management │ +│ Connection Mgmt │ Event System │ Options & Config │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Network Layer │ +├─────────────────────────────────────────────────────────────┤ +│ TCP Protocol │ UDP Protocol │ WebSocket Protocol │ +│ TLS/SSL │ │ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Core Component Relationships + +1. **Session** is the core component, managing connection lifecycle +2. **Server/Client** provides endpoint implementations for different protocols +3. **Connection** encapsulates underlying network connections +4. **EventListener** handles various events +5. **Options** provides flexible configuration + ## LICENCE Apache License 2.0 diff --git a/README_CN.md b/README_CN.md index c88ab627..f09fffef 100644 --- a/README_CN.md +++ b/README_CN.md @@ -94,6 +94,632 @@ session.AddCloseCallback([]int{1, 2, 3}, "key", callback) // 记录日 总之,Getty 的数据传输接口并不自带内部的重试机制;相反,是否在特定情况下实现重试逻辑由上层调用者决定。这种设计方法为开发者在控制数据传输行为方面提供了更大的灵活性。 +## Session 会话管理 + +Session 是 Getty 框架的核心组件,负责管理客户端与服务器之间的连接会话。每个连接对应一个 Session 实例,提供完整的连接生命周期管理。 + +### Session 接口 + +```go +type Session interface { + Connection + Reset() + Conn() net.Conn + Stat() string + IsClosed() bool + EndPoint() EndPoint + SetMaxMsgLen(int) + SetName(string) + SetEventListener(EventListener) + SetPkgHandler(ReadWriter) + SetReader(Reader) + SetWriter(Writer) + SetCronPeriod(int) + SetWaitTime(time.Duration) + GetAttribute(any) any + SetAttribute(any, any) + RemoveAttribute(any) + WritePkg(pkg any, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error) + WriteBytes([]byte) (int, error) + WriteBytesArray(...[]byte) (int, error) + Close() + AddCloseCallback(handler, key any, callback CallBackFunc) + RemoveCloseCallback(handler, key any) +} +``` + +### 主要方法说明 + +#### 连接管理 +- **`Conn()`**: 获取底层网络连接对象 +- **`IsClosed()`**: 检查会话是否已关闭 +- **`Close()`**: 关闭会话连接 +- **`Reset()`**: 重置会话状态 + +#### 配置设置 +- **`SetName(string)`**: 设置会话名称 +- **`SetMaxMsgLen(int)`**: 设置最大消息长度 +- **`SetCronPeriod(int)`**: 设置心跳检测周期(毫秒) +- **`SetWaitTime(time.Duration)`**: 设置等待超时时间 + +#### 处理器设置 +- **`SetEventListener(EventListener)`**: 设置事件监听器,处理连接生命周期事件 +- **`SetPkgHandler(ReadWriter)`**: 设置数据包处理器,负责解析和序列化网络数据 +- **`SetReader(Reader)`**: 设置数据读取器,用于自定义数据解析 +- **`SetWriter(Writer)`**: 设置数据写入器,用于自定义数据序列化 + +#### 详细方法使用说明 + +**SetPkgHandler - 数据包处理器** +```go +// SetPkgHandler 负责解析和序列化网络数据包 +session.SetPkgHandler(&EchoPackageHandler{}) + +// 处理器必须实现 ReadWriter 接口 +type EchoPackageHandler struct{} + +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // 解析接收到的数据为应用层数据包 + // 返回: (解析出的数据包, 消费的字节数, 错误) + if len(data) < 4 { + return nil, 0, nil // 数据不足,等待更多数据 + } + length := int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) + if len(data) < 4+length { + return nil, 0, nil // 数据包不完整 + } + return data[4:4+length], 4 + length, nil +} + +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + // 将应用层数据包序列化为网络字节 + data := []byte(fmt.Sprintf("%v", pkg)) + length := len(data) + header := []byte{ + byte(length >> 24), byte(length >> 16), + byte(length >> 8), byte(length), + } + return append(header, data...), nil +} +``` + +**SetEventListener - 事件处理器** +```go +// SetEventListener 处理连接生命周期事件 +session.SetEventListener(&EchoMessageHandler{}) + +type EchoMessageHandler struct{} + +func (h *EchoMessageHandler) OnOpen(session transport.Session) error { + log.Printf("连接建立: %s", session.RemoteAddr()) + return nil +} + +func (h *EchoMessageHandler) OnClose(session transport.Session) { + log.Printf("连接关闭: %s", session.RemoteAddr()) +} + +func (h *EchoMessageHandler) OnError(session transport.Session, err error) { + log.Printf("连接错误: %s, 错误: %v", session.RemoteAddr(), err) +} + +func (h *EchoMessageHandler) OnCron(session transport.Session) { + // 心跳检测 - 定期调用 + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + log.Printf("连接超时: %s", session.RemoteAddr()) + session.Close() + } +} + +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + // 处理接收到的消息 + messageData := pkg.([]byte) + log.Printf("收到消息: %s", string(messageData)) + + // 处理业务逻辑并发送响应 + response := fmt.Sprintf("Echo: %s", string(messageData)) + session.WritePkg(response, time.Second*5) +} +``` + +**SetCronPeriod - 心跳配置** +```go +// SetCronPeriod 设置心跳检测周期,单位是毫秒 +// conf.heartbeatPeriod 是 time.Duration 类型(例如 5 * time.Second) +cronPeriod := int(conf.heartbeatPeriod.Nanoseconds() / 1e6) // 转换为毫秒 +session.SetCronPeriod(cronPeriod) // 设置 5000ms = 5 秒 + +// OnCron 方法将每 5 秒被调用一次进行心跳检测 +func (h *EchoMessageHandler) OnCron(session transport.Session) { + // 检查连接是否仍然活跃 + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + // 连接超时,关闭它 + session.Close() + } + + // 可选:发送心跳包 + // session.WritePkg("ping", time.Second) +} +``` + +#### 活跃时间更新机制 + +**自动活跃时间更新** +```go +// Getty 在以下情况下自动更新会话活跃时间: +// 1. 从网络接收数据时 +func (t *gettyTCPConn) recv(p []byte) (int, error) { + // ... 接收数据逻辑 + t.UpdateActive() // 自动调用 - 更新 GetActive() 值 + return length, err +} + +// 2. WebSocket ping/pong 帧(仅 WebSocket) +func (w *gettyWSConn) handlePing(message string) error { + w.UpdateActive() // 收到 ping 时更新 + return w.writePong([]byte(message)) +} + +func (w *gettyWSConn) handlePong(string) error { + w.UpdateActive() // 收到 pong 时更新 + return nil +} + +// 注意:TCP/UDP 的 Send() 方法不会自动调用 UpdateActive() +// 只有数据接收和 WebSocket ping/pong 会更新活跃时间 +``` + +**服务端心跳检测** +```go +// 服务端定期为每个会话自动调用 OnCron +func (h *ServerMessageHandler) OnCron(session transport.Session) { + // 获取最后活跃时间(在数据接收/发送时自动更新) + activeTime := session.GetActive() + idleTime := time.Since(activeTime) + + log.Printf("心跳检测: %s, 最后活跃: %v, 空闲: %v", + session.RemoteAddr(), activeTime, idleTime) + + // 检查是否超时 + if idleTime > 30*time.Second { + log.Printf("客户端超时,关闭连接: %s", session.RemoteAddr()) + session.Close() + } +} +``` + +**活跃时间更新时间线** +```go +// 示例时间线,显示 GetActive() 值何时变化: +// 00:00:00 - 连接建立,GetActive() = 2024-01-01 10:00:00 +// 00:00:05 - 客户端发送数据,GetActive() = 2024-01-01 10:00:05 +// 00:00:10 - 服务端发送响应,GetActive() = 2024-01-01 10:00:10 +// 00:00:15 - OnCron 被调用,检查空闲时间:5 秒 +// 00:00:20 - OnCron 被调用,检查空闲时间:10 秒 +// 00:00:30 - OnCron 被调用,检测到超时,关闭连接 +``` + +**关键要点:** +- **自动更新**:活跃时间在数据接收/发送时自动更新 +- **服务端检测**:服务端定期调用 OnCron 检查客户端活动 +- **无需客户端请求**:心跳检测是服务端发起的,不需要客户端请求 +- **实时监控**:GetActive() 反映真实的网络活动 + +#### 数据流处理 + +**接收数据流** +```go +// 1. 网络数据接收 +网络数据 → Getty 框架 → PkgHandler.Read() → EventListener.OnMessage() → 业务逻辑 + +// 2. 详细流程: +// 步骤 1: 接收原始网络数据 +func (t *gettyTCPConn) recv(p []byte) (int, error) { + // 来自网络的原始字节 + t.UpdateActive() // 更新活跃时间 + return length, err +} + +// 步骤 2: PkgHandler 解析数据为应用层数据包 +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // 将原始字节解析为应用层数据包 + // 返回: (解析出的数据包, 消费的字节数, 错误) + return parsedPacket, bytesConsumed, nil +} + +// 步骤 3: EventListener.OnMessage() 处理解析后的数据包 +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + // 使用解析后的数据包处理业务逻辑 + // 发送响应回客户端 +} +``` + +**发送数据流** +```go +// 1. 业务逻辑生成响应 +业务逻辑 → PkgHandler.Write() → Getty 框架 → 网络 + +// 2. 详细流程: +// 步骤 1: 业务逻辑调用 WritePkg +session.WritePkg(response, timeout) + +// 步骤 2: PkgHandler 序列化应用数据 +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + // 将应用层数据包序列化为网络字节 + return serializedBytes, nil +} + +// 步骤 3: Getty 发送到网络 +func (t *gettyTCPConn) Send(pkg any) (int, error) { + // 发送序列化字节到网络 + return length, err +} +``` + +**完整数据流图** +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 接收数据流 │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ 网络 → Getty → PkgHandler.Read() → EventListener.OnMessage() → 业务逻辑 │ +└─────────────────────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 发送数据流 │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ 业务逻辑 → WritePkg() → PkgHandler.Write() → Getty → 网络 │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +**处理顺序:** +1. **PkgHandler 优先**:处理协议层解析/序列化 +2. **EventListener 其次**:处理业务逻辑和事件 +3. **两个独立 goroutine**:一个负责读取,一个负责处理 + +**关键组件:** +- **PkgHandler**:实现 `ReadWriter` 接口,负责数据解析/序列化 +- **EventListener**:实现 `EventListener` 接口,负责业务逻辑 +- **OnMessage()**:`EventListener` 接口的方法,用于处理解析后的数据包 + +#### 数据发送 +- **`WritePkg(pkg any, timeout time.Duration)`**: 发送数据包,返回总字节数和成功发送字节数 +- **`WriteBytes([]byte)`**: 发送字节数据 +- **`WriteBytesArray(...[]byte)`**: 发送多个字节数组 + +#### 属性管理 +- **`GetAttribute(key any)`**: 获取会话属性 +- **`SetAttribute(key any, value any)`**: 设置会话属性 +- **`RemoveAttribute(key any)`**: 删除会话属性 + +#### 统计信息 +- **`Stat()`**: 获取会话统计信息(连接状态、读写字节数、包数量等) + +## Server 服务器管理 + +Getty 提供了多种类型的服务器实现,支持 TCP、UDP、WebSocket 和 WSS 协议。 + +### 服务器类型 + +#### TCP 服务器 +```go +// 创建 TCP 服务器 +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### UDP 端点 +```go +// 创建 UDP 端点 +server := getty.NewUDPEndPoint( + getty.WithLocalAddress(":8080"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### WebSocket 服务器 +```go +// 创建 WebSocket 服务器 +server := getty.NewWSServer( + getty.WithLocalAddress(":8080"), + getty.WithWebsocketServerPath("/ws"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### 安全 WebSocket 服务器 +```go +// 创建 WSS 服务器 +server := getty.NewWSSServer( + getty.WithLocalAddress(":8443"), + getty.WithWebsocketServerPath("/wss"), + getty.WithWebsocketServerCert("cert.pem"), + getty.WithWebsocketServerPrivateKey("key.pem"), + getty.WithServerTaskPool(taskPool), +) +``` + +### 服务器接口 + +```go +type Server interface { + EndPoint +} + +type StreamServer interface { + Server + Listener() net.Listener +} + +type PacketServer interface { + Server + PacketConn() net.PacketConn +} +``` + +### 主要方法 + +- **`RunEventLoop(newSession NewSessionCallback)`**: 启动事件循环,处理客户端连接 +- **`Close()`**: 关闭服务器 +- **`IsClosed()`**: 检查服务器是否已关闭 +- **`ID()`**: 获取服务器ID +- **`EndPointType()`**: 获取端点类型 + +### 事件循环 + +服务器通过 `RunEventLoop` 方法启动事件循环: + +```go +func (s *server) RunEventLoop(newSession NewSessionCallback) { + if err := s.listen(); err != nil { + panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err))) + } + + switch s.endPointType { + case TCP_SERVER: + s.runTCPEventLoop(newSession) + case UDP_ENDPOINT: + s.runUDPEventLoop(newSession) + case WS_SERVER: + s.runWSEventLoop(newSession) + case WSS_SERVER: + s.runWSSEventLoop(newSession) + default: + panic(fmt.Sprintf("illegal server type %s", s.endPointType.String())) + } +} +``` + +## Options 配置系统 + +Getty 使用函数式选项模式来配置服务器和客户端,提供了灵活的配置方式。 + +### Server Options 服务器选项 + +#### 基础配置 +- **`WithLocalAddress(addr string)`**: 设置服务器监听地址 +- **`WithServerTaskPool(pool GenericTaskPool)`**: 设置服务器任务池 + +#### WebSocket 配置 +- **`WithWebsocketServerPath(path string)`**: 设置 WebSocket 请求路径 +- **`WithWebsocketServerCert(cert string)`**: 设置服务器证书文件 +- **`WithWebsocketServerPrivateKey(key string)`**: 设置服务器私钥文件 +- **`WithWebsocketServerRootCert(cert string)`**: 设置根证书文件 + +#### TLS 配置 +- **`WithServerSslEnabled(sslEnabled bool)`**: 启用/禁用 SSL +- **`WithServerTlsConfigBuilder(builder TlsConfigBuilder)`**: 设置 TLS 配置构建器 + +### Client Options 客户端选项 + +#### 基础配置 +- **`WithServerAddress(addr string)`**: 设置服务器地址 +- **`WithConnectionNumber(num int)`**: 设置连接数量 +- **`WithClientTaskPool(pool GenericTaskPool)`**: 设置客户端任务池 + +#### 重连配置 +- **`WithReconnectInterval(interval int)`**: 设置重连间隔(纳秒) +- **`WithReconnectAttempts(maxAttempts int)`**: 设置最大重连次数 + +#### 证书配置 +- **`WithRootCertificateFile(cert string)`**: 设置根证书文件 +- **`WithClientSslEnabled(sslEnabled bool)`**: 启用/禁用客户端 SSL +- **`WithClientTlsConfigBuilder(builder TlsConfigBuilder)`**: 设置客户端 TLS 配置 + +### 配置示例 + +#### 服务器配置示例 +```go +// TCP 服务器配置 +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), + getty.WithServerTaskPool(taskPool), +) + +// WebSocket 服务器配置 +wsServer := getty.NewWSServer( + getty.WithLocalAddress(":8080"), + getty.WithWebsocketServerPath("/ws"), + getty.WithServerTaskPool(taskPool), +) + +// WSS 服务器配置 +wssServer := getty.NewWSSServer( + getty.WithLocalAddress(":8443"), + getty.WithWebsocketServerPath("/wss"), + getty.WithWebsocketServerCert("server.crt"), + getty.WithWebsocketServerPrivateKey("server.key"), + getty.WithServerTaskPool(taskPool), +) +``` + +#### 客户端配置示例 +```go +// TCP 客户端配置 +client := getty.NewTCPClient( + getty.WithServerAddress("127.0.0.1:8080"), + getty.WithConnectionNumber(1), + getty.WithReconnectInterval(5e8), // 500ms + getty.WithReconnectAttempts(3), + getty.WithClientTaskPool(taskPool), +) + +// WebSocket 客户端配置 +wsClient := getty.NewWSClient( + getty.WithServerAddress("127.0.0.1:8080"), + getty.WithConnectionNumber(1), + getty.WithReconnectInterval(5e8), + getty.WithClientTaskPool(taskPool), +) +``` + +## TCP 服务器示例 + +以下是一个完整的 TCP 服务器示例,展示了如何使用 Getty 框架: + +```go +package main + +import ( + "fmt" + "log" + "time" + "github.com/AlexStocks/getty/transport" + gxsync "github.com/dubbogo/gost/sync" +) + +// EchoPackageHandler 实现数据包处理 +type EchoPackageHandler struct{} + +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // 简单的长度前缀协议 + if len(data) < 4 { + return nil, 0, nil + } + + length := int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) + if len(data) < 4+length { + return nil, 0, nil + } + + return data[4:4+length], 4 + length, nil +} + +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + data := []byte(fmt.Sprintf("%v", pkg)) + length := len(data) + header := []byte{ + byte(length >> 24), + byte(length >> 16), + byte(length >> 8), + byte(length), + } + return append(header, data...), nil +} + +// EchoMessageHandler 实现事件处理 +type EchoMessageHandler struct{} + +func (h *EchoMessageHandler) OnOpen(session transport.Session) error { + log.Printf("新连接: %s", session.RemoteAddr()) + return nil +} + +func (h *EchoMessageHandler) OnClose(session transport.Session) { + log.Printf("连接关闭: %s", session.RemoteAddr()) +} + +func (h *EchoMessageHandler) OnError(session transport.Session, err error) { + log.Printf("连接错误: %s, 错误: %v", session.RemoteAddr(), err) +} + +func (h *EchoMessageHandler) OnCron(session transport.Session) { + // 心跳检测 + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + session.Close() + } +} + +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + messageData := pkg.([]byte) + log.Printf("收到消息: %s", string(messageData)) + + // 回显消息 + response := fmt.Sprintf("Echo: %s", string(messageData)) + session.WritePkg(response, time.Second*5) +} + +// 新连接回调 +func newSession(session transport.Session) error { + session.SetName("tcp-echo-session") + session.SetMaxMsgLen(4096) + session.SetReadTimeout(time.Second * 10) + session.SetWriteTimeout(time.Second * 10) + session.SetCronPeriod(5) + session.SetWaitTime(time.Second * 3) + + session.SetPkgHandler(&EchoPackageHandler{}) + session.SetEventListener(&EchoMessageHandler{}) + + session.AddCloseCallback("cleanup", "resources", func() { + log.Printf("清理资源: %s", session.RemoteAddr()) + }) + + return nil +} + +func main() { + // 创建任务池 + taskPool := gxsync.NewTaskPoolSimple(0) + defer taskPool.Close() + + // 创建 TCP 服务器 + server := transport.NewTCPServer( + transport.WithLocalAddress(":8080"), + transport.WithServerTaskPool(taskPool), + ) + + // 启动服务器 + log.Println("TCP 服务器启动在 :8080") + server.RunEventLoop(newSession) +} +``` + +## 框架架构图 + +Getty 框架采用分层架构设计,从上到下分为应用层、Getty 核心层和网络层: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 应用层 (Application Layer) │ +├─────────────────────────────────────────────────────────────┤ +│ Application Code │ Message Handler │ Codec/ReadWriter │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Getty 核心层 (Core Layer) │ +├─────────────────────────────────────────────────────────────┤ +│ Session Management │ Server Management │ Client Management │ +│ Connection Mgmt │ Event System │ Options & Config │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ 网络层 (Network Layer) │ +├─────────────────────────────────────────────────────────────┤ +│ TCP Protocol │ UDP Protocol │ WebSocket Protocol │ +│ TLS/SSL │ │ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 核心组件关系 + +1. **Session** 是核心组件,管理连接生命周期 +2. **Server/Client** 提供不同协议的端点实现 +3. **Connection** 封装底层网络连接 +4. **EventListener** 处理各种事件 +5. **Options** 提供灵活的配置方式 + ## 许可证 Apache 许可证 2.0