package pool
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"net/http"
"pool/pool/util/grpool"
"sync"
"time"
)
const (
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Maximum message size allowed from peer.
maxMessageSize = 1024 * 1024 * 20
//
closeWait = 60 * 30 * time.Second
// time for auto end
endDate = 60 * 30 * time.Second
)
var cliKey = "clients"
var convKey = "conversation"
var upgrader = websocket.Upgrader{
//ReadBufferSize: 1024 * 1024,
//WriteBufferSize: 1024 * 1024,
// 默认允许WebSocket请求跨域,权限控制可以由业务层自己负责,灵活度更高
CheckOrigin: func(r *http.Request) bool {
return true
},
}
//连接参数结构体
type Config struct {
Id string //标识连接的名称/用户组标识
Type string //连接类型或path
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
Goroutine int //每个连接开启的go程数里 默认为10
User string //用户唯一标识
}
type RuntimeInfo struct {
Id string //标识连接的名称
Type string //连接类型或path
Ip string
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
OpenTime time.Time //连接打开时间
LastReceiveTime time.Time //最后一次接收到数据的时间
LastSendTime time.Time //最后一次发送数据的时间
}
type SendMsg struct {
ConversationId int `json:"conversationId"`
SendTime string `json:"sendTime"` //消息发送时间
FromClientId string `json:"fromClientId"` //指令消息的来源。发送者的连接ID
ToClientId string `json:"toClientId"` //指令消息的接收者。发送给对应的客户端连接ID
CmdData []byte `json:"cmdData"` //对应指令的CmdData1的protobuf的message
Status int `json:"status"` //消息发送响应状态
Priority int `json:"priority"` //用于处理指令队列的优先级的权重值
Channel []string `json:"channel"` //指定需要广播的频道,可以指定一个或多个频道
ProcedureType int `json:"procedureType"` //会话状态 1 开始导诊 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读 8 超时关闭
MsgType int `json:"msgType"` // msg类型 procedureType=6时填写 1 文本消息 2 图片消息
Msg string `json:"msg"` //一般用于json数据传递,或消息发送响应内容
Desc string `json:"desc"` //消息介绍内容,或其它数据
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *hub
// The websocket connection.
conn *websocket.Conn
types string //连接类型或path
openTime time.Time //连接打开时间
CloseTime time.Time //连接断开的时间
lastReceiveTime time.Time //最后一次接收到数据的时间
lastSendTime time.Time //最后一次发送数据的时间
Id string //标识连接的名称
User string //标识连接的名称
mux *sync.Mutex
IsClose chan bool //连接的状态。true为关闭
channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
// Buffered channel of outbound messages.
grpool *grpool.Pool
sendCh chan *SendMsg //发送消息的缓冲管首
recvCh chan *SendMsg //接收消息的缓冲管首
recvPing chan int //收到ping的存储管道,方便回复pong处理
sendPing chan int //发送ping的存储管道,方便收到pong处理下次发ping
ticker *time.Ticker //定时发送ping的定时器
onError func(error)
onOpen func() //连接成功的回调
onPing func() //收到ping
onPong func() //收到pong
onMessage func(*SendMsg)
onClose func()
pingPeriodTicker *time.Timer
closeTicker *time.Timer
//ToSendData []ToSendData //连接方列表
}
//type ToSendData struct {
// toSendTime string //开启会话时间
// toSendId string //连接id
//}
type Conversation struct {
ConversationId int `json:"conversationId"`
Promoter string `json:"promoter"`
Participant string `json:"participant"`
StartTime string `json:"startTime"`
StartReceiveDate string `json:"startReceiveDate"`
Status int `json:"status"`
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
fmt.Println("连接己关闭或者断开,正在清理对像")
c.conn.Close()
//触发连接关闭的事件回调
c.onClose() //先执行完关闭回调,再请空所有的回调
c.OnError(nil)
c.OnOpen(nil)
c.OnMessage(nil)
c.OnClose(nil)
c.OnPong(nil)
c.OnPing(nil)
close(c.recvPing)
close(c.sendPing)
c.grpool.Close()
c.hub.RemoveClient(c)
DelClient(c.Id,c.User)
dump()
}()
Loop:
for {
select {
case <-c.IsClose:
return
default:
_, message, err := c.conn.ReadMessage()
//if string(message) == "" {
// return
//}
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake) {
fmt.Println("err:", err)
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error()))
//c.closeChan<-true;
goto Loop1
}
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage other error:" + err.Error()))
//c.closeChan<-true;
goto Loop1
}
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker.Reset(pingPeriod)
c.lastReceiveTime = time.Now()
//msg, err := unMarshal(message)
msg := &SendMsg{}
UnserislizeJson(message, msg)
if err != nil {
c.onError(errors.New("接收数据解析失败!!连接ID:" + c.Id + "原因:" + err.Error()))
break Loop
}
if msg.ProcedureType == 9 {
pingMsg := &SendMsg{ProcedureType: 10}
c.send(pingMsg)
}
SaveMsg(msg)
//rabbitMQ.PublishSimple(string(message))
c.readMessage(msg)
}
}
Loop1:
c.close()
}
// 读取消息写管道缓冲区
func (c *Client) readMessage(msg *SendMsg) {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case <-c.IsClose:
c.onError(errors.New("readMessage连接" + c.Id + ",连接己在关闭,不进行消息接收"))
return
case c.recvCh <- msg:
return
case <-timeout.C:
c.onError(errors.New("recvCh 消息管道blocked,写入消息超时,管道长度:" + string(len(c.recvCh))))
return
}
}
// 单个连接接收消息
func (c *Client) recvMessage() {
defer func() {
dump()
}()
loop:
for {
select {
case <-c.IsClose:
return
case data, ok := <-c.recvCh:
if !ok {
break loop
}
//ToClientId与Channel不能同时存在!!!注意!!!!
//if message.ToClientId != "" {
// Send(message)
//}
//ToClientId与Channel不能同时存在!!!注意!!!!
//if message.Channel != "" {
// Broadcast(message)
//}
//收到消息触发回调
//c.onMessage(data)
c.grpool.Add(func() {
c.onMessage(data)
})
}
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
defer func() {
dump()
}()
Loop:
for {
select {
case <-c.IsClose:
return
case d, ok := <-c.sendCh:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
//说明管道己经关闭
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
//glog.Error("连接ID:"+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭:(此处服务端会断开连接,使客户端能够感知进行重连)")
goto Loop1
}
//message, err := marshal(d)
message := SerializeJson(d)
//if err != nil {
// c.onError(errors.New("接收数据ProtoBuf编码失败!!连接ID:" + c.Id + "原因:" + err.Error()))
// break Loop
//}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
goto Loop1
}
c.lastSendTime = time.Now()
_, err = w.Write(message)
if err != nil {
c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error()))
goto Loop1
}
// Add queued chat messages to the current websocket message.
//n := len(c.sendCh)
//if n > 0 {
// for i := 0; i < n; i++ {
//
// _, err = w.Write(<-c.sendCh)
// if err != nil {
// c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error()))
// return
// }
// }
//}
//关闭写入io对象
if err := w.Close(); err != nil {
c.onError(errors.New("连接ID:" + c.User + "关闭写入IO对象出错,连接中断" + err.Error()))
goto Loop1
}
case p, ok := <-c.sendPing: //定时发送ping
if !ok {
break Loop
}
if p == 1 {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
fmt.Println(err)
c.onError(errors.New("连接ID:" + c.User + "关闭写入IO对象出错,连接中断" + err.Error()))
goto Loop1
}
}
case p, ok := <-c.recvPing:
if !ok {
break Loop
}
if p == 1 {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
fmt.Println(err)
c.onError(errors.New("回复客户端PongMessage出现异常:" + err.Error()))
goto Loop1
}
}
}
}
Loop1:
c.close()
}
func (c *Client) send(msg *SendMsg) {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case c.sendCh <- msg:
return
case <-timeout.C:
c.onError(errors.New("sendCh消息管道blocked,写入消息超时,管道长度:" + string(len(c.sendCh))))
return
}
//c.sendCh<-msg
}
//定时发送ping
func (c *Client) Tickers() {
for {
select {
case <-c.IsClose:
return
case <-c.pingPeriodTicker.C:
c.sendPing <- 1
}
}
}
func (c *Client) closeTickers() {
for {
select {
case <-c.IsClose:
return
case <-c.closeTicker.C:
return
}
}
}
func (c *Client) close() {
c.mux.Lock()
defer c.mux.Unlock()
select {
case <-c.IsClose:
return
default:
close(c.IsClose)
}
}