Commit ab2296fa authored by haoyanbin's avatar haoyanbin

1

parent c87803cd
...@@ -6,19 +6,19 @@ import ( ...@@ -6,19 +6,19 @@ import (
) )
// Step 1. Simple 创建实例 // Step 1. Simple 创建实例
func NewRabbitMQSimple(queueName string) (*RabbitMQ) { func NewRabbitMQSimple(queueName string) *RabbitMQ {
//在simple模式下 exchange and key 都为空 //在simple模式下 exchange and key 都为空
rabbitMQ, err := newRabbitMQ(queueName, "", "") rabbitMQ, err := newRabbitMQ(queueName, "", "")
if err != nil { if err != nil {
fmt.Println("NewRabbitMQSimple err:",err) fmt.Println("NewRabbitMQSimple err:", err)
return nil return nil
} }
return rabbitMQ return rabbitMQ
} }
// Step 2. Simple producer code // Step 2. Simple producer code
func (r *RabbitMQ) PublishSimple(message string) error { func (r *RabbitMQ) PublishSimple(message []byte) error {
fmt.Println("push:", string(message))
// 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建 // 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
// 保证队列存在、 消息能发送到队列中 // 保证队列存在、 消息能发送到队列中
_, err := r.channel.QueueDeclare( _, err := r.channel.QueueDeclare(
...@@ -50,7 +50,7 @@ func (r *RabbitMQ) PublishSimple(message string) error { ...@@ -50,7 +50,7 @@ func (r *RabbitMQ) PublishSimple(message string) error {
// 消息持久化 // 消息持久化
DeliveryMode: amqp.Persistent, DeliveryMode: amqp.Persistent,
ContentType: "text/plain", ContentType: "text/plain",
Body: []byte(message), Body: message,
}, },
) )
if err != nil { if err != nil {
...@@ -59,3 +59,36 @@ func (r *RabbitMQ) PublishSimple(message string) error { ...@@ -59,3 +59,36 @@ func (r *RabbitMQ) PublishSimple(message string) error {
return nil return nil
} }
type SetMsgReq struct {
ProcedureType int `json:"procedureType"`
GroupId string `json:"groupId" db:"group_id"`
UserId string `json:"userId" db:"user_id"`
BusinessId string `json:"businessId" db:"business_id"`
CustomerId string `json:"customerId" db:"customer_id"`
Status string `json:"status" db:"status"`
StartTime string `json:"startTime" db:"start_time"`
EndTime string `json:"endTime" db:"end_time"`
Remark string `json:"remark" db:"remark"`
Promoter string `json:"promoter" db:"promoter"`
Finish string `json:"finish" db:"finish"`
ExpertUnread string `json:"expertUnread" db:"expert_unread"`
UserUnread string `json:"userUnread" db:"user_unread"`
ExpertUnreadMessage string `json:"expertUnreadMessage" db:"expert_unread_message"`
ExpertUnreadMessageTime string `json:"expertUnreadMessageTime" db:"expert_unread_message_time"`
GuideMsg string `json:"guideMsg" db:"guide_msg"`
GuideDate string `json:"guideDate" db:"guide_date"`
ReceiveDate string `json:"receiveDate" db:"receive_date"`
ConversationDate string `json:"conversationDate" db:"conversation_date"`
StartReceiveDate string `json:"startReceiveDate" db:"start_receive_date"`
ConversationId string `json:"conversationId" db:"conversation_id"`
PromoterType string `json:"promoterType" db:"promoter_type"`
SendTime string `json:"sendTime" db:"send_time"`
MsgType int `json:"msgType" db:"msg_type"`
Content string `json:"content" db:"content"`
Sender string `json:"sender" db:"sender"`
Receiver string `json:"receiver" db:"receiver"`
HandleTime string `json:"handleTime" db:"handle_time"`
HandlePersonId string `json:"handlePersonId" db:"handle_person_id"`
HandlePerson string `json:"handlePerson" db:"handle_person"`
}
...@@ -22,6 +22,9 @@ const ( ...@@ -22,6 +22,9 @@ const (
writeWait = 30 * time.Second writeWait = 30 * time.Second
// Maximum message size allowed from peer. // Maximum message size allowed from peer.
maxMessageSize = 1024 * 1024 * 20 maxMessageSize = 1024 * 1024 * 20
//
closeWait = 60 * 30 * time.Second
) )
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
...@@ -52,16 +55,18 @@ type RuntimeInfo struct { ...@@ -52,16 +55,18 @@ type RuntimeInfo struct {
} }
type SendMsg struct { type SendMsg struct {
Cmd int `json:"cmd"` MsgId string `json:"msgId"`
Timestamp int `json:"timestamp"` //消息发送时间 SendTime string `json:"sendTime"` //消息发送时间
FromClientId string `json:"fromClientId"` //指令消息的来源。发送者的连接ID FromClientId string `json:"fromClientId"` //指令消息的来源。发送者的连接ID
ToClientId string `json:"toClientId"` //指令消息的接收者。发送给对应的客户端连接ID ToClientId string `json:"toClientId"` //指令消息的接收者。发送给对应的客户端连接ID
CmdData []byte `json:"cmdData"` //对应指令的CmdData1的protobuf的message CmdData []byte `json:"cmdData"` //对应指令的CmdData1的protobuf的message
Status int `json:"status"` //消息发送响应状态 Status int `json:"status"` //消息发送响应状态
Priority int `json:"priority"` //用于处理指令队列的优先级的权重值 Priority int `json:"priority"` //用于处理指令队列的优先级的权重值
LocalId string `json:"localId"` //客户端标识消息的id,主要区分相同cmd的不同消息,方便收到回复分发处理等效果,考虑长度问题定义成string BusinessId string `json:"businessId"` //客户端标识消息的id,主要区分相同cmd的不同消息,方便收到回复分发处理等效果,考虑长度问题定义成string
ServerId string `json:"serverId"` //服务端发送消息的ID,主要区分相同cmd的不同消息,方便服务端收到回复分发处理等效果,考虑长度问题定义成string GroupId string `json:"groupId"` //服务端发送消息的ID,主要区分相同cmd的不同消息,方便服务端收到回复分发处理等效果,考虑长度问题定义成string
Channel []string `json:"channel"` //指定需要广播的频道,可以指定一个或多个频道 Channel []string `json:"channel"` //指定需要广播的频道,可以指定一个或多个频道
ProcedureType int `json:"procedureType"` // 会话类型 1 用户连接,开始导诊 2 导诊完成,等待接诊 3 专家接诊 4 会话 5 结束会话
MsgType int `json:"msgType"` // msg类型 procedureType=4时填写 1 文本消息 2 图片消息
Msg string `json:"msg"` //一般用于json数据传递,或消息发送响应内容 Msg string `json:"msg"` //一般用于json数据传递,或消息发送响应内容
Desc string `json:"desc"` //消息介绍内容,或其它数据 Desc string `json:"desc"` //消息介绍内容,或其它数据
} }
...@@ -86,7 +91,7 @@ type Client struct { ...@@ -86,7 +91,7 @@ type Client struct {
recvCh chan *SendMsg //接收消息的缓冲管首 recvCh chan *SendMsg //接收消息的缓冲管首
recvPing chan int //收到ping的存储管道,方便回复pong处理 recvPing chan int //收到ping的存储管道,方便回复pong处理
sendPing chan int //发送ping的存储管道,方便收到pong处理下次发ping sendPing chan int //发送ping的存储管道,方便收到pong处理下次发ping
//ticker *time.Ticker //定时发送ping的定时器 ticker *time.Ticker //定时发送ping的定时器
onError func(error) onError func(error)
onOpen func() //连接成功的回调 onOpen func() //连接成功的回调
onPing func() //收到ping onPing func() //收到ping
...@@ -94,6 +99,7 @@ type Client struct { ...@@ -94,6 +99,7 @@ type Client struct {
onMessage func(*SendMsg) onMessage func(*SendMsg)
onClose func() onClose func()
pingPeriodTicker *time.Timer pingPeriodTicker *time.Timer
closeTicker *time.Timer
} }
// readPump pumps messages from the websocket connection to the hub. // readPump pumps messages from the websocket connection to the hub.
...@@ -126,7 +132,9 @@ Loop: ...@@ -126,7 +132,9 @@ Loop:
return return
default: default:
_, message, err := c.conn.ReadMessage() _, message, err := c.conn.ReadMessage()
fmt.Println(string(message)) //if string(message) == "" {
// return
//}
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, if websocket.IsUnexpectedCloseError(err,
websocket.CloseAbnormalClosure, websocket.CloseAbnormalClosure,
...@@ -158,10 +166,11 @@ Loop: ...@@ -158,10 +166,11 @@ Loop:
msg := &SendMsg{} msg := &SendMsg{}
UnserislizeJson(message, msg) UnserislizeJson(message, msg)
if err != nil { if err != nil {
c.onError(errors.New("接收数据ProtoBuf解析失败!!连接ID:" + c.Id + "原因:" + err.Error())) c.onError(errors.New("接收数据解析失败!!连接ID:" + c.Id + "原因:" + err.Error()))
break Loop break Loop
} }
rabbitMQ.PublishSimple(string(message)) SaveMsg(msg)
//rabbitMQ.PublishSimple(string(message))
c.readMessage(msg) c.readMessage(msg)
} }
} }
...@@ -329,6 +338,18 @@ func (c *Client) Tickers() { ...@@ -329,6 +338,18 @@ func (c *Client) Tickers() {
} }
func (c *Client) closeTickers() {
for {
select {
case <-c.IsClose:
return
case <-c.closeTicker.C:
return
}
}
}
func (c *Client) close() { func (c *Client) close() {
c.mux.Lock() c.mux.Lock()
defer c.mux.Unlock() defer c.mux.Unlock()
......
...@@ -9,23 +9,23 @@ import ( ...@@ -9,23 +9,23 @@ import (
var ( var (
//最大连接池缓冲处理连接对像管道长度 //最大连接池缓冲处理连接对像管道长度
Max_client_channel_len = 10240 MaxClientChannelLen = 10240
//最大全局广播缓冲处理管道长度 //最大全局广播缓冲处理管道长度
Max_broadcastQueue_len = 4096 MaxBroadcastQueueLen = 4096
//最大频道广播缓冲处理管道长度 //最大频道广播缓冲处理管道长度
Max_chanBroadcastQueue_len = 4096 MaxChanBroadcastQueueLen = 4096
//最大接收消息缓冲处理管道长度 //最大接收消息缓冲处理管道长度
Max_recvCh_len = 10240 MaxRecvChLen = 10240
//最大发送消息缓冲处理管道长度 //最大发送消息缓冲处理管道长度
Max_sendCh_len = 10240 MaxSendChLen = 10240
) )
// Hub maintains the set of active clients and broadcasts messages to the // Hub maintains the set of active clients and broadcasts messages to the
// clients. // clients.
type hub struct { type hub struct {
// Registered clients. // Registered clients.
clients map[string]*Client// //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了 clients map[string]*Client // //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients map[string]*Client //缓存断开的连接消息队列 oldClients map[string]*Client //缓存断开的连接消息队列
// Inbound messages from the clients. // Inbound messages from the clients.
...@@ -50,12 +50,12 @@ type oldMsg struct { ...@@ -50,12 +50,12 @@ type oldMsg struct {
func newHub() *hub { func newHub() *hub {
return &hub{ return &hub{
register: make(chan *Client, Max_client_channel_len), register: make(chan *Client, MaxClientChannelLen),
unregister: make(chan string, Max_client_channel_len), unregister: make(chan string, MaxClientChannelLen),
clients: make(map[string]*Client), clients: make(map[string]*Client),
oldClients: make(map[string]*Client), oldClients: make(map[string]*Client),
broadcastQueue: make(chan *SendMsg, Max_broadcastQueue_len), broadcastQueue: make(chan *SendMsg, MaxBroadcastQueueLen),
chanBroadcastQueue: make(chan *SendMsg, Max_chanBroadcastQueue_len), chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen),
} }
} }
...@@ -67,9 +67,9 @@ loop: ...@@ -67,9 +67,9 @@ loop:
if !ok { if !ok {
break loop break loop
} }
c,_ := h.clients[id] c, _ := h.clients[id]
if c != nil { if c != nil {
delete(h.clients,id) delete(h.clients, id)
} }
fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients)) fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))
...@@ -77,15 +77,14 @@ loop: ...@@ -77,15 +77,14 @@ loop:
if !ok { if !ok {
break loop break loop
} }
fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients))
h.clients[client.Id] = client h.clients[client.Id] = client
fmt.Println(h.clients) fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients))
case broadcastMsg, ok := <-h.broadcastQueue: case broadcastMsg, ok := <-h.broadcastQueue:
if !ok { if !ok {
break loop break loop
} }
for k,v := range h.clients{ for k, v := range h.clients {
if v != nil { if v != nil {
client := v client := v
broadcastMsg.ToClientId = k broadcastMsg.ToClientId = k
...@@ -117,7 +116,6 @@ loop: ...@@ -117,7 +116,6 @@ loop:
} }
} }
func (h *hub) ticker() { func (h *hub) ticker() {
//定时清理清理缓存的旧的连接对像 //定时清理清理缓存的旧的连接对像
//gtimer.AddSingleton(30*time.Second, func() { //gtimer.AddSingleton(30*time.Second, func() {
// if len(h.oldClients) > 0 { // if len(h.oldClients) > 0 {
...@@ -153,14 +151,14 @@ func (h *hub) AddClient(client *Client) error { ...@@ -153,14 +151,14 @@ func (h *hub) AddClient(client *Client) error {
func (h *hub) clearOldClient(client *Client) { func (h *hub) clearOldClient(client *Client) {
close(client.recvCh) close(client.recvCh)
close(client.sendCh) close(client.sendCh)
delete(h.oldClients,client.Id) delete(h.oldClients, client.Id)
fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients)) fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
} }
func (h *hub) RemoveClient(client *Client) error { func (h *hub) RemoveClient(client *Client) error {
//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像 //把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
//client.CloseTime = time.Now() client.CloseTime = time.Now()
//h.oldClients.Set(client.Id, client) h.oldClients[client.Id] = client
timeout := time.NewTimer(time.Second * 1) timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop() defer timeout.Stop()
select { select {
......
...@@ -4,7 +4,9 @@ import ( ...@@ -4,7 +4,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
mq "pool/dao"
"pool/pool/util/grpool" "pool/pool/util/grpool"
"strings"
"sync" "sync"
"time" "time"
) )
...@@ -16,7 +18,7 @@ func NewClient(conf *Config) *Client { ...@@ -16,7 +18,7 @@ func NewClient(conf *Config) *Client {
} }
var client *Client var client *Client
oldclient := wsSever.hub.oldClients[conf.Id] oldclient := wsSever.hub.oldClients[conf.Id]
delete(wsSever.hub.oldClients,conf.Id) delete(wsSever.hub.oldClients, conf.Id)
if oldclient != nil { if oldclient != nil {
c := oldclient c := oldclient
client = c client = c
...@@ -25,8 +27,8 @@ func NewClient(conf *Config) *Client { ...@@ -25,8 +27,8 @@ func NewClient(conf *Config) *Client {
Id: conf.Id, Id: conf.Id,
types: conf.Type, types: conf.Type,
hub: wsSever.hub, hub: wsSever.hub,
sendCh: make(chan *SendMsg, Max_recvCh_len), sendCh: make(chan *SendMsg, MaxRecvChLen),
recvCh: make(chan *SendMsg, Max_sendCh_len), recvCh: make(chan *SendMsg, MaxSendChLen),
mux: new(sync.Mutex), mux: new(sync.Mutex),
} }
} }
...@@ -61,6 +63,7 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He ...@@ -61,6 +63,7 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He
c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker = time.NewTimer(pingPeriod) c.pingPeriodTicker = time.NewTimer(pingPeriod)
c.closeTicker = time.NewTimer(closeWait)
c.conn.SetPongHandler(func(str string) error { c.conn.SetPongHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetReadDeadline(time.Now().Add(pongWait))
fmt.Println("收到pong---", c.Id, str) fmt.Println("收到pong---", c.Id, str)
...@@ -80,11 +83,12 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He ...@@ -80,11 +83,12 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He
c.onPing() c.onPing()
return nil return nil
}) })
//c.conn.SetCloseHandler(func(code int, str string) error {
// //收到客户端连接关闭时的回调 c.conn.SetCloseHandler(func(code int, str string) error {
// glog.Error("连接ID:"+c.Id,"SetCloseHandler接收到连接关闭状态:",code,"关闭信息:",str) //收到客户端连接关闭时的回调
// return nil fmt.Println("连接ID:"+c.Id, "SetCloseHandler接收到连接关闭状态:", code, "关闭信息:", str)
//}) return nil
})
// Allow collection of memory referenced by the caller by doing all work in // Allow collection of memory referenced by the caller by doing all work in
// new goroutines. // new goroutines.
...@@ -95,9 +99,23 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He ...@@ -95,9 +99,23 @@ func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.He
//开始接收消息 //开始接收消息
c.grpool.Add(c.recvMessage) c.grpool.Add(c.recvMessage)
c.grpool.Add(c.Tickers) c.grpool.Add(c.Tickers)
c.grpool.Add(c.closeTickers)
c.onOpen() c.onOpen()
}
//客户端建立连接
//user := GetClientInfoById(c.Id)
//if user.Promoter == "1" {
// mqData := &mq.SetMsgReq{
// ProcedureType: 1,
// BusinessId: user.Source,
// CustomerId: user.CustomerId,
// Status: "0",
// StartTime: time.Now().Format("2006-01-02 15:04:05"),
// Promoter: user.Promoter,
// }
// PublishData(mqData)
//}
}
//获取连接对像运行过程中的信息 //获取连接对像运行过程中的信息
func (c *Client) GetRuntimeInfo() *RuntimeInfo { func (c *Client) GetRuntimeInfo() *RuntimeInfo {
...@@ -180,6 +198,7 @@ func (c *Client) OnError(h func(err error)) { ...@@ -180,6 +198,7 @@ func (c *Client) OnError(h func(err error)) {
func (c *Client) Send(msg *SendMsg) error { func (c *Client) Send(msg *SendMsg) error {
select { select {
case <-c.IsClose: case <-c.IsClose:
c.close()
return errors.New("连接ID:" + c.Id + "ws连接己经断开,无法发送消息") return errors.New("连接ID:" + c.Id + "ws连接己经断开,无法发送消息")
default: default:
msg.ToClientId = c.Id msg.ToClientId = c.Id
...@@ -208,12 +227,15 @@ func Send(msg *SendMsg) error { ...@@ -208,12 +227,15 @@ func Send(msg *SendMsg) error {
if client.Id == msg.ToClientId { if client.Id == msg.ToClientId {
msg.ToClientId = client.Id msg.ToClientId = client.Id
err := client.Send(msg) err := client.Send(msg)
if err != nil { if err != nil {
return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + client.Id + "。") return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + client.Id + "。")
} }
} }
} else {
fmt.Println(1)
fmt.Println(wsSever.hub.clients)
} }
return nil return nil
} }
...@@ -248,3 +270,185 @@ func BroadcastAll(msg *SendMsg) error { ...@@ -248,3 +270,185 @@ func BroadcastAll(msg *SendMsg) error {
return errors.New("hub.broadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.broadcastQueue))) return errors.New("hub.broadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.broadcastQueue)))
} }
} }
// 根据来源和类型获取客户端列表
func GetList(source, userType string) map[string]*Client {
clientData := make(map[string]*Client, 0)
for k, v := range wsSever.hub.clients {
userInfo := strings.Split(k, "_")
if userInfo[0] == source {
if userType != "" {
if userInfo[1] == userType {
clientData[k] = v
}
} else {
clientData[k] = v
}
}
}
return clientData
}
type UserInfo struct {
Source string `json:"source"` //
Promoter string `json:"promoter"`
CustomerId string `json:"customerId"`
ClientId string `json:"clientId"`
ToChannel []string `json:"toChannel"`
}
//根据token获取用户来源信息
func GetClientInfoByToken(token string) (*UserInfo, error) {
tokenData := strings.Split(token, "_")
if len(tokenData) < 3 {
return nil, errors.New("用户数据有误")
}
userData := new(UserInfo)
userData.Source = tokenData[0]
userData.Promoter = tokenData[1]
userData.CustomerId = tokenData[2]
userData.ClientId = userData.Source + "_" + userData.Promoter + "_" + userData.CustomerId
return userData, nil
}
//会话状态 0 新建会话 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读
func SaveMsg(msg *SendMsg) {
user := GetClientInfoById(msg.FromClientId)
toUser := &UserInfo{}
if msg.ToClientId != "" {
toUser = GetClientInfoById(msg.ToClientId)
}
mqData := &mq.SetMsgReq{}
if user.Promoter == "1" {
mqData.BusinessId = user.Source
mqData.CustomerId = user.CustomerId
mqData.GroupId = toUser.Source
mqData.UserId = toUser.CustomerId
}
if user.Promoter == "2" {
mqData.BusinessId = toUser.Source
mqData.CustomerId = toUser.CustomerId
mqData.GroupId = user.Source
mqData.UserId = user.CustomerId
}
mqData.ProcedureType = msg.ProcedureType
mqData.StartTime = msg.SendTime
if msg.ProcedureType == 1 {
mqData.Promoter = user.Promoter
}
if msg.ProcedureType == 2 {
mqData.GuideMsg = msg.Msg
mqData.GuideDate = msg.SendTime
}
if msg.ProcedureType == 4 {
mqData.StartReceiveDate = msg.SendTime
}
if msg.ProcedureType == 5 {
mqData.EndTime = msg.SendTime
mqData.Promoter = user.Promoter
if user.Promoter == "1" {
mqData.Finish = "3"
}
if user.Promoter == "2" {
mqData.Finish = "1"
}
}
if msg.ProcedureType == 6 {
mqData.PromoterType = user.Promoter
mqData.SendTime = msg.SendTime
mqData.MsgType = msg.MsgType
mqData.Content = msg.Msg
mqData.Sender = msg.FromClientId
mqData.Receiver = msg.ToClientId
}
//已读状态
if msg.ProcedureType == 7 {
if user.Promoter == "1" {
mqData.PromoterType = "2"
}
if user.Promoter == "2" {
mqData.PromoterType = "1"
}
mqData.SendTime = msg.SendTime
}
if mqData.ProcedureType != 0 {
PublishData(mqData)
}
return
}
//clientId转结构体
func GetClientInfoById(clientId string) *UserInfo {
data := strings.Split(clientId, "_")
userData := new(UserInfo)
userData.Source = data[0]
userData.Promoter = data[1]
userData.CustomerId = data[2]
return userData
}
func PublishData(mqData *mq.SetMsgReq) {
rabbitMQ.PublishSimple(SerializeJson(mqData))
return
}
// 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return
//func (c *Client)SetOffline(user *UserInfo, customerId string) {
//
// fmt.Println("开始执行定时任务")
// count := 0
// for {
// // 客服离线则 goroutine结束
// if _, ok := wsSever.hub.clients[customerId]; !ok {
// //客户端建立连接后创建数据
// if user.Promoter == "3" {
// mqData := &mq.SetMsgReq{
// ProcedureType: 1,
// BusinessId: user.Source,
// CustomerId: user.CustomerId,
// Status: "0",
// StartTime: time.Now().Format("2006-01-02 15:04:05"),
// Promoter: user.Promoter,
// }
// PublishData(mqData)
// }
// return
// }
//
// if count > 300 {
//
// var conversation ConversationList
// global.GVA_DB.Where("id = ?", conversationId).First(&conversation)
// if conversation.State == 0 {
// global.GVA_DB.Exec(`update conversation_list set state = 1, finish = 3, end_time = ? where id = ?`, time.Now().Format("2006-01-02 15:04:05"), conversationId)
// }
// return
// }
//
// count++
//
// time.Sleep(1 * time.Second)
// fmt.Println("开始计时关闭会话: ", count)
// }
//
//}
...@@ -5,13 +5,12 @@ import ( ...@@ -5,13 +5,12 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
mq "pool/dao"
"pool/pool" "pool/pool"
"runtime" "runtime"
"strings" "time"
) )
//var addr = flag.String("12", ":8081", "http service address")
func serveHome(w http.ResponseWriter, r *http.Request) { func serveHome(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.URL) fmt.Println(r.URL)
if r.URL.Path != "/" { if r.URL.Path != "/" {
...@@ -25,40 +24,9 @@ func serveHome(w http.ResponseWriter, r *http.Request) { ...@@ -25,40 +24,9 @@ func serveHome(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "home.html") http.ServeFile(w, r, "home.html")
} }
var ch = make(chan int, 10)
func chfun(i int) {
fmt.Println("写入管道i的值%d", i)
//ch<-i
select {
case ch <- i:
return
default:
fmt.Println("管道己经锁定;i的值" + string(i))
}
}
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
/* for i:=0;i<10000 ;i++ {
go chfun(i)
}
close(ch)
*/
/*for {
select {
case i,ok:=<-ch:
if !ok{
log.Println("管道己经关闭%d",i)
}
log.Println("读取i的值%d",i)
}
}
*/
flag.Parse() flag.Parse()
//初骀化连接池 //初骀化连接池
pool.InitWsPool(func(err interface{}) { pool.InitWsPool(func(err interface{}) {
...@@ -76,6 +44,7 @@ func main() { ...@@ -76,6 +44,7 @@ func main() {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace) mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.HandleFunc("/ws", ws) mux.HandleFunc("/ws", ws)
mux.HandleFunc("/ws-list", GetClientList)
err := http.ListenAndServe(":11001", mux) err := http.ListenAndServe(":11001", mux)
if err != nil { if err != nil {
fmt.Printf("ListenAndServe: %s", err.Error()) fmt.Printf("ListenAndServe: %s", err.Error())
...@@ -84,36 +53,51 @@ func main() { ...@@ -84,36 +53,51 @@ func main() {
func ws(w http.ResponseWriter, r *http.Request) { func ws(w http.ResponseWriter, r *http.Request) {
headData := r.Header.Get("Sec-Websocket-Protocol") headData := r.Header.Get("Sec-Websocket-Protocol")
list := strings.Split(headData, "-")
head := http.Header{} head := http.Header{}
head.Add("Sec-Websocket-Protocol", headData) head.Add("Sec-Websocket-Protocol", headData)
userInfo, err := pool.GetClientInfoByToken(headData)
if err != nil {
fmt.Println(err)
return
}
//实例化连接对象 //实例化连接对象
client := pool.NewClient(&pool.Config{ client := pool.NewClient(&pool.Config{
Id: list[0], //连接标识 Id: userInfo.ClientId, //连接标识
Type: "ws", //连接类型 Type: "ws", //连接类型
Channel: list[1:], //指定频道 Channel: userInfo.ToChannel, //指定频道
Goroutine: 100, Goroutine: 100,
}) })
fmt.Println(client.Id, "实例化连接对象完成") fmt.Println(client.Id, "实例化连接对象完成")
//连接成功回调 //连接成功回调
client.OnOpen(func() { client.OnOpen(func() {
fmt.Printf("连接己开启%s", client.Id) fmt.Println("连接开启回调:", client.Id)
}) })
//接收消息 //接收消息
client.OnMessage(func(msg *pool.SendMsg) { client.OnMessage(func(msg *pool.SendMsg) {
user := pool.GetClientInfoById(client.Id)
if user.Promoter == "1" {
msg.BusinessId = user.Source
} else if user.Promoter == "2" {
msg.GroupId = user.Source
}
if msg.Status == 3 { if msg.Status == 3 {
fmt.Println("OnMessage:收到出错消息=》", client.Id, msg.Desc) fmt.Println("OnMessage:收到出错消息=》", client.Id, msg.Desc)
return return
} }
fmt.Println(msg.Msg) //fmt.Println(msg.Msg)
if msg.ToClientId != "" { if msg.ToClientId != "" {
//发送消息给指定的ToClientID连接 //发送消息给指定的ToClientID连接
err := pool.Send(msg) err := pool.Send(msg)
if err != nil { if err != nil {
fmt.Println("wsPool.Send(msg):", err.Error()) fmt.Println("pool.Send(msg):", err.Error())
} }
//发送消息给当前连接对象 //发送消息给当前连接对象
//err = client.Send(msg) //err = client.Send(msg)
...@@ -123,20 +107,44 @@ func ws(w http.ResponseWriter, r *http.Request) { ...@@ -123,20 +107,44 @@ func ws(w http.ResponseWriter, r *http.Request) {
} }
/*if len(msg.Channel)>0{ /*if len(msg.Channel)>0{
//按频道广播,可指定多个频道[]string //按频道广播,可指定多个频道[]string
err:=wsPool.Broadcast(msg) //或者 wsPool.Broadcast(msg) err:=pool.Broadcast(msg) //或者 wsPool.Broadcast(msg)
if err!=nil { if err!=nil {
log.Println("wsPool.Broadcast(msg)", err.Error()) fmt.Println("pool.Broadcast(msg)", err.Error())
} }
} }
//或都全局广播,所有连接都进行发送 //或都全局广播,所有连接都进行发送
err:=wsPool.BroadcastAll(msg) err:=pool.BroadcastAll(msg)
if err!=nil { if err!=nil {
log.Println("wsPool.BroadcastAll(msg)", err.Error()) fmt.Println("pool.BroadcastAll(msg)", err.Error())
}*/ }*/
}) })
//连接断开回调 //连接断开回调
client.OnClose(func() { client.OnClose(func() {
user := pool.GetClientInfoById(client.Id)
closeMsg := &mq.SetMsgReq{}
closeMsg.ProcedureType = 5
closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05")
closeMsg.Promoter = user.Promoter
if user.Promoter == "1" {
closeMsg.Finish = "3"
closeMsg.BusinessId = userInfo.Source
closeMsg.CustomerId = userInfo.CustomerId
//pool.PublishData(closeMsg)
fmt.Println("用户关闭连接",client.Id)
}
if user.Promoter == "2" {
//closeMsg.Finish = "4"
//closeMsg.GroupId = userInfo.Source
//closeMsg.UserId = userInfo.CustomerId
//pool.PublishData(closeMsg)
fmt.Println("专家关闭连接",client.Id)
}
fmt.Printf("连接己经关闭%s", client.Id) fmt.Printf("连接己经关闭%s", client.Id)
}) })
client.OnError(func(err error) { client.OnError(func(err error) {
...@@ -150,3 +158,19 @@ func ws(w http.ResponseWriter, r *http.Request) { ...@@ -150,3 +158,19 @@ func ws(w http.ResponseWriter, r *http.Request) {
r.Close = true r.Close = true
return return
} }
func GetClientList(w http.ResponseWriter, r *http.Request) {
//fmt.Println(wsSever.hub.clients)
data := r.URL.Query()
//req := new(GetClientListReq)
//pool.UnserislizeJson(data, req)
//source := data["source"][0]
reply := pool.GetList(data["source"][0], data["promoter"][0])
fmt.Println(reply)
return
}
type GetClientListReq struct {
Source string `json:"source" form:"source"`
Promoter string `json:"promoter" form:"promoter"`
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment