Commit 84a8ba8f authored by haoyanbin's avatar haoyanbin

Mq

parent 05095aae
...@@ -29,15 +29,25 @@ func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) { ...@@ -29,15 +29,25 @@ func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) {
var err error var err error
// dial mq // dial mq
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl) rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
rabbitMQ.failOnErr(err, "创建连接错误")
if err != nil { if err != nil {
return nil, err return nil, err
} }
// get channel
rabbitMQ.channel, err = rabbitMQ.conn.Channel() rabbitMQ.channel, err = rabbitMQ.conn.Channel()
if err != nil { if err != nil {
return nil, err return nil, err
} }
_, err = rabbitMQ.channel.QueueDeclare(
rabbitMQ.QueueName,
true,
false,
false,
false,
nil,
)
if err != nil {
return nil, err
}
return rabbitMQ, nil return rabbitMQ, nil
} }
......
...@@ -5,43 +5,10 @@ import ( ...@@ -5,43 +5,10 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
// Step 1. Simple 创建实例 func (r *RabbitMQ) PublishSimple(message []byte) {
func NewRabbitMQSimple(queueName string) *RabbitMQ {
//在simple模式下 exchange and key 都为空
rabbitMQ, err := newRabbitMQ(queueName, "", "")
if err != nil {
fmt.Println("NewRabbitMQSimple err:", err)
return nil
}
return rabbitMQ
}
// Step 2. Simple producer code
func (r *RabbitMQ) PublishSimple(message []byte) error {
//fmt.Println("push:", string(message))
//fmt.Println(2)
// 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
// 保证队列存在、 消息能发送到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
true,
// 是否自动删除
false,
// 是否具有排他性
false,
//是否阻塞
false,
// 额外属性
nil,
)
if err != nil {
return err
}
// 2.2 发送消息到队列中 err := r.channel.Publish(
err = r.channel.Publish( "",
r.Exchange,
r.QueueName, r.QueueName,
// 如果为true 根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者 // 如果为true 根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者
false, false,
...@@ -55,41 +22,73 @@ func (r *RabbitMQ) PublishSimple(message []byte) error { ...@@ -55,41 +22,73 @@ func (r *RabbitMQ) PublishSimple(message []byte) error {
}, },
) )
if err != nil { if err != nil {
return err fmt.Println("err publish:", err)
return
} }
return nil
} }
type SetMsgReq struct { // Step 1. Simple 创建实例
ProcedureType int `json:"procedureType"` func NewRabbitMQSimple(queueName string) *RabbitMQ {
GroupId string `json:"groupId" db:"group_id"` //在simple模式下 exchange and key 都为空
UserId string `json:"userId" db:"user_id"` rabbitMQ, err := newRabbitMQ(queueName, "", "")
BusinessId string `json:"businessId" db:"business_id"` if err != nil {
CustomerId string `json:"customerId" db:"customer_id"` fmt.Println("NewRabbitMQSimple err:", err)
Status string `json:"status" db:"status"` return nil
StartTime string `json:"startTime" db:"start_time"` }
EndTime string `json:"endTime" db:"end_time"` return rabbitMQ
Remark string `json:"remark" db:"remark"`
Promoter string `json:"promoter" db:"promoter"`
Finish string `json:"finish" db:"finish"`
ExpertUnread string `json:"expertUnread" db:"expert_unread"`
UserUnread string `json:"userUnread" db:"user_unread"`
ExpertUnreadMessage string `json:"expertUnreadMessage" db:"expert_unread_message"`
ExpertUnreadMessageTime string `json:"expertUnreadMessageTime" db:"expert_unread_message_time"`
GuideMsg string `json:"guideMsg" db:"guide_msg"`
GuideDate string `json:"guideDate" db:"guide_date"`
ReceiveDate string `json:"receiveDate" db:"receive_date"`
ConversationDate string `json:"conversationDate" db:"conversation_date"`
StartReceiveDate string `json:"startReceiveDate" db:"start_receive_date"`
ConversationId string `json:"conversationId" db:"conversation_id"`
PromoterType string `json:"promoterType" db:"promoter_type"`
SendTime string `json:"sendTime" db:"send_time"`
MsgType int `json:"msgType" db:"msg_type"`
Content string `json:"content" db:"content"`
Sender string `json:"sender" db:"sender"`
Receiver string `json:"receiver" db:"receiver"`
HandleTime string `json:"handleTime" db:"handle_time"`
HandlePersonId string `json:"handlePersonId" db:"handle_person_id"`
HandlePerson string `json:"handlePerson" db:"handle_person"`
} }
//
//// Step 2. Simple producer code
//// 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
//// 保证队列存在、 消息能发送到队列中
//func (r *RabbitMQ) IsQueue() error {
// _, err := r.channel.QueueDeclare(
// r.QueueName,
// //是否持久化
// true,
// // 是否自动删除
// false,
// // 是否具有排他性
// false,
// //是否阻塞
// false,
// // 额外属性
// nil,
// )
//
// if err != nil {
// fmt.Println(5)
// fmt.Println(err)
// return err
// }
// return nil
//}
// 2.2 发送消息到队列中
//func (r *RabbitMQ) PublishSimple(message []byte) {
// //fmt.Println("push:", string(message))
//
// err := r.channel.Publish(
// r.Exchange,
// r.QueueName,
// // 如果为true 根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者
// false,
// // 如果为true 当exchange发送消息 到队列后发现队列上没有绑定消费者, 则会把消息还给 发送者
// false,
// amqp.Publishing{
// // 消息持久化
// DeliveryMode: amqp.Persistent,
// ContentType: "text/plain",
// Body: message,
// },
// )
// if err != nil {
// fmt.Println(6)
// fmt.Println(err)
// return
// }
//
// return
//}
...@@ -158,7 +158,7 @@ func (h *hub) clearOldClient(client *Client) { ...@@ -158,7 +158,7 @@ func (h *hub) clearOldClient(client *Client) {
func (h *hub) RemoveClient(client *Client) error { func (h *hub) RemoveClient(client *Client) error {
//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像 //把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
client.CloseTime = time.Now() client.CloseTime = time.Now()
h.oldClients[client.Id] = client //h.oldClients[client.Id] = client
timeout := time.NewTimer(time.Second * 1) timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop() defer timeout.Stop()
select { select {
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
) )
var wsSever *Server var wsSever *Server
var rabbitMQ *mq.RabbitMQ var RabbitMQ *mq.RabbitMQ
//连接池的结构体 //连接池的结构体
type Server struct { type Server struct {
...@@ -17,8 +17,6 @@ type Server struct { ...@@ -17,8 +17,6 @@ type Server struct {
//初始化执行连接池对象 //初始化执行连接池对象
//参数为接收连接池中运行时的一些错误信息的回调方法 //参数为接收连接池中运行时的一些错误信息的回调方法
func InitWsPool(errfun func(err interface{})) { func InitWsPool(errfun func(err interface{})) {
InitRabbit()
wsSever = new(Server) wsSever = new(Server)
wsSever.hub = newHub() wsSever.hub = newHub()
wsSever.ErrFun = errfun wsSever.ErrFun = errfun
...@@ -29,7 +27,7 @@ func InitWsPool(errfun func(err interface{})) { ...@@ -29,7 +27,7 @@ func InitWsPool(errfun func(err interface{})) {
func InitRabbit(){ func InitRabbit(){
//forever := make(chan bool) //forever := make(chan bool)
rabbitMQ = mq.NewRabbitMQSimple("im") //RabbitMQ = mq.NewRabbitMQSimple("ttt")
fmt.Println("rabbitMq start success") fmt.Println("rabbitMq start success")
......
...@@ -4,13 +4,45 @@ import ( ...@@ -4,13 +4,45 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
mq "pool/dao"
"pool/pool/util/grpool" "pool/pool/util/grpool"
"strings" "strings"
"sync" "sync"
"time" "time"
) )
type SetMsgReq struct {
ProcedureType int `json:"procedureType"`
GroupId string `json:"groupId" db:"group_id"`
UserId string `json:"userId" db:"user_id"`
BusinessId string `json:"businessId" db:"business_id"`
CustomerId string `json:"customerId" db:"customer_id"`
Status string `json:"status" db:"status"`
StartTime string `json:"startTime" db:"start_time"`
EndTime string `json:"endTime" db:"end_time"`
Remark string `json:"remark" db:"remark"`
Promoter string `json:"promoter" db:"promoter"`
Finish string `json:"finish" db:"finish"`
ExpertUnread string `json:"expertUnread" db:"expert_unread"`
UserUnread string `json:"userUnread" db:"user_unread"`
ExpertUnreadMessage string `json:"expertUnreadMessage" db:"expert_unread_message"`
ExpertUnreadMessageTime string `json:"expertUnreadMessageTime" db:"expert_unread_message_time"`
GuideMsg string `json:"guideMsg" db:"guide_msg"`
GuideDate string `json:"guideDate" db:"guide_date"`
ReceiveDate string `json:"receiveDate" db:"receive_date"`
ConversationDate string `json:"conversationDate" db:"conversation_date"`
StartReceiveDate string `json:"startReceiveDate" db:"start_receive_date"`
ConversationId string `json:"conversationId" db:"conversation_id"`
PromoterType string `json:"promoterType" db:"promoter_type"`
SendTime string `json:"sendTime" db:"send_time"`
MsgType int `json:"msgType" db:"msg_type"`
Content string `json:"content" db:"content"`
Sender string `json:"sender" db:"sender"`
Receiver string `json:"receiver" db:"receiver"`
HandleTime string `json:"handleTime" db:"handle_time"`
HandlePersonId string `json:"handlePersonId" db:"handle_person_id"`
HandlePerson string `json:"handlePerson" db:"handle_person"`
}
// 第一步,实例化连接对像 // 第一步,实例化连接对像
func NewClient(conf *Config) *Client { func NewClient(conf *Config) *Client {
if conf.Goroutine < 5 { if conf.Goroutine < 5 {
...@@ -342,7 +374,7 @@ func SaveMsg(msg *SendMsg) { ...@@ -342,7 +374,7 @@ func SaveMsg(msg *SendMsg) {
return return
} }
mqData := &mq.SetMsgReq{} mqData := &SetMsgReq{}
if user.Promoter == "1" { if user.Promoter == "1" {
mqData.BusinessId = user.Source mqData.BusinessId = user.Source
...@@ -441,7 +473,7 @@ func SaveMsg(msg *SendMsg) { ...@@ -441,7 +473,7 @@ func SaveMsg(msg *SendMsg) {
} }
if mqData.ProcedureType != 0 { if mqData.ProcedureType != 0 {
go PublishData(mqData) PublishData(mqData)
} }
return return
} }
...@@ -464,8 +496,8 @@ func GetClientInfoById(clientId string) *UserInfo { ...@@ -464,8 +496,8 @@ func GetClientInfoById(clientId string) *UserInfo {
return userData return userData
} }
func PublishData(mqData *mq.SetMsgReq) { func PublishData(mqData *SetMsgReq) {
rabbitMQ.PublishSimple(SerializeJson(mqData)) go RabbitMQ.PublishSimple(SerializeJson(mqData))
return return
} }
......
...@@ -18,7 +18,7 @@ var addr2 = flag.String("addr", "127.0.0.1:11001", "http service address") ...@@ -18,7 +18,7 @@ var addr2 = flag.String("addr", "127.0.0.1:11001", "http service address")
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
for i := 1; i < 200; i++ { for i := 0; i < 1500; i++ {
go wsClient2(fmt.Sprintf("lA6fUNMamyUBlOokPOeiGg==_1_%d", i)) go wsClient2(fmt.Sprintf("lA6fUNMamyUBlOokPOeiGg==_1_%d", i))
} }
select {} select {}
......
...@@ -28,6 +28,9 @@ func main() { ...@@ -28,6 +28,9 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse() flag.Parse()
pool.RabbitMQ = mq.NewRabbitMQSimple("im")
fmt.Println("rabbitMq start success")
//初骀化连接池 //初骀化连接池
pool.InitWsPool(func(err interface{}) { pool.InitWsPool(func(err interface{}) {
//接收连接池中的运行时错误信息 //接收连接池中的运行时错误信息
...@@ -132,7 +135,7 @@ func ws(w http.ResponseWriter, r *http.Request) { ...@@ -132,7 +135,7 @@ func ws(w http.ResponseWriter, r *http.Request) {
client.OnClose(func() { client.OnClose(func() {
user := pool.GetClientInfoById(client.Id) user := pool.GetClientInfoById(client.Id)
closeMsg := &mq.SetMsgReq{} closeMsg := &pool.SetMsgReq{}
closeMsg.ProcedureType = 5 closeMsg.ProcedureType = 5
closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05") closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05")
closeMsg.Promoter = user.Promoter closeMsg.Promoter = user.Promoter
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment