Commit 05095aae authored by haoyanbin's avatar haoyanbin

Test

parent bd8767f2
......@@ -15,11 +15,11 @@ import (
var addr = flag.String("addr", "localhost:8080", "http service address")
func main() {
go wsClient("1001_1_2")
go WsClient("1001_1_2")
select {}
}
func wsClient(id string) {
func WsClient(id string) {
flag.Parse()
log.SetFlags(0)
......
......@@ -18,7 +18,8 @@ func NewRabbitMQSimple(queueName string) *RabbitMQ {
// Step 2. Simple producer code
func (r *RabbitMQ) PublishSimple(message []byte) error {
fmt.Println("push:", string(message))
//fmt.Println("push:", string(message))
//fmt.Println(2)
// 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
// 保证队列存在、 消息能发送到队列中
_, err := r.channel.QueueDeclare(
......
......@@ -25,6 +25,8 @@ const (
//
closeWait = 60 * 30 * time.Second
//
endDate = 60 * 30 * time.Second
)
var upgrader = websocket.Upgrader{
......@@ -65,7 +67,7 @@ type SendMsg struct {
BusinessId string `json:"businessId"` //客户端标识消息的id,主要区分相同cmd的不同消息,方便收到回复分发处理等效果,考虑长度问题定义成string
GroupId string `json:"groupId"` //服务端发送消息的ID,主要区分相同cmd的不同消息,方便服务端收到回复分发处理等效果,考虑长度问题定义成string
Channel []string `json:"channel"` //指定需要广播的频道,可以指定一个或多个频道
ProcedureType int `json:"procedureType"` //会话状态 1 开始导诊 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读
ProcedureType int `json:"procedureType"` //会话状态 1 开始导诊 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读 8 超时关闭
MsgType int `json:"msgType"` // msg类型 procedureType=6时填写 1 文本消息 2 图片消息
Msg string `json:"msg"` //一般用于json数据传递,或消息发送响应内容
Desc string `json:"desc"` //消息介绍内容,或其它数据
......@@ -100,6 +102,13 @@ type Client struct {
onClose func()
pingPeriodTicker *time.Timer
closeTicker *time.Timer
ToSendData []ToSendData //连接方列表
}
type ToSendData struct {
toSendTime string //开启会话时间
toSendId string //连接id
}
// readPump pumps messages from the websocket connection to the hub.
......@@ -151,6 +160,7 @@ Loop:
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake) {
fmt.Println("err:", err)
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error()))
//c.closeChan<-true;
goto Loop1
......
......@@ -18,8 +18,8 @@ func NewClient(conf *Config) *Client {
}
var client *Client
oldclient := wsSever.hub.oldClients[conf.Id]
delete(wsSever.hub.oldClients, conf.Id)
if oldclient != nil {
delete(wsSever.hub.oldClients, conf.Id)
c := oldclient
client = c
} else {
......@@ -218,6 +218,7 @@ func (c *Client) Close() {
// 在此方法中sendMsg.Channel指定的值不会处理
func Send(msg *SendMsg) error {
//log.Info("发送指令:",msg.Cmd,msg.ToClientId)
//fmt.Println(msg)
if msg.ToClientId == "" {
return errors.New("发送消息的消息体中未指定ToClient目标!")
}
......@@ -233,7 +234,7 @@ func Send(msg *SendMsg) error {
}
} else {
fmt.Println(1)
fmt.Println(wsSever.hub.clients)
//fmt.Println(wsSever.hub.clients)
}
return nil
......@@ -327,7 +328,7 @@ func GetClientInfoByToken(token string) (*UserInfo, error) {
return userData, nil
}
//会话状态 1 开始导诊 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读
// 会话状态 1 开始导诊 2 等待连接 3 双方建立连接 4 结束会话 5 离线 6 对话 7 已读 8 超时自动结束
func SaveMsg(msg *SendMsg) {
user := GetClientInfoById(msg.FromClientId)
......@@ -336,6 +337,11 @@ func SaveMsg(msg *SendMsg) {
toUser = GetClientInfoById(msg.ToClientId)
}
if user.Promoter == "" {
fmt.Println("用户数据有误")
return
}
mqData := &mq.SetMsgReq{}
if user.Promoter == "1" {
......@@ -352,12 +358,14 @@ func SaveMsg(msg *SendMsg) {
mqData.UserId = user.CustomerId
}
if mqData.BusinessId != "" {
mqData.BusinessId, _ = DataAesDecrypt(mqData.BusinessId)
}
mqData.ProcedureType = msg.ProcedureType
mqData.StartTime = msg.SendTime
if msg.ProcedureType == 1 {
mqData.StartTime = msg.SendTime
mqData.Promoter = user.Promoter
}
......@@ -366,20 +374,17 @@ func SaveMsg(msg *SendMsg) {
mqData.GuideDate = msg.SendTime
}
//建立连接
if msg.ProcedureType == 3 {
if user.Promoter == "1" {
mqData.Finish = "1"
}
if user.Promoter == "2" {
mqData.Finish = "2"
}
}
if msg.ProcedureType == 4 {
mqData.StartReceiveDate = msg.SendTime
//连接开启信息存入
AppendToSendData(msg.FromClientId, msg.ToClientId, msg.SendTime)
AppendToSendData(msg.ToClientId, msg.FromClientId, msg.SendTime)
}
if msg.ProcedureType == 5 {
//结束
if msg.ProcedureType == 4 {
mqData.EndTime = msg.SendTime
mqData.Promoter = user.Promoter
if user.Promoter == "1" {
......@@ -389,8 +394,23 @@ func SaveMsg(msg *SendMsg) {
if user.Promoter == "2" {
mqData.Finish = "1"
}
//删除连接信息
DelToSendData(msg.FromClientId, msg.ToClientId)
DelToSendData(msg.ToClientId, msg.FromClientId)
}
//离线
if msg.ProcedureType == 5 {
if user.Promoter == "1" {
mqData.Finish = "1"
}
if user.Promoter == "2" {
mqData.Finish = "2"
}
}
//对话
if msg.ProcedureType == 6 {
mqData.PromoterType = user.Promoter
mqData.SendTime = msg.SendTime
......@@ -411,8 +431,17 @@ func SaveMsg(msg *SendMsg) {
mqData.SendTime = msg.SendTime
}
if msg.ProcedureType == 8 {
mqData.EndTime = msg.SendTime
mqData.Promoter = user.Promoter
mqData.Finish = "5"
//删除连接信息
DelToSendData(msg.FromClientId, msg.ToClientId)
DelToSendData(msg.ToClientId, msg.FromClientId)
}
if mqData.ProcedureType != 0 {
PublishData(mqData)
go PublishData(mqData)
}
return
}
......@@ -422,6 +451,12 @@ func GetClientInfoById(clientId string) *UserInfo {
data := strings.Split(clientId, "_")
userData := new(UserInfo)
if len(data) < 3 {
fmt.Println("用户数据有误")
return userData
}
userData.Source = data[0]
userData.Promoter = data[1]
userData.CustomerId = data[2]
......@@ -434,11 +469,48 @@ func PublishData(mqData *mq.SetMsgReq) {
return
}
func SetEndTime(){
mqData := &mq.SetMsgReq{
ProcedureType:8,
func SetEnd() {
for clientsId, vClients := range wsSever.hub.clients {
if GetClientInfoById(clientsId).Promoter == "1" {
continue
}
for _, vToSendData := range vClients.ToSendData {
if vToSendData.toSendTime < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") {
mqData := &SendMsg{
ProcedureType: 8,
ToClientId: vToSendData.toSendId,
FromClientId: clientsId,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
SaveMsg(mqData)
//发送结束会话给客户端
wsSever.hub.clients[clientsId].readMessage(mqData)
toMqData := &SendMsg{
ProcedureType: 8,
ToClientId: clientsId,
FromClientId: vToSendData.toSendId,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
wsSever.hub.clients[vToSendData.toSendId].readMessage(toMqData)
}
}
}
return
}
func AppendToSendData(clientId, toClientId, sendTime string) {
toSendData := ToSendData{toSendTime: sendTime, toSendId: clientId}
wsSever.hub.clients[toClientId].ToSendData = append(wsSever.hub.clients[toClientId].ToSendData, toSendData)
return
}
func DelToSendData(clientId, toClientId string) {
for k, v := range wsSever.hub.clients[clientId].ToSendData {
if v.toSendId == toClientId {
wsSever.hub.clients[clientId].ToSendData = append(wsSever.hub.clients[clientId].ToSendData[:k], wsSever.hub.clients[clientId].ToSendData[k+1:]...)
}
}
PublishData(mqData)
return
}
......
......@@ -11,16 +11,15 @@ import (
"os/signal"
"pool/pool"
"runtime"
"strings"
"time"
)
var addr2 = flag.String("addr", "localhost:8081", "http service address")
var addr2 = flag.String("addr", "127.0.0.1:11001", "http service address")
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
for i := 1; i < 100; i++ {
go wsClient2(fmt.Sprintf("%d_1_3", i))
for i := 1; i < 200; i++ {
go wsClient2(fmt.Sprintf("lA6fUNMamyUBlOokPOeiGg==_1_%d", i))
}
select {}
}
......@@ -28,9 +27,9 @@ func main() {
func wsClient2(id string) {
flag.Parse()
log.SetFlags(0)
list := strings.Split(id, "_")
Id := list[0]
chann := list[1:]
//list := strings.Split(id, "_")
//Id := "lA6fUNMamyUBlOokPOeiGg==_1_" + list[2]
//chann := list[1:]
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
......@@ -48,7 +47,7 @@ func wsClient2(id string) {
//重新连接
//t := grand.N(10, 20)
//time.Sleep(time.Duration(t) * time.Second)
go wsClient(id)
//go WsClient(id)
}()
ping := make(chan int)
c.SetPingHandler(func(appData string) error {
......@@ -80,7 +79,7 @@ func wsClient2(id string) {
//
//}()
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(10 * time.Second)
ticker1 := time.NewTicker(20 * time.Second)
defer ticker.Stop()
......@@ -90,12 +89,14 @@ func wsClient2(id string) {
return
case <-ticker.C:
msg := &pool.SendMsg{
ToClientId: Id,
FromClientId: Id,
Msg: "test" + time.Now().String(),
Channel: chann,
ToClientId: "6_2_14",
FromClientId: id,
ProcedureType: 6,
Msg: "test" + time.Now().Format("2006-01-02 15:04:05"),
//Channel: chann,
}
m := pool.SerializeJson(msg)
fmt.Println(msg)
err := c.WriteMessage(websocket.BinaryMessage, m)
if err != nil {
log.Printf("write:%s", err.Error())
......
......@@ -34,14 +34,14 @@ func main() {
fmt.Println("wsPool.InitWsPool error-------------", err)
})
ticker := time.NewTicker(time.Minute)
//自动结束会话
ticker := time.NewTicker(time.Second * 10)
go func() {
for range ticker.C {
pool.SetEndTime()
pool.SetEnd()
}
}()
mux := http.NewServeMux()
mux.HandleFunc("/", serveHome)
......@@ -139,7 +139,11 @@ func ws(w http.ResponseWriter, r *http.Request) {
if user.Promoter == "1" {
closeMsg.Finish = "3"
closeMsg.BusinessId,_ = pool.DataAesDecrypt(userInfo.Source)
if userInfo.Source != "" {
closeMsg.BusinessId, _ = pool.DataAesDecrypt(userInfo.Source)
}
closeMsg.CustomerId = userInfo.CustomerId
pool.PublishData(closeMsg)
fmt.Println("用户关闭连接", client.Id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment