Commit c87803cd authored by haoyanbin's avatar haoyanbin

Mq

parent ff9ec19e
package mq
import (
"fmt"
"github.com/streadway/amqp"
)
var mqUrl = "amqp://admin:admin@127.0.0.1:5672/"
// 除了simple 模式外、其他的模式都是由 队列 交换机 key 不同组合实现的
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string //队列
Exchange string //交换机
Key string //key
MQUrl string //连接信息
}
// 创建RabbitMQ 实例
func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) {
rabbitMQ := &RabbitMQ{
QueueName: queueName,
Exchange: exchange,
Key: key,
MQUrl: mqUrl,
}
var err error
// dial mq
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
rabbitMQ.failOnErr(err, "创建连接错误")
if err != nil {
return nil, err
}
// get channel
rabbitMQ.channel, err = rabbitMQ.conn.Channel()
if err != nil {
return nil, err
}
return rabbitMQ, nil
}
// 错误处理
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
panic(fmt.Sprintf("%s:%s", err.Error(), message))
}
}
package mq
import (
"fmt"
"github.com/streadway/amqp"
)
// Step 1. Simple 创建实例
func NewRabbitMQSimple(queueName string) (*RabbitMQ) {
//在simple模式下 exchange and key 都为空
rabbitMQ, err := newRabbitMQ(queueName, "", "")
if err != nil {
fmt.Println("NewRabbitMQSimple err:",err)
return nil
}
return rabbitMQ
}
// Step 2. Simple producer code
func (r *RabbitMQ) PublishSimple(message string) error {
// 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
// 保证队列存在、 消息能发送到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
true,
// 是否自动删除
false,
// 是否具有排他性
false,
//是否阻塞
false,
// 额外属性
nil,
)
if err != nil {
return err
}
// 2.2 发送消息到队列中
err = r.channel.Publish(
r.Exchange,
r.QueueName,
// 如果为true 根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者
false,
// 如果为true 当exchange发送消息 到队列后发现队列上没有绑定消费者, 则会把消息还给 发送者
false,
amqp.Publishing{
// 消息持久化
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(message),
},
)
if err != nil {
return err
}
return nil
}
...@@ -2,4 +2,7 @@ module pool ...@@ -2,4 +2,7 @@ module pool
go 1.16 go 1.16
require github.com/gorilla/websocket v1.5.0 require (
github.com/gorilla/websocket v1.5.0
github.com/streadway/amqp v1.0.0
)
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
...@@ -161,6 +161,7 @@ Loop: ...@@ -161,6 +161,7 @@ Loop:
c.onError(errors.New("接收数据ProtoBuf解析失败!!连接ID:" + c.Id + "原因:" + err.Error())) c.onError(errors.New("接收数据ProtoBuf解析失败!!连接ID:" + c.Id + "原因:" + err.Error()))
break Loop break Loop
} }
rabbitMQ.PublishSimple(string(message))
c.readMessage(msg) c.readMessage(msg)
} }
} }
......
package pool package pool
import (
"fmt"
"pool/dao"
)
var wsSever *Server var wsSever *Server
var rabbitMQ *mq.RabbitMQ
//连接池的结构体 //连接池的结构体
type Server struct { type Server struct {
...@@ -11,9 +17,22 @@ type Server struct { ...@@ -11,9 +17,22 @@ type Server struct {
//初始化执行连接池对象 //初始化执行连接池对象
//参数为接收连接池中运行时的一些错误信息的回调方法 //参数为接收连接池中运行时的一些错误信息的回调方法
func InitWsPool(errfun func(err interface{})) { func InitWsPool(errfun func(err interface{})) {
InitRabbit()
wsSever = new(Server) wsSever = new(Server)
wsSever.hub = newHub() wsSever.hub = newHub()
wsSever.ErrFun = errfun wsSever.ErrFun = errfun
go wsSever.hub.run() //开启服务 go wsSever.hub.run() //开启服务
//go wsSever.hub.ticker() //开启定时服务 //go wsSever.hub.ticker() //开启定时服务
} }
func InitRabbit(){
//forever := make(chan bool)
rabbitMQ = mq.NewRabbitMQSimple("im")
fmt.Println("rabbitMq start success")
// 在没有消息处理后 进行阻塞
//<-forever //不让协程终止
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment