From 75dc954eb247400cb422844b95c3317fa1c71aa8 Mon Sep 17 00:00:00 2001 From: Vibhav Pant Date: Sun, 14 Feb 2016 00:09:44 +0530 Subject: [PATCH 1/6] listener rewrite --- chatListener.go | 145 ------------------------------- chatMessage.go | 59 ++++--------- chatMessage_test.go | 7 +- logListen.go | 202 +++++++++++++++++++++++++++++++++++++++++++ tf2RconConnection.go | 8 +- 5 files changed, 227 insertions(+), 194 deletions(-) delete mode 100644 chatListener.go create mode 100644 logListen.go diff --git a/chatListener.go b/chatListener.go deleted file mode 100644 index 9b2843f..0000000 --- a/chatListener.go +++ /dev/null @@ -1,145 +0,0 @@ -package TF2RconWrapper - -import ( - "fmt" - "log" - "math/rand" - "net" - "strconv" - "sync" - "time" -) - -type RawMessage struct { - data []byte - n int //length -} - -func (r RawMessage) String() string { - return string(r.data) -} - -// RconChatListener maintains an UDP server that receives redirected chat messages from TF2 servers -type RconChatListener struct { - conn *net.UDPConn - servers map[string]*ServerListener - serversLock *sync.RWMutex - wait *sync.WaitGroup - addr *net.UDPAddr - localip string - port string - rng *rand.Rand -} - -// NewRconChatListener builds a new RconChatListener. Its arguments are localip (the ip of this server) and -// port (the port the listener will use) -func NewRconChatListener(localip, port string) (*RconChatListener, error) { - addr, err := net.ResolveUDPAddr("udp4", ":"+port) - if err != nil { - return nil, err - } - - servers := make(map[string]*ServerListener) - - rng := rand.New(rand.NewSource(time.Now().Unix())) - - listener := &RconChatListener{nil, servers, new(sync.RWMutex), new(sync.WaitGroup), addr, localip, port, rng} - listener.startListening() - return listener, nil -} - -func (r *RconChatListener) startListening() { - conn, err := net.ListenUDP("udp", r.addr) - r.conn = conn - if err != nil { - log.Fatal(err) - return - } - conn.SetReadBuffer(1048576) - go r.readStrings() -} - -func (r *RconChatListener) readStrings() { - rawMessageC := make(chan RawMessage, 500) - - go func() { - for { - raw := <-rawMessageC - r.wait.Wait() - message := raw.data[0:raw.n] - secret, err := getSecret(message) - if err != nil { - continue - } - - r.serversLock.RLock() - s, ok := r.servers[secret] - r.serversLock.RUnlock() - if ok { - s.RawMessages <- raw - } - } - }() - - for { - buff := make([]byte, 65000) - n, err := r.conn.Read(buff) - //log.Println(string(buff[0:n])) - - if err != nil { - fmt.Println("Error receiving server chat data: ", err) - continue - } - - rawMessageC <- RawMessage{buff, n} - } -} - -// Close stops the RconChatListener -func (s *ServerListener) Close(m *TF2RconConnection) { - s.listener.wait.Add(1) - delete(s.listener.servers, s.Secret) - s.listener.wait.Done() - - m.StopLogRedirection(s.listener.localip, s.listener.port) -} - -// CreateServerListener creates a ServerListener that receives chat messages from a -// particular TF2 server -func (r *RconChatListener) CreateServerListener(m *TF2RconConnection) *ServerListener { - - secret := strconv.Itoa(r.rng.Intn(999998) + 1) - - r.serversLock.RLock() - _, ok := r.servers[secret] - for ok { - secret = strconv.Itoa(r.rng.Intn(999998) + 1) - _, ok = r.servers[secret] - } - r.serversLock.RUnlock() - - return r.CreateListenerWithSecret(m, secret) -} - -func (r *RconChatListener) CreateListenerWithSecret(m *TF2RconConnection, secret string) *ServerListener { - s := &ServerListener{make(chan RawMessage, 10), m.host, secret, r} - - r.serversLock.Lock() - r.servers[secret] = s - //log.Println(r.servers) - r.serversLock.Unlock() - - m.Query("sv_logsecret " + secret) - m.RedirectLogs(r.localip, r.port) - return s -} - -// ServerListener represents a listener that receives chat messages from a particular -// TF2 server. It's built and managed by an RconChatListener instance. -type ServerListener struct { - RawMessages chan RawMessage - - host string - Secret string - listener *RconChatListener -} diff --git a/chatMessage.go b/chatMessage.go index 5e66188..5ce6f28 100644 --- a/chatMessage.go +++ b/chatMessage.go @@ -62,14 +62,14 @@ type CvarData struct { const ( PlayerGlobalMessage = iota - PlayerTeamMessage = iota - PlayerChangedClass = iota - PlayerChangedTeam = iota + PlayerTeamMessage + PlayerChangedClass + PlayerChangedTeam - WorldPlayerConnected = iota - WorldPlayerDisconnected = iota - WorldGameOver = iota - ServerCvar = iota + WorldPlayerConnected + WorldPlayerDisconnected + WorldGameOver + ServerCvar LogFileClosed = iota ) @@ -83,13 +83,13 @@ var ( ErrInvalidPacket = errors.New("Invalid Packet") ) -func getSecret(data []byte) (string, error) { +func getSecret(data []byte) (string, int, error) { if !(len(data) > 6) { - return "", ErrInvalidPacket + return "", 0, ErrInvalidPacket } if data[4] != 0x53 { - return "", errors.New("Server trying to send a chat packet without a secret") + return "", 0, errors.New("Server trying to send a chat packet without a secret") } bytes := data[5:] @@ -98,56 +98,31 @@ func getSecret(data []byte) (string, error) { for bytes[pos] != 0x20 { pos++ if pos >= len(bytes) { - return "", ErrInvalidPacket + return "", 0, ErrInvalidPacket } } secret := string(bytes[:pos-1]) if pos+1 >= len(data) { //No message/time data - return "", ErrInvalidPacket + return "", 0, ErrInvalidPacket } - return secret, nil + return secret, pos + 1, nil } -func ParseMessage(raw RawMessage) (LogMessage, error) { - textBytes := raw.data[0:raw.n] - if len(textBytes) <= 24 { - return LogMessage{}, ErrInvalidPacket - } - - packetType := textBytes[4] - - if packetType != 0x53 { - return LogMessage{}, errors.New("Server trying to send a chat packet without a secret") - } - - // drop the header - textBytes = textBytes[5:] - - pos := 0 - for textBytes[pos] != 0x20 { - pos++ - } - - textBytes = textBytes[pos+1:] - - text := string(textBytes) +const refTime = "01/02/2006 - 15:04:05" +func parse(text string) LogMessage { timeText := text[:21] message := text[23:] - const refTime = "01/02/2006 - 15:04:05" - timeObj, _ := time.Parse(refTime, timeText) - m := parse(message) - - return LogMessage{timeObj, text, m}, nil + return LogMessage{timeObj, text, ParseLine(message)} } -func parse(message string) ParsedMsg { +func ParseLine(message string) ParsedMsg { r := ParsedMsg{Type: -1} var m []string diff --git a/chatMessage_test.go b/chatMessage_test.go index bd4fe79..575b7af 100644 --- a/chatMessage_test.go +++ b/chatMessage_test.go @@ -1,11 +1,12 @@ package TF2RconWrapper import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) -var logs []string = []string{ +var logs = []string{ `"Sk1LL0<2><[U:1:198288660]>" joined team "Red"`, `"Sk1LL0<2><[U:1:198288660]>" changed role to "scout"`, `"Sk1LL0<2><[U:1:198288660]>" changed role to "soldier"`, @@ -20,7 +21,7 @@ var logs []string = []string{ func TestParse(t *testing.T) { for i := range logs { - m := parse(logs[i]) + m := ParseLine(logs[i]) switch i { diff --git a/logListen.go b/logListen.go new file mode 100644 index 0000000..6ff0a7a --- /dev/null +++ b/logListen.go @@ -0,0 +1,202 @@ +package TF2RconWrapper + +import ( + "bytes" + "log" + "math/rand" + "net" + "strconv" + "sync" + "time" +) + +type Handler interface { + PlayerConnected(PlayerData) + PlayerDisconnected(PlayerData) + + PlayerGlobalMessage(PlayerData, string) + PlayerTeamMessage(PlayerData, string) + + PlayerClassChange(PlayerData, string) // string -> new classes + PlayerTeamChange(PlayerData, string) // string -> new team + + GameOver() + + CVarChange(variable string, value string) + LogFileClosed() +} + +type Listener struct { + mapMu *sync.RWMutex + sources map[string]*Source + channels map[string](chan string) + + listenAddr *net.UDPAddr + redirectAddr string +} + +type Source struct { + Secret string + logsMu *sync.RWMutex //protects logs + logs *bytes.Buffer + + handlerMu *sync.Mutex //protects handler and closed + handler Handler + closed bool +} + +func (s *Source) Logs() *bytes.Buffer { + s.logsMu.RLock() + b := s.logs.Bytes() + var logs []byte + copy(b, logs) + s.logsMu.RUnlock() + + return bytes.NewBuffer(logs) +} + +// NewListener returns a new Listener +func NewListener(addr string) *Listener { + return NewListenerAddr(addr, addr) +} + +func NewListenerAddr(listenAddr string, redirectAddr string) *Listener { + addr, err := net.ResolveUDPAddr("udp", listenAddr) + if err != nil { + panic(err) + } + + return &Listener{ + mapMu: new(sync.RWMutex), + sources: make(map[string]*Source), + channels: make(map[string](chan string)), + + listenAddr: addr, + redirectAddr: redirectAddr, + } +} + +func (l *Listener) RemoveSource(s *Source, m *TF2RconConnection) { + s.handlerMu.Lock() + s.closed = true + s.handlerMu.Unlock() + + l.mapMu.Lock() + delete(l.sources, s.Secret) + l.mapMu.Unlock() + + m.StopLogRedirection(l.redirectAddr) +} + +func (l *Listener) start() { + log.SetFlags(log.Llongfile) + + conn, err := net.ListenUDP("udp4", l.listenAddr) + if err != nil { + panic(err) + } + + buff := make([]byte, 512) + + for { + n, err := conn.Read(buff) + if err != nil { + log.Println(err) + } + + secret, pos, err := getSecret(buff[0:n]) + if err != nil { + continue + } + + l.mapMu.RLock() + source, ok := l.sources[secret] + l.mapMu.RUnlock() + + if !ok { + continue + } + + go func() { + source.handlerMu.Lock() + defer source.handlerMu.Unlock() + + if source.closed { + return + } + + source.logsMu.Lock() + source.logs.WriteString("L ") + source.logs.Write(buff[pos : n-2]) + source.logs.WriteByte('\n') + source.logsMu.Unlock() + + handler := source.handler + + m := parse(string(buff[pos:])) + + switch m.Parsed.Type { + case PlayerGlobalMessage: + d := m.Parsed.Data.(PlayerData) + handler.PlayerGlobalMessage(d, d.Text) + case PlayerTeamMessage: + d := m.Parsed.Data.(PlayerData) + handler.PlayerTeamMessage(d, d.Text) + case PlayerChangedClass: + d := m.Parsed.Data.(PlayerData) + handler.PlayerClassChange(d, d.Class) + case PlayerChangedTeam: + d := m.Parsed.Data.(PlayerData) + handler.PlayerTeamChange(d, d.NewTeam) + case WorldPlayerConnected: + d := m.Parsed.Data.(PlayerData) + handler.PlayerConnected(d) + case WorldPlayerDisconnected: + d := m.Parsed.Data.(PlayerData) + handler.PlayerDisconnected(d) + case WorldGameOver: + handler.GameOver() + case ServerCvar: + d := m.Parsed.Data.(CvarData) + handler.CVarChange(d.Variable, d.Value) + } + }() + } +} + +func (l *Listener) AddSource(handler Handler, m *TF2RconConnection) *Source { + secret := strconv.Itoa(rand.Intn(999998) + 1) + rand.Seed(time.Now().Unix()) + + l.mapMu.RLock() + _, ok := l.sources[secret] + for ok { + _, ok = l.sources[secret] + } + l.mapMu.Unlock() + + return l.AddSourceSecret(secret, handler, m) +} + +func (l *Listener) AddSourceSecret(secret string, handler Handler, m *TF2RconConnection) *Source { + s := newSource(secret, handler) + + l.mapMu.Lock() + l.sources[secret] = s + l.mapMu.Unlock() + + m.Query("sv_logsecret " + secret) + m.RedirectLogs(l.redirectAddr) + return s +} + +func newSource(secret string, handler Handler) *Source { + return &Source{ + Secret: secret, + logsMu: new(sync.RWMutex), + logs: new(bytes.Buffer), + + handlerMu: new(sync.Mutex), + handler: handler, + } +} diff --git a/tf2RconConnection.go b/tf2RconConnection.go index 7b25da7..94c4001 100644 --- a/tf2RconConnection.go +++ b/tf2RconConnection.go @@ -262,14 +262,14 @@ func (c *TF2RconConnection) RemoveTag(tagName string) error { } // RedirectLogs send the logaddress_add command -func (c *TF2RconConnection) RedirectLogs(ip string, port string) error { - query := "logaddress_add " + ip + ":" + port +func (c *TF2RconConnection) RedirectLogs(addr string) error { + query := "logaddress_add " + addr _, err := c.Query(query) return err } -func (c *TF2RconConnection) StopLogRedirection(localip string, port string) { - query := fmt.Sprintf("logaddress_del %s:%s", localip, port) +func (c *TF2RconConnection) StopLogRedirection(addr string) { + query := fmt.Sprintf("logaddress_del %s", addr) c.Query(query) } From 92297fede2e8c0f7e3178d3dc8abd2499e41578c Mon Sep 17 00:00:00 2001 From: Vibhav Pant Date: Sun, 14 Feb 2016 00:37:10 +0530 Subject: [PATCH 2/6] Return errors. --- logListen.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/logListen.go b/logListen.go index 6ff0a7a..f72a464 100644 --- a/logListen.go +++ b/logListen.go @@ -56,17 +56,17 @@ func (s *Source) Logs() *bytes.Buffer { } // NewListener returns a new Listener -func NewListener(addr string) *Listener { +func NewListener(addr string) (*Listener, error) { return NewListenerAddr(addr, addr) } -func NewListenerAddr(listenAddr string, redirectAddr string) *Listener { +func NewListenerAddr(listenAddr string, redirectAddr string) (*Listener, error) { addr, err := net.ResolveUDPAddr("udp", listenAddr) if err != nil { - panic(err) + return nil, err } - return &Listener{ + l := &Listener{ mapMu: new(sync.RWMutex), sources: make(map[string]*Source), channels: make(map[string](chan string)), @@ -74,6 +74,14 @@ func NewListenerAddr(listenAddr string, redirectAddr string) *Listener { listenAddr: addr, redirectAddr: redirectAddr, } + + conn, err := net.ListenUDP("udp", l.listenAddr) + if err != nil { + return nil, err + } + + go l.start(conn) + return l, nil } func (l *Listener) RemoveSource(s *Source, m *TF2RconConnection) { @@ -88,14 +96,7 @@ func (l *Listener) RemoveSource(s *Source, m *TF2RconConnection) { m.StopLogRedirection(l.redirectAddr) } -func (l *Listener) start() { - log.SetFlags(log.Llongfile) - - conn, err := net.ListenUDP("udp4", l.listenAddr) - if err != nil { - panic(err) - } - +func (l *Listener) start(conn *net.UDPConn) { buff := make([]byte, 512) for { From e9b794d42bb56a68998ee27bfd9c116a56914a1d Mon Sep 17 00:00:00 2001 From: Vibhav Pant Date: Sun, 14 Feb 2016 01:02:09 +0530 Subject: [PATCH 3/6] NewListenerAddr: take port address --- logListen.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/logListen.go b/logListen.go index f72a464..0cc417c 100644 --- a/logListen.go +++ b/logListen.go @@ -60,8 +60,8 @@ func NewListener(addr string) (*Listener, error) { return NewListenerAddr(addr, addr) } -func NewListenerAddr(listenAddr string, redirectAddr string) (*Listener, error) { - addr, err := net.ResolveUDPAddr("udp", listenAddr) +func NewListenerAddr(port, redirectAddr string) (*Listener, error) { + addr, err := net.ResolveUDPAddr("udp", ":"+port) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func NewListenerAddr(listenAddr string, redirectAddr string) (*Listener, error) redirectAddr: redirectAddr, } - conn, err := net.ListenUDP("udp", l.listenAddr) + conn, err := net.ListenUDP("udp", addr) if err != nil { return nil, err } From a0140f664a69c5f4679d49e7873f3deb12ca6c42 Mon Sep 17 00:00:00 2001 From: Vibhav Pant Date: Sun, 14 Feb 2016 01:55:00 +0530 Subject: [PATCH 4/6] AddSource: call RUnlock --- logListen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logListen.go b/logListen.go index 0cc417c..96080c5 100644 --- a/logListen.go +++ b/logListen.go @@ -174,7 +174,7 @@ func (l *Listener) AddSource(handler Handler, m *TF2RconConnection) *Source { for ok { _, ok = l.sources[secret] } - l.mapMu.Unlock() + l.mapMu.RUnlock() return l.AddSourceSecret(secret, handler, m) } From 1ebcc83e7b5e571fda62db9912083d200d2fafe3 Mon Sep 17 00:00:00 2001 From: Vibhav Pant Date: Sun, 14 Feb 2016 01:55:14 +0530 Subject: [PATCH 5/6] Add QueryNoResp --- tf2RconConnection.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tf2RconConnection.go b/tf2RconConnection.go index 94c4001..a56b1d6 100644 --- a/tf2RconConnection.go +++ b/tf2RconConnection.go @@ -24,6 +24,11 @@ var ( CVarValueRegex = regexp.MustCompile(`^"(?:.*?)" = "(.*?)"`) ) +func (c *TF2RconConnection) QueryNoResp(req string) error { + _, err := c.rc.Write(req) + return err +} + // Query executes a query and returns the server responses func (c *TF2RconConnection) Query(req string) (string, error) { reqID, reqErr := c.rc.Write(req) From 8647948428f17445383d9716539f1bfd864bca9f Mon Sep 17 00:00:00 2001 From: Vibhav Pant Date: Sun, 14 Feb 2016 03:18:06 +0530 Subject: [PATCH 6/6] Fix indices --- chatMessage.go | 8 +++----- logListen.go | 6 +++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/chatMessage.go b/chatMessage.go index 5ce6f28..07a51c2 100644 --- a/chatMessage.go +++ b/chatMessage.go @@ -93,7 +93,7 @@ func getSecret(data []byte) (string, int, error) { } bytes := data[5:] - var pos int + pos := 5 for bytes[pos] != 0x20 { pos++ @@ -114,11 +114,10 @@ func getSecret(data []byte) (string, int, error) { const refTime = "01/02/2006 - 15:04:05" func parse(text string) LogMessage { - timeText := text[:21] - message := text[23:] + timeText := text[5:26] + message := text[28:] timeObj, _ := time.Parse(refTime, timeText) - return LogMessage{timeObj, text, ParseLine(message)} } @@ -128,7 +127,6 @@ func ParseLine(message string) ParsedMsg { isPlayerMessage := false playerData := PlayerData{} - switch { case rPlayerGlobalMessage.MatchString(message): m = rPlayerGlobalMessage.FindStringSubmatch(message) diff --git a/logListen.go b/logListen.go index 96080c5..8736528 100644 --- a/logListen.go +++ b/logListen.go @@ -97,9 +97,8 @@ func (l *Listener) RemoveSource(s *Source, m *TF2RconConnection) { } func (l *Listener) start(conn *net.UDPConn) { - buff := make([]byte, 512) - for { + buff := make([]byte, 512) n, err := conn.Read(buff) if err != nil { log.Println(err) @@ -134,7 +133,7 @@ func (l *Listener) start(conn *net.UDPConn) { handler := source.handler - m := parse(string(buff[pos:])) + m := parse(string(buff[pos : n-2])) switch m.Parsed.Type { case PlayerGlobalMessage: @@ -172,6 +171,7 @@ func (l *Listener) AddSource(handler Handler, m *TF2RconConnection) *Source { l.mapMu.RLock() _, ok := l.sources[secret] for ok { + secret = strconv.Itoa(rand.Intn(999998) + 1) _, ok = l.sources[secret] } l.mapMu.RUnlock()