Commit 43001efb authored by haoyanbin's avatar haoyanbin

Redis

parent 76c32f9d
...@@ -21,7 +21,7 @@ func Init() (client *redis.Client) { ...@@ -21,7 +21,7 @@ func Init() (client *redis.Client) {
}) })
_, err := client.Ping().Result() _, err := client.Ping().Result()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println("redis:",err)
} }
return return
} }
...@@ -29,6 +29,9 @@ const ( ...@@ -29,6 +29,9 @@ const (
endDate = 60 * 2 * time.Second endDate = 60 * 2 * time.Second
) )
var cliKey = "clients"
var convKey = "conversation"
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
//ReadBufferSize: 1024 * 1024, //ReadBufferSize: 1024 * 1024,
//WriteBufferSize: 1024 * 1024, //WriteBufferSize: 1024 * 1024,
...@@ -102,13 +105,20 @@ type Client struct { ...@@ -102,13 +105,20 @@ type Client struct {
onClose func() onClose func()
pingPeriodTicker *time.Timer pingPeriodTicker *time.Timer
closeTicker *time.Timer closeTicker *time.Timer
ToSendData []ToSendData //连接方列表 //ToSendData []ToSendData //连接方列表
} }
type ToSendData struct { //type ToSendData struct {
toSendTime string //开启会话时间 // toSendTime string //开启会话时间
toSendId string //连接id // toSendId string //连接id
//}
type Conversation struct {
CustomerClientId string `json:"customerClientId"`
UserClientId string `json:"userClientId"`
StartTime string `json:"startTime"`
StartReceiveDate string `json:"startReceiveDate"`
Status int `json:"status"`
} }
// readPump pumps messages from the websocket connection to the hub. // readPump pumps messages from the websocket connection to the hub.
...@@ -132,7 +142,7 @@ func (c *Client) readPump() { ...@@ -132,7 +142,7 @@ func (c *Client) readPump() {
close(c.sendPing) close(c.sendPing)
c.grpool.Close() c.grpool.Close()
c.hub.RemoveClient(c) c.hub.RemoveClient(c)
Redis.HDel("clients", c.Id) Redis.HDel(cliKey, c.Id)
dump() dump()
}() }()
Loop: Loop:
......
...@@ -19,6 +19,8 @@ var ( ...@@ -19,6 +19,8 @@ var (
MaxRecvChLen = 10240 MaxRecvChLen = 10240
//最大发送消息缓冲处理管道长度 //最大发送消息缓冲处理管道长度
MaxSendChLen = 10240 MaxSendChLen = 10240
//最大会话数量
MaxConversationNum = 10240
) )
// Hub maintains the set of active clients and broadcasts messages to the // Hub maintains the set of active clients and broadcasts messages to the
...@@ -40,6 +42,8 @@ type hub struct { ...@@ -40,6 +42,8 @@ type hub struct {
// Unregister requests from clients. // Unregister requests from clients.
unregister chan string unregister chan string
//conversation []*Conversation
} }
//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效) //重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
...@@ -50,6 +54,7 @@ type oldMsg struct { ...@@ -50,6 +54,7 @@ type oldMsg struct {
func newHub() *hub { func newHub() *hub {
return &hub{ return &hub{
//conversation: make([]*Conversation, MaxConversationNum),
register: make(chan *Client, MaxClientChannelLen), register: make(chan *Client, MaxClientChannelLen),
unregister: make(chan string, MaxClientChannelLen), unregister: make(chan string, MaxClientChannelLen),
clients: make(map[string]*Client), clients: make(map[string]*Client),
......
...@@ -28,7 +28,7 @@ func InitWsPool(errfun func(err interface{})) { ...@@ -28,7 +28,7 @@ func InitWsPool(errfun func(err interface{})) {
} }
func initWsPoolData() { func initWsPoolData() {
clientsData := Redis.HGetAll("clients") clientsData := Redis.HGetAll(cliKey)
clientsAll := clientsData.Val() clientsAll := clientsData.Val()
for _, v := range clientsAll { for _, v := range clientsAll {
......
...@@ -2,7 +2,6 @@ package pool ...@@ -2,7 +2,6 @@ package pool
import ( import (
"encoding/json" "encoding/json"
"fmt"
) )
...@@ -28,8 +27,8 @@ func searchStrArray(arr []string, ch string) bool { ...@@ -28,8 +27,8 @@ func searchStrArray(arr []string, ch string) bool {
} }
func SerializeJson(data interface{}) []byte { func SerializeJson(data interface{}) []byte {
reply, err := json.Marshal(data) reply, _ := json.Marshal(data)
fmt.Println(err) //fmt.Println(err)
return reply return reply
} }
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"time" "time"
) )
var toSendDataLock sync.Mutex var toConverstaionLock sync.Mutex
type SetMsgReq struct { type SetMsgReq struct {
ProcedureType int `json:"procedureType"` ProcedureType int `json:"procedureType"`
...@@ -79,7 +79,7 @@ func NewClient(conf *Config) *Client { ...@@ -79,7 +79,7 @@ func NewClient(conf *Config) *Client {
client.OnPong(nil) client.OnPong(nil)
wsSever.hub.AddClient(client) wsSever.hub.AddClient(client)
jsonData := string(SerializeJson(conf)) jsonData := string(SerializeJson(conf))
Redis.HSet("clients", client.Id, jsonData) Redis.HSet(cliKey, client.Id, jsonData)
return client return client
} }
...@@ -102,7 +102,7 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He ...@@ -102,7 +102,7 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He
c.closeTicker = time.NewTimer(closeWait) c.closeTicker = time.NewTimer(closeWait)
c.conn.SetPongHandler(func(str string) error { c.conn.SetPongHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetReadDeadline(time.Now().Add(pongWait))
fmt.Println("收到pong---", c.Id, str) //fmt.Println("收到pong---", c.Id, str)
c.pingPeriodTicker.Reset(pingPeriod) c.pingPeriodTicker.Reset(pingPeriod)
c.onPong() c.onPong()
return nil return nil
...@@ -111,7 +111,7 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He ...@@ -111,7 +111,7 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He
c.conn.SetPingHandler(func(str string) error { c.conn.SetPingHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker.Reset(pingPeriod) c.pingPeriodTicker.Reset(pingPeriod)
fmt.Println("收到ping---", c.Id, str) //fmt.Println("收到ping---", c.Id, str)
c.recvPing <- 1 c.recvPing <- 1
//if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil { //if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
// c.onError(errors.New("回复客户端PongMessage出现异常:"+err.Error())) // c.onError(errors.New("回复客户端PongMessage出现异常:"+err.Error()))
...@@ -415,8 +415,18 @@ func SaveMsg(msg *SendMsg) { ...@@ -415,8 +415,18 @@ func SaveMsg(msg *SendMsg) {
mqData.StartReceiveDate = msg.SendTime mqData.StartReceiveDate = msg.SendTime
//连接开启信息存入 //连接开启信息存入
AppendToSendData(msg.FromClientId, msg.ToClientId, msg.SendTime) conversation := &Conversation{}
AppendToSendData(msg.ToClientId, msg.FromClientId, msg.SendTime) if user.Promoter == "1" {
conversation.CustomerClientId = msg.FromClientId
conversation.UserClientId = msg.ToClientId
}
if user.Promoter == "2" {
conversation.CustomerClientId = msg.ToClientId
conversation.UserClientId = msg.FromClientId
}
conversation.Status = 3
conversation.StartReceiveDate = mqData.StartReceiveDate
AppendConversation(conversation)
} }
//结束 //结束
...@@ -430,9 +440,6 @@ func SaveMsg(msg *SendMsg) { ...@@ -430,9 +440,6 @@ func SaveMsg(msg *SendMsg) {
if user.Promoter == "2" { if user.Promoter == "2" {
mqData.Finish = "1" mqData.Finish = "1"
} }
//删除连接信息
DelToSendData(msg.FromClientId, msg.ToClientId)
} }
//离线 //离线
...@@ -470,8 +477,7 @@ func SaveMsg(msg *SendMsg) { ...@@ -470,8 +477,7 @@ func SaveMsg(msg *SendMsg) {
mqData.EndTime = msg.SendTime mqData.EndTime = msg.SendTime
mqData.Promoter = user.Promoter mqData.Promoter = user.Promoter
mqData.Finish = "5" mqData.Finish = "5"
//删除连接信息 DelConversation(msg.FromClientId,msg.ToClientId)
DelToSendData(msg.FromClientId, msg.ToClientId)
} }
if mqData.ProcedureType != 0 { if mqData.ProcedureType != 0 {
...@@ -503,62 +509,75 @@ func PublishData(mqData *SetMsgReq) { ...@@ -503,62 +509,75 @@ func PublishData(mqData *SetMsgReq) {
return return
} }
func SetEnd() { func GetConversationList() map[string]string {
for clientsId, vClients := range wsSever.hub.clients { jsonData := Redis.HGetAll(convKey)
if GetClientInfoById(clientsId).Promoter == "1" { jsonDataAll := jsonData.Val()
continue return jsonDataAll
} }
for _, vToSendData := range vClients.ToSendData {
if vToSendData.toSendTime < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") {
mqData := &SendMsg{
ProcedureType: 8,
ToClientId: vToSendData.toSendId,
FromClientId: clientsId,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
SaveMsg(mqData)
//发送结束会话给客户端
_, isSet := wsSever.hub.clients[clientsId]
if isSet == true {
wsSever.hub.clients[clientsId].readMessage(mqData)
}
toMqData := &SendMsg{
ProcedureType: 8,
ToClientId: clientsId,
FromClientId: vToSendData.toSendId,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
_, isSet = wsSever.hub.clients[vToSendData.toSendId]
if isSet == true {
wsSever.hub.clients[vToSendData.toSendId].readMessage(toMqData)
}
}
}
}
func AppendConversation(conversation *Conversation) {
Redis.HSetNX(convKey, conversation.CustomerClientId+"^"+conversation.UserClientId, string(SerializeJson(conversation)))
return return
} }
func AppendToSendData(clientId, toClientId, sendTime string) { func DelConversation(customerClientId, userClientId string) {
sendData := ToSendData{toSendTime: sendTime, toSendId: clientId} Redis.HDel(convKey, customerClientId + "^" + userClientId)
toSendDataLock.Lock()
wsSever.hub.clients[toClientId].ToSendData = append(wsSever.hub.clients[toClientId].ToSendData, sendData)
toSendDataLock.Unlock()
return return
} }
func DelToSendData(clientId, toClientId string) { func SetEnd() {
for kclientId, vclientId := range wsSever.hub.clients[clientId].ToSendData { conversationList := GetConversationList()
if vclientId.toSendId == toClientId { for _, vClients := range conversationList {
toSendDataLock.Lock() conversationData := &Conversation{}
wsSever.hub.clients[clientId].ToSendData = append(wsSever.hub.clients[clientId].ToSendData[:kclientId], wsSever.hub.clients[clientId].ToSendData[kclientId+1:]...) UnserislizeJson([]byte(vClients), conversationData)
toSendDataLock.Unlock() if conversationData.StartReceiveDate < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") {
mqData := &SendMsg{
ProcedureType: 8,
ToClientId: conversationData.CustomerClientId,
FromClientId: conversationData.UserClientId,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
//发送结束会话给客户端
_, isSet := wsSever.hub.clients[conversationData.CustomerClientId]
if isSet == true {
wsSever.hub.clients[conversationData.CustomerClientId].readMessage(mqData)
}
toMqData := &SendMsg{
ProcedureType: 8,
ToClientId: conversationData.UserClientId,
FromClientId: conversationData.CustomerClientId,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
_, isSet = wsSever.hub.clients[conversationData.UserClientId]
if isSet == true {
wsSever.hub.clients[conversationData.UserClientId].readMessage(toMqData)
}
SaveMsg(toMqData)
} }
} }
return return
} }
//func AppendToSendData(clientId, toClientId, sendTime string) {
// sendData := ToSendData{toSendTime: sendTime, toSendId: clientId}
// toSendDataLock.Lock()
// wsSever.hub.clients[toClientId].ToSendData = append(wsSever.hub.clients[toClientId].ToSendData, sendData)
// toSendDataLock.Unlock()
// return
//}
//
//func DelToSendData(clientId, toClientId string) {
// for kclientId, vclientId := range wsSever.hub.clients[clientId].ToSendData {
// if vclientId.toSendId == toClientId {
// toSendDataLock.Lock()
// wsSever.hub.clients[clientId].ToSendData = append(wsSever.hub.clients[clientId].ToSendData[:kclientId], wsSever.hub.clients[clientId].ToSendData[kclientId+1:]...)
// toSendDataLock.Unlock()
// }
// }
// return
//}
// 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return // 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return
//func (c *Client)SetOffline(user *UserInfo, customerId string) { //func (c *Client)SetOffline(user *UserInfo, customerId string) {
// //
......
...@@ -76,7 +76,7 @@ func ws(w http.ResponseWriter, r *http.Request) { ...@@ -76,7 +76,7 @@ func ws(w http.ResponseWriter, r *http.Request) {
userInfo, err := pool.GetClientInfoByToken(headData) userInfo, err := pool.GetClientInfoByToken(headData)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println("用户信息报错:",err)
return return
} }
...@@ -186,7 +186,7 @@ func GetClientList(w http.ResponseWriter, r *http.Request) { ...@@ -186,7 +186,7 @@ func GetClientList(w http.ResponseWriter, r *http.Request) {
//pool.UnserislizeJson(data, req) //pool.UnserislizeJson(data, req)
//source := data["source"][0] //source := data["source"][0]
reply := pool.GetList(data["source"][0], data["promoter"][0]) reply := pool.GetList(data["source"][0], data["promoter"][0])
fmt.Println(reply) fmt.Println("客户端列表:",reply)
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment