From 794a031863b3c6e93f2d5951af4e63cf87002a54 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 23 Sep 2025 18:25:40 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 473 ++++++++++++++++++++++++++++++++++++++++++++++++-- README_CN.md | 474 +++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 919 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 9cf3136f..74215623 100644 --- a/README.md +++ b/README.md @@ -20,18 +20,472 @@ If you're using WebSocket, you don't need to worry about heartbeat request/respo For code examples, you can refer to [getty-examples](https://github.com/AlexStocks/getty-examples). -## Callback System +## Network Transmission + +In network communication, the data transmission interface of getty does not guarantee that data will be sent successfully; it lacks an internal retry mechanism. Instead, getty delegates the outcome of data transmission to the underlying operating system mechanism. Under this mechanism, if data is successfully transmitted, it is considered a success; if transmission fails, it is regarded as a failure. These outcomes are then communicated back to the upper-layer caller. + +Upper-layer callers need to determine whether to incorporate a retry mechanism based on these outcomes. This implies that when data transmission fails, upper-layer callers must handle the situation differently depending on the circumstances. For instance, if the failure is due to a disconnect in the connection, upper-layer callers can attempt to resend the data based on the result of getty's automatic reconnection. Alternatively, if the failure is caused by the sending buffer of the underlying operating system being full, the sender can implement its own retry mechanism to wait for the sending buffer to become available before attempting another transmission. + +In summary, the data transmission interface of getty does not come with an inherent retry mechanism; instead, it is up to upper-layer callers to decide whether to implement retry logic based on specific situations. This design approach provides developers with greater flexibility in controlling the behavior of data transmission. + +## Framework Architecture + +Getty framework adopts a layered architecture design, from top to bottom: Application Layer, Getty Core Layer, and Network Layer: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Application Layer │ +├─────────────────────────────────────────────────────────────┤ +│ Application Code │ Message Handler │ Codec/ReadWriter │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Getty Core Layer │ +├─────────────────────────────────────────────────────────────┤ +│ Session Management │ Server Management │ Client Management │ +│ Connection Mgmt │ Event System │ Options & Config │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Network Layer │ +├─────────────────────────────────────────────────────────────┤ +│ TCP Protocol │ UDP Protocol │ WebSocket Protocol │ +│ TLS/SSL │ │ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Core Component Relationships + +1. **Session** is the core component, managing connection lifecycle +2. **Server/Client** provides endpoint implementations for different protocols +3. **Connection** encapsulates underlying network connections +4. **EventListener** handles various events +5. **Options** provides flexible configuration + +## Data Flow Processing + +#### Complete Data Flow Diagram + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Incoming Data Flow │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Network → Getty → PkgHandler.Read() → EventListener.OnMessage() → Logic │ +└─────────────────────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Outgoing Data Flow │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Logic → WritePkg() → PkgHandler.Write() → Getty → Network │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +**Processing Order:** +1. **PkgHandler first**: Handles protocol-level parsing/serialization +2. **EventListener second**: Handles business logic and events +3. **Two separate goroutines**: One for reading, one for processing + +**Key Components:** +- **PkgHandler**: Implements `ReadWriter` interface for data parsing/serialization +- **EventListener**: Implements `EventListener` interface for business logic +- **OnMessage()**: Method of `EventListener` interface for processing parsed packets + +## Quick Start + +### TCP Server Example + +Here's a simplified TCP server example demonstrating Getty framework's core usage: + +```go +package main + +import ( + "fmt" + "log" + "time" + "github.com/AlexStocks/getty/transport" + gxsync "github.com/dubbogo/gost/sync" +) + +// Packet handler - responsible for packet serialization/deserialization +type EchoPackageHandler struct{} + +// Deserialize: parse network byte stream into application packets +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // Pseudo code: implement length-prefixed protocol + // 1. Check if there's enough data to read length header (4 bytes) + if len(data) < 4 { + return nil, 0, nil // Need more data + } + + // 2. Parse packet length + length := int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) + + // 3. Check if we have complete packet + if len(data) < 4+length { + return nil, 0, nil // Incomplete packet, wait for more data + } + + // 4. Return parsed packet and bytes consumed + return data[4:4+length], 4 + length, nil +} + +// Serialize: convert application packets to network byte stream +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + // Pseudo code: implement length-prefixed protocol + // 1. Convert application data to bytes + data := []byte(fmt.Sprintf("%v", pkg)) + + // 2. Build length header (4 bytes) + length := len(data) + header := []byte{ + byte(length >> 24), byte(length >> 16), + byte(length >> 8), byte(length), + } + + // 3. Return complete network packet + return append(header, data...), nil +} + +// Event handler - responsible for business logic +type EchoMessageHandler struct{} + +// Called when connection is established +func (h *EchoMessageHandler) OnOpen(session transport.Session) error { + log.Printf("New connection: %s", session.RemoteAddr()) + return nil +} + +// Called when connection is closed +func (h *EchoMessageHandler) OnClose(session transport.Session) { + log.Printf("Connection closed: %s", session.RemoteAddr()) +} + +// Called when error occurs +func (h *EchoMessageHandler) OnError(session transport.Session, err error) { + log.Printf("Connection error: %s, error: %v", session.RemoteAddr(), err) +} + +// Heartbeat detection - called periodically +func (h *EchoMessageHandler) OnCron(session transport.Session) { + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + log.Printf("Connection timeout, closing: %s", session.RemoteAddr()) + session.Close() + } +} + +// Called when message is received - core business logic +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + messageData := pkg.([]byte) + log.Printf("Received message: %s", string(messageData)) + + // Business logic: echo message + response := fmt.Sprintf("Echo: %s", string(messageData)) + session.WritePkg(response, time.Second*5) +} + +// New connection callback - configure session +func newSession(session transport.Session) error { + // Basic configuration + session.SetName("tcp-echo-session") + session.SetMaxMsgLen(4096) + session.SetReadTimeout(time.Second * 10) + session.SetWriteTimeout(time.Second * 10) + session.SetCronPeriod(5) // 5 second heartbeat detection + session.SetWaitTime(time.Second * 3) + + // Set handlers + session.SetPkgHandler(&EchoPackageHandler{}) // Packet handler + session.SetEventListener(&EchoMessageHandler{}) // Event handler + + // Add close callback + session.AddCloseCallback("cleanup", "resources", func() { + log.Printf("Cleaning up resources: %s", session.RemoteAddr()) + }) + + return nil +} + +func main() { + // Create task pool (for concurrent message processing) + taskPool := gxsync.NewTaskPoolSimple(0) + defer taskPool.Close() + + // Create TCP server + server := transport.NewTCPServer( + transport.WithLocalAddress(":8080"), // Listen address + transport.WithServerTaskPool(taskPool), // Task pool + ) + + // Start server + log.Println("TCP server starting on :8080") + server.RunEventLoop(newSession) // Start event loop +} +``` + +## Core Concepts + +### Session Management + +Session is the core component of the Getty framework, responsible for managing connection sessions between clients and servers. Each connection corresponds to a Session instance, providing complete connection lifecycle management. + +#### Session Interface + +```go +type Session interface { + Connection + Reset() + Conn() net.Conn + Stat() string + IsClosed() bool + EndPoint() EndPoint + SetMaxMsgLen(int) + SetName(string) + SetEventListener(EventListener) + SetPkgHandler(ReadWriter) + SetReader(Reader) + SetWriter(Writer) + SetCronPeriod(int) + SetWaitTime(time.Duration) + GetAttribute(any) any + SetAttribute(any, any) + RemoveAttribute(any) + WritePkg(pkg any, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error) + WriteBytes([]byte) (int, error) + WriteBytesArray(...[]byte) (int, error) + Close() + AddCloseCallback(handler, key any, callback CallBackFunc) + RemoveCloseCallback(handler, key any) +} +``` + +#### Key Methods + +**Connection Management** +- **`Conn()`**: Get the underlying network connection object +- **`IsClosed()`**: Check if the session is closed +- **`Close()`**: Close the session connection +- **`Reset()`**: Reset session state + +**Configuration Settings** +- **`SetName(string)`**: Set session name +- **`SetMaxMsgLen(int)`**: Set maximum message length +- **`SetCronPeriod(int)`**: Set heartbeat detection period (milliseconds) +- **`SetWaitTime(time.Duration)`**: Set wait timeout + +**Handler Settings** +- **`SetEventListener(EventListener)`**: Set event listener for handling connection lifecycle events +- **`SetPkgHandler(ReadWriter)`**: Set packet handler for parsing and serializing network data +- **`SetReader(Reader)`**: Set data reader for custom data parsing +- **`SetWriter(Writer)`**: Set data writer for custom data serialization + +**Data Transmission** +- **`WritePkg(pkg any, timeout time.Duration)`**: Send data packet, returns total bytes and successfully sent bytes +- **`WriteBytes([]byte)`**: Send byte data +- **`WriteBytesArray(...[]byte)`**: Send multiple byte arrays + +**Attribute Management** +- **`GetAttribute(key any)`**: Get session attribute +- **`SetAttribute(key any, value any)`**: Set session attribute +- **`RemoveAttribute(key any)`**: Remove session attribute + +**Statistics** +- **`Stat()`**: Get session statistics (connection status, read/write bytes, packet count, etc.) + +#### Active Time Update Mechanism + +**Automatic Active Time Updates** +```go +// Getty automatically updates session active time when: +// 1. Receiving data from network +func (t *gettyTCPConn) recv(p []byte) (int, error) { + // ... receive data logic + t.UpdateActive() // Automatically called - updates GetActive() value + return length, err +} + +// 2. WebSocket ping/pong frames (WebSocket only) +func (w *gettyWSConn) handlePing(message string) error { + w.UpdateActive() // Updates when receiving ping + return w.writePong([]byte(message)) +} + +func (w *gettyWSConn) handlePong(string) error { + w.UpdateActive() // Updates when receiving pong + return nil +} + +// Note: TCP/UDP Send() methods do NOT automatically call UpdateActive() +// Only data reception and WebSocket ping/pong update active time +``` + +**Server-Side Heartbeat Detection** +```go +// Server automatically calls OnCron periodically for each session +func (h *ServerMessageHandler) OnCron(session transport.Session) { + // Get last active time (automatically updated on data receive/send) + activeTime := session.GetActive() + idleTime := time.Since(activeTime) + + log.Printf("Heartbeat check: %s, last active: %v, idle: %v", + session.RemoteAddr(), activeTime, idleTime) + + // Check for timeout + if idleTime > 30*time.Second { + log.Printf("Client timeout, closing connection: %s", session.RemoteAddr()) + session.Close() + } +} +``` + +**Active Time Update Timeline** +```go +// Example timeline showing when GetActive() values change: +// 00:00:00 - Connection established, GetActive() = 2024-01-01 10:00:00 +// 00:00:05 - Client sends data, GetActive() = 2024-01-01 10:00:05 +// 00:00:10 - Server sends response, GetActive() = 2024-01-01 10:00:10 +// 00:00:15 - OnCron called, checks idle time: 5 seconds +// 00:00:20 - OnCron called, checks idle time: 10 seconds +// 00:00:30 - OnCron called, detects timeout, closes connection +``` + +**Key Points:** +- **Automatic Updates**: Active time is updated automatically on data receive/send +- **Server-Side Detection**: Server calls OnCron periodically to check client activity +- **No Client Request Needed**: Heartbeat detection is server-initiated, not client-requested +- **Real-Time Monitoring**: GetActive() reflects actual network activity + +### Server Management + +Getty provides multiple types of server implementations, supporting TCP, UDP, WebSocket, and WSS protocols. + +#### TCP Server + +```go +// Create TCP server +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), // Listen address + getty.WithServerTaskPool(taskPool), // Task pool +) +``` + +#### Server Interface + +```go +type Server interface { + EndPoint +} + +type StreamServer interface { + Server + Listener() net.Listener +} + +type PacketServer interface { + Server + PacketConn() net.PacketConn +} +``` + +#### Key Methods + +- **`RunEventLoop(newSession NewSessionCallback)`**: Start event loop to handle client connections +- **`Close()`**: Close the server +- **`IsClosed()`**: Check if the server is closed +- **`ID()`**: Get server ID +- **`EndPointType()`**: Get endpoint type + +#### Event Loop + +The server starts the event loop through the `RunEventLoop` method: + +```go +func (s *server) RunEventLoop(newSession NewSessionCallback) { + if err := s.listen(); err != nil { + panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err))) + } + + switch s.endPointType { + case TCP_SERVER: + s.runTCPEventLoop(newSession) + case UDP_ENDPOINT: + s.runUDPEventLoop(newSession) + case WS_SERVER: + s.runWSEventLoop(newSession) + case WSS_SERVER: + s.runWSSEventLoop(newSession) + default: + panic(fmt.Sprintf("illegal server type %s", s.endPointType.String())) + } +} +``` + +### Options Configuration System + +Getty uses functional options pattern to configure servers and clients, providing flexible configuration. + +#### Server Options + +**Basic Configuration** +- **`WithLocalAddress(addr string)`**: Set server listen address +- **`WithServerTaskPool(pool GenericTaskPool)`**: Set server task pool + +**WebSocket Configuration** +- **`WithWebsocketServerPath(path string)`**: Set WebSocket request path +- **`WithWebsocketServerCert(cert string)`**: Set server certificate file +- **`WithWebsocketServerPrivateKey(key string)`**: Set server private key file +- **`WithWebsocketServerRootCert(cert string)`**: Set root certificate file + +**TLS Configuration** +- **`WithServerSslEnabled(sslEnabled bool)`**: Enable/disable SSL +- **`WithServerTlsConfigBuilder(builder TlsConfigBuilder)`**: Set TLS config builder + +#### Client Options + +**Basic Configuration** +- **`WithServerAddress(addr string)`**: Set server address +- **`WithConnectionNumber(num int)`**: Set connection number +- **`WithClientTaskPool(pool GenericTaskPool)`**: Set client task pool + +**Reconnection Configuration** +- **`WithReconnectInterval(interval int)`**: Set reconnection interval (nanoseconds) +- **`WithReconnectAttempts(maxAttempts int)`**: Set maximum reconnection attempts + +**Certificate Configuration** +- **`WithRootCertificateFile(cert string)`**: Set root certificate file +- **`WithClientSslEnabled(sslEnabled bool)`**: Enable/disable client SSL +- **`WithClientTlsConfigBuilder(builder TlsConfigBuilder)`**: Set client TLS config + +#### Configuration Examples + +**TCP Server Configuration** +```go +// Create task pool +taskPool := gxsync.NewTaskPoolSimple(0) +defer taskPool.Close() + +// TCP server configuration +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), // Listen address + getty.WithServerTaskPool(taskPool), // Task pool +) + +// Start server +server.RunEventLoop(newSession) +``` + +## Advanced Features + +### Callback System Getty provides a robust callback system that allows you to register and manage callback functions for session lifecycle events. This is particularly useful for cleanup operations, resource management, and custom event handling. -### Key Features +#### Key Features - **Thread-safe operations**: All callback operations are protected by mutex locks - **Replace semantics**: Adding with the same (handler, key) replaces the existing callback in place (position preserved) - **Panic safety**: During session close, callbacks run in a dedicated goroutine with defer/recover; panics are logged with stack traces and do not escape the close path - **Ordered execution**: Callbacks are executed in the order they were added -### Usage Example +#### Usage Example ```go // Add a close callback @@ -49,13 +503,13 @@ session.RemoveCloseCallback("cleanup", "resources") **Note**: During session shutdown, callbacks are executed sequentially in a dedicated goroutine to preserve add-order, with defer/recover to log panics without letting them escape the close path. -### Callback Management +#### Callback Management - **AddCloseCallback**: Register a callback to be executed when the session closes - **RemoveCloseCallback**: Remove a previously registered callback (no-op if not found; safe to call multiple times) - **Thread Safety**: All operations are thread-safe and can be called concurrently -### Type Requirements +#### Type Requirements The `handler` and `key` parameters must be **comparable types** that support the `==` operator: @@ -86,15 +540,6 @@ session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) // Logged and session.AddCloseCallback([]int{1, 2, 3}, "key", callback) // Logged and ignored ``` -## About network transmission in getty - -In network communication, the data transmission interface of getty does not guarantee that data will be sent successfully; it lacks an internal retry mechanism. Instead, getty delegates the outcome of data transmission to the underlying operating system mechanism. Under this mechanism, if data is successfully transmitted, it is considered a success; if transmission fails, it is regarded as a failure. These outcomes are then communicated back to the upper-layer caller. - -Upper-layer callers need to determine whether to incorporate a retry mechanism based on these outcomes. This implies that when data transmission fails, upper-layer callers must handle the situation differently depending on the circumstances. For instance, if the failure is due to a disconnect in the connection, upper-layer callers can attempt to resend the data based on the result of getty's automatic reconnection. Alternatively, if the failure is caused by the sending buffer of the underlying operating system being full, the sender can implement its own retry mechanism to wait for the sending buffer to become available before attempting another transmission. - -In summary, the data transmission interface of getty does not come with an inherent retry mechanism; instead, it is up to upper-layer callers to decide whether to implement retry logic based on specific situations. This design approach provides developers with greater flexibility in controlling the behavior of data transmission. - ## LICENCE Apache License 2.0 - diff --git a/README_CN.md b/README_CN.md index c88ab627..da526aa8 100644 --- a/README_CN.md +++ b/README_CN.md @@ -20,18 +20,472 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、 有关代码示例,请参阅 [AlexStocks/getty-examples](https://github.com/AlexStocks/getty-examples)。 -## 回调系统 +## 网络传输机制 + +在网络通信中,Getty 的数据传输接口并不保证数据一定会成功发送,它缺乏内部的重试机制。相反,Getty 将数据传输的结果委托给底层操作系统机制处理。在这种机制下,如果数据成功传输,将被视为成功;如果传输失败,则被视为失败。这些结果随后会传递给上层调用者。 + +上层调用者需要根据这些结果决定是否加入重试机制。这意味着当数据传输失败时,上层调用者必须根据情况采取不同的处理方式。例如,如果失败是由于连接断开导致的,上层调用者可以尝试基于 Getty 的自动重新连接结果重新发送数据。另外,如果失败是因为底层操作系统的发送缓冲区已满,发送者可以自行实现重试机制,在再次尝试传输之前等待发送缓冲区可用。 + +总之,Getty 的数据传输接口并不自带内部的重试机制;相反,是否在特定情况下实现重试逻辑由上层调用者决定。这种设计方法为开发者在控制数据传输行为方面提供了更大的灵活性。 + +## 框架架构图 + +Getty 框架采用分层架构设计,从上到下分为应用层、Getty 核心层和网络层: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 应用层 (Application Layer) │ +├─────────────────────────────────────────────────────────────┤ +│ Application Code │ Message Handler │ Codec/ReadWriter │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Getty 核心层 (Core Layer) │ +├─────────────────────────────────────────────────────────────┤ +│ Session Management │ Server Management │ Client Management │ +│ Connection Mgmt │ Event System │ Options & Config │ +└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ 网络层 (Network Layer) │ +├─────────────────────────────────────────────────────────────┤ +│ TCP Protocol │ UDP Protocol │ WebSocket Protocol │ +│ TLS/SSL │ │ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 核心组件关系 + +1. **Session** 是核心组件,管理连接生命周期 +2. **Server/Client** 提供不同协议的端点实现 +3. **Connection** 封装底层网络连接 +4. **EventListener** 处理各种事件 +5. **Options** 提供灵活的配置方式 + +## 数据流处理 + +#### 完整数据流图 + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 接收数据流 │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ 网络 → Getty → PkgHandler.Read() → EventListener.OnMessage() → 业务逻辑 │ +└─────────────────────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ 发送数据流 │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ 业务逻辑 → WritePkg() → PkgHandler.Write() → Getty → 网络 │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +**处理顺序:** +1. **PkgHandler 优先**:处理协议层解析/序列化 +2. **EventListener 其次**:处理业务逻辑和事件 +3. **两个独立 goroutine**:一个负责读取,一个负责处理 + +**关键组件:** +- **PkgHandler**:实现 `ReadWriter` 接口,负责数据解析/序列化 +- **EventListener**:实现 `EventListener` 接口,负责业务逻辑 +- **OnMessage()**:`EventListener` 接口的方法,用于处理解析后的数据包 + +## 快速开始 + +### TCP 服务器示例 + +以下是一个简化的 TCP 服务器示例,展示 Getty 框架的核心用法: + +```go +package main + +import ( + "fmt" + "log" + "time" + "github.com/AlexStocks/getty/transport" + gxsync "github.com/dubbogo/gost/sync" +) + +// 数据包处理器 - 负责封包/解包 +type EchoPackageHandler struct{} + +// 解包:将网络字节流解析为应用层数据包 +func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { + // 伪代码:实现长度前缀协议 + // 1. 检查是否有足够的数据读取长度头(4字节) + if len(data) < 4 { + return nil, 0, nil // 数据不足,等待更多数据 + } + + // 2. 解析数据包长度 + length := int(data[0])<<24 | int(data[1])<<16 | int(data[2])<<8 | int(data[3]) + + // 3. 检查是否有完整的数据包 + if len(data) < 4+length { + return nil, 0, nil // 数据包不完整,等待更多数据 + } + + // 4. 返回解析出的数据包和消费的字节数 + return data[4:4+length], 4 + length, nil +} + +// 封包:将应用层数据包序列化为网络字节流 +func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { + // 伪代码:实现长度前缀协议 + // 1. 将应用数据转换为字节 + data := []byte(fmt.Sprintf("%v", pkg)) + + // 2. 构建长度头(4字节) + length := len(data) + header := []byte{ + byte(length >> 24), byte(length >> 16), + byte(length >> 8), byte(length), + } + + // 3. 返回完整的网络数据包 + return append(header, data...), nil +} + +// 事件处理器 - 负责业务逻辑 +type EchoMessageHandler struct{} + +// 连接建立时调用 +func (h *EchoMessageHandler) OnOpen(session transport.Session) error { + log.Printf("新连接: %s", session.RemoteAddr()) + return nil +} + +// 连接关闭时调用 +func (h *EchoMessageHandler) OnClose(session transport.Session) { + log.Printf("连接关闭: %s", session.RemoteAddr()) +} + +// 发生错误时调用 +func (h *EchoMessageHandler) OnError(session transport.Session, err error) { + log.Printf("连接错误: %s, 错误: %v", session.RemoteAddr(), err) +} + +// 心跳检测 - 定期调用 +func (h *EchoMessageHandler) OnCron(session transport.Session) { + activeTime := session.GetActive() + if time.Since(activeTime) > 30*time.Second { + log.Printf("连接超时,关闭: %s", session.RemoteAddr()) + session.Close() + } +} + +// 收到消息时调用 - 核心业务逻辑 +func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { + messageData := pkg.([]byte) + log.Printf("收到消息: %s", string(messageData)) + + // 业务逻辑:回显消息 + response := fmt.Sprintf("Echo: %s", string(messageData)) + session.WritePkg(response, time.Second*5) +} + +// 新连接回调 - 配置会话 +func newSession(session transport.Session) error { + // 基础配置 + session.SetName("tcp-echo-session") + session.SetMaxMsgLen(4096) + session.SetReadTimeout(time.Second * 10) + session.SetWriteTimeout(time.Second * 10) + session.SetCronPeriod(5) // 5秒心跳检测 + session.SetWaitTime(time.Second * 3) + + // 设置处理器 + session.SetPkgHandler(&EchoPackageHandler{}) // 数据包处理器 + session.SetEventListener(&EchoMessageHandler{}) // 事件处理器 + + // 添加关闭回调 + session.AddCloseCallback("cleanup", "resources", func() { + log.Printf("清理资源: %s", session.RemoteAddr()) + }) + + return nil +} + +func main() { + // 创建任务池(用于并发处理消息) + taskPool := gxsync.NewTaskPoolSimple(0) + defer taskPool.Close() + + // 创建 TCP 服务器 + server := transport.NewTCPServer( + transport.WithLocalAddress(":8080"), // 监听地址 + transport.WithServerTaskPool(taskPool), // 任务池 + ) + + // 启动服务器 + log.Println("TCP 服务器启动在 :8080") + server.RunEventLoop(newSession) // 开始事件循环 +} +``` + +## 核心概念 + +### Session 会话管理 + +Session 是 Getty 框架的核心组件,负责管理客户端与服务器之间的连接会话。每个连接对应一个 Session 实例,提供完整的连接生命周期管理。 + +#### Session 接口 + +```go +type Session interface { + Connection + Reset() + Conn() net.Conn + Stat() string + IsClosed() bool + EndPoint() EndPoint + SetMaxMsgLen(int) + SetName(string) + SetEventListener(EventListener) + SetPkgHandler(ReadWriter) + SetReader(Reader) + SetWriter(Writer) + SetCronPeriod(int) + SetWaitTime(time.Duration) + GetAttribute(any) any + SetAttribute(any, any) + RemoveAttribute(any) + WritePkg(pkg any, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error) + WriteBytes([]byte) (int, error) + WriteBytesArray(...[]byte) (int, error) + Close() + AddCloseCallback(handler, key any, callback CallBackFunc) + RemoveCloseCallback(handler, key any) +} +``` + +#### 主要方法说明 + +**连接管理** +- **`Conn()`**: 获取底层网络连接对象 +- **`IsClosed()`**: 检查会话是否已关闭 +- **`Close()`**: 关闭会话连接 +- **`Reset()`**: 重置会话状态 + +**配置设置** +- **`SetName(string)`**: 设置会话名称 +- **`SetMaxMsgLen(int)`**: 设置最大消息长度 +- **`SetCronPeriod(int)`**: 设置心跳检测周期(毫秒) +- **`SetWaitTime(time.Duration)`**: 设置等待超时时间 + +**处理器设置** +- **`SetEventListener(EventListener)`**: 设置事件监听器,处理连接生命周期事件 +- **`SetPkgHandler(ReadWriter)`**: 设置数据包处理器,负责解析和序列化网络数据 +- **`SetReader(Reader)`**: 设置数据读取器,用于自定义数据解析 +- **`SetWriter(Writer)`**: 设置数据写入器,用于自定义数据序列化 + +**数据发送** +- **`WritePkg(pkg any, timeout time.Duration)`**: 发送数据包,返回总字节数和成功发送字节数 +- **`WriteBytes([]byte)`**: 发送字节数据 +- **`WriteBytesArray(...[]byte)`**: 发送多个字节数组 + +**属性管理** +- **`GetAttribute(key any)`**: 获取会话属性 +- **`SetAttribute(key any, value any)`**: 设置会话属性 +- **`RemoveAttribute(key any)`**: 删除会话属性 + +**统计信息** +- **`Stat()`**: 获取会话统计信息(连接状态、读写字节数、包数量等) + +#### 活跃时间更新机制 + +**自动活跃时间更新** +```go +// Getty 在以下情况下自动更新会话活跃时间: +// 1. 从网络接收数据时 +func (t *gettyTCPConn) recv(p []byte) (int, error) { + // ... 接收数据逻辑 + t.UpdateActive() // 自动调用 - 更新 GetActive() 值 + return length, err +} + +// 2. WebSocket ping/pong 帧(仅 WebSocket) +func (w *gettyWSConn) handlePing(message string) error { + w.UpdateActive() // 收到 ping 时更新 + return w.writePong([]byte(message)) +} + +func (w *gettyWSConn) handlePong(string) error { + w.UpdateActive() // 收到 pong 时更新 + return nil +} + +// 注意:TCP/UDP 的 Send() 方法不会自动调用 UpdateActive() +// 只有数据接收和 WebSocket ping/pong 会更新活跃时间 +``` + +**服务端心跳检测** +```go +// 服务端定期为每个会话自动调用 OnCron +func (h *ServerMessageHandler) OnCron(session transport.Session) { + // 获取最后活跃时间(在数据接收/发送时自动更新) + activeTime := session.GetActive() + idleTime := time.Since(activeTime) + + log.Printf("心跳检测: %s, 最后活跃: %v, 空闲: %v", + session.RemoteAddr(), activeTime, idleTime) + + // 检查是否超时 + if idleTime > 30*time.Second { + log.Printf("客户端超时,关闭连接: %s", session.RemoteAddr()) + session.Close() + } +} +``` + +**活跃时间更新时间线** +```go +// 示例时间线,显示 GetActive() 值何时变化: +// 00:00:00 - 连接建立,GetActive() = 2024-01-01 10:00:00 +// 00:00:05 - 客户端发送数据,GetActive() = 2024-01-01 10:00:05 +// 00:00:10 - 服务端发送响应,GetActive() = 2024-01-01 10:00:10 +// 00:00:15 - OnCron 被调用,检查空闲时间:5 秒 +// 00:00:20 - OnCron 被调用,检查空闲时间:10 秒 +// 00:00:30 - OnCron 被调用,检测到超时,关闭连接 +``` + +**关键要点:** +- **自动更新**:活跃时间在数据接收/发送时自动更新 +- **服务端检测**:服务端定期调用 OnCron 检查客户端活动 +- **无需客户端请求**:心跳检测是服务端发起的,不需要客户端请求 +- **实时监控**:GetActive() 反映真实的网络活动 + +### Server 服务器管理 + +Getty 提供了多种类型的服务器实现,支持 TCP、UDP、WebSocket 和 WSS 协议。 + +#### TCP 服务器 + +```go +// 创建 TCP 服务器 +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), // 监听地址 + getty.WithServerTaskPool(taskPool), // 任务池 +) +``` + +#### 服务器接口 + +```go +type Server interface { + EndPoint +} + +type StreamServer interface { + Server + Listener() net.Listener +} + +type PacketServer interface { + Server + PacketConn() net.PacketConn +} +``` + +#### 主要方法 + +- **`RunEventLoop(newSession NewSessionCallback)`**: 启动事件循环,处理客户端连接 +- **`Close()`**: 关闭服务器 +- **`IsClosed()`**: 检查服务器是否已关闭 +- **`ID()`**: 获取服务器ID +- **`EndPointType()`**: 获取端点类型 + +#### 事件循环 + +服务器通过 `RunEventLoop` 方法启动事件循环: + +```go +func (s *server) RunEventLoop(newSession NewSessionCallback) { + if err := s.listen(); err != nil { + panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err))) + } + + switch s.endPointType { + case TCP_SERVER: + s.runTCPEventLoop(newSession) + case UDP_ENDPOINT: + s.runUDPEventLoop(newSession) + case WS_SERVER: + s.runWSEventLoop(newSession) + case WSS_SERVER: + s.runWSSEventLoop(newSession) + default: + panic(fmt.Sprintf("illegal server type %s", s.endPointType.String())) + } +} +``` + +### Options 配置系统 + +Getty 使用函数式选项模式来配置服务器和客户端,提供了灵活的配置方式。 + +#### Server Options 服务器选项 + +**基础配置** +- **`WithLocalAddress(addr string)`**: 设置服务器监听地址 +- **`WithServerTaskPool(pool GenericTaskPool)`**: 设置服务器任务池 + +**WebSocket 配置** +- **`WithWebsocketServerPath(path string)`**: 设置 WebSocket 请求路径 +- **`WithWebsocketServerCert(cert string)`**: 设置服务器证书文件 +- **`WithWebsocketServerPrivateKey(key string)`**: 设置服务器私钥文件 +- **`WithWebsocketServerRootCert(cert string)`**: 设置根证书文件 + +**TLS 配置** +- **`WithServerSslEnabled(sslEnabled bool)`**: 启用/禁用 SSL +- **`WithServerTlsConfigBuilder(builder TlsConfigBuilder)`**: 设置 TLS 配置构建器 + +#### Client Options 客户端选项 + +**基础配置** +- **`WithServerAddress(addr string)`**: 设置服务器地址 +- **`WithConnectionNumber(num int)`**: 设置连接数量 +- **`WithClientTaskPool(pool GenericTaskPool)`**: 设置客户端任务池 + +**重连配置** +- **`WithReconnectInterval(interval int)`**: 设置重连间隔(纳秒) +- **`WithReconnectAttempts(maxAttempts int)`**: 设置最大重连次数 + +**证书配置** +- **`WithRootCertificateFile(cert string)`**: 设置根证书文件 +- **`WithClientSslEnabled(sslEnabled bool)`**: 启用/禁用客户端 SSL +- **`WithClientTlsConfigBuilder(builder TlsConfigBuilder)`**: 设置客户端 TLS 配置 + +#### 配置示例 + +**TCP 服务器配置** +```go +// 创建任务池 +taskPool := gxsync.NewTaskPoolSimple(0) +defer taskPool.Close() + +// TCP 服务器配置 +server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), // 监听地址 + getty.WithServerTaskPool(taskPool), // 任务池 +) + +// 启动服务器 +server.RunEventLoop(newSession) +``` + +## 高级特性 + +### 回调系统 Getty 提供了一个强大的回调系统,允许您为会话生命周期事件注册和管理回调函数。这对于清理操作、资源管理和自定义事件处理特别有用。 -### 主要特性 +#### 主要特性 - **线程安全操作**:所有回调操作都受到互斥锁保护 - **替换语义**:使用相同的 (handler, key) 添加会替换现有回调并保持位置不变 - **Panic 安全性**:在会话关闭期间,回调在专用 goroutine 中运行,带有 defer/recover;panic 会被记录堆栈跟踪且不会逃逸出关闭路径 - **有序执行**:回调按照添加的顺序执行 -### 使用示例 +#### 使用示例 ```go // 添加关闭回调 @@ -49,13 +503,13 @@ session.RemoveCloseCallback("cleanup", "resources") **注意**:在会话关闭期间,回调在专用 goroutine 中顺序执行以保持添加顺序,带有 defer/recover 来记录 panic 而不让它们逃逸出关闭路径。 -### 回调管理 +#### 回调管理 - **AddCloseCallback**:注册一个在会话关闭时执行的回调 - **RemoveCloseCallback**:移除之前注册的回调(未找到时无操作;可安全多次调用) - **线程安全**:所有操作都是线程安全的,可以并发调用 -### 类型要求 +#### 类型要求 `handler` 和 `key` 参数必须是**可比较的类型**,支持 `==` 操作符: @@ -86,14 +540,6 @@ session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) // 记录日 session.AddCloseCallback([]int{1, 2, 3}, "key", callback) // 记录日志并忽略 ``` -## 关于 Getty 中的网络传输 - -在网络通信中,Getty 的数据传输接口并不保证数据一定会成功发送,它缺乏内部的重试机制。相反,Getty 将数据传输的结果委托给底层操作系统机制处理。在这种机制下,如果数据成功传输,将被视为成功;如果传输失败,则被视为失败。这些结果随后会传递给上层调用者。 - -上层调用者需要根据这些结果决定是否加入重试机制。这意味着当数据传输失败时,上层调用者必须根据情况采取不同的处理方式。例如,如果失败是由于连接断开导致的,上层调用者可以尝试基于 Getty 的自动重新连接结果重新发送数据。另外,如果失败是因为底层操作系统的发送缓冲区已满,发送者可以自行实现重试机制,在再次尝试传输之前等待发送缓冲区可用。 - -总之,Getty 的数据传输接口并不自带内部的重试机制;相反,是否在特定情况下实现重试逻辑由上层调用者决定。这种设计方法为开发者在控制数据传输行为方面提供了更大的灵活性。 - ## 许可证 -Apache 许可证 2.0 +Apache 许可证 2.0 \ No newline at end of file From 9170d8221d6a80bf34b67fb669d81ff1fe6fe6a8 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 23 Sep 2025 19:50:03 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E6=A0=B9=E6=8D=AEai=E7=9A=84=E5=BB=BA?= =?UTF-8?q?=E8=AE=AE=E8=BF=9B=E8=A1=8C=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 18 +++++++++++------- README_CN.md | 18 +++++++++++------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 74215623..8dbfcfb2 100644 --- a/README.md +++ b/README.md @@ -189,7 +189,7 @@ func newSession(session transport.Session) error { session.SetMaxMsgLen(4096) session.SetReadTimeout(time.Second * 10) session.SetWriteTimeout(time.Second * 10) - session.SetCronPeriod(5) // 5 second heartbeat detection + session.SetCronPeriod(5000) // 5 second heartbeat detection session.SetWaitTime(time.Second * 3) // Set handlers @@ -245,6 +245,8 @@ type Session interface { SetWriter(Writer) SetCronPeriod(int) SetWaitTime(time.Duration) + SetReadTimeout(time.Duration) + SetWriteTimeout(time.Duration) GetAttribute(any) any SetAttribute(any, any) RemoveAttribute(any) @@ -270,6 +272,8 @@ type Session interface { - **`SetMaxMsgLen(int)`**: Set maximum message length - **`SetCronPeriod(int)`**: Set heartbeat detection period (milliseconds) - **`SetWaitTime(time.Duration)`**: Set wait timeout +- **`SetReadTimeout(time.Duration)`**: Set read timeout +- **`SetWriteTimeout(time.Duration)`**: Set write timeout **Handler Settings** - **`SetEventListener(EventListener)`**: Set event listener for handling connection lifecycle events @@ -313,15 +317,15 @@ func (w *gettyWSConn) handlePong(string) error { return nil } -// Note: TCP/UDP Send() methods do NOT automatically call UpdateActive() -// Only data reception and WebSocket ping/pong update active time +// Note: TCP/UDP send does NOT automatically call UpdateActive() +// Only "data reception" and WebSocket ping/pong update active time ``` **Server-Side Heartbeat Detection** ```go // Server automatically calls OnCron periodically for each session func (h *ServerMessageHandler) OnCron(session transport.Session) { - // Get last active time (automatically updated on data receive/send) + // Get last active time (automatically updated on data reception or WS ping/pong) activeTime := session.GetActive() idleTime := time.Since(activeTime) @@ -340,15 +344,15 @@ func (h *ServerMessageHandler) OnCron(session transport.Session) { ```go // Example timeline showing when GetActive() values change: // 00:00:00 - Connection established, GetActive() = 2024-01-01 10:00:00 -// 00:00:05 - Client sends data, GetActive() = 2024-01-01 10:00:05 -// 00:00:10 - Server sends response, GetActive() = 2024-01-01 10:00:10 +// 00:00:05 - Server receives client data, GetActive() = 2024-01-01 10:00:05 +// 00:00:10 - Server receives client data, GetActive() = 2024-01-01 10:00:10 // 00:00:15 - OnCron called, checks idle time: 5 seconds // 00:00:20 - OnCron called, checks idle time: 10 seconds // 00:00:30 - OnCron called, detects timeout, closes connection ``` **Key Points:** -- **Automatic Updates**: Active time is updated automatically on data receive/send +- **Automatic Updates**: Active time updates only on data reception or WebSocket ping/pong - **Server-Side Detection**: Server calls OnCron periodically to check client activity - **No Client Request Needed**: Heartbeat detection is server-initiated, not client-requested - **Real-Time Monitoring**: GetActive() reflects actual network activity diff --git a/README_CN.md b/README_CN.md index da526aa8..489d7400 100644 --- a/README_CN.md +++ b/README_CN.md @@ -189,7 +189,7 @@ func newSession(session transport.Session) error { session.SetMaxMsgLen(4096) session.SetReadTimeout(time.Second * 10) session.SetWriteTimeout(time.Second * 10) - session.SetCronPeriod(5) // 5秒心跳检测 + session.SetCronPeriod(5000) // 5秒心跳检测 session.SetWaitTime(time.Second * 3) // 设置处理器 @@ -245,6 +245,8 @@ type Session interface { SetWriter(Writer) SetCronPeriod(int) SetWaitTime(time.Duration) + SetReadTimeout(time.Duration) + SetWriteTimeout(time.Duration) GetAttribute(any) any SetAttribute(any, any) RemoveAttribute(any) @@ -270,6 +272,8 @@ type Session interface { - **`SetMaxMsgLen(int)`**: 设置最大消息长度 - **`SetCronPeriod(int)`**: 设置心跳检测周期(毫秒) - **`SetWaitTime(time.Duration)`**: 设置等待超时时间 +- **`SetReadTimeout(time.Duration)`**: 设置读取超时时间 +- **`SetWriteTimeout(time.Duration)`**: 设置写入超时时间 **处理器设置** - **`SetEventListener(EventListener)`**: 设置事件监听器,处理连接生命周期事件 @@ -313,15 +317,15 @@ func (w *gettyWSConn) handlePong(string) error { return nil } -// 注意:TCP/UDP 的 Send() 方法不会自动调用 UpdateActive() -// 只有数据接收和 WebSocket ping/pong 会更新活跃时间 +// 注意:TCP/UDP send 不会自动调用 UpdateActive() +// 只有"数据接收"和 WebSocket ping/pong 会更新活跃时间 ``` **服务端心跳检测** ```go // 服务端定期为每个会话自动调用 OnCron func (h *ServerMessageHandler) OnCron(session transport.Session) { - // 获取最后活跃时间(在数据接收/发送时自动更新) + // 获取最后活跃时间(在数据接收或 WS ping/pong 时自动更新) activeTime := session.GetActive() idleTime := time.Since(activeTime) @@ -340,15 +344,15 @@ func (h *ServerMessageHandler) OnCron(session transport.Session) { ```go // 示例时间线,显示 GetActive() 值何时变化: // 00:00:00 - 连接建立,GetActive() = 2024-01-01 10:00:00 -// 00:00:05 - 客户端发送数据,GetActive() = 2024-01-01 10:00:05 -// 00:00:10 - 服务端发送响应,GetActive() = 2024-01-01 10:00:10 +// 00:00:05 - 服务端接收客户端数据,GetActive() = 2024-01-01 10:00:05 +// 00:00:10 - 服务端接收客户端数据,GetActive() = 2024-01-01 10:00:10 // 00:00:15 - OnCron 被调用,检查空闲时间:5 秒 // 00:00:20 - OnCron 被调用,检查空闲时间:10 秒 // 00:00:30 - OnCron 被调用,检测到超时,关闭连接 ``` **关键要点:** -- **自动更新**:活跃时间在数据接收/发送时自动更新 +- **自动更新**:活跃时间仅在数据接收或 WebSocket ping/pong 时更新 - **服务端检测**:服务端定期调用 OnCron 检查客户端活动 - **无需客户端请求**:心跳检测是服务端发起的,不需要客户端请求 - **实时监控**:GetActive() 反映真实的网络活动 From d21535c2975c6514cb6aa1c49e6389ca0c00fb53 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 23 Sep 2025 19:57:29 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=A0=B9=E6=8D=AEai=E7=9A=84=E5=BB=BA?= =?UTF-8?q?=E8=AE=AE=E8=BF=9B=E8=A1=8C=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 44 ++++++++++++++++++++++++-------------------- README_CN.md | 44 ++++++++++++++++++++++++-------------------- 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 8dbfcfb2..b94095d1 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ In summary, the data transmission interface of getty does not come with an inher Getty framework adopts a layered architecture design, from top to bottom: Application Layer, Getty Core Layer, and Network Layer: -``` +```text ┌─────────────────────────────────────────────────────────────┐ │ Application Layer │ ├─────────────────────────────────────────────────────────────┤ @@ -62,9 +62,9 @@ Getty framework adopts a layered architecture design, from top to bottom: Applic ## Data Flow Processing -#### Complete Data Flow Diagram +### Complete Data Flow Diagram -``` +```text ┌─────────────────────────────────────────────────────────────────────────────┐ │ Incoming Data Flow │ ├─────────────────────────────────────────────────────────────────────────────┤ @@ -77,12 +77,12 @@ Getty framework adopts a layered architecture design, from top to bottom: Applic └─────────────────────────────────────────────────────────────────────────────┘ ``` -**Processing Order:** +### Processing Order 1. **PkgHandler first**: Handles protocol-level parsing/serialization 2. **EventListener second**: Handles business logic and events 3. **Two separate goroutines**: One for reading, one for processing -**Key Components:** +### Key Components - **PkgHandler**: Implements `ReadWriter` interface for data parsing/serialization - **EventListener**: Implements `EventListener` interface for business logic - **OnMessage()**: Method of `EventListener` interface for processing parsed packets @@ -108,7 +108,7 @@ import ( type EchoPackageHandler struct{} // Deserialize: parse network byte stream into application packets -func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { +func (h *EchoPackageHandler) Read(session getty.Session, data []byte) (interface{}, int, error) { // Pseudo code: implement length-prefixed protocol // 1. Check if there's enough data to read length header (4 bytes) if len(data) < 4 { @@ -128,7 +128,7 @@ func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (inter } // Serialize: convert application packets to network byte stream -func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { +func (h *EchoPackageHandler) Write(session getty.Session, pkg interface{}) ([]byte, error) { // Pseudo code: implement length-prefixed protocol // 1. Convert application data to bytes data := []byte(fmt.Sprintf("%v", pkg)) @@ -148,23 +148,23 @@ func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ( type EchoMessageHandler struct{} // Called when connection is established -func (h *EchoMessageHandler) OnOpen(session transport.Session) error { +func (h *EchoMessageHandler) OnOpen(session getty.Session) error { log.Printf("New connection: %s", session.RemoteAddr()) return nil } // Called when connection is closed -func (h *EchoMessageHandler) OnClose(session transport.Session) { +func (h *EchoMessageHandler) OnClose(session getty.Session) { log.Printf("Connection closed: %s", session.RemoteAddr()) } // Called when error occurs -func (h *EchoMessageHandler) OnError(session transport.Session, err error) { +func (h *EchoMessageHandler) OnError(session getty.Session, err error) { log.Printf("Connection error: %s, error: %v", session.RemoteAddr(), err) } // Heartbeat detection - called periodically -func (h *EchoMessageHandler) OnCron(session transport.Session) { +func (h *EchoMessageHandler) OnCron(session getty.Session) { activeTime := session.GetActive() if time.Since(activeTime) > 30*time.Second { log.Printf("Connection timeout, closing: %s", session.RemoteAddr()) @@ -173,8 +173,12 @@ func (h *EchoMessageHandler) OnCron(session transport.Session) { } // Called when message is received - core business logic -func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { - messageData := pkg.([]byte) +func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) { + messageData, ok := pkg.([]byte) + if !ok { + log.Printf("invalid packet type: %T", pkg) + return + } log.Printf("Received message: %s", string(messageData)) // Business logic: echo message @@ -183,7 +187,7 @@ func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{ } // New connection callback - configure session -func newSession(session transport.Session) error { +func newSession(session getty.Session) error { // Basic configuration session.SetName("tcp-echo-session") session.SetMaxMsgLen(4096) @@ -210,9 +214,9 @@ func main() { defer taskPool.Close() // Create TCP server - server := transport.NewTCPServer( - transport.WithLocalAddress(":8080"), // Listen address - transport.WithServerTaskPool(taskPool), // Task pool + server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), // Listen address + getty.WithServerTaskPool(taskPool), // Task pool ) // Start server @@ -324,7 +328,7 @@ func (w *gettyWSConn) handlePong(string) error { **Server-Side Heartbeat Detection** ```go // Server automatically calls OnCron periodically for each session -func (h *ServerMessageHandler) OnCron(session transport.Session) { +func (h *ServerMessageHandler) OnCron(session getty.Session) { // Get last active time (automatically updated on data reception or WS ping/pong) activeTime := session.GetActive() idleTime := time.Since(activeTime) @@ -366,7 +370,7 @@ Getty provides multiple types of server implementations, supporting TCP, UDP, We ```go // Create TCP server server := getty.NewTCPServer( - getty.WithLocalAddress(":8080"), // Listen address + getty.WithLocalAddress(":8080"), // Listen address getty.WithServerTaskPool(taskPool), // Task pool ) ``` @@ -468,7 +472,7 @@ defer taskPool.Close() // TCP server configuration server := getty.NewTCPServer( - getty.WithLocalAddress(":8080"), // Listen address + getty.WithLocalAddress(":8080"), // Listen address getty.WithServerTaskPool(taskPool), // Task pool ) diff --git a/README_CN.md b/README_CN.md index 489d7400..16ee67e7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -32,7 +32,7 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、 Getty 框架采用分层架构设计,从上到下分为应用层、Getty 核心层和网络层: -``` +```text ┌─────────────────────────────────────────────────────────────┐ │ 应用层 (Application Layer) │ ├─────────────────────────────────────────────────────────────┤ @@ -62,9 +62,9 @@ Getty 框架采用分层架构设计,从上到下分为应用层、Getty 核 ## 数据流处理 -#### 完整数据流图 +### 完整数据流图 -``` +```text ┌─────────────────────────────────────────────────────────────────────────────┐ │ 接收数据流 │ ├─────────────────────────────────────────────────────────────────────────────┤ @@ -77,12 +77,12 @@ Getty 框架采用分层架构设计,从上到下分为应用层、Getty 核 └─────────────────────────────────────────────────────────────────────────────┘ ``` -**处理顺序:** +### 处理顺序 1. **PkgHandler 优先**:处理协议层解析/序列化 2. **EventListener 其次**:处理业务逻辑和事件 3. **两个独立 goroutine**:一个负责读取,一个负责处理 -**关键组件:** +### 关键组件 - **PkgHandler**:实现 `ReadWriter` 接口,负责数据解析/序列化 - **EventListener**:实现 `EventListener` 接口,负责业务逻辑 - **OnMessage()**:`EventListener` 接口的方法,用于处理解析后的数据包 @@ -108,7 +108,7 @@ import ( type EchoPackageHandler struct{} // 解包:将网络字节流解析为应用层数据包 -func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (interface{}, int, error) { +func (h *EchoPackageHandler) Read(session getty.Session, data []byte) (interface{}, int, error) { // 伪代码:实现长度前缀协议 // 1. 检查是否有足够的数据读取长度头(4字节) if len(data) < 4 { @@ -128,7 +128,7 @@ func (h *EchoPackageHandler) Read(session transport.Session, data []byte) (inter } // 封包:将应用层数据包序列化为网络字节流 -func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ([]byte, error) { +func (h *EchoPackageHandler) Write(session getty.Session, pkg interface{}) ([]byte, error) { // 伪代码:实现长度前缀协议 // 1. 将应用数据转换为字节 data := []byte(fmt.Sprintf("%v", pkg)) @@ -148,23 +148,23 @@ func (h *EchoPackageHandler) Write(session transport.Session, pkg interface{}) ( type EchoMessageHandler struct{} // 连接建立时调用 -func (h *EchoMessageHandler) OnOpen(session transport.Session) error { +func (h *EchoMessageHandler) OnOpen(session getty.Session) error { log.Printf("新连接: %s", session.RemoteAddr()) return nil } // 连接关闭时调用 -func (h *EchoMessageHandler) OnClose(session transport.Session) { +func (h *EchoMessageHandler) OnClose(session getty.Session) { log.Printf("连接关闭: %s", session.RemoteAddr()) } // 发生错误时调用 -func (h *EchoMessageHandler) OnError(session transport.Session, err error) { +func (h *EchoMessageHandler) OnError(session getty.Session, err error) { log.Printf("连接错误: %s, 错误: %v", session.RemoteAddr(), err) } // 心跳检测 - 定期调用 -func (h *EchoMessageHandler) OnCron(session transport.Session) { +func (h *EchoMessageHandler) OnCron(session getty.Session) { activeTime := session.GetActive() if time.Since(activeTime) > 30*time.Second { log.Printf("连接超时,关闭: %s", session.RemoteAddr()) @@ -173,8 +173,12 @@ func (h *EchoMessageHandler) OnCron(session transport.Session) { } // 收到消息时调用 - 核心业务逻辑 -func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{}) { - messageData := pkg.([]byte) +func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) { + messageData, ok := pkg.([]byte) + if !ok { + log.Printf("invalid packet type: %T", pkg) + return + } log.Printf("收到消息: %s", string(messageData)) // 业务逻辑:回显消息 @@ -183,7 +187,7 @@ func (h *EchoMessageHandler) OnMessage(session transport.Session, pkg interface{ } // 新连接回调 - 配置会话 -func newSession(session transport.Session) error { +func newSession(session getty.Session) error { // 基础配置 session.SetName("tcp-echo-session") session.SetMaxMsgLen(4096) @@ -210,9 +214,9 @@ func main() { defer taskPool.Close() // 创建 TCP 服务器 - server := transport.NewTCPServer( - transport.WithLocalAddress(":8080"), // 监听地址 - transport.WithServerTaskPool(taskPool), // 任务池 + server := getty.NewTCPServer( + getty.WithLocalAddress(":8080"), // 监听地址 + getty.WithServerTaskPool(taskPool), // 任务池 ) // 启动服务器 @@ -324,7 +328,7 @@ func (w *gettyWSConn) handlePong(string) error { **服务端心跳检测** ```go // 服务端定期为每个会话自动调用 OnCron -func (h *ServerMessageHandler) OnCron(session transport.Session) { +func (h *ServerMessageHandler) OnCron(session getty.Session) { // 获取最后活跃时间(在数据接收或 WS ping/pong 时自动更新) activeTime := session.GetActive() idleTime := time.Since(activeTime) @@ -366,7 +370,7 @@ Getty 提供了多种类型的服务器实现,支持 TCP、UDP、WebSocket 和 ```go // 创建 TCP 服务器 server := getty.NewTCPServer( - getty.WithLocalAddress(":8080"), // 监听地址 + getty.WithLocalAddress(":8080"), // 监听地址 getty.WithServerTaskPool(taskPool), // 任务池 ) ``` @@ -468,7 +472,7 @@ defer taskPool.Close() // TCP 服务器配置 server := getty.NewTCPServer( - getty.WithLocalAddress(":8080"), // 监听地址 + getty.WithLocalAddress(":8080"), // 监听地址 getty.WithServerTaskPool(taskPool), // 任务池 ) From 7317960691b54e5b5033295076c473b12c27a98f Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 23 Sep 2025 20:08:19 +0800 Subject: [PATCH 4/5] ai robot --- README.md | 6 ++++-- README_CN.md | 26 ++++++++++++++------------ 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index b94095d1..2e978497 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ import ( "fmt" "log" "time" - "github.com/AlexStocks/getty/transport" + getty "github.com/AlexStocks/getty/transport" gxsync "github.com/dubbogo/gost/sync" ) @@ -183,7 +183,9 @@ func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) { // Business logic: echo message response := fmt.Sprintf("Echo: %s", string(messageData)) - session.WritePkg(response, time.Second*5) + if _, _, err := session.WritePkg(response, 5*time.Second); err != nil { + log.Printf("send failed: %v", err) + } } // New connection callback - configure session diff --git a/README_CN.md b/README_CN.md index 16ee67e7..2523452d 100644 --- a/README_CN.md +++ b/README_CN.md @@ -100,7 +100,7 @@ import ( "fmt" "log" "time" - "github.com/AlexStocks/getty/transport" + getty "github.com/AlexStocks/getty/transport" gxsync "github.com/dubbogo/gost/sync" ) @@ -183,7 +183,9 @@ func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) { // 业务逻辑:回显消息 response := fmt.Sprintf("Echo: %s", string(messageData)) - session.WritePkg(response, time.Second*5) + if _, _, err := session.WritePkg(response, 5*time.Second); err != nil { + log.Printf("发送失败: %v", err) + } } // 新连接回调 - 配置会话 @@ -265,13 +267,13 @@ type Session interface { #### 主要方法说明 -**连接管理** +#### 连接管理 - **`Conn()`**: 获取底层网络连接对象 - **`IsClosed()`**: 检查会话是否已关闭 - **`Close()`**: 关闭会话连接 - **`Reset()`**: 重置会话状态 -**配置设置** +#### 配置设置 - **`SetName(string)`**: 设置会话名称 - **`SetMaxMsgLen(int)`**: 设置最大消息长度 - **`SetCronPeriod(int)`**: 设置心跳检测周期(毫秒) @@ -279,28 +281,28 @@ type Session interface { - **`SetReadTimeout(time.Duration)`**: 设置读取超时时间 - **`SetWriteTimeout(time.Duration)`**: 设置写入超时时间 -**处理器设置** +#### 处理器设置 - **`SetEventListener(EventListener)`**: 设置事件监听器,处理连接生命周期事件 - **`SetPkgHandler(ReadWriter)`**: 设置数据包处理器,负责解析和序列化网络数据 - **`SetReader(Reader)`**: 设置数据读取器,用于自定义数据解析 - **`SetWriter(Writer)`**: 设置数据写入器,用于自定义数据序列化 -**数据发送** +#### 数据发送 - **`WritePkg(pkg any, timeout time.Duration)`**: 发送数据包,返回总字节数和成功发送字节数 - **`WriteBytes([]byte)`**: 发送字节数据 - **`WriteBytesArray(...[]byte)`**: 发送多个字节数组 -**属性管理** +#### 属性管理 - **`GetAttribute(key any)`**: 获取会话属性 - **`SetAttribute(key any, value any)`**: 设置会话属性 - **`RemoveAttribute(key any)`**: 删除会话属性 -**统计信息** +#### 统计信息 - **`Stat()`**: 获取会话统计信息(连接状态、读写字节数、包数量等) #### 活跃时间更新机制 -**自动活跃时间更新** +##### 自动活跃时间更新 ```go // Getty 在以下情况下自动更新会话活跃时间: // 1. 从网络接收数据时 @@ -325,7 +327,7 @@ func (w *gettyWSConn) handlePong(string) error { // 只有"数据接收"和 WebSocket ping/pong 会更新活跃时间 ``` -**服务端心跳检测** +##### 服务端心跳检测 ```go // 服务端定期为每个会话自动调用 OnCron func (h *ServerMessageHandler) OnCron(session getty.Session) { @@ -344,7 +346,7 @@ func (h *ServerMessageHandler) OnCron(session getty.Session) { } ``` -**活跃时间更新时间线** +##### 活跃时间更新时间线 ```go // 示例时间线,显示 GetActive() 值何时变化: // 00:00:00 - 连接建立,GetActive() = 2024-01-01 10:00:00 @@ -355,7 +357,7 @@ func (h *ServerMessageHandler) OnCron(session getty.Session) { // 00:00:30 - OnCron 被调用,检测到超时,关闭连接 ``` -**关键要点:** +##### 关键要点 - **自动更新**:活跃时间仅在数据接收或 WebSocket ping/pong 时更新 - **服务端检测**:服务端定期调用 OnCron 检查客户端活动 - **无需客户端请求**:心跳检测是服务端发起的,不需要客户端请求 From 28976508578b8102eee35a59f96ea3d8a3f9ceb1 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 23 Sep 2025 20:16:47 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=8F=82=E8=80=83ai=20robot=E5=BB=BA?= =?UTF-8?q?=E8=AE=AE=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 ++++++++------ README_CN.md | 14 ++++++++------ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 2e978497..265833a1 100644 --- a/README.md +++ b/README.md @@ -249,6 +249,8 @@ type Session interface { SetPkgHandler(ReadWriter) SetReader(Reader) SetWriter(Writer) + GetActive() time.Time + UpdateActive() SetCronPeriod(int) SetWaitTime(time.Duration) SetReadTimeout(time.Duration) @@ -267,13 +269,13 @@ type Session interface { #### Key Methods -**Connection Management** +##### Connection Management - **`Conn()`**: Get the underlying network connection object - **`IsClosed()`**: Check if the session is closed - **`Close()`**: Close the session connection - **`Reset()`**: Reset session state -**Configuration Settings** +##### Configuration Settings - **`SetName(string)`**: Set session name - **`SetMaxMsgLen(int)`**: Set maximum message length - **`SetCronPeriod(int)`**: Set heartbeat detection period (milliseconds) @@ -281,23 +283,23 @@ type Session interface { - **`SetReadTimeout(time.Duration)`**: Set read timeout - **`SetWriteTimeout(time.Duration)`**: Set write timeout -**Handler Settings** +##### Handler Settings - **`SetEventListener(EventListener)`**: Set event listener for handling connection lifecycle events - **`SetPkgHandler(ReadWriter)`**: Set packet handler for parsing and serializing network data - **`SetReader(Reader)`**: Set data reader for custom data parsing - **`SetWriter(Writer)`**: Set data writer for custom data serialization -**Data Transmission** +##### Data Transmission - **`WritePkg(pkg any, timeout time.Duration)`**: Send data packet, returns total bytes and successfully sent bytes - **`WriteBytes([]byte)`**: Send byte data - **`WriteBytesArray(...[]byte)`**: Send multiple byte arrays -**Attribute Management** +##### Attribute Management - **`GetAttribute(key any)`**: Get session attribute - **`SetAttribute(key any, value any)`**: Set session attribute - **`RemoveAttribute(key any)`**: Remove session attribute -**Statistics** +##### Statistics - **`Stat()`**: Get session statistics (connection status, read/write bytes, packet count, etc.) #### Active Time Update Mechanism diff --git a/README_CN.md b/README_CN.md index 2523452d..2732a767 100644 --- a/README_CN.md +++ b/README_CN.md @@ -249,6 +249,8 @@ type Session interface { SetPkgHandler(ReadWriter) SetReader(Reader) SetWriter(Writer) + GetActive() time.Time + UpdateActive() SetCronPeriod(int) SetWaitTime(time.Duration) SetReadTimeout(time.Duration) @@ -267,13 +269,13 @@ type Session interface { #### 主要方法说明 -#### 连接管理 +##### 连接管理 - **`Conn()`**: 获取底层网络连接对象 - **`IsClosed()`**: 检查会话是否已关闭 - **`Close()`**: 关闭会话连接 - **`Reset()`**: 重置会话状态 -#### 配置设置 +##### 配置设置 - **`SetName(string)`**: 设置会话名称 - **`SetMaxMsgLen(int)`**: 设置最大消息长度 - **`SetCronPeriod(int)`**: 设置心跳检测周期(毫秒) @@ -281,23 +283,23 @@ type Session interface { - **`SetReadTimeout(time.Duration)`**: 设置读取超时时间 - **`SetWriteTimeout(time.Duration)`**: 设置写入超时时间 -#### 处理器设置 +##### 处理器设置 - **`SetEventListener(EventListener)`**: 设置事件监听器,处理连接生命周期事件 - **`SetPkgHandler(ReadWriter)`**: 设置数据包处理器,负责解析和序列化网络数据 - **`SetReader(Reader)`**: 设置数据读取器,用于自定义数据解析 - **`SetWriter(Writer)`**: 设置数据写入器,用于自定义数据序列化 -#### 数据发送 +##### 数据发送 - **`WritePkg(pkg any, timeout time.Duration)`**: 发送数据包,返回总字节数和成功发送字节数 - **`WriteBytes([]byte)`**: 发送字节数据 - **`WriteBytesArray(...[]byte)`**: 发送多个字节数组 -#### 属性管理 +##### 属性管理 - **`GetAttribute(key any)`**: 获取会话属性 - **`SetAttribute(key any, value any)`**: 设置会话属性 - **`RemoveAttribute(key any)`**: 删除会话属性 -#### 统计信息 +##### 统计信息 - **`Stat()`**: 获取会话统计信息(连接状态、读写字节数、包数量等) #### 活跃时间更新机制