diff --git a/pool/client.go b/pool/client.go index 9817442266be37f0477b605e610ad598bff8f398..ed90171172a23f8b1d5e358ab9b816241937cab5 100755 --- a/pool/client.go +++ b/pool/client.go @@ -43,10 +43,11 @@ var upgrader = websocket.Upgrader{ //杩炴帴鍙傛暟缁撴瀯浣� type Config struct { - Id string //鏍囪瘑杩炴帴鐨勫悕绉� + Id string //鏍囪瘑杩炴帴鐨勫悕绉�/鐢ㄦ埛缁勬爣璇� Type string //杩炴帴绫诲瀷鎴杙ath Channel []string //杩炴帴娉ㄥ唽棰戦亾绫诲瀷鏂逛究骞挎挱绛夋搷浣溿€傚仛涓轰竴涓暟缁勫瓨鍌ㄣ€傚洜涓轰竴涓繛鎺ュ彲浠ュ睘澶氫釜棰戦亾 Goroutine int //姣忎釜杩炴帴寮€鍚殑go绋嬫暟閲� 榛樿涓�10 + User string //鐢ㄦ埛鍞竴鏍囪瘑 } type RuntimeInfo struct { @@ -85,6 +86,7 @@ type Client struct { lastReceiveTime time.Time //鏈€鍚庝竴娆℃帴鏀跺埌鏁版嵁鐨勬椂闂� lastSendTime time.Time //鏈€鍚庝竴娆″彂閫佹暟鎹殑鏃堕棿 Id string //鏍囪瘑杩炴帴鐨勫悕绉� + User string //鏍囪瘑杩炴帴鐨勫悕绉� mux *sync.Mutex IsClose chan bool //杩炴帴鐨勭姸鎬併€倀rue涓哄叧闂� channel []string //杩炴帴娉ㄥ唽棰戦亾绫诲瀷鏂逛究骞挎挱绛夋搷浣溿€傚仛涓轰竴涓暟缁勫瓨鍌ㄣ€傚洜涓轰竴涓繛鎺ュ彲浠ュ睘澶氫釜棰戦亾 @@ -141,7 +143,7 @@ func (c *Client) readPump() { close(c.sendPing) c.grpool.Close() c.hub.RemoveClient(c) - DelClient(c.Id) + DelClient(c.Id,c.User) dump() }() Loop: diff --git a/pool/hub.go b/pool/hub.go index 2e4626de41da0c993bfc0bd23589d56766044f6d..d6c4982ae28e28e84913aa59b080a176c5cf4953 100755 --- a/pool/hub.go +++ b/pool/hub.go @@ -27,8 +27,8 @@ var ( // clients. type hub struct { // Registered clients. - clients map[string]*Client //鏂扮殑澶勭悊鏂瑰紡鏈夋病鏈夌嚎绋嬫晥鐜囦細鏇撮珮,鎵€浠afeMap鐨勯攣澶勭悊閮藉幓鎺変簡 - oldClients map[string]*Client //缂撳瓨鏂紑鐨勮繛鎺ユ秷鎭槦鍒� + clients map[string]map[string]*Client //鏂扮殑澶勭悊鏂瑰紡鏈夋病鏈夌嚎绋嬫晥鐜囦細鏇撮珮,鎵€浠afeMap鐨勯攣澶勭悊閮藉幓鎺変簡 + oldClients map[string]map[string]*Client //缂撳瓨鏂紑鐨勮繛鎺ユ秷鎭槦鍒� // Inbound messages from the clients. //鍙互鐢ㄤ簬骞挎挱鎵€鏈夎繛鎺ュ璞� @@ -57,8 +57,8 @@ func newHub() *hub { //conversation: make([]*Conversation, MaxConversationNum), register: make(chan *Client, MaxClientChannelLen), unregister: make(chan string, MaxClientChannelLen), - clients: make(map[string]*Client), - oldClients: make(map[string]*Client), + clients: make(map[string]map[string]*Client), + oldClients: make(map[string]map[string]*Client), broadcastQueue: make(chan *SendMsg, MaxBroadcastQueueLen), chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen), } @@ -72,28 +72,38 @@ loop: if !ok { break loop } - c, _ := h.clients[id] + userInfo, _ := GetClientInfo(id) + c, _ := h.clients[userInfo.ClientId][userInfo.User] if c != nil { - delete(h.clients, id) + delete(h.clients[userInfo.ClientId], userInfo.User) } fmt.Println("鍙栨秷娉ㄥ唽ws杩炴帴瀵硅薄锛�", id, "杩炴帴鎬绘暟锛�", len(h.clients)) + if len(h.clients[userInfo.ClientId]) == 0 { + delete(h.clients, userInfo.ClientId) + } case client, ok := <-h.register: if !ok { break loop } - h.clients[client.Id] = client - fmt.Println("娉ㄥ唽ws杩炴帴瀵硅薄锛�", client.Id, "杩炴帴鎬绘暟锛�", len(h.clients)) + if len(h.clients[client.Id]) > 0 { + h.clients[client.Id][client.User] = client + }else{ + h.clients[client.Id] = map[string]*Client{client.User: client} + } + fmt.Println("娉ㄥ唽ws杩炴帴瀵硅薄锛�", client.User, "杩炴帴鎬绘暟锛�", len(h.clients)) case broadcastMsg, ok := <-h.broadcastQueue: if !ok { break loop } - for k, v := range h.clients { + for _, v := range h.clients { if v != nil { - client := v - broadcastMsg.ToClientId = k - client.send(broadcastMsg) + for k, vv := range v { + client := vv + broadcastMsg.ToClientId = k + client.send(broadcastMsg) + } } } @@ -103,13 +113,15 @@ loop: } //骞挎挱鎸囧畾棰戦亾鐨勬秷鎭鐞� //h.clients.Iterator(func(id string, v interface{}) bool { - for k, v := range h.clients { + for _, v := range h.clients { if v != nil { - client := v - for _, ch := range chanBroadcastMsg.Channel { - if searchStrArray(client.channel, ch) { - chanBroadcastMsg.ToClientId = k - client.send(chanBroadcastMsg) + for k, vv := range v { + client := vv + for _, ch := range chanBroadcastMsg.Channel { + if searchStrArray(client.channel, ch) { + chanBroadcastMsg.ToClientId = k + client.send(chanBroadcastMsg) + } } } } @@ -117,7 +129,6 @@ loop: // return true //}) } - } } func (h *hub) ticker() { @@ -156,7 +167,10 @@ func (h *hub) AddClient(client *Client) error { func (h *hub) clearOldClient(client *Client) { close(client.recvCh) close(client.sendCh) - delete(h.oldClients, client.Id) + delete(h.oldClients[client.Id], client.User) + if len(h.oldClients[client.Id]) == 0 { + delete(h.oldClients, client.Id) + } fmt.Println("宸辨柇寮€鐨剋s杩炴帴缂撳瓨瀵硅薄锛�", client.Id, "鏃ц繛鎺ユ€绘暟锛�", len(h.oldClients)) } @@ -167,7 +181,7 @@ func (h *hub) RemoveClient(client *Client) error { timeout := time.NewTimer(time.Second * 1) defer timeout.Stop() select { - case h.unregister <- client.Id: + case h.unregister <- client.User: return nil case <-timeout.C: return errors.New(" RemoveClient unregister娑堟伅绠¢亾blocked,鍐欏叆娑堟伅瓒呮椂") diff --git a/pool/publicApi.go b/pool/publicApi.go index 3c8274b4526337ad0d3a8f6e4a79b03d7fb72fd1..9b2c639deb502d547da64f47a6217addb5672c7d 100755 --- a/pool/publicApi.go +++ b/pool/publicApi.go @@ -38,14 +38,18 @@ func NewClient(conf *Config) *Client { conf.Goroutine = 200 } var client *Client - oldclient := wsSever.hub.oldClients[conf.Id] + oldclient := wsSever.hub.oldClients[conf.Id][conf.User] if oldclient != nil { - delete(wsSever.hub.oldClients, conf.Id) + delete(wsSever.hub.oldClients[conf.Id], conf.User) + if len(wsSever.hub.oldClients[conf.Id]) == 0 { + delete(wsSever.hub.oldClients, conf.Id) + } c := oldclient client = c } else { client = &Client{ Id: conf.Id, + User: conf.User, types: conf.Type, hub: wsSever.hub, sendCh: make(chan *SendMsg, MaxRecvChLen), @@ -65,7 +69,7 @@ func NewClient(conf *Config) *Client { client.OnPing(nil) client.OnPong(nil) wsSever.hub.AddClient(client) - AppendClient(client.Id, string(SerializeJson(conf))) + AppendClient(client.Id, client.User, string(SerializeJson(conf))) return client } @@ -223,7 +227,7 @@ func (c *Client) Send(msg *SendMsg) error { c.close() return errors.New("杩炴帴ID锛�" + c.Id + "ws杩炴帴宸辩粡鏂紑锛屾棤娉曞彂閫佹秷鎭�") default: - msg.ToClientId = c.Id + msg.ToClientId = c.User fmt.Println(msg) c.send(msg) } @@ -245,19 +249,23 @@ func Send(msg *SendMsg) error { if msg.ToClientId == "" { return errors.New("鍙戦€佹秷鎭殑娑堟伅浣撲腑鏈寚瀹歍oClient鐩爣锛�") } - c := wsSever.hub.clients[msg.ToClientId] - if c != nil { - client := c - if client.Id == msg.ToClientId { - msg.ToClientId = client.Id - err := client.Send(msg) + + userInfo, _ := GetClientInfo(msg.ToClientId) + cList := wsSever.hub.clients[userInfo.ClientId] + for _, c := range cList { + if c != nil { + //client := c + //if client.Id == msg.ToClientId { + msg.ToClientId = c.User + err := c.Send(msg) if err != nil { - return errors.New("鍙戦€佹秷鎭嚭閿欙細" + err.Error() + ",杩炴帴瀵硅薄id=" + client.Id + "銆�") + return errors.New("鍙戦€佹秷鎭嚭閿欙細" + err.Error() + ",杩炴帴瀵硅薄id=" + c.User + "銆�") } + //} + } else { + fmt.Println(1) + //fmt.Println(wsSever.hub.clients) } - } else { - fmt.Println(1) - //fmt.Println(wsSever.hub.clients) } return nil @@ -296,8 +304,8 @@ func BroadcastAll(msg *SendMsg) error { } // 鏍规嵁鏉ユ簮鍜岀被鍨嬭幏鍙栧鎴风鍒楄〃 -func GetList(source, sourceId, customerType string) map[string]*Client { - clientData := make(map[string]*Client, 0) +func GetList(source, sourceId, customerType string) map[string]map[string]*Client { + clientData := make(map[string]map[string]*Client, 0) for k, v := range wsSever.hub.clients { userInfo := strings.Split(k, "_") if (userInfo[0] == source && userInfo[1] == sourceId) || (source == "") { @@ -319,13 +327,14 @@ type UserInfo struct { CustomerType string `json:"customerType"` CustomerId string `json:"customerId"` ClientId string `json:"clientId"` + User string `json:"User"` ToChannel []string `json:"toChannel"` } //鏍规嵁token鑾峰彇鐢ㄦ埛鏉ユ簮淇℃伅 -func GetClientInfoByToken(token string) (*UserInfo, error) { +func GetClientInfo(id string) (*UserInfo, error) { - tokenData := strings.Split(token, "_") + tokenData := strings.Split(id, "_") fmt.Println(tokenData) if len(tokenData) < 4 { @@ -349,7 +358,8 @@ func GetClientInfoByToken(token string) (*UserInfo, error) { userData.CustomerType = tokenData[2] userData.CustomerId = tokenData[3] - userData.ClientId = userData.Source + "_" + userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId + userData.ClientId = userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId + userData.User = userData.Source + "_" + userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId return userData, nil } @@ -361,7 +371,7 @@ func SaveMsg(msg *SendMsg) { } msg.SendTime = time.Now().Format("2006-01-02 15:04:05") - user := GetClientInfoById(msg.FromClientId) + user, _ := GetClientInfo(msg.FromClientId) mqData := &SetMsgReq{} mqData.ProcedureType = msg.ProcedureType @@ -477,49 +487,49 @@ func DelConversation(conversationId int) { return } -func AppendClient(clientId, data string) { - Redis.HSet(cliKey, clientId, data) +func AppendClient(clientId, clientUser, data string) { + Redis.HSet(cliKey+":"+clientId, clientUser, data) return } -func DelClient(clientId string) { - Redis.HDel(cliKey, clientId) +func DelClient(clientId, clientUser string) { + Redis.HDel(cliKey+":"+clientId, clientUser) return } func SetEnd() { - conversationList := GetAllList(convKey) - for _, vClients := range conversationList { - conversationData := &Conversation{} - UnserislizeJson([]byte(vClients), conversationData) - if conversationData.StartReceiveDate < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") { - mqData := &SendMsg{ - ProcedureType: 8, - ConversationId: conversationData.ConversationId, - ToClientId: conversationData.Promoter, - FromClientId: conversationData.Participant, - SendTime: time.Now().Format("2006-01-02 15:04:05"), - } - //鍙戦€佺粨鏉熶細璇濈粰瀹㈡埛绔� - _, isSet := wsSever.hub.clients[conversationData.Promoter] - if isSet == true { - wsSever.hub.clients[conversationData.Promoter].readMessage(mqData) - } - - toMqData := &SendMsg{ - ProcedureType: 8, - ConversationId: conversationData.ConversationId, - ToClientId: conversationData.Participant, - FromClientId: conversationData.Promoter, - SendTime: time.Now().Format("2006-01-02 15:04:05"), - } - _, isSet = wsSever.hub.clients[conversationData.Participant] - if isSet == true { - wsSever.hub.clients[conversationData.Participant].readMessage(toMqData) - } - SaveMsg(toMqData) - } - } + //conversationList := GetAllList(convKey) + //for _, vClients := range conversationList { + // conversationData := &Conversation{} + // UnserislizeJson([]byte(vClients), conversationData) + // if conversationData.StartReceiveDate < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") { + // mqData := &SendMsg{ + // ProcedureType: 8, + // ConversationId: conversationData.ConversationId, + // ToClientId: conversationData.Promoter, + // FromClientId: conversationData.Participant, + // SendTime: time.Now().Format("2006-01-02 15:04:05"), + // } + // //鍙戦€佺粨鏉熶細璇濈粰瀹㈡埛绔� + // _, isSet := wsSever.hub.clients[conversationData.Promoter] + // if isSet == true { + // wsSever.hub.clients[conversationData.Promoter].readMessage(mqData) + // } + // + // toMqData := &SendMsg{ + // ProcedureType: 8, + // ConversationId: conversationData.ConversationId, + // ToClientId: conversationData.Participant, + // FromClientId: conversationData.Promoter, + // SendTime: time.Now().Format("2006-01-02 15:04:05"), + // } + // _, isSet = wsSever.hub.clients[conversationData.Participant] + // if isSet == true { + // wsSever.hub.clients[conversationData.Participant].readMessage(toMqData) + // } + // SaveMsg(toMqData) + // } + //} return } diff --git a/ws_server.go b/ws_server.go index cd851bb50fadddd0c10ac2fda10109c7f4455509..146685be2bedb385abca3da8c8c6bd8895662b73 100755 --- a/ws_server.go +++ b/ws_server.go @@ -75,7 +75,7 @@ func ws(w http.ResponseWriter, r *http.Request) { head := http.Header{} head.Add("Sec-Websocket-Protocol", headData) - userInfo, err := pool.GetClientInfoByToken(headData) + userInfo, err := pool.GetClientInfo(headData) if err != nil { fmt.Println("鐢ㄦ埛淇℃伅鎶ラ敊锛�", err) @@ -84,17 +84,18 @@ func ws(w http.ResponseWriter, r *http.Request) { //瀹炰緥鍖栬繛鎺ュ璞� client := pool.NewClient(&pool.Config{ - Id: userInfo.ClientId, //杩炴帴鏍囪瘑 + Id: userInfo.ClientId, //杩炴帴鏍囪瘑 + User: userInfo.User, Type: "ws", //杩炴帴绫诲瀷 Channel: userInfo.ToChannel, //鎸囧畾棰戦亾 Goroutine: 100, }) - fmt.Println(client.Id, "瀹炰緥鍖栬繛鎺ュ璞″畬鎴�") + fmt.Println(client.User, "瀹炰緥鍖栬繛鎺ュ璞″畬鎴�") //杩炴帴鎴愬姛鍥炶皟 client.OnOpen(func() { - fmt.Println("杩炴帴寮€鍚洖璋冿細", client.Id) + fmt.Println("杩炴帴寮€鍚洖璋冿細", client.User) }) //鎺ユ敹娑堟伅 @@ -134,26 +135,26 @@ func ws(w http.ResponseWriter, r *http.Request) { //杩炴帴鏂紑鍥炶皟 client.OnClose(func() { - user := pool.GetClientInfoById(client.Id) + user, _ := pool.GetClientInfo(client.User) closeMsg := &pool.SetMsgReq{} closeMsg.ProcedureType = 5 closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05") if user.CustomerType == "1" { - closeMsg.Promoter = client.Id + closeMsg.Promoter = client.User pool.PublishData(closeMsg) - fmt.Println("鐢ㄦ埛鍏抽棴杩炴帴", client.Id) + fmt.Println("鐢ㄦ埛鍏抽棴杩炴帴", client.User) } if user.CustomerType == "2" { - closeMsg.Participant = client.Id + closeMsg.Participant = client.User pool.PublishData(closeMsg) - fmt.Println("涓撳鍏抽棴杩炴帴", client.Id) + fmt.Println("涓撳鍏抽棴杩炴帴", client.User) } - pool.DelClient(client.Id) + pool.DelClient(client.Id, client.User) fmt.Printf("杩炴帴宸辩粡鍏抽棴%s", client.Id) })