diff --git a/chatListener.go b/chatListener.go deleted file mode 100644 index 9b2843f..0000000 --- a/chatListener.go +++ /dev/null @@ -1,145 +0,0 @@ -package TF2RconWrapper - -import ( - "fmt" - "log" - "math/rand" - "net" - "strconv" - "sync" - "time" -) - -type RawMessage struct { - data []byte - n int //length -} - -func (r RawMessage) String() string { - return string(r.data) -} - -// RconChatListener maintains an UDP server that receives redirected chat messages from TF2 servers -type RconChatListener struct { - conn *net.UDPConn - servers map[string]*ServerListener - serversLock *sync.RWMutex - wait *sync.WaitGroup - addr *net.UDPAddr - localip string - port string - rng *rand.Rand -} - -// NewRconChatListener builds a new RconChatListener. Its arguments are localip (the ip of this server) and -// port (the port the listener will use) -func NewRconChatListener(localip, port string) (*RconChatListener, error) { - addr, err := net.ResolveUDPAddr("udp4", ":"+port) - if err != nil { - return nil, err - } - - servers := make(map[string]*ServerListener) - - rng := rand.New(rand.NewSource(time.Now().Unix())) - - listener := &RconChatListener{nil, servers, new(sync.RWMutex), new(sync.WaitGroup), addr, localip, port, rng} - listener.startListening() - return listener, nil -} - -func (r *RconChatListener) startListening() { - conn, err := net.ListenUDP("udp", r.addr) - r.conn = conn - if err != nil { - log.Fatal(err) - return - } - conn.SetReadBuffer(1048576) - go r.readStrings() -} - -func (r *RconChatListener) readStrings() { - rawMessageC := make(chan RawMessage, 500) - - go func() { - for { - raw := <-rawMessageC - r.wait.Wait() - message := raw.data[0:raw.n] - secret, err := getSecret(message) - if err != nil { - continue - } - - r.serversLock.RLock() - s, ok := r.servers[secret] - r.serversLock.RUnlock() - if ok { - s.RawMessages <- raw - } - } - }() - - for { - buff := make([]byte, 65000) - n, err := r.conn.Read(buff) - //log.Println(string(buff[0:n])) - - if err != nil { - fmt.Println("Error receiving server chat data: ", err) - continue - } - - rawMessageC <- RawMessage{buff, n} - } -} - -// Close stops the RconChatListener -func (s *ServerListener) Close(m *TF2RconConnection) { - s.listener.wait.Add(1) - delete(s.listener.servers, s.Secret) - s.listener.wait.Done() - - m.StopLogRedirection(s.listener.localip, s.listener.port) -} - -// CreateServerListener creates a ServerListener that receives chat messages from a -// particular TF2 server -func (r *RconChatListener) CreateServerListener(m *TF2RconConnection) *ServerListener { - - secret := strconv.Itoa(r.rng.Intn(999998) + 1) - - r.serversLock.RLock() - _, ok := r.servers[secret] - for ok { - secret = strconv.Itoa(r.rng.Intn(999998) + 1) - _, ok = r.servers[secret] - } - r.serversLock.RUnlock() - - return r.CreateListenerWithSecret(m, secret) -} - -func (r *RconChatListener) CreateListenerWithSecret(m *TF2RconConnection, secret string) *ServerListener { - s := &ServerListener{make(chan RawMessage, 10), m.host, secret, r} - - r.serversLock.Lock() - r.servers[secret] = s - //log.Println(r.servers) - r.serversLock.Unlock() - - m.Query("sv_logsecret " + secret) - m.RedirectLogs(r.localip, r.port) - return s -} - -// ServerListener represents a listener that receives chat messages from a particular -// TF2 server. It's built and managed by an RconChatListener instance. -type ServerListener struct { - RawMessages chan RawMessage - - host string - Secret string - listener *RconChatListener -} diff --git a/chatMessage.go b/chatMessage.go index 5e66188..07a51c2 100644 --- a/chatMessage.go +++ b/chatMessage.go @@ -62,14 +62,14 @@ type CvarData struct { const ( PlayerGlobalMessage = iota - PlayerTeamMessage = iota - PlayerChangedClass = iota - PlayerChangedTeam = iota + PlayerTeamMessage + PlayerChangedClass + PlayerChangedTeam - WorldPlayerConnected = iota - WorldPlayerDisconnected = iota - WorldGameOver = iota - ServerCvar = iota + WorldPlayerConnected + WorldPlayerDisconnected + WorldGameOver + ServerCvar LogFileClosed = iota ) @@ -83,77 +83,50 @@ var ( ErrInvalidPacket = errors.New("Invalid Packet") ) -func getSecret(data []byte) (string, error) { +func getSecret(data []byte) (string, int, error) { if !(len(data) > 6) { - return "", ErrInvalidPacket + return "", 0, ErrInvalidPacket } if data[4] != 0x53 { - return "", errors.New("Server trying to send a chat packet without a secret") + return "", 0, errors.New("Server trying to send a chat packet without a secret") } bytes := data[5:] - var pos int + pos := 5 for bytes[pos] != 0x20 { pos++ if pos >= len(bytes) { - return "", ErrInvalidPacket + return "", 0, ErrInvalidPacket } } secret := string(bytes[:pos-1]) if pos+1 >= len(data) { //No message/time data - return "", ErrInvalidPacket + return "", 0, ErrInvalidPacket } - return secret, nil + return secret, pos + 1, nil } -func ParseMessage(raw RawMessage) (LogMessage, error) { - textBytes := raw.data[0:raw.n] - if len(textBytes) <= 24 { - return LogMessage{}, ErrInvalidPacket - } - - packetType := textBytes[4] - - if packetType != 0x53 { - return LogMessage{}, errors.New("Server trying to send a chat packet without a secret") - } - - // drop the header - textBytes = textBytes[5:] - - pos := 0 - for textBytes[pos] != 0x20 { - pos++ - } - - textBytes = textBytes[pos+1:] - - text := string(textBytes) - - timeText := text[:21] - message := text[23:] +const refTime = "01/02/2006 - 15:04:05" - const refTime = "01/02/2006 - 15:04:05" +func parse(text string) LogMessage { + timeText := text[5:26] + message := text[28:] timeObj, _ := time.Parse(refTime, timeText) - - m := parse(message) - - return LogMessage{timeObj, text, m}, nil + return LogMessage{timeObj, text, ParseLine(message)} } -func parse(message string) ParsedMsg { +func ParseLine(message string) ParsedMsg { r := ParsedMsg{Type: -1} var m []string isPlayerMessage := false playerData := PlayerData{} - switch { case rPlayerGlobalMessage.MatchString(message): m = rPlayerGlobalMessage.FindStringSubmatch(message) diff --git a/chatMessage_test.go b/chatMessage_test.go index bd4fe79..575b7af 100644 --- a/chatMessage_test.go +++ b/chatMessage_test.go @@ -1,11 +1,12 @@ package TF2RconWrapper import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) -var logs []string = []string{ +var logs = []string{ `"Sk1LL0<2><[U:1:198288660]>" joined team "Red"`, `"Sk1LL0<2><[U:1:198288660]>" changed role to "scout"`, `"Sk1LL0<2><[U:1:198288660]>" changed role to "soldier"`, @@ -20,7 +21,7 @@ var logs []string = []string{ func TestParse(t *testing.T) { for i := range logs { - m := parse(logs[i]) + m := ParseLine(logs[i]) switch i { diff --git a/logListen.go b/logListen.go new file mode 100644 index 0000000..8736528 --- /dev/null +++ b/logListen.go @@ -0,0 +1,203 @@ +package TF2RconWrapper + +import ( + "bytes" + "log" + "math/rand" + "net" + "strconv" + "sync" + "time" +) + +type Handler interface { + PlayerConnected(PlayerData) + PlayerDisconnected(PlayerData) + + PlayerGlobalMessage(PlayerData, string) + PlayerTeamMessage(PlayerData, string) + + PlayerClassChange(PlayerData, string) // string -> new classes + PlayerTeamChange(PlayerData, string) // string -> new team + + GameOver() + + CVarChange(variable string, value string) + LogFileClosed() +} + +type Listener struct { + mapMu *sync.RWMutex + sources map[string]*Source + channels map[string](chan string) + + listenAddr *net.UDPAddr + redirectAddr string +} + +type Source struct { + Secret string + logsMu *sync.RWMutex //protects logs + logs *bytes.Buffer + + handlerMu *sync.Mutex //protects handler and closed + handler Handler + closed bool +} + +func (s *Source) Logs() *bytes.Buffer { + s.logsMu.RLock() + b := s.logs.Bytes() + var logs []byte + copy(b, logs) + s.logsMu.RUnlock() + + return bytes.NewBuffer(logs) +} + +// NewListener returns a new Listener +func NewListener(addr string) (*Listener, error) { + return NewListenerAddr(addr, addr) +} + +func NewListenerAddr(port, redirectAddr string) (*Listener, error) { + addr, err := net.ResolveUDPAddr("udp", ":"+port) + if err != nil { + return nil, err + } + + l := &Listener{ + mapMu: new(sync.RWMutex), + sources: make(map[string]*Source), + channels: make(map[string](chan string)), + + listenAddr: addr, + redirectAddr: redirectAddr, + } + + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + + go l.start(conn) + return l, nil +} + +func (l *Listener) RemoveSource(s *Source, m *TF2RconConnection) { + s.handlerMu.Lock() + s.closed = true + s.handlerMu.Unlock() + + l.mapMu.Lock() + delete(l.sources, s.Secret) + l.mapMu.Unlock() + + m.StopLogRedirection(l.redirectAddr) +} + +func (l *Listener) start(conn *net.UDPConn) { + for { + buff := make([]byte, 512) + n, err := conn.Read(buff) + if err != nil { + log.Println(err) + } + + secret, pos, err := getSecret(buff[0:n]) + if err != nil { + continue + } + + l.mapMu.RLock() + source, ok := l.sources[secret] + l.mapMu.RUnlock() + + if !ok { + continue + } + + go func() { + source.handlerMu.Lock() + defer source.handlerMu.Unlock() + + if source.closed { + return + } + + source.logsMu.Lock() + source.logs.WriteString("L ") + source.logs.Write(buff[pos : n-2]) + source.logs.WriteByte('\n') + source.logsMu.Unlock() + + handler := source.handler + + m := parse(string(buff[pos : n-2])) + + switch m.Parsed.Type { + case PlayerGlobalMessage: + d := m.Parsed.Data.(PlayerData) + handler.PlayerGlobalMessage(d, d.Text) + case PlayerTeamMessage: + d := m.Parsed.Data.(PlayerData) + handler.PlayerTeamMessage(d, d.Text) + case PlayerChangedClass: + d := m.Parsed.Data.(PlayerData) + handler.PlayerClassChange(d, d.Class) + case PlayerChangedTeam: + d := m.Parsed.Data.(PlayerData) + handler.PlayerTeamChange(d, d.NewTeam) + case WorldPlayerConnected: + d := m.Parsed.Data.(PlayerData) + handler.PlayerConnected(d) + case WorldPlayerDisconnected: + d := m.Parsed.Data.(PlayerData) + handler.PlayerDisconnected(d) + case WorldGameOver: + handler.GameOver() + case ServerCvar: + d := m.Parsed.Data.(CvarData) + handler.CVarChange(d.Variable, d.Value) + } + }() + } +} + +func (l *Listener) AddSource(handler Handler, m *TF2RconConnection) *Source { + secret := strconv.Itoa(rand.Intn(999998) + 1) + rand.Seed(time.Now().Unix()) + + l.mapMu.RLock() + _, ok := l.sources[secret] + for ok { + secret = strconv.Itoa(rand.Intn(999998) + 1) + _, ok = l.sources[secret] + } + l.mapMu.RUnlock() + + return l.AddSourceSecret(secret, handler, m) +} + +func (l *Listener) AddSourceSecret(secret string, handler Handler, m *TF2RconConnection) *Source { + s := newSource(secret, handler) + + l.mapMu.Lock() + l.sources[secret] = s + l.mapMu.Unlock() + + m.Query("sv_logsecret " + secret) + m.RedirectLogs(l.redirectAddr) + return s +} + +func newSource(secret string, handler Handler) *Source { + return &Source{ + Secret: secret, + logsMu: new(sync.RWMutex), + logs: new(bytes.Buffer), + + handlerMu: new(sync.Mutex), + handler: handler, + } +} diff --git a/tf2RconConnection.go b/tf2RconConnection.go index 7b25da7..a56b1d6 100644 --- a/tf2RconConnection.go +++ b/tf2RconConnection.go @@ -24,6 +24,11 @@ var ( CVarValueRegex = regexp.MustCompile(`^"(?:.*?)" = "(.*?)"`) ) +func (c *TF2RconConnection) QueryNoResp(req string) error { + _, err := c.rc.Write(req) + return err +} + // Query executes a query and returns the server responses func (c *TF2RconConnection) Query(req string) (string, error) { reqID, reqErr := c.rc.Write(req) @@ -262,14 +267,14 @@ func (c *TF2RconConnection) RemoveTag(tagName string) error { } // RedirectLogs send the logaddress_add command -func (c *TF2RconConnection) RedirectLogs(ip string, port string) error { - query := "logaddress_add " + ip + ":" + port +func (c *TF2RconConnection) RedirectLogs(addr string) error { + query := "logaddress_add " + addr _, err := c.Query(query) return err } -func (c *TF2RconConnection) StopLogRedirection(localip string, port string) { - query := fmt.Sprintf("logaddress_del %s:%s", localip, port) +func (c *TF2RconConnection) StopLogRedirection(addr string) { + query := fmt.Sprintf("logaddress_del %s", addr) c.Query(query) }