package pool import ( "errors" "fmt" "github.com/gorilla/websocket" "net/http" "pool/pool/util/grpool" "sync" "time" ) const ( // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Maximum message size allowed from peer. maxMessageSize = 1024 * 1024 * 20 // closeWait = 60 * 30 * time.Second // time for auto end endDate = 60 * 30 * time.Second ) var cliKey = "clients" var convKey = "conversation" var upgrader = websocket.Upgrader{ //ReadBufferSize: 1024 * 1024, //WriteBufferSize: 1024 * 1024, // 默认允许WebSocket请求跨域,权限控制可以由业务层自己负责,灵活度更高 CheckOrigin: func(r *http.Request) bool { return true }, } //连接参数结构体 type Config struct { Id string //标识连接的名称 Type string //连接类型或path Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道 Goroutine int //每个连接开启的go程数里 默认为10 } type RuntimeInfo struct { Id string //标识连接的名称 Type string //连接类型或path Ip string Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道 OpenTime time.Time //连接打开时间 LastReceiveTime time.Time //最后一次接收到数据的时间 LastSendTime time.Time //最后一次发送数据的时间 } type SendMsg struct { ConversationId int `json:"conversationId"` SendTime string `json:"sendTime"` //消息发送时间 FromClientId string `json:"fromClientId"` //指令消息的来源。发送者的连接ID ToClientId string `json:"toClientId"` //指令消息的接收者。发送给对应的客户端连接ID CmdData []byte `json:"cmdData"` //对应指令的CmdData1的protobuf的message Status int `json:"status"` //消息发送响应状态 Priority int `json:"priority"` //用于处理指令队列的优先级的权重值 BusinessId string `json:"businessId"` //客户端标识消息的id,主要区分相同cmd的不同消息,方便收到回复分发处理等效果,考虑长度问题定义成string GroupId string `json:"groupId"` //服务端发送消息的ID,主要区分相同cmd的不同消息,方便服务端收到回复分发处理等效果,考虑长度问题定义成string Channel []string `json:"channel"` //指定需要广播的频道,可以指定一个或多个频道 ProcedureType int `json:"procedureType"` //会话状态 1 开始导诊 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读 8 超时关闭 MsgType int `json:"msgType"` // msg类型 procedureType=6时填写 1 文本消息 2 图片消息 Msg string `json:"msg"` //一般用于json数据传递,或消息发送响应内容 Desc string `json:"desc"` //消息介绍内容,或其它数据 } // Client is a middleman between the websocket connection and the hub. type Client struct { hub *hub // The websocket connection. conn *websocket.Conn types string //连接类型或path openTime time.Time //连接打开时间 CloseTime time.Time //连接断开的时间 lastReceiveTime time.Time //最后一次接收到数据的时间 lastSendTime time.Time //最后一次发送数据的时间 Id string //标识连接的名称 mux *sync.Mutex IsClose chan bool //连接的状态。true为关闭 channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道 // Buffered channel of outbound messages. grpool *grpool.Pool sendCh chan *SendMsg //发送消息的缓冲管首 recvCh chan *SendMsg //接收消息的缓冲管首 recvPing chan int //收到ping的存储管道,方便回复pong处理 sendPing chan int //发送ping的存储管道,方便收到pong处理下次发ping ticker *time.Ticker //定时发送ping的定时器 onError func(error) onOpen func() //连接成功的回调 onPing func() //收到ping onPong func() //收到pong onMessage func(*SendMsg) onClose func() pingPeriodTicker *time.Timer closeTicker *time.Timer //ToSendData []ToSendData //连接方列表 } //type ToSendData struct { // toSendTime string //开启会话时间 // toSendId string //连接id //} type Conversation struct { ConversationId int `json:"conversationId"` Promoter string `json:"promoter"` Participant string `json:"participant"` StartTime string `json:"startTime"` StartReceiveDate string `json:"startReceiveDate"` Status int `json:"status"` } // readPump pumps messages from the websocket connection to the hub. // // The application runs readPump in a per-connection goroutine. The application // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *Client) readPump() { defer func() { fmt.Println("连接己关闭或者断开,正在清理对像") c.conn.Close() //触发连接关闭的事件回调 c.onClose() //先执行完关闭回调,再请空所有的回调 c.OnError(nil) c.OnOpen(nil) c.OnMessage(nil) c.OnClose(nil) c.OnPong(nil) c.OnPing(nil) close(c.recvPing) close(c.sendPing) c.grpool.Close() c.hub.RemoveClient(c) DelClient(c.Id) dump() }() Loop: for { select { case <-c.IsClose: return default: _, message, err := c.conn.ReadMessage() //if string(message) == "" { // return //} if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseProtocolError, websocket.CloseUnsupportedData, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure, websocket.CloseInvalidFramePayloadData, websocket.ClosePolicyViolation, websocket.CloseMessageTooBig, websocket.CloseMandatoryExtension, websocket.CloseInternalServerErr, websocket.CloseServiceRestart, websocket.CloseTryAgainLater, websocket.CloseTLSHandshake) { fmt.Println("err:", err) c.onError(errors.New("连接ID:" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error())) //c.closeChan<-true; goto Loop1 } c.onError(errors.New("连接ID:" + c.Id + "ReadMessage other error:" + err.Error())) //c.closeChan<-true; goto Loop1 } c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.pingPeriodTicker.Reset(pingPeriod) c.lastReceiveTime = time.Now() //msg, err := unMarshal(message) msg := &SendMsg{} UnserislizeJson(message, msg) if err != nil { c.onError(errors.New("接收数据解析失败!!连接ID:" + c.Id + "原因:" + err.Error())) break Loop } if msg.ProcedureType == 9 { pingMsg := &SendMsg{ProcedureType: 10} c.send(pingMsg) } SaveMsg(msg) //rabbitMQ.PublishSimple(string(message)) c.readMessage(msg) } } Loop1: c.close() } // 读取消息写管道缓冲区 func (c *Client) readMessage(msg *SendMsg) { timeout := time.NewTimer(time.Millisecond * 800) defer timeout.Stop() select { case <-c.IsClose: c.onError(errors.New("readMessage连接" + c.Id + ",连接己在关闭,不进行消息接收")) return case c.recvCh <- msg: return case <-timeout.C: c.onError(errors.New("recvCh 消息管道blocked,写入消息超时,管道长度:" + string(len(c.recvCh)))) return } } // 单个连接接收消息 func (c *Client) recvMessage() { defer func() { dump() }() loop: for { select { case <-c.IsClose: return case data, ok := <-c.recvCh: if !ok { break loop } //ToClientId与Channel不能同时存在!!!注意!!!! //if message.ToClientId != "" { // Send(message) //} //ToClientId与Channel不能同时存在!!!注意!!!! //if message.Channel != "" { // Broadcast(message) //} //收到消息触发回调 //c.onMessage(data) c.grpool.Add(func() { c.onMessage(data) }) } } } // writePump pumps messages from the hub to the websocket connection. // // A goroutine running writePump is started for each connection. The // application ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *Client) writePump() { defer func() { dump() }() Loop: for { select { case <-c.IsClose: return case d, ok := <-c.sendCh: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. //说明管道己经关闭 c.conn.WriteMessage(websocket.CloseMessage, []byte{}) //glog.Error("连接ID:"+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭:(此处服务端会断开连接,使客户端能够感知进行重连)") goto Loop1 } //message, err := marshal(d) message := SerializeJson(d) //if err != nil { // c.onError(errors.New("接收数据ProtoBuf编码失败!!连接ID:" + c.Id + "原因:" + err.Error())) // break Loop //} w, err := c.conn.NextWriter(websocket.TextMessage) if err != nil { goto Loop1 } c.lastSendTime = time.Now() _, err = w.Write(message) if err != nil { c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error())) goto Loop1 } // Add queued chat messages to the current websocket message. //n := len(c.sendCh) //if n > 0 { // for i := 0; i < n; i++ { // // _, err = w.Write(<-c.sendCh) // if err != nil { // c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error())) // return // } // } //} //关闭写入io对象 if err := w.Close(); err != nil { c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error())) goto Loop1 } case p, ok := <-c.sendPing: //定时发送ping if !ok { break Loop } if p == 1 { c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { fmt.Println(err) c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error())) goto Loop1 } } case p, ok := <-c.recvPing: if !ok { break Loop } if p == 1 { c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil { fmt.Println(err) c.onError(errors.New("回复客户端PongMessage出现异常:" + err.Error())) goto Loop1 } } } } Loop1: c.close() } func (c *Client) send(msg *SendMsg) { timeout := time.NewTimer(time.Millisecond * 800) defer timeout.Stop() select { case c.sendCh <- msg: return case <-timeout.C: c.onError(errors.New("sendCh消息管道blocked,写入消息超时,管道长度:" + string(len(c.sendCh)))) return } //c.sendCh<-msg } //定时发送ping func (c *Client) Tickers() { for { select { case <-c.IsClose: return case <-c.pingPeriodTicker.C: c.sendPing <- 1 } } } func (c *Client) closeTickers() { for { select { case <-c.IsClose: return case <-c.closeTicker.C: return } } } func (c *Client) close() { c.mux.Lock() defer c.mux.Unlock() select { case <-c.IsClose: return default: close(c.IsClose) } }