Commit 76c32f9d authored by haoyanbin's avatar haoyanbin

Redis

parent 84a8ba8f
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
) )
func (r *RabbitMQ) PublishSimple(message []byte) { func (r *RabbitMQ) PublishSimple(message []byte) {
err := r.channel.Publish( err := r.channel.Publish(
"", "",
r.QueueName, r.QueueName,
......
package redis
import (
"fmt"
"github.com/go-redis/redis"
)
const (
host = "39.96.85.45"
port = 6382
password = "saas123456"
db = 6
)
// 初始化连接
func Init() (client *redis.Client) {
client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
Password: password,
DB: db, // use default DB
})
_, err := client.Ping().Result()
if err != nil {
fmt.Println(err)
}
return
}
...@@ -3,6 +3,9 @@ module pool ...@@ -3,6 +3,9 @@ module pool
go 1.16 go 1.16
require ( require (
github.com/go-redis/redis v6.15.8+incompatible
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.23.0 // indirect
github.com/streadway/amqp v1.0.0 github.com/streadway/amqp v1.0.0
) )
This diff is collapsed.
...@@ -26,7 +26,7 @@ const ( ...@@ -26,7 +26,7 @@ const (
// //
closeWait = 60 * 30 * time.Second closeWait = 60 * 30 * time.Second
// //
endDate = 60 * 30 * time.Second endDate = 60 * 2 * time.Second
) )
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
...@@ -132,6 +132,7 @@ func (c *Client) readPump() { ...@@ -132,6 +132,7 @@ func (c *Client) readPump() {
close(c.sendPing) close(c.sendPing)
c.grpool.Close() c.grpool.Close()
c.hub.RemoveClient(c) c.hub.RemoveClient(c)
Redis.HDel("clients", c.Id)
dump() dump()
}() }()
Loop: Loop:
......
...@@ -2,11 +2,13 @@ package pool ...@@ -2,11 +2,13 @@ package pool
import ( import (
"fmt" "fmt"
"pool/dao" "github.com/go-redis/redis"
"pool/dao/mq"
) )
var wsSever *Server var wsSever *Server
var RabbitMQ *mq.RabbitMQ var RabbitMQ *mq.RabbitMQ
var Redis *redis.Client
//连接池的结构体 //连接池的结构体
type Server struct { type Server struct {
...@@ -22,15 +24,20 @@ func InitWsPool(errfun func(err interface{})) { ...@@ -22,15 +24,20 @@ func InitWsPool(errfun func(err interface{})) {
wsSever.ErrFun = errfun wsSever.ErrFun = errfun
go wsSever.hub.run() //开启服务 go wsSever.hub.run() //开启服务
//go wsSever.hub.ticker() //开启定时服务 //go wsSever.hub.ticker() //开启定时服务
initWsPoolData()
} }
func InitRabbit(){ func initWsPoolData() {
//forever := make(chan bool) clientsData := Redis.HGetAll("clients")
clientsAll := clientsData.Val()
//RabbitMQ = mq.NewRabbitMQSimple("ttt")
for _, v := range clientsAll {
fmt.Println("rabbitMq start success") if v == "" {
continue
// 在没有消息处理后 进行阻塞 }
//<-forever //不让协程终止 client := new(Config)
} UnserislizeJson([]byte(v), client)
\ No newline at end of file NewClient(client)
}
fmt.Println(wsSever.hub.clients)
}
...@@ -2,6 +2,7 @@ package pool ...@@ -2,6 +2,7 @@ package pool
import ( import (
"encoding/json" "encoding/json"
"fmt"
) )
...@@ -27,7 +28,8 @@ func searchStrArray(arr []string, ch string) bool { ...@@ -27,7 +28,8 @@ func searchStrArray(arr []string, ch string) bool {
} }
func SerializeJson(data interface{}) []byte { func SerializeJson(data interface{}) []byte {
reply, _ := json.Marshal(data) reply, err := json.Marshal(data)
fmt.Println(err)
return reply return reply
} }
......
...@@ -10,6 +10,8 @@ import ( ...@@ -10,6 +10,8 @@ import (
"time" "time"
) )
var toSendDataLock sync.Mutex
type SetMsgReq struct { type SetMsgReq struct {
ProcedureType int `json:"procedureType"` ProcedureType int `json:"procedureType"`
GroupId string `json:"groupId" db:"group_id"` GroupId string `json:"groupId" db:"group_id"`
...@@ -76,6 +78,8 @@ func NewClient(conf *Config) *Client { ...@@ -76,6 +78,8 @@ func NewClient(conf *Config) *Client {
client.OnPing(nil) client.OnPing(nil)
client.OnPong(nil) client.OnPong(nil)
wsSever.hub.AddClient(client) wsSever.hub.AddClient(client)
jsonData := string(SerializeJson(conf))
Redis.HSet("clients", client.Id, jsonData)
return client return client
} }
...@@ -429,7 +433,6 @@ func SaveMsg(msg *SendMsg) { ...@@ -429,7 +433,6 @@ func SaveMsg(msg *SendMsg) {
//删除连接信息 //删除连接信息
DelToSendData(msg.FromClientId, msg.ToClientId) DelToSendData(msg.FromClientId, msg.ToClientId)
DelToSendData(msg.ToClientId, msg.FromClientId)
} }
//离线 //离线
...@@ -469,7 +472,6 @@ func SaveMsg(msg *SendMsg) { ...@@ -469,7 +472,6 @@ func SaveMsg(msg *SendMsg) {
mqData.Finish = "5" mqData.Finish = "5"
//删除连接信息 //删除连接信息
DelToSendData(msg.FromClientId, msg.ToClientId) DelToSendData(msg.FromClientId, msg.ToClientId)
DelToSendData(msg.ToClientId, msg.FromClientId)
} }
if mqData.ProcedureType != 0 { if mqData.ProcedureType != 0 {
...@@ -516,14 +518,21 @@ func SetEnd() { ...@@ -516,14 +518,21 @@ func SetEnd() {
} }
SaveMsg(mqData) SaveMsg(mqData)
//发送结束会话给客户端 //发送结束会话给客户端
wsSever.hub.clients[clientsId].readMessage(mqData) _, isSet := wsSever.hub.clients[clientsId]
if isSet == true {
wsSever.hub.clients[clientsId].readMessage(mqData)
}
toMqData := &SendMsg{ toMqData := &SendMsg{
ProcedureType: 8, ProcedureType: 8,
ToClientId: clientsId, ToClientId: clientsId,
FromClientId: vToSendData.toSendId, FromClientId: vToSendData.toSendId,
SendTime: time.Now().Format("2006-01-02 15:04:05"), SendTime: time.Now().Format("2006-01-02 15:04:05"),
} }
wsSever.hub.clients[vToSendData.toSendId].readMessage(toMqData) _, isSet = wsSever.hub.clients[vToSendData.toSendId]
if isSet == true {
wsSever.hub.clients[vToSendData.toSendId].readMessage(toMqData)
}
} }
} }
} }
...@@ -532,15 +541,19 @@ func SetEnd() { ...@@ -532,15 +541,19 @@ func SetEnd() {
} }
func AppendToSendData(clientId, toClientId, sendTime string) { func AppendToSendData(clientId, toClientId, sendTime string) {
toSendData := ToSendData{toSendTime: sendTime, toSendId: clientId} sendData := ToSendData{toSendTime: sendTime, toSendId: clientId}
wsSever.hub.clients[toClientId].ToSendData = append(wsSever.hub.clients[toClientId].ToSendData, toSendData) toSendDataLock.Lock()
wsSever.hub.clients[toClientId].ToSendData = append(wsSever.hub.clients[toClientId].ToSendData, sendData)
toSendDataLock.Unlock()
return return
} }
func DelToSendData(clientId, toClientId string) { func DelToSendData(clientId, toClientId string) {
for k, v := range wsSever.hub.clients[clientId].ToSendData { for kclientId, vclientId := range wsSever.hub.clients[clientId].ToSendData {
if v.toSendId == toClientId { if vclientId.toSendId == toClientId {
wsSever.hub.clients[clientId].ToSendData = append(wsSever.hub.clients[clientId].ToSendData[:k], wsSever.hub.clients[clientId].ToSendData[k+1:]...) toSendDataLock.Lock()
wsSever.hub.clients[clientId].ToSendData = append(wsSever.hub.clients[clientId].ToSendData[:kclientId], wsSever.hub.clients[clientId].ToSendData[kclientId+1:]...)
toSendDataLock.Unlock()
} }
} }
return return
......
...@@ -18,7 +18,7 @@ var addr2 = flag.String("addr", "127.0.0.1:11001", "http service address") ...@@ -18,7 +18,7 @@ var addr2 = flag.String("addr", "127.0.0.1:11001", "http service address")
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
for i := 0; i < 1500; i++ { for i := 1; i < 5; i++ {
go wsClient2(fmt.Sprintf("lA6fUNMamyUBlOokPOeiGg==_1_%d", i)) go wsClient2(fmt.Sprintf("lA6fUNMamyUBlOokPOeiGg==_1_%d", i))
} }
select {} select {}
...@@ -56,28 +56,55 @@ func wsClient2(id string) { ...@@ -56,28 +56,55 @@ func wsClient2(id string) {
}) })
done := make(chan struct{}) done := make(chan struct{})
//t := grand.N(30, 90)
//go func() { msg1 := &pool.SendMsg{
// ticker1 := time.NewTicker(time.Duration(t) * time.Second) ToClientId: "",
// defer func() { FromClientId: id,
// ticker1.Stop() ProcedureType: 1,
// close(done) SendTime: time.Now().Format("2006-01-02 15:04:05"),
// }() Msg: "test" + time.Now().Format("2006-01-02 15:04:05"),
// for { }
// _, message, err := c.ReadMessage() m1 := pool.SerializeJson(msg1)
// if err != nil {
// log.Printf("read:%s", err.Error()) fmt.Println(1)
// return err = c.WriteMessage(websocket.BinaryMessage, m1)
// } if err != nil {
// log.Printf("recv: %s", message) fmt.Println("write:", err.Error())
// /*select { return
// case <-ticker1.C: }
//
// return time.Sleep(5*time.Second)
// }*/
// } msg2 := &pool.SendMsg{
// ToClientId: "",
//}() FromClientId: id,
ProcedureType: 2,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
Msg: "{\"petName\":\"gogogo\",\"petAge\":\"1.1\",\"question\":\"eye\"}",
}
m2 := pool.SerializeJson(msg2)
fmt.Println(2)
err = c.WriteMessage(websocket.BinaryMessage, m2)
if err != nil {
fmt.Println("write:", err.Error())
return
}
time.Sleep(5*time.Second)
msg3 := &pool.SendMsg{
ToClientId: id,
FromClientId: "6_2_14",
ProcedureType: 3,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
}
m3 := pool.SerializeJson(msg3)
fmt.Println(3)
err = c.WriteMessage(websocket.BinaryMessage, m3)
if err != nil {
fmt.Println("write:", err.Error())
return
}
ticker := time.NewTicker(10 * time.Second) ticker := time.NewTicker(10 * time.Second)
ticker1 := time.NewTicker(20 * time.Second) ticker1 := time.NewTicker(20 * time.Second)
...@@ -92,24 +119,24 @@ func wsClient2(id string) { ...@@ -92,24 +119,24 @@ func wsClient2(id string) {
ToClientId: "6_2_14", ToClientId: "6_2_14",
FromClientId: id, FromClientId: id,
ProcedureType: 6, ProcedureType: 6,
SendTime: time.Now().Format("2006-01-02 15:04:05"),
Msg: "test" + time.Now().Format("2006-01-02 15:04:05"), Msg: "test" + time.Now().Format("2006-01-02 15:04:05"),
//Channel: chann,
} }
m := pool.SerializeJson(msg) m := pool.SerializeJson(msg)
fmt.Println(msg) fmt.Println(6)
err := c.WriteMessage(websocket.BinaryMessage, m) err := c.WriteMessage(websocket.BinaryMessage, m)
if err != nil { if err != nil {
log.Printf("write:%s", err.Error()) fmt.Println("write:", err.Error())
return return
} }
case <-interrupt: case <-interrupt:
log.Printf("interrupt") fmt.Println("interrupt")
// Cleanly close the connection by sending a close message and then // Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection. // waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil { if err != nil {
log.Printf("write close:%s", err.Error()) fmt.Println("write close:", err.Error())
return return
} }
select { select {
...@@ -120,13 +147,13 @@ func wsClient2(id string) { ...@@ -120,13 +147,13 @@ func wsClient2(id string) {
case <-ping: case <-ping:
err := c.WriteMessage(websocket.PongMessage, nil) err := c.WriteMessage(websocket.PongMessage, nil)
if err != nil { if err != nil {
log.Printf("write pong:%s", err.Error()) fmt.Println("write pong:", err.Error())
return return
} }
case <-ticker1.C: case <-ticker1.C:
err := c.WriteMessage(websocket.PingMessage, nil) err := c.WriteMessage(websocket.PingMessage, nil)
if err != nil { if err != nil {
log.Printf("write pong:%s", err.Error()) fmt.Println("write pong:", err.Error())
return return
} }
//return //return
......
...@@ -5,7 +5,8 @@ import ( ...@@ -5,7 +5,8 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
mq "pool/dao" "pool/dao/mq"
"pool/dao/redis"
"pool/pool" "pool/pool"
"runtime" "runtime"
"time" "time"
...@@ -31,6 +32,10 @@ func main() { ...@@ -31,6 +32,10 @@ func main() {
pool.RabbitMQ = mq.NewRabbitMQSimple("im") pool.RabbitMQ = mq.NewRabbitMQSimple("im")
fmt.Println("rabbitMq start success") fmt.Println("rabbitMq start success")
pool.Redis = redis.Init()
fmt.Println("redis start success")
//初骀化连接池 //初骀化连接池
pool.InitWsPool(func(err interface{}) { pool.InitWsPool(func(err interface{}) {
//接收连接池中的运行时错误信息 //接收连接池中的运行时错误信息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment