Commit be273959 authored by haoyanbin's avatar haoyanbin

Client.user

parent 81a75d9a
......@@ -43,10 +43,11 @@ var upgrader = websocket.Upgrader{
//连接参数结构体
type Config struct {
Id string //标识连接的名称
Id string //标识连接的名称/用户组标识
Type string //连接类型或path
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
Goroutine int //每个连接开启的go程数里 默认为10
User string //用户唯一标识
}
type RuntimeInfo struct {
......@@ -85,6 +86,7 @@ type Client struct {
lastReceiveTime time.Time //最后一次接收到数据的时间
lastSendTime time.Time //最后一次发送数据的时间
Id string //标识连接的名称
User string //标识连接的名称
mux *sync.Mutex
IsClose chan bool //连接的状态。true为关闭
channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
......@@ -141,7 +143,7 @@ func (c *Client) readPump() {
close(c.sendPing)
c.grpool.Close()
c.hub.RemoveClient(c)
DelClient(c.Id)
DelClient(c.Id,c.User)
dump()
}()
Loop:
......
......@@ -27,8 +27,8 @@ var (
// clients.
type hub struct {
// Registered clients.
clients map[string]*Client //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients map[string]*Client //缓存断开的连接消息队列
clients map[string]map[string]*Client //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients map[string]map[string]*Client //缓存断开的连接消息队列
// Inbound messages from the clients.
//可以用于广播所有连接对象
......@@ -57,8 +57,8 @@ func newHub() *hub {
//conversation: make([]*Conversation, MaxConversationNum),
register: make(chan *Client, MaxClientChannelLen),
unregister: make(chan string, MaxClientChannelLen),
clients: make(map[string]*Client),
oldClients: make(map[string]*Client),
clients: make(map[string]map[string]*Client),
oldClients: make(map[string]map[string]*Client),
broadcastQueue: make(chan *SendMsg, MaxBroadcastQueueLen),
chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen),
}
......@@ -72,30 +72,40 @@ loop:
if !ok {
break loop
}
c, _ := h.clients[id]
userInfo, _ := GetClientInfo(id)
c, _ := h.clients[userInfo.ClientId][userInfo.User]
if c != nil {
delete(h.clients, id)
delete(h.clients[userInfo.ClientId], userInfo.User)
}
fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))
if len(h.clients[userInfo.ClientId]) == 0 {
delete(h.clients, userInfo.ClientId)
}
case client, ok := <-h.register:
if !ok {
break loop
}
h.clients[client.Id] = client
fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients))
if len(h.clients[client.Id]) > 0 {
h.clients[client.Id][client.User] = client
}else{
h.clients[client.Id] = map[string]*Client{client.User: client}
}
fmt.Println("注册ws连接对象:", client.User, "连接总数:", len(h.clients))
case broadcastMsg, ok := <-h.broadcastQueue:
if !ok {
break loop
}
for k, v := range h.clients {
for _, v := range h.clients {
if v != nil {
client := v
for k, vv := range v {
client := vv
broadcastMsg.ToClientId = k
client.send(broadcastMsg)
}
}
}
case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
if !ok {
......@@ -103,9 +113,10 @@ loop:
}
//广播指定频道的消息处理
//h.clients.Iterator(func(id string, v interface{}) bool {
for k, v := range h.clients {
for _, v := range h.clients {
if v != nil {
client := v
for k, vv := range v {
client := vv
for _, ch := range chanBroadcastMsg.Channel {
if searchStrArray(client.channel, ch) {
chanBroadcastMsg.ToClientId = k
......@@ -114,10 +125,10 @@ loop:
}
}
}
}
// return true
//})
}
}
}
func (h *hub) ticker() {
......@@ -156,7 +167,10 @@ func (h *hub) AddClient(client *Client) error {
func (h *hub) clearOldClient(client *Client) {
close(client.recvCh)
close(client.sendCh)
delete(h.oldClients[client.Id], client.User)
if len(h.oldClients[client.Id]) == 0 {
delete(h.oldClients, client.Id)
}
fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
}
......@@ -167,7 +181,7 @@ func (h *hub) RemoveClient(client *Client) error {
timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop()
select {
case h.unregister <- client.Id:
case h.unregister <- client.User:
return nil
case <-timeout.C:
return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
......
......@@ -38,14 +38,18 @@ func NewClient(conf *Config) *Client {
conf.Goroutine = 200
}
var client *Client
oldclient := wsSever.hub.oldClients[conf.Id]
oldclient := wsSever.hub.oldClients[conf.Id][conf.User]
if oldclient != nil {
delete(wsSever.hub.oldClients[conf.Id], conf.User)
if len(wsSever.hub.oldClients[conf.Id]) == 0 {
delete(wsSever.hub.oldClients, conf.Id)
}
c := oldclient
client = c
} else {
client = &Client{
Id: conf.Id,
User: conf.User,
types: conf.Type,
hub: wsSever.hub,
sendCh: make(chan *SendMsg, MaxRecvChLen),
......@@ -65,7 +69,7 @@ func NewClient(conf *Config) *Client {
client.OnPing(nil)
client.OnPong(nil)
wsSever.hub.AddClient(client)
AppendClient(client.Id, string(SerializeJson(conf)))
AppendClient(client.Id, client.User, string(SerializeJson(conf)))
return client
}
......@@ -223,7 +227,7 @@ func (c *Client) Send(msg *SendMsg) error {
c.close()
return errors.New("连接ID:" + c.Id + "ws连接己经断开,无法发送消息")
default:
msg.ToClientId = c.Id
msg.ToClientId = c.User
fmt.Println(msg)
c.send(msg)
}
......@@ -245,20 +249,24 @@ func Send(msg *SendMsg) error {
if msg.ToClientId == "" {
return errors.New("发送消息的消息体中未指定ToClient目标!")
}
c := wsSever.hub.clients[msg.ToClientId]
userInfo, _ := GetClientInfo(msg.ToClientId)
cList := wsSever.hub.clients[userInfo.ClientId]
for _, c := range cList {
if c != nil {
client := c
if client.Id == msg.ToClientId {
msg.ToClientId = client.Id
err := client.Send(msg)
//client := c
//if client.Id == msg.ToClientId {
msg.ToClientId = c.User
err := c.Send(msg)
if err != nil {
return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + client.Id + "。")
}
return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + c.User + "。")
}
//}
} else {
fmt.Println(1)
//fmt.Println(wsSever.hub.clients)
}
}
return nil
}
......@@ -296,8 +304,8 @@ func BroadcastAll(msg *SendMsg) error {
}
// 根据来源和类型获取客户端列表
func GetList(source, sourceId, customerType string) map[string]*Client {
clientData := make(map[string]*Client, 0)
func GetList(source, sourceId, customerType string) map[string]map[string]*Client {
clientData := make(map[string]map[string]*Client, 0)
for k, v := range wsSever.hub.clients {
userInfo := strings.Split(k, "_")
if (userInfo[0] == source && userInfo[1] == sourceId) || (source == "") {
......@@ -319,13 +327,14 @@ type UserInfo struct {
CustomerType string `json:"customerType"`
CustomerId string `json:"customerId"`
ClientId string `json:"clientId"`
User string `json:"User"`
ToChannel []string `json:"toChannel"`
}
//根据token获取用户来源信息
func GetClientInfoByToken(token string) (*UserInfo, error) {
func GetClientInfo(id string) (*UserInfo, error) {
tokenData := strings.Split(token, "_")
tokenData := strings.Split(id, "_")
fmt.Println(tokenData)
if len(tokenData) < 4 {
......@@ -349,7 +358,8 @@ func GetClientInfoByToken(token string) (*UserInfo, error) {
userData.CustomerType = tokenData[2]
userData.CustomerId = tokenData[3]
userData.ClientId = userData.Source + "_" + userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId
userData.ClientId = userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId
userData.User = userData.Source + "_" + userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId
return userData, nil
}
......@@ -361,7 +371,7 @@ func SaveMsg(msg *SendMsg) {
}
msg.SendTime = time.Now().Format("2006-01-02 15:04:05")
user := GetClientInfoById(msg.FromClientId)
user, _ := GetClientInfo(msg.FromClientId)
mqData := &SetMsgReq{}
mqData.ProcedureType = msg.ProcedureType
......@@ -477,49 +487,49 @@ func DelConversation(conversationId int) {
return
}
func AppendClient(clientId, data string) {
Redis.HSet(cliKey, clientId, data)
func AppendClient(clientId, clientUser, data string) {
Redis.HSet(cliKey+":"+clientId, clientUser, data)
return
}
func DelClient(clientId string) {
Redis.HDel(cliKey, clientId)
func DelClient(clientId, clientUser string) {
Redis.HDel(cliKey+":"+clientId, clientUser)
return
}
func SetEnd() {
conversationList := GetAllList(convKey)
for _, vClients := range conversationList {
conversationData := &Conversation{}
UnserislizeJson([]byte(vClients), conversationData)
if conversationData.StartReceiveDate < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") {
mqData := &SendMsg{
ProcedureType: 8,
ConversationId: conversationData.ConversationId,
ToClientId: conversationData.Promoter,
FromClientId: conversationData.Participant,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
//发送结束会话给客户端
_, isSet := wsSever.hub.clients[conversationData.Promoter]
if isSet == true {
wsSever.hub.clients[conversationData.Promoter].readMessage(mqData)
}
toMqData := &SendMsg{
ProcedureType: 8,
ConversationId: conversationData.ConversationId,
ToClientId: conversationData.Participant,
FromClientId: conversationData.Promoter,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
_, isSet = wsSever.hub.clients[conversationData.Participant]
if isSet == true {
wsSever.hub.clients[conversationData.Participant].readMessage(toMqData)
}
SaveMsg(toMqData)
}
}
//conversationList := GetAllList(convKey)
//for _, vClients := range conversationList {
// conversationData := &Conversation{}
// UnserislizeJson([]byte(vClients), conversationData)
// if conversationData.StartReceiveDate < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") {
// mqData := &SendMsg{
// ProcedureType: 8,
// ConversationId: conversationData.ConversationId,
// ToClientId: conversationData.Promoter,
// FromClientId: conversationData.Participant,
// SendTime: time.Now().Format("2006-01-02 15:04:05"),
// }
// //发送结束会话给客户端
// _, isSet := wsSever.hub.clients[conversationData.Promoter]
// if isSet == true {
// wsSever.hub.clients[conversationData.Promoter].readMessage(mqData)
// }
//
// toMqData := &SendMsg{
// ProcedureType: 8,
// ConversationId: conversationData.ConversationId,
// ToClientId: conversationData.Participant,
// FromClientId: conversationData.Promoter,
// SendTime: time.Now().Format("2006-01-02 15:04:05"),
// }
// _, isSet = wsSever.hub.clients[conversationData.Participant]
// if isSet == true {
// wsSever.hub.clients[conversationData.Participant].readMessage(toMqData)
// }
// SaveMsg(toMqData)
// }
//}
return
}
......
......@@ -75,7 +75,7 @@ func ws(w http.ResponseWriter, r *http.Request) {
head := http.Header{}
head.Add("Sec-Websocket-Protocol", headData)
userInfo, err := pool.GetClientInfoByToken(headData)
userInfo, err := pool.GetClientInfo(headData)
if err != nil {
fmt.Println("用户信息报错:", err)
......@@ -85,16 +85,17 @@ func ws(w http.ResponseWriter, r *http.Request) {
//实例化连接对象
client := pool.NewClient(&pool.Config{
Id: userInfo.ClientId, //连接标识
User: userInfo.User,
Type: "ws", //连接类型
Channel: userInfo.ToChannel, //指定频道
Goroutine: 100,
})
fmt.Println(client.Id, "实例化连接对象完成")
fmt.Println(client.User, "实例化连接对象完成")
//连接成功回调
client.OnOpen(func() {
fmt.Println("连接开启回调:", client.Id)
fmt.Println("连接开启回调:", client.User)
})
//接收消息
......@@ -134,26 +135,26 @@ func ws(w http.ResponseWriter, r *http.Request) {
//连接断开回调
client.OnClose(func() {
user := pool.GetClientInfoById(client.Id)
user, _ := pool.GetClientInfo(client.User)
closeMsg := &pool.SetMsgReq{}
closeMsg.ProcedureType = 5
closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05")
if user.CustomerType == "1" {
closeMsg.Promoter = client.Id
closeMsg.Promoter = client.User
pool.PublishData(closeMsg)
fmt.Println("用户关闭连接", client.Id)
fmt.Println("用户关闭连接", client.User)
}
if user.CustomerType == "2" {
closeMsg.Participant = client.Id
closeMsg.Participant = client.User
pool.PublishData(closeMsg)
fmt.Println("专家关闭连接", client.Id)
fmt.Println("专家关闭连接", client.User)
}
pool.DelClient(client.Id)
pool.DelClient(client.Id, client.User)
fmt.Printf("连接己经关闭%s", client.Id)
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment