package pool

import (
	"errors"
	"fmt"
	"github.com/gorilla/websocket"
	"net/http"
	"pool/pool/util/grpool"
	"sync"
	"time"
)

const (
	// Time allowed to read the next pong message from the peer.
	pongWait = 60 * time.Second

	// Send pings to peer with this period. Must be less than pongWait.
	pingPeriod = (pongWait * 9) / 10

	// Time allowed to write a message to the peer.
	writeWait = 10 * time.Second

	// Maximum message size allowed from peer.
	maxMessageSize = 1024 * 1024 * 20

	//
	closeWait = 60 * 30 * time.Second
	// time for auto end
	endDate = 60 * 30 * time.Second
)

var cliKey = "clients"
var convKey = "conversation"

var upgrader = websocket.Upgrader{
	//ReadBufferSize:  1024 * 1024,
	//WriteBufferSize: 1024 * 1024,
	// 默认允许WebSocket请求跨域,权限控制可以由业务层自己负责,灵活度更高
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

//连接参数结构体
type Config struct {
	Id        string   //标识连接的名称
	Type      string   //连接类型或path
	Channel   []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	Goroutine int      //每个连接开启的go程数里 默认为10
}

type RuntimeInfo struct {
	Id              string //标识连接的名称
	Type            string //连接类型或path
	Ip              string
	Channel         []string  //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	OpenTime        time.Time //连接打开时间
	LastReceiveTime time.Time //最后一次接收到数据的时间
	LastSendTime    time.Time //最后一次发送数据的时间
}

type SendMsg struct {
	ConversationId int      `json:"conversationId"`
	SendTime       string   `json:"sendTime"`      //消息发送时间
	FromClientId   string   `json:"fromClientId"`  //指令消息的来源。发送者的连接ID
	ToClientId     string   `json:"toClientId"`    //指令消息的接收者。发送给对应的客户端连接ID
	CmdData        []byte   `json:"cmdData"`       //对应指令的CmdData1的protobuf的message
	Status         int      `json:"status"`        //消息发送响应状态
	Priority       int      `json:"priority"`      //用于处理指令队列的优先级的权重值
	Channel        []string `json:"channel"`       //指定需要广播的频道,可以指定一个或多个频道
	ProcedureType  int      `json:"procedureType"` //会话状态 1 开始导诊 2 等待连接  3 离线  4 双方建立连接 5 结束会话 6 对话 7 已读 8 超时关闭
	MsgType        int      `json:"msgType"`       // msg类型 procedureType=6时填写 1 文本消息  2 图片消息
	Msg            string   `json:"msg"`           //一般用于json数据传递,或消息发送响应内容
	Desc           string   `json:"desc"`          //消息介绍内容,或其它数据
}

// Client is a middleman between the websocket connection and the hub.
type Client struct {
	hub *hub
	// The websocket connection.
	conn            *websocket.Conn
	types           string    //连接类型或path
	openTime        time.Time //连接打开时间
	CloseTime       time.Time //连接断开的时间
	lastReceiveTime time.Time //最后一次接收到数据的时间
	lastSendTime    time.Time //最后一次发送数据的时间
	Id              string    //标识连接的名称
	mux             *sync.Mutex
	IsClose         chan bool //连接的状态。true为关闭
	channel         []string  //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
	// Buffered channel of outbound messages.
	grpool           *grpool.Pool
	sendCh           chan *SendMsg //发送消息的缓冲管首
	recvCh           chan *SendMsg //接收消息的缓冲管首
	recvPing         chan int      //收到ping的存储管道,方便回复pong处理
	sendPing         chan int      //发送ping的存储管道,方便收到pong处理下次发ping
	ticker           *time.Ticker  //定时发送ping的定时器
	onError          func(error)
	onOpen           func() //连接成功的回调
	onPing           func() //收到ping
	onPong           func() //收到pong
	onMessage        func(*SendMsg)
	onClose          func()
	pingPeriodTicker *time.Timer
	closeTicker      *time.Timer
	//ToSendData       []ToSendData //连接方列表
}

//type ToSendData struct {
//	toSendTime string //开启会话时间
//	toSendId   string //连接id
//}

type Conversation struct {
	ConversationId   int    `json:"conversationId"`
	Promoter         string `json:"promoter"`
	Participant      string `json:"participant"`
	StartTime        string `json:"startTime"`
	StartReceiveDate string `json:"startReceiveDate"`
	Status           int    `json:"status"`
}

// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
	defer func() {
		fmt.Println("连接己关闭或者断开,正在清理对像")
		c.conn.Close()
		//触发连接关闭的事件回调
		c.onClose() //先执行完关闭回调,再请空所有的回调
		c.OnError(nil)
		c.OnOpen(nil)
		c.OnMessage(nil)
		c.OnClose(nil)
		c.OnPong(nil)
		c.OnPing(nil)
		close(c.recvPing)
		close(c.sendPing)
		c.grpool.Close()
		c.hub.RemoveClient(c)
		DelClient(c.Id)
		dump()
	}()
Loop:
	for {
		select {
		case <-c.IsClose:
			return
		default:
			_, message, err := c.conn.ReadMessage()
			//if string(message) == "" {
			//	return
			//}
			if err != nil {
				if websocket.IsUnexpectedCloseError(err,
					websocket.CloseAbnormalClosure,
					websocket.CloseGoingAway,
					websocket.CloseProtocolError,
					websocket.CloseUnsupportedData,
					websocket.CloseNoStatusReceived,
					websocket.CloseAbnormalClosure,
					websocket.CloseInvalidFramePayloadData,
					websocket.ClosePolicyViolation,
					websocket.CloseMessageTooBig,
					websocket.CloseMandatoryExtension,
					websocket.CloseInternalServerErr,
					websocket.CloseServiceRestart,
					websocket.CloseTryAgainLater,
					websocket.CloseTLSHandshake) {
					fmt.Println("err:", err)
					c.onError(errors.New("连接ID:" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error()))
					//c.closeChan<-true;
					goto Loop1
				}
				c.onError(errors.New("连接ID:" + c.Id + "ReadMessage other error:" + err.Error()))
				//c.closeChan<-true;
				goto Loop1
			}
			c.conn.SetReadDeadline(time.Now().Add(pongWait))
			c.pingPeriodTicker.Reset(pingPeriod)
			c.lastReceiveTime = time.Now()
			//msg, err := unMarshal(message)
			msg := &SendMsg{}
			UnserislizeJson(message, msg)
			if err != nil {
				c.onError(errors.New("接收数据解析失败!!连接ID:" + c.Id + "原因:" + err.Error()))
				break Loop
			}

			if msg.ProcedureType == 9 {
				pingMsg := &SendMsg{ProcedureType: 10}
				c.send(pingMsg)
			}

			SaveMsg(msg)
			//rabbitMQ.PublishSimple(string(message))
			c.readMessage(msg)
		}
	}
Loop1:
	c.close()
}

