From c87803cd474870c224b564b5bccbec5a8f3e59da Mon Sep 17 00:00:00 2001 From: haoyanbin <605649647@qq.com> Date: Wed, 21 Sep 2022 16:46:26 +0800 Subject: [PATCH] Mq --- dao/rabbitmq.go | 50 ++++++++++++++++++++++++++++++++++++++++ dao/simple.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 5 +++- go.sum | 2 ++ pool/client.go | 1 + pool/init.go | 19 +++++++++++++++ 6 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 dao/rabbitmq.go create mode 100644 dao/simple.go diff --git a/dao/rabbitmq.go b/dao/rabbitmq.go new file mode 100644 index 0000000..cd1bc37 --- /dev/null +++ b/dao/rabbitmq.go @@ -0,0 +1,50 @@ +package mq + +import ( + "fmt" + "github.com/streadway/amqp" +) + +var mqUrl = "amqp://admin:admin@127.0.0.1:5672/" + +// 闄や簡simple 妯″紡澶栥€佸叾浠栫殑妯″紡閮芥槸鐢� 闃熷垪 浜ゆ崲鏈� key 涓嶅悓缁勫悎瀹炵幇鐨� +type RabbitMQ struct { + conn *amqp.Connection + channel *amqp.Channel + QueueName string //闃熷垪 + Exchange string //浜ゆ崲鏈� + Key string //key + MQUrl string //杩炴帴淇℃伅 +} + +// 鍒涘缓RabbitMQ 瀹炰緥 +func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) { + rabbitMQ := &RabbitMQ{ + QueueName: queueName, + Exchange: exchange, + Key: key, + MQUrl: mqUrl, + } + + var err error + // dial mq + rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl) + rabbitMQ.failOnErr(err, "鍒涘缓杩炴帴閿欒") + if err != nil { + return nil, err + } + // get channel + rabbitMQ.channel, err = rabbitMQ.conn.Channel() + if err != nil { + return nil, err + } + + return rabbitMQ, nil +} + +// 閿欒澶勭悊 +func (r *RabbitMQ) failOnErr(err error, message string) { + if err != nil { + panic(fmt.Sprintf("%s:%s", err.Error(), message)) + } +} diff --git a/dao/simple.go b/dao/simple.go new file mode 100644 index 0000000..a8bd174 --- /dev/null +++ b/dao/simple.go @@ -0,0 +1,61 @@ +package mq + +import ( + "fmt" + "github.com/streadway/amqp" +) + +// Step 1. Simple 鍒涘缓瀹炰緥 +func NewRabbitMQSimple(queueName string) (*RabbitMQ) { + //鍦╯imple妯″紡涓� exchange and key 閮戒负绌� + rabbitMQ, err := newRabbitMQ(queueName, "", "") + if err != nil { + fmt.Println("NewRabbitMQSimple err:",err) + return nil + } + return rabbitMQ +} + +// Step 2. Simple producer code +func (r *RabbitMQ) PublishSimple(message string) error { + + // 2.1 鐢宠闃熷垪銆� 濡傛灉闃熷垪涓嶅瓨鍦ㄥ垯浼氳嚜鍔ㄥ垱寤恒€� 濡傛灉瀛樺湪鍒欒烦杩囧垱寤� + // 淇濊瘉闃熷垪瀛樺湪銆� 娑堟伅鑳藉彂閫佸埌闃熷垪涓� + _, err := r.channel.QueueDeclare( + r.QueueName, + //鏄惁鎸佷箙鍖� + true, + // 鏄惁鑷姩鍒犻櫎 + false, + // 鏄惁鍏锋湁鎺掍粬鎬� + false, + //鏄惁闃诲 + false, + // 棰濆灞炴€� + nil, + ) + if err != nil { + return err + } + + // 2.2 鍙戦€佹秷鎭埌闃熷垪涓� + err = r.channel.Publish( + r.Exchange, + r.QueueName, + // 濡傛灉涓簍rue 鏍规嵁exchange 绫诲瀷 鍜� routkey瑙勫垯銆� 濡傛灉鏃犳硶鎵惧埌绗﹀悎鏉′欢鐨勯槦鍒椼€侀偅涔堜細鎶婂彂閫佸畬鐨勬秷鎭繑鍥炵粰鍙戦€佽€� + false, + // 濡傛灉涓簍rue 褰揺xchange鍙戦€佹秷鎭� 鍒伴槦鍒楀悗鍙戠幇闃熷垪涓婃病鏈夌粦瀹氭秷璐硅€咃紝 鍒欎細鎶婃秷鎭繕缁� 鍙戦€佽€� + false, + amqp.Publishing{ + // 娑堟伅鎸佷箙鍖� + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(message), + }, + ) + if err != nil { + return err + } + + return nil +} diff --git a/go.mod b/go.mod index 9e8ca8f..dc1b770 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module pool go 1.16 -require github.com/gorilla/websocket v1.5.0 +require ( + github.com/gorilla/websocket v1.5.0 + github.com/streadway/amqp v1.0.0 +) diff --git a/go.sum b/go.sum index e5a03d4..f960ef6 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/pool/client.go b/pool/client.go index a19a53c..aa76d49 100644 --- a/pool/client.go +++ b/pool/client.go @@ -161,6 +161,7 @@ Loop: c.onError(errors.New("鎺ユ敹鏁版嵁ProtoBuf瑙f瀽澶辫触锛侊紒杩炴帴ID锛�" + c.Id + "鍘熷洜锛�" + err.Error())) break Loop } + rabbitMQ.PublishSimple(string(message)) c.readMessage(msg) } } diff --git a/pool/init.go b/pool/init.go index 4a1f4d9..4011952 100644 --- a/pool/init.go +++ b/pool/init.go @@ -1,6 +1,12 @@ package pool +import ( + "fmt" + "pool/dao" +) + var wsSever *Server +var rabbitMQ *mq.RabbitMQ //杩炴帴姹犵殑缁撴瀯浣� type Server struct { @@ -11,9 +17,22 @@ type Server struct { //鍒濆鍖栨墽琛岃繛鎺ユ睜瀵硅薄 //鍙傛暟涓烘帴鏀惰繛鎺ユ睜涓繍琛屾椂鐨勪竴浜涢敊璇俊鎭殑鍥炶皟鏂规硶 func InitWsPool(errfun func(err interface{})) { + InitRabbit() + wsSever = new(Server) wsSever.hub = newHub() wsSever.ErrFun = errfun go wsSever.hub.run() //寮€鍚湇鍔� //go wsSever.hub.ticker() //寮€鍚畾鏃舵湇鍔� } + +func InitRabbit(){ + //forever := make(chan bool) + + rabbitMQ = mq.NewRabbitMQSimple("im") + + fmt.Println("rabbitMq start success") + + // 鍦ㄦ病鏈夋秷鎭鐞嗗悗 杩涜闃诲 + //<-forever //涓嶈鍗忕▼缁堟 +} \ No newline at end of file -- 2.18.1