-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.go
More file actions
133 lines (123 loc) · 2.79 KB
/
server.go
File metadata and controls
133 lines (123 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"fmt"
"io"
"net"
"sync"
"time"
)
//server端构建
type Server struct {
// 服务器监听的端口
IP string
// 服务器监听的ip
Port int
// 在线用户map
OnlineMap map[string]*User
// 针对OnlineMap的读写锁
mapLock sync.RWMutex
// 用于广播消息的channel
MsgChan chan string
}
// 创建一个新的服务器实例
func NewServer(ip string, port int) *Server {
//新建一个server对象?
server := &Server{
IP: ip,
Port: port,
OnlineMap: make(map[string]*User),
MsgChan: make(chan string),
}
return server
}
// 处理广播消息的监听,谁发的+发的啥
func (s *Server) BroadCast(u *User, msg string) {
// 广播消息格式:[userName] say: msg
sendMsg := "[" + u.Addr + "]" + u.Name + ":" + msg
s.MsgChan <- sendMsg
}
// 监听MsgChannel,并投送用户channel的协程
func (s *Server) ListennerMessage() {
// 监听MsgChannel
for {
msg := <-s.MsgChan
// 广播消息给所有用户
s.mapLock.Lock()
for _, u := range s.OnlineMap {
u.UserChan <- msg
}
s.mapLock.Unlock()
}
}
func (s *Server) Handler(conn net.Conn) {
//...处理当前链接的业务
// 创建一个绑定user的User对象
user := NewUser(conn, s)
isAlive := make(chan bool)
//上线
user.OnLine()
// 接收客户端消息
//只处理一个任务没必要再开一个goroutine
// 🔥 多任务,修正点 1:必须开启一个匿名协程专门负责读取消息
go func() {
buf := make([]byte, 4096)
// 无限循环读取
for {
//多任务如果不单开协程,此处会阻塞
n, err := conn.Read(buf)
if n == 0 {
user.OffLine()
return
}
if err != nil && err != io.EOF {
fmt.Println("conn.Read err:", err)
return
}
msg := string(buf[:n-1])
user.DoMessage(msg)
//有发消息,则重置alive
isAlive <- true
}
}()
//使用select监听事件,触发一个就会重进select
for {
select {
case <-time.After(100 * time.Second):
// 超时处理
user.SendMsg("超时被踢\n")
//销毁资源
close(user.UserChan)
//关闭连接
conn.Close()
//退出handler
return
case <-isAlive:
//只为了重置select
}
}
}
// 启动服务器
func (s *Server) Start() {
//socket listen
//使用tcp连接
listenner, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("net.Listen err:", err)
}
//最终关闭listenner
defer listenner.Close()
//启动广播监听投送程序
go s.ListennerMessage()
//不断监听新连接,所以用无限for循环
for {
//accept
//接收一个连接请求
conn, err := listenner.Accept()
if err != nil {
fmt.Println("listener.Accept err:", err)
}
//do handler
//开启一个go协程,处理连接的业务需求,将conn作为参数传递
go s.Handler(conn)
}
}