// 读取消息写管道缓冲区
func (c *Client) readMessage(msg *SendMsg) {
	timeout := time.NewTimer(time.Millisecond * 800)
	defer timeout.Stop()
	select {
	case <-c.IsClose:
		c.onError(errors.New("readMessage连接" + c.Id + ",连接己在关闭,不进行消息接收"))
		return
	case c.recvCh <- msg:
		return
	case <-timeout.C:
		c.onError(errors.New("recvCh 消息管道blocked,写入消息超时,管道长度:" + string(len(c.recvCh))))
		return
	}
}

// 单个连接接收消息
func (c *Client) recvMessage() {
	defer func() {
		dump()
	}()
loop:
	for {
		select {
		case <-c.IsClose:
			return
		case data, ok := <-c.recvCh:
			if !ok {
				break loop
			}

			//ToClientId与Channel不能同时存在!!!注意!!!!
			//if message.ToClientId != "" {
			//	Send(message)
			//}
			//ToClientId与Channel不能同时存在!!!注意!!!!
			//if message.Channel != "" {
			//	Broadcast(message)
			//}

			//收到消息触发回调
			//c.onMessage(data)
			c.grpool.Add(func() {
				c.onMessage(data)
			})
		}
	}
}

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
	defer func() {
		dump()
	}()
Loop:
	for {
		select {
		case <-c.IsClose:
			return
		case d, ok := <-c.sendCh:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// The hub closed the channel.
				//说明管道己经关闭
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				//glog.Error("连接ID:"+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭:(此处服务端会断开连接,使客户端能够感知进行重连)")
				goto Loop1
			}
			//message, err := marshal(d)
			message := SerializeJson(d)
			//if err != nil {
			//	c.onError(errors.New("接收数据ProtoBuf编码失败!!连接ID:" + c.Id + "原因:" + err.Error()))
			//	break Loop
			//}
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				goto Loop1
			}
			c.lastSendTime = time.Now()
			_, err = w.Write(message)
			if err != nil {
				c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error()))
				goto Loop1
			}
			// Add queued chat messages to the current websocket message.
			//n := len(c.sendCh)
			//if n > 0 {
			//	for i := 0; i < n; i++ {
			//
			//		_, err = w.Write(<-c.sendCh)
			//		if err != nil {
			//			c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error()))
			//			return
			//		}
			//	}
			//}

			//关闭写入io对象
			if err := w.Close(); err != nil {
				c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
				goto Loop1
			}
		case p, ok := <-c.sendPing: //定时发送ping
			if !ok {
				break Loop
			}
			if p == 1 {
				c.conn.SetWriteDeadline(time.Now().Add(writeWait))
				if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
					fmt.Println(err)
					c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
					goto Loop1
				}
			}
		case p, ok := <-c.recvPing:
			if !ok {
				break Loop
			}
			if p == 1 {
				c.conn.SetWriteDeadline(time.Now().Add(writeWait))
				if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
					fmt.Println(err)
					c.onError(errors.New("回复客户端PongMessage出现异常:" + err.Error()))
					goto Loop1
				}
			}
		}
	}
Loop1:
	c.close()
}

func (c *Client) send(msg *SendMsg) {
	timeout := time.NewTimer(time.Millisecond * 800)
	defer timeout.Stop()
	select {
	case c.sendCh <- msg:
		return
	case <-timeout.C:
		c.onError(errors.New("sendCh消息管道blocked,写入消息超时,管道长度:" + string(len(c.sendCh))))
		return
	}
	//c.sendCh<-msg
}

//定时发送ping
func (c *Client) Tickers() {
	for {
		select {
		case <-c.IsClose:
			return
		case <-c.pingPeriodTicker.C:
			c.sendPing <- 1
		}
	}

}

func (c *Client) closeTickers() {
	for {
		select {
		case <-c.IsClose:
			return
		case <-c.closeTicker.C:
			return
		}
	}

}

func (c *Client) close() {
	c.mux.Lock()
	defer c.mux.Unlock()
	select {
	case <-c.IsClose:
		return
	default:
		close(c.IsClose)
	}
}