package pool import ( "errors" "fmt" "pool/pool/util/queue" "time" ) var ( //最大连接池缓冲处理连接对像管道长度 MaxClientChannelLen = 10240 //最大全局广播缓冲处理管道长度 MaxBroadcastQueueLen = 4096 //最大频道广播缓冲处理管道长度 MaxChanBroadcastQueueLen = 4096 //最大接收消息缓冲处理管道长度 MaxRecvChLen = 10240 //最大发送消息缓冲处理管道长度 MaxSendChLen = 10240 //最大会话数量 MaxConversationNum = 10240 ) // Hub maintains the set of active clients and broadcasts messages to the // clients. type hub struct { // Registered clients. clients map[string]*Client //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了 oldClients map[string]*Client //缓存断开的连接消息队列 // Inbound messages from the clients. //可以用于广播所有连接对象 broadcastQueue chan *SendMsg //广播指定频道的管道 chanBroadcastQueue chan *SendMsg // Register requests from the clients. register chan *Client // Unregister requests from clients. unregister chan string //conversation []*Conversation } //重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效) type oldMsg struct { list *queue.PriorityQueue Expiration time.Time //过期时间 } func newHub() *hub { return &hub{ //conversation: make([]*Conversation, MaxConversationNum), register: make(chan *Client, MaxClientChannelLen), unregister: make(chan string, MaxClientChannelLen), clients: make(map[string]*Client), oldClients: make(map[string]*Client), broadcastQueue: make(chan *SendMsg, MaxBroadcastQueueLen), chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen), } } func (h *hub) run() { loop: for { select { case id, ok := <-h.unregister: if !ok { break loop } c, _ := h.clients[id] if c != nil { delete(h.clients, id) } fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients)) case client, ok := <-h.register: if !ok { break loop } h.clients[client.Id] = client fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients)) case broadcastMsg, ok := <-h.broadcastQueue: if !ok { break loop } for k, v := range h.clients { if v != nil { client := v broadcastMsg.ToClientId = k client.send(broadcastMsg) } } case chanBroadcastMsg, ok := <-h.chanBroadcastQueue: if !ok { break loop } //广播指定频道的消息处理 //h.clients.Iterator(func(id string, v interface{}) bool { for k, v := range h.clients { if v != nil { client := v for _, ch := range chanBroadcastMsg.Channel { if searchStrArray(client.channel, ch) { chanBroadcastMsg.ToClientId = k client.send(chanBroadcastMsg) } } } } // return true //}) } } } func (h *hub) ticker() { //定时清理清理缓存的旧的连接对像 //gtimer.AddSingleton(30*time.Second, func() { // if len(h.oldClients) > 0 { // for _, v := range h.oldClients { // //h.oldClients.Iterator(func(k string, v interface{}) bool { // if v != nil { // client := v // if time.Now().Add(-180 * time.Second).After(client.CloseTime) { // //3分钟后清理组存中的旧连接对像 // h.clearOldClient(client) // /// h.clearOldClient <- client // } // } // // return true // //}) // } // } //}) } func (h *hub) AddClient(client *Client) error { timeout := time.NewTimer(time.Second * 3) defer timeout.Stop() select { case h.register <- client: return nil case <-timeout.C: return errors.New("AddClient register消息管道blocked,写入消息超时") } } func (h *hub) clearOldClient(client *Client) { close(client.recvCh) close(client.sendCh) delete(h.oldClients, client.Id) fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients)) } func (h *hub) RemoveClient(client *Client) error { //把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像 client.CloseTime = time.Now() //h.oldClients[client.Id] = client timeout := time.NewTimer(time.Second * 1) defer timeout.Stop() select { case h.unregister <- client.Id: return nil case <-timeout.C: return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时") } }