Commit ff9ec19e authored by haoyanbin's avatar haoyanbin

first

parents
Pipeline #347 failed with stages
package main
import (
"flag"
"github.com/gorilla/websocket"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"pool/pool"
"time"
)
var addr = flag.String("addr", "localhost:8080", "http service address")
func main() {
go wsClient("1001_1_2")
select {}
}
func wsClient(id string) {
flag.Parse()
log.SetFlags(0)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"}
log.Printf("connecting to %s", u.String())
head := http.Header{}
log.Printf("connecting info: %s", id)
head.Add("Sec-Websocket-Protocol", id)
c, _, err := websocket.DefaultDialer.Dial(u.String(), head)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
ping := make(chan int)
c.SetPingHandler(func(appData string) error {
ping <- 1
return nil
})
done := make(chan struct{})
go func() {
ticker1 := time.NewTicker(10 * time.Second)
defer func() {
ticker1.Stop()
close(done)
}()
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Printf("read:%s", err.Error())
return
}
log.Printf("recv: %s", message)
select {
case <-ticker1.C:
return
}
}
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-done:
//重新连接
go wsClient(id)
return
case <-ticker.C:
msg := &pool.SendMsg{
ToClientId: "12",
FromClientId: "13",
Msg: "test" + time.Now().String(),
Channel: []string{"1", "2"},
}
//m,_:=proto.Marshal(msg)
m := pool.SerializeJson(msg)
err := c.WriteMessage(websocket.BinaryMessage, m)
if err != nil {
log.Printf("write:%s", err.Error())
return
}
case <-interrupt:
log.Printf("interrupt")
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Printf("write close:%s", err.Error())
return
}
select {
case <-done:
case <-time.After(time.Second):
}
return
case <-ping:
err := c.WriteMessage(websocket.PongMessage, nil)
if err != nil {
log.Printf("write pong:%s", err.Error())
return
}
}
}
}
module pool
go 1.16
require github.com/gorilla/websocket v1.5.0
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
This diff is collapsed.
package pool
import (
"container/list"
"errors"
)
// Goroutine Pool
type Pool struct {
limit int // Max goroutine count limit.
count int // Current running goroutine count.
list *list.List // Job list for asynchronous job adding purpose.
closed bool // Is pool closed or not.
wJobsChan chan func() //写工作方法
rJobsChan chan func() //读工作方法
}
// New creates and returns a new goroutine pool object.
// The parameter <limit> is used to limit the max goroutine count,
// which is not limited in default.
func New(limit ...int) *Pool {
p := &Pool{
limit: -1,
count: 0,
list: list.New(),
closed: false,
wJobsChan:make(chan func()),
rJobsChan:make(chan func()),
}
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
}
//开始工作
go p.runWrite()
go p.runRead()
return p
}
/*
// Default goroutine pool.
var pool = New()
// Add pushes a new job to the pool using default goroutine pool.
// The job will be executed asynchronously.
func Add(f func()) error {
return pool.Add(f)
}
// Size returns current goroutine count of default goroutine pool.
func Size() int {
return pool.Size()
}
// Jobs returns current job count of default goroutine pool.
func Jobs() int {
return pool.Jobs()
}
*/
func (p *Pool) runWrite(){
for !p.closed {
select {
case f,ok:=<-p.wJobsChan:
if !ok {
break
}
p.list.PushFront(f)
}
}
}
func (p *Pool) runRead(){
for !p.closed {
if job := p.list.Back(); job != nil {
value := p.list.Remove(job)
p.rJobsChan<-value.(func())
} else {
return
}
}
}
// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(f func()) error {
for p.closed{
return errors.New("pool closed")
}
p.wJobsChan<-f
var n int
n = p.count
if p.limit != -1 && n >= p.limit {
return nil
}
p.count=n+1
p.fork()
return nil
}
// Cap returns the capacity of the pool.
// This capacity is defined when pool is created.
// If it returns -1 means no limit.
func (p *Pool) Cap() int {
return p.limit
}
// Size returns current goroutine count of the pool.
func (p *Pool) Size() int {
return p.count
}
// Jobs returns current job count of the pool.
func (p *Pool) Jobs() int {
return p.list.Len()
}
// fork creates a new goroutine pool.
func (p *Pool) fork() {
go func() {
defer func() {
p.count--
}()
for !p.closed {
select {
case job,ok:=<-p.rJobsChan:
if !ok {
break
}
job()
}
}
}()
}
// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed
}
// Close closes the goroutine pool, which makes all goroutines exit.
func (p *Pool) Close() {
p.closed=true
close(p.wJobsChan)
close(p.rJobsChan)
}
package pool
import (
"errors"
"fmt"
"pool/pool/util/queue"
"time"
)
var (
//最大连接池缓冲处理连接对像管道长度
Max_client_channel_len = 10240
//最大全局广播缓冲处理管道长度
Max_broadcastQueue_len = 4096
//最大频道广播缓冲处理管道长度
Max_chanBroadcastQueue_len = 4096
//最大接收消息缓冲处理管道长度
Max_recvCh_len = 10240
//最大发送消息缓冲处理管道长度
Max_sendCh_len = 10240
)
// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
// Registered clients.
clients map[string]*Client// //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients map[string]*Client //缓存断开的连接消息队列
// Inbound messages from the clients.
//可以用于广播所有连接对象
broadcastQueue chan *SendMsg
//广播指定频道的管道
chanBroadcastQueue chan *SendMsg
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan string
}
//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
list *queue.PriorityQueue
Expiration time.Time //过期时间
}
func newHub() *hub {
return &hub{
register: make(chan *Client, Max_client_channel_len),
unregister: make(chan string, Max_client_channel_len),
clients: make(map[string]*Client),
oldClients: make(map[string]*Client),
broadcastQueue: make(chan *SendMsg, Max_broadcastQueue_len),
chanBroadcastQueue: make(chan *SendMsg, Max_chanBroadcastQueue_len),
}
}
func (h *hub) run() {
loop:
for {
select {
case id, ok := <-h.unregister:
if !ok {
break loop
}
c,_ := h.clients[id]
if c != nil {
delete(h.clients,id)
}
fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))
case client, ok := <-h.register:
if !ok {
break loop
}
fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients))
h.clients[client.Id] = client
fmt.Println(h.clients)
case broadcastMsg, ok := <-h.broadcastQueue:
if !ok {
break loop
}
for k,v := range h.clients{
if v != nil {
client := v
broadcastMsg.ToClientId = k
client.send(broadcastMsg)
}
}
case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
if !ok {
break loop
}
//广播指定频道的消息处理
//h.clients.Iterator(func(id string, v interface{}) bool {
for k, v := range h.clients {
if v != nil {
client := v
for _, ch := range chanBroadcastMsg.Channel {
if searchStrArray(client.channel, ch) {
chanBroadcastMsg.ToClientId = k
client.send(chanBroadcastMsg)
}
}
}
}
// return true
//})
}
}
}
func (h *hub) ticker() {
//定时清理清理缓存的旧的连接对像
//gtimer.AddSingleton(30*time.Second, func() {
// if len(h.oldClients) > 0 {
// for _, v := range h.oldClients {
// //h.oldClients.Iterator(func(k string, v interface{}) bool {
// if v != nil {
// client := v
// if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
// //3分钟后清理组存中的旧连接对像
// h.clearOldClient(client)
// /// h.clearOldClient <- client
// }
// }
// // return true
// //})
// }
// }
//})
}
func (h *hub) AddClient(client *Client) error {
timeout := time.NewTimer(time.Second * 3)
defer timeout.Stop()
select {
case h.register <- client:
return nil
case <-timeout.C:
return errors.New("AddClient register消息管道blocked,写入消息超时")
}
}
func (h *hub) clearOldClient(client *Client) {
close(client.recvCh)
close(client.sendCh)
delete(h.oldClients,client.Id)
fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
}
func (h *hub) RemoveClient(client *Client) error {
//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
//client.CloseTime = time.Now()
//h.oldClients.Set(client.Id, client)
timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop()
select {
case h.unregister <- client.Id:
return nil
case <-timeout.C:
return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
}
}
package pool
var wsSever *Server
//连接池的结构体
type Server struct {
hub *hub
ErrFun func(err interface{}) //用于接收ws连接池内代码运行时错误信息
}
//初始化执行连接池对象
//参数为接收连接池中运行时的一些错误信息的回调方法
func InitWsPool(errfun func(err interface{})) {
wsSever = new(Server)
wsSever.hub = newHub()
wsSever.ErrFun = errfun
go wsSever.hub.run() //开启服务
//go wsSever.hub.ticker() //开启定时服务
}
package pool
import (
"encoding/json"
)
//client连接对象的私有方法
func dump() {
rcv_err := recover()
if rcv_err == nil {
return
}
wsSever.ErrFun(rcv_err)
//log.Error("运行过程中出现异常,错误信息如下:",rcv_err)
}
func searchStrArray(arr []string, ch string) bool {
result := -1
for index, v := range arr {
if v == ch {
result = index
break
}
}
return result != -1
}
func SerializeJson(data interface{}) []byte {
reply, _ := json.Marshal(data)
return reply
}
func UnserislizeJson(str []byte, data interface{}) {
_ = json.Unmarshal(str, data)
}
\ No newline at end of file
package pool
import (
"errors"
"fmt"
"net/http"
"pool/pool/util/grpool"
"sync"
"time"
)
// 第一步,实例化连接对像
func NewClient(conf *Config) *Client {
if conf.Goroutine < 5 {
conf.Goroutine = 200
}
var client *Client
oldclient := wsSever.hub.oldClients[conf.Id]
delete(wsSever.hub.oldClients,conf.Id)
if oldclient != nil {
c := oldclient
client = c
} else {
client = &Client{
Id: conf.Id,
types: conf.Type,
hub: wsSever.hub,
sendCh: make(chan *SendMsg, Max_recvCh_len),
recvCh: make(chan *SendMsg, Max_sendCh_len),
mux: new(sync.Mutex),
}
}
client.recvPing = make(chan int)
client.sendPing = make(chan int)
client.channel = conf.Channel
client.grpool = grpool.NewPool(conf.Goroutine)
client.IsClose = make(chan bool)
client.OnError(nil)
client.OnOpen(nil)
client.OnMessage(nil)
client.OnClose(nil)
client.OnPing(nil)
client.OnPong(nil)
wsSever.hub.AddClient(client)
return client
}
// 开启连接
// serveWs handles websocket requests from the peer.
func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.Header) {
defer dump()
conn, err := upgrader.Upgrade(w, r, head)
if err != nil {
if c.onError != nil {
c.onError(err)
}
return
}
r.Close = true
c.conn = conn
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker = time.NewTimer(pingPeriod)
c.conn.SetPongHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
fmt.Println("收到pong---", c.Id, str)
c.pingPeriodTicker.Reset(pingPeriod)
c.onPong()
return nil
})
c.conn.SetPingHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker.Reset(pingPeriod)
fmt.Println("收到ping---", c.Id, str)
c.recvPing <- 1
//if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
// c.onError(errors.New("回复客户端PongMessage出现异常:"+err.Error()))
//}
c.onPing()
return nil
})
//c.conn.SetCloseHandler(func(code int, str string) error {
// //收到客户端连接关闭时的回调
// glog.Error("连接ID:"+c.Id,"SetCloseHandler接收到连接关闭状态:",code,"关闭信息:",str)
// return nil
//})
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
//连接开启后瑞添加连接池中
c.openTime = time.Now()
c.grpool.Add(c.writePump)
c.grpool.Add(c.readPump)
//开始接收消息
c.grpool.Add(c.recvMessage)
c.grpool.Add(c.Tickers)
c.onOpen()
}
//获取连接对像运行过程中的信息
func (c *Client) GetRuntimeInfo() *RuntimeInfo {
return &RuntimeInfo{
Id: c.Id,
Type: c.types,
Channel: c.channel,
OpenTime: c.openTime,
LastReceiveTime: c.lastSendTime,
LastSendTime: c.lastSendTime,
Ip: c.conn.RemoteAddr().String(),
}
}
//回调添加方法
//监听连接对象的连接open成功的事件
func (c *Client) OnOpen(h func()) {
if h == nil {
c.onOpen = func() {
}
return
}
c.onOpen = h
}
func (c *Client) OnPing(h func()) {
if h == nil {
c.onPing = func() {
}
return
}
c.onPing = h
}
func (c *Client) OnPong(h func()) {
if h == nil {
c.onPong = func() {
}
return
}
c.onPong = h
}
//监听连接对象的连接open成功的事件
func (c *Client) OnMessage(h func(msg *SendMsg)) {
if h == nil {
c.onMessage = func(msg *SendMsg) {
}
return
}
c.onMessage = h
}
//监听连接对象的连接open成功的事件
func (c *Client) OnClose(h func()) {
if h == nil {
c.onClose = func() {
}
return
}
c.onClose = h
}
//监听连接对象的错误信息
func (c *Client) OnError(h func(err error)) {
if h == nil {
c.onError = func(err error) {
}
return
}
c.onError = h
}
// 单个连接发送消息
func (c *Client) Send(msg *SendMsg) error {
select {
case <-c.IsClose:
return errors.New("连接ID:" + c.Id + "ws连接己经断开,无法发送消息")
default:
msg.ToClientId = c.Id
c.send(msg)
}
return nil
}
//服务主动关闭连接
func (c *Client) Close() {
c.close()
}
// 包级的公开方法
// 所有包级的发送如果连接断开,消息会丢失
// 发送消息 只从连接池中按指定的toClientId的连接对象发送出消息
// 在此方法中sendMsg.Channel指定的值不会处理
func Send(msg *SendMsg) error {
//log.Info("发送指令:",msg.Cmd,msg.ToClientId)
if msg.ToClientId == "" {
return errors.New("发送消息的消息体中未指定ToClient目标!")
}
c := wsSever.hub.clients[msg.ToClientId]
if c != nil {
client := c
if client.Id == msg.ToClientId {
msg.ToClientId = client.Id
err := client.Send(msg)
if err != nil {
return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + client.Id + "。")
}
}
}
return nil
}
// 通过连接池广播消息,每次广播只能指定一个类型下的一个频道
// 广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像
// 并且只针对频道内的连接进行处理
func Broadcast(msg *SendMsg) error {
if len(msg.Channel) == 0 {
return errors.New("广播消息的消息体中未指定Channel频道!")
}
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case wsSever.hub.chanBroadcastQueue <- msg:
return nil
case <-timeout.C:
return errors.New("hub.chanBroadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.chanBroadcastQueue)))
}
}
//全局广播
//广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像
//通过此方法进行广播的消息体,会对所有的类型和频道都进行广播
func BroadcastAll(msg *SendMsg) error {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case wsSever.hub.broadcastQueue <- msg:
return nil
case <-timeout.C:
return errors.New("hub.broadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.broadcastQueue)))
}
}
package grpool
import (
"sync/atomic"
)
type Bool struct {
value int32
}
var (
bytesTrue = []byte("true")
bytesFalse = []byte("false")
)
// NewBool returns a concurrent-safe object for bool type,
// with given initial value <value>.
func NewBool(value ...bool) *Bool {
t := &Bool{}
if len(value) > 0 {
if value[0] {
t.value = 1
} else {
t.value = 0
}
}
return t
}
// Clone clones and returns a new concurrent-safe object for bool type.
func (v *Bool) Clone() *Bool {
return NewBool(v.Val())
}
// Set atomically stores <value> into t.value and returns the previous value of t.value.
func (v *Bool) Set(value bool) (old bool) {
if value {
old = atomic.SwapInt32(&v.value, 1) == 1
} else {
old = atomic.SwapInt32(&v.value, 0) == 1
}
return
}
// Val atomically loads t.valueue.
func (v *Bool) Val() bool {
return atomic.LoadInt32(&v.value) > 0
}
// Cas executes the compare-and-swap operation for value.
func (v *Bool) Cas(old, new bool) bool {
var oldInt32, newInt32 int32
if old {
oldInt32 = 1
}
if new {
newInt32 = 1
}
return atomic.CompareAndSwapInt32(&v.value, oldInt32, newInt32)
}
// String implements String interface for string printing.
func (v *Bool) String() string {
if v.Val() {
return "true"
}
return "false"
}
\ No newline at end of file
package grpool
import (
"container/list"
"encoding/json"
)
type (
List struct {
mu *RWMutex
list *list.List
}
Element = list.Element
)
// New creates and returns a new empty doubly linked list.
func NewList(safe ...bool) *List {
return &List{
mu: NewRWMutex(safe...),
list: list.New(),
}
}
// NewFrom creates and returns a list from a copy of given slice <array>.
// The parameter <safe> used to specify whether using list in concurrent-safety,
// which is false in default.
func NewFrom(array []interface{}, safe ...bool) *List {
l := list.New()
for _, v := range array {
l.PushBack(v)
}
return &List{
mu: NewRWMutex(safe...),
list: l,
}
}
// PushFront inserts a new element <e> with value <v> at the front of list <l> and returns <e>.
func (l *List) PushFront(v interface{}) (e *Element) {
l.mu.Lock()
e = l.list.PushFront(v)
l.mu.Unlock()
return
}
// PushBack inserts a new element <e> with value <v> at the back of list <l> and returns <e>.
func (l *List) PushBack(v interface{}) (e *Element) {
l.mu.Lock()
e = l.list.PushBack(v)
l.mu.Unlock()
return
}
// PushFronts inserts multiple new elements with values <values> at the front of list <l>.
func (l *List) PushFronts(values []interface{}) {
l.mu.Lock()
for _, v := range values {
l.list.PushFront(v)
}
l.mu.Unlock()
}
// PushBacks inserts multiple new elements with values <values> at the back of list <l>.
func (l *List) PushBacks(values []interface{}) {
l.mu.Lock()
for _, v := range values {
l.list.PushBack(v)
}
l.mu.Unlock()
}
// PopBack removes the element from back of <l> and returns the value of the element.
func (l *List) PopBack() (value interface{}) {
l.mu.Lock()
if e := l.list.Back(); e != nil {
value = l.list.Remove(e)
}
l.mu.Unlock()
return
}
// PopFront removes the element from front of <l> and returns the value of the element.
func (l *List) PopFront() (value interface{}) {
l.mu.Lock()
if e := l.list.Front(); e != nil {
value = l.list.Remove(e)
}
l.mu.Unlock()
return
}
// PopBacks removes <max> elements from back of <l>
// and returns values of the removed elements as slice.
func (l *List) PopBacks(max int) (values []interface{}) {
l.mu.Lock()
length := l.list.Len()
if length > 0 {
if max > 0 && max < length {
length = max
}
values = make([]interface{}, length)
for i := 0; i < length; i++ {
values[i] = l.list.Remove(l.list.Back())
}
}
l.mu.Unlock()
return
}
// PopFronts removes <max> elements from front of <l>
// and returns values of the removed elements as slice.
func (l *List) PopFronts(max int) (values []interface{}) {
l.mu.Lock()
length := l.list.Len()
if length > 0 {
if max > 0 && max < length {
length = max
}
values = make([]interface{}, length)
for i := 0; i < length; i++ {
values[i] = l.list.Remove(l.list.Front())
}
}
l.mu.Unlock()
return
}
// PopBackAll removes all elements from back of <l>
// and returns values of the removed elements as slice.
func (l *List) PopBackAll() []interface{} {
return l.PopBacks(-1)
}
// PopFrontAll removes all elements from front of <l>
// and returns values of the removed elements as slice.
func (l *List) PopFrontAll() []interface{} {
return l.PopFronts(-1)
}
// FrontAll copies and returns values of all elements from front of <l> as slice.
func (l *List) FrontAll() (values []interface{}) {
l.mu.RLock()
length := l.list.Len()
if length > 0 {
values = make([]interface{}, length)
for i, e := 0, l.list.Front(); i < length; i, e = i+1, e.Next() {
values[i] = e.Value
}
}
l.mu.RUnlock()
return
}
// BackAll copies and returns values of all elements from back of <l> as slice.
func (l *List) BackAll() (values []interface{}) {
l.mu.RLock()
length := l.list.Len()
if length > 0 {
values = make([]interface{}, length)
for i, e := 0, l.list.Back(); i < length; i, e = i+1, e.Prev() {
values[i] = e.Value
}
}
l.mu.RUnlock()
return
}
// FrontValue returns value of the first element of <l> or nil if the list is empty.
func (l *List) FrontValue() (value interface{}) {
l.mu.RLock()
if e := l.list.Front(); e != nil {
value = e.Value
}
l.mu.RUnlock()
return
}
// BackValue returns value of the last element of <l> or nil if the list is empty.
func (l *List) BackValue() (value interface{}) {
l.mu.RLock()
if e := l.list.Back(); e != nil {
value = e.Value
}
l.mu.RUnlock()
return
}
// Front returns the first element of list <l> or nil if the list is empty.
func (l *List) Front() (e *Element) {
l.mu.RLock()
e = l.list.Front()
l.mu.RUnlock()
return
}
// Back returns the last element of list <l> or nil if the list is empty.
func (l *List) Back() (e *Element) {
l.mu.RLock()
e = l.list.Back()
l.mu.RUnlock()
return
}
// Len returns the number of elements of list <l>.
// The complexity is O(1).
func (l *List) Len() (length int) {
l.mu.RLock()
length = l.list.Len()
l.mu.RUnlock()
return
}
// Size is alias of Len.
func (l *List) Size() int {
return l.Len()
}
// MoveBefore moves element <e> to its new position before <p>.
// If <e> or <p> is not an element of <l>, or <e> == <p>, the list is not modified.
// The element and <p> must not be nil.
func (l *List) MoveBefore(e, p *Element) {
l.mu.Lock()
l.list.MoveBefore(e, p)
l.mu.Unlock()
}
// MoveAfter moves element <e> to its new position after <p>.
// If <e> or <p> is not an element of <l>, or <e> == <p>, the list is not modified.
// The element and <p> must not be nil.
func (l *List) MoveAfter(e, p *Element) {
l.mu.Lock()
l.list.MoveAfter(e, p)
l.mu.Unlock()
}
// MoveToFront moves element <e> to the front of list <l>.
// If <e> is not an element of <l>, the list is not modified.
// The element must not be nil.
func (l *List) MoveToFront(e *Element) {
l.mu.Lock()
l.list.MoveToFront(e)
l.mu.Unlock()
}
// MoveToBack moves element <e> to the back of list <l>.
// If <e> is not an element of <l>, the list is not modified.
// The element must not be nil.
func (l *List) MoveToBack(e *Element) {
l.mu.Lock()
l.list.MoveToBack(e)
l.mu.Unlock()
}
// PushBackList inserts a copy of an other list at the back of list <l>.
// The lists <l> and <other> may be the same, but they must not be nil.
func (l *List) PushBackList(other *List) {
if l != other {
other.mu.RLock()
defer other.mu.RUnlock()
}
l.mu.Lock()
l.list.PushBackList(other.list)
l.mu.Unlock()
}
// PushFrontList inserts a copy of an other list at the front of list <l>.
// The lists <l> and <other> may be the same, but they must not be nil.
func (l *List) PushFrontList(other *List) {
if l != other {
other.mu.RLock()
defer other.mu.RUnlock()
}
l.mu.Lock()
l.list.PushFrontList(other.list)
l.mu.Unlock()
}
// InsertAfter inserts a new element <e> with value <v> immediately after <p> and returns <e>.
// If <p> is not an element of <l>, the list is not modified.
// The <p> must not be nil.
func (l *List) InsertAfter(p *Element, v interface{}) (e *Element) {
l.mu.Lock()
e = l.list.InsertAfter(v, p)
l.mu.Unlock()
return
}
// InsertBefore inserts a new element <e> with value <v> immediately before <p> and returns <e>.
// If <p> is not an element of <l>, the list is not modified.
// The <p> must not be nil.
func (l *List) InsertBefore(p *Element, v interface{}) (e *Element) {
l.mu.Lock()
e = l.list.InsertBefore(v, p)
l.mu.Unlock()
return
}
// Remove removes <e> from <l> if <e> is an element of list <l>.
// It returns the element value e.Value.
// The element must not be nil.
func (l *List) Remove(e *Element) (value interface{}) {
l.mu.Lock()
value = l.list.Remove(e)
l.mu.Unlock()
return
}
// Removes removes multiple elements <es> from <l> if <es> are elements of list <l>.
func (l *List) Removes(es []*Element) {
l.mu.Lock()
for _, e := range es {
l.list.Remove(e)
}
l.mu.Unlock()
return
}
// RemoveAll removes all elements from list <l>.
func (l *List) RemoveAll() {
l.mu.Lock()
l.list = list.New()
l.mu.Unlock()
}
// See RemoveAll().
func (l *List) Clear() {
l.RemoveAll()
}
// RLockFunc locks reading with given callback function <f> within RWMutex.RLock.
func (l *List) RLockFunc(f func(list *list.List)) {
l.mu.RLock()
defer l.mu.RUnlock()
f(l.list)
}
// LockFunc locks writing with given callback function <f> within RWMutex.Lock.
func (l *List) LockFunc(f func(list *list.List)) {
l.mu.Lock()
defer l.mu.Unlock()
f(l.list)
}
// Iterator is alias of IteratorAsc.
func (l *List) Iterator(f func(e *Element) bool) {
l.IteratorAsc(f)
}
// IteratorAsc iterates the list in ascending order with given callback function <f>.
// If <f> returns true, then it continues iterating; or false to stop.
func (l *List) IteratorAsc(f func(e *Element) bool) {
l.mu.RLock()
length := l.list.Len()
if length > 0 {
for i, e := 0, l.list.Front(); i < length; i, e = i+1, e.Next() {
if !f(e) {
break
}
}
}
l.mu.RUnlock()
}
// IteratorDesc iterates the list in descending order with given callback function <f>.
// If <f> returns true, then it continues iterating; or false to stop.
func (l *List) IteratorDesc(f func(e *Element) bool) {
l.mu.RLock()
length := l.list.Len()
if length > 0 {
for i, e := 0, l.list.Back(); i < length; i, e = i+1, e.Prev() {
if !f(e) {
break
}
}
}
l.mu.RUnlock()
}
// MarshalJSON implements the interface MarshalJSON for json.Marshal.
func (l *List) MarshalJSON() ([]byte, error) {
return json.Marshal(l.FrontAll())
}
// UnmarshalJSON implements the interface UnmarshalJSON for json.Unmarshal.
func (l *List) UnmarshalJSON(b []byte) error {
if l.mu == nil {
l.mu = NewRWMutex()
l.list = list.New()
}
l.mu.Lock()
defer l.mu.Unlock()
var array []interface{}
if err := json.Unmarshal(b, &array); err != nil {
return err
}
l.PushBacks(array)
return nil
}
package grpool
import (
"errors"
)
// Goroutine Pool
type Pool struct {
limit int // Max goroutine count limit.
count *Int // Current running goroutine count.
list *List // Job list for asynchronous job adding purpose.
closed *Bool // Is pool closed or not.
}
// Default goroutine pool.
var pool = NewPool()
// New creates and returns a new goroutine pool object.
// The parameter <limit> is used to limit the max goroutine count,
// which is not limited in default.
func NewPool(limit ...int) *Pool {
p := &Pool{
limit: -1,
count: NewInt(),
list: NewList(true),
closed: NewBool(),
}
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
}
return p
}
// Add pushes a new job to the pool using default goroutine pool.
// The job will be executed asynchronously.
func Add(f func()) error {
return pool.Add(f)
}
// Size returns current goroutine count of default goroutine pool.
func Size() int {
return pool.Size()
}
// Jobs returns current job count of default goroutine pool.
func Jobs() int {
return pool.Jobs()
}
// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(f func()) error {
for p.closed.Val() {
return errors.New("pool closed")
}
p.list.PushFront(f)
var n int
for {
n = p.count.Val()
if p.limit != -1 && n >= p.limit {
return nil
}
if p.count.Cas(n, n+1) {
break
}
}
p.fork()
return nil
}
// Cap returns the capacity of the pool.
// This capacity is defined when pool is created.
// If it returns -1 means no limit.
func (p *Pool) Cap() int {
return p.limit
}
// Size returns current goroutine count of the pool.
func (p *Pool) Size() int {
return p.count.Val()
}
// Jobs returns current job count of the pool.
func (p *Pool) Jobs() int {
return p.list.Size()
}
// fork creates a new goroutine pool.
func (p *Pool) fork() {
go func() {
defer p.count.Add(-1)
job := (interface{})(nil)
for !p.closed.Val() {
if job = p.list.PopBack(); job != nil {
job.(func())()
} else {
return
}
}
}()
}
// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed.Val()
}
// Close closes the goroutine pool, which makes all goroutines exit.
func (p *Pool) Close() {
p.closed.Set(true)
}
package grpool
import (
"strconv"
"sync/atomic"
)
type Int struct {
value int64
}
// NewInt returns a concurrent-safe object for int type,
// with given initial value <value>.
func NewInt(value ...int) *Int {
if len(value) > 0 {
return &Int{
value: int64(value[0]),
}
}
return &Int{}
}
// Clone clones and returns a new concurrent-safe object for int type.
func (v *Int) Clone() *Int {
return NewInt(v.Val())
}
// Set atomically stores <value> into t.value and returns the previous value of t.value.
func (v *Int) Set(value int) (old int) {
return int(atomic.SwapInt64(&v.value, int64(value)))
}
// Val atomically loads t.value.
func (v *Int) Val() int {
return int(atomic.LoadInt64(&v.value))
}
// Add atomically adds <delta> to t.value and returns the new value.
func (v *Int) Add(delta int) (new int) {
return int(atomic.AddInt64(&v.value, int64(delta)))
}
// Cas executes the compare-and-swap operation for value.
func (v *Int) Cas(old, new int) bool {
return atomic.CompareAndSwapInt64(&v.value, int64(old), int64(new))
}
// String implements String interface for string printing.
func (v *Int) String() string {
return strconv.Itoa(v.Val())
}
package grpool
import "sync"
// RWMutex is a sync.RWMutex with a switch of concurrent safe feature.
type RWMutex struct {
sync.RWMutex
safe bool
}
// New creates and returns a new *RWMutex.
// The parameter <safe> is used to specify whether using this mutex in concurrent-safety,
// which is false in default.
func NewRWMutex(safe ...bool) *RWMutex {
mu := new(RWMutex)
if len(safe) > 0 {
mu.safe = safe[0]
} else {
mu.safe = false
}
return mu
}
func (mu *RWMutex) IsSafe() bool {
return mu.safe
}
func (mu *RWMutex) Lock() {
if mu.safe {
mu.RWMutex.Lock()
}
}
func (mu *RWMutex) Unlock() {
if mu.safe {
mu.RWMutex.Unlock()
}
}
func (mu *RWMutex) RLock() {
if mu.safe {
mu.RWMutex.RLock()
}
}
func (mu *RWMutex) RUnlock() {
if mu.safe {
mu.RWMutex.RUnlock()
}
}
package queue
//环形队列
import (
"container/list"
"fmt"
"sync"
)
type CircularLinked struct {
data *list.List
}
var lock1 sync.Mutex
var _ *CircularLinked
func (q *CircularLinked) Push(v interface{}) {
defer lock1.Unlock()
lock1.Lock()
q.data.PushFront(v)
}
func (q *CircularLinked) Pop() interface{} {
defer lock1.Unlock()
lock1.Lock()
iter := q.data.Back()
v := iter.Value
q.data.Remove(iter)
return v
}
//读取第一个元素后,把它移到队列尾
func (q *CircularLinked) Front() *list.Element {
defer lock1.Unlock()
lock1.Lock()
iter := q.data.Front()
q.data.Remove(iter)
if iter != nil {
//q.data.MoveToBack(iter)
q.data.PushBack(iter.Value.(string))
}
return iter
}
//移除元素
func (q *CircularLinked) Remove(iter *list.Element) {
defer lock1.Unlock()
lock1.Lock()
if iter != nil {
q.data.Remove(iter)
}
}
func (q *CircularLinked) Len() int {
return q.data.Len()
}
func (q *CircularLinked) Dump() {
for iter := q.data.Back(); iter != nil; iter = iter.Prev() {
fmt.Println("item:", iter.Value)
}
}
// bill 2018.1.8
//优先级队列[同级别先进先出]权重值越大越优先
package queue
import (
"container/list"
"log"
"sync"
"time"
)
type Item struct {
Data interface{} //数据
Priority int32 //优先级
AddTime time.Time //插入队列的时间
Expiration int64 //过期时间值 以秒为单位
}
type PriorityQueue struct {
Data *list.List
PriorityMap map[int32]*pqmap
}
type pqmap struct {
element *list.Element
totle int
}
var lock sync.RWMutex
func NewPriorityQueue() *PriorityQueue {
pq:= &PriorityQueue{
Data: list.New(),
PriorityMap: make(map[int32]*pqmap),
}
return pq
}
func (pq *PriorityQueue) Len() int {
defer lock.RUnlock()
lock.RLock()
return pq.Data.Len()
}
func (pq *PriorityQueue) Push(v *Item) {
defer lock.Unlock()
lock.Lock()
newElement := pq.Data.PushFront(v)
if _, ok := pq.PriorityMap[v.Priority]; !ok {
pq.PriorityMap[v.Priority] = &pqmap{
element: newElement,
totle: 1,
}
} else {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle + 1
}
//找出小于自己的最大值权重值
var maxKey int32 = 1
for p, _ := range pq.PriorityMap {
if p < v.Priority && p >= maxKey {
maxKey = p
}
}
//pq.Dump()
if v.Priority != maxKey {
if _, ok := pq.PriorityMap[maxKey]; ok {
pq.Data.MoveAfter(newElement, pq.PriorityMap[maxKey].element)
}
}
//log.Println("挺入队列的消息:",v,"消息权重值:",v.Priority)
}
func (pq *PriorityQueue) Pop() *Item {
defer lock.Unlock()
lock.Lock()
iter := pq.Data.Back()
if iter==nil||iter.Value==nil{
return nil
}
v := iter.Value.(*Item)
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
//log.Println("取出队列的消息:",v,"消息权重值:",v.Priority)
return v
}
func (pq *PriorityQueue) Dump() {
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
log.Println("队列信息:", iter.Value.(*Item))
}
}
//清除队列
func (pq *PriorityQueue) Clear() {
defer lock.RUnlock()
lock.RLock()
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
//fmt.Println("item:", iter.Value.(*Item))
v := iter.Value.(*Item)
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
}
}
//检测超时任务
func (pq *PriorityQueue) Expirations(expriCallback func(item *Item)) {
defer lock.RUnlock()
lock.RLock()
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
//fmt.Println("item:", iter.Value.(*Item))
v := iter.Value.(*Item)
if v.Expiration==0 {
continue
}
isExpri:=v.AddTime.Add(time.Duration(v.Expiration)*time.Second).Before(time.Now())
if isExpri {
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
expriCallback(v)
}else{
//没过期说明越往前越新
//break
}
}
}
package queue
import (
"container/list"
"fmt"
"log"
"testing"
"time"
)
var TQueues *PriorityQueue
func TestNewPriorityQueue(t *testing.T) {
//TQueues = NewPriorityQueue(func(item *Item) {
// log.Println("已经过期的节点:",gconv.String(item))
//})
//pq := NewPriorityQueue(func(item *Item) {
// log.Println("已经过期的节点:",gconv.String(item))
//})
TQueues.Push(&Item{
Priority: 9,
Data: "优先级为9的消息,",
})
TQueues.Push(&Item{
Priority: 11,
Data: "优先级为11的消息,",
})
TQueues.Push(&Item{
Priority: 8,
Data: "优先级为8的消息,",
})
TQueues.Push(&Item{
Priority: 10,
Data: "优先级为10的消息,",
})
TQueues.Push(&Item{
Priority: 13,
Data: "优先级为13的消息,",
})
TQueues.Push(&Item{
Priority: 15,
Data: "优先级为15的消息,",
})
TQueues.Push(&Item{
Priority: 11,
Data: "优先级为11的消息,",
})
TQueues.Push(&Item{
Priority: 7,
Data: "优先级为7的消息,",
})
TQueues.Push(&Item{
Priority: 7,
Data: "优先级为7的消息",
})
TQueues.Push(&Item{
Priority: 7,
Data: "优先级为7的消息",
})
TQueues.Push(&Item{
Priority: 7,
Data: "优先级为7的消息,",
})
TQueues.Push(&Item{
Priority: 7,
Data: "优先级为7的消息,",
})
TQueues.Push(&Item{
Priority: 7,
Data: "优先级为7的消息,",
})
TQueues.Push(&Item{
Priority: 6,
Data: "优先级为6的消息",
})
TQueues.Push(&Item{
Priority: -1,
Data: "优先级为-1的消息,",
})
for TQueues.Len() > 0 {
v := TQueues.Pop()
fmt.Println(v.Data)
time.Sleep(5 * time.Second)
}
}
type JieGou struct {
Content string
Priority int
}
type Ks struct {
list *list.List
maps map[int]*Priority
}
type Priority struct {
element *list.Element
}
func (k *Ks)NewPushBack(v JieGou) {
if k.list.Len() == 0 {
insertEl := k.list.PushFront(v)
k.maps[v.Priority] = &Priority{
element: insertEl,
}
}else {
vvF, ok := k.list.Front().Value.(JieGou)
if ok {
if vvF.Priority < v.Priority {
insertEl := k.list.PushFront(v)
k.maps[v.Priority] = &Priority{
element: insertEl,
}
return
}
}
vvB, ok := k.list.Back().Value.(JieGou)
if ok {
if vvB.Priority >= v.Priority {
insertEl := k.list.PushBack(v)
k.maps[v.Priority] = &Priority{
element: insertEl,
}
return
}
}
maxKey := 0
for kk,_ := range k.maps{
if kk <= v.Priority && kk >= maxKey {
maxKey = kk
}
}
if _, ok := k.maps[maxKey]; ok {
insertEl := k.list.PushBack(v)
k.list.MoveAfter(insertEl, k.maps[maxKey].element)
k.maps[v.Priority] = &Priority{
element: insertEl,
}
}
}
}
func TestPriorityQueue_Pop(t *testing.T) {
var ks = &Ks{
list.New(),
make(map[int]*Priority),
}
//var kkone = new(list.List)
//PushBack 放到list的最后面
//PushFront 放到list的最前面
//Remove 仅仅是将Element对象移除的方法,没有一定是先移除front或者back,是业务自己定义移除的Element
//理论上要不就是从最后开始移除,或者从最前面开始移除
//MoveAfter 两个list调换位置
ks.NewPushBack(JieGou{
Content:"a",
Priority:20,
})
ks.NewPushBack(JieGou{
Content:"b",
Priority:30,
})
ks.NewPushBack(JieGou{
Content:"c",
Priority:40,
})
ks.NewPushBack(JieGou{
Content:"d",
Priority:50,
})
ks.NewPushBack(JieGou{
Content:"e",
Priority:40,
})
ks.NewPushBack(JieGou{
Content:"f",
Priority:30,
})
ks.NewPushBack(JieGou{
Content:"g",
Priority:20,
})
for ks.list.Len() > 0 {
useMsg := ks.list.Front()
log.Println(useMsg)
ks.list.Remove(ks.list.Front())
}
}
package main
import (
"flag"
"fmt"
"github.com/gorilla/websocket"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"pool/pool"
"runtime"
"strings"
"time"
)
var addr2 = flag.String("addr", "localhost:8081", "http service address")
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
for i := 1; i < 100; i++ {
go wsClient2(fmt.Sprintf("%d_1_3", i))
}
select {}
}
func wsClient2(id string) {
flag.Parse()
log.SetFlags(0)
list := strings.Split(id, "_")
Id := list[0]
chann := list[1:]
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "ws", Host: *addr2, Path: "/ws"}
log.Printf("connecting to %s", u.String())
head := http.Header{}
log.Printf("connecting info: %s", id)
head.Add("Sec-Websocket-Protocol", id)
c, _, err := websocket.DefaultDialer.Dial(u.String(), head)
if err != nil {
log.Fatal("dial:", err)
}
defer func() {
c.Close()
//重新连接
//t := grand.N(10, 20)
//time.Sleep(time.Duration(t) * time.Second)
go wsClient(id)
}()
ping := make(chan int)
c.SetPingHandler(func(appData string) error {
ping <- 1
return nil
})
done := make(chan struct{})
//t := grand.N(30, 90)
//go func() {
// ticker1 := time.NewTicker(time.Duration(t) * time.Second)
// defer func() {
// ticker1.Stop()
// close(done)
// }()
// for {
// _, message, err := c.ReadMessage()
// if err != nil {
// log.Printf("read:%s", err.Error())
// return
// }
// log.Printf("recv: %s", message)
// /*select {
// case <-ticker1.C:
//
// return
// }*/
// }
//
//}()
ticker := time.NewTicker(time.Second)
ticker1 := time.NewTicker(20 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
msg := &pool.SendMsg{
ToClientId: Id,
FromClientId: Id,
Msg: "test" + time.Now().String(),
Channel: chann,
}
m := pool.SerializeJson(msg)
err := c.WriteMessage(websocket.BinaryMessage, m)
if err != nil {
log.Printf("write:%s", err.Error())
return
}
case <-interrupt:
log.Printf("interrupt")
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Printf("write close:%s", err.Error())
return
}
select {
case <-done:
case <-time.After(time.Second):
}
return
case <-ping:
err := c.WriteMessage(websocket.PongMessage, nil)
if err != nil {
log.Printf("write pong:%s", err.Error())
return
}
case <-ticker1.C:
err := c.WriteMessage(websocket.PingMessage, nil)
if err != nil {
log.Printf("write pong:%s", err.Error())
return
}
//return
}
}
}
package main
import (
"flag"
"fmt"
"net/http"
"net/http/pprof"
"pool/pool"
"runtime"
"strings"
)
//var addr = flag.String("12", ":8081", "http service address")
func serveHome(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.URL)
if r.URL.Path != "/" {
http.Error(w, "Not found", http.StatusNotFound)
return
}
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
http.ServeFile(w, r, "home.html")
}
var ch = make(chan int, 10)
func chfun(i int) {
fmt.Println("写入管道i的值%d", i)
//ch<-i
select {
case ch <- i:
return
default:
fmt.Println("管道己经锁定;i的值" + string(i))
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
/* for i:=0;i<10000 ;i++ {
go chfun(i)
}
close(ch)
*/
/*for {
select {
case i,ok:=<-ch:
if !ok{
log.Println("管道己经关闭%d",i)
}
log.Println("读取i的值%d",i)
}
}
*/
flag.Parse()
//初骀化连接池
pool.InitWsPool(func(err interface{}) {
//接收连接池中的运行时错误信息
fmt.Println("wsPool.InitWsPool error-------------", err)
})
mux := http.NewServeMux()
mux.HandleFunc("/", serveHome)
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.HandleFunc("/ws", ws)
err := http.ListenAndServe(":11001", mux)
if err != nil {
fmt.Printf("ListenAndServe: %s", err.Error())
}
}
func ws(w http.ResponseWriter, r *http.Request) {
headData := r.Header.Get("Sec-Websocket-Protocol")
list := strings.Split(headData, "-")
head := http.Header{}
head.Add("Sec-Websocket-Protocol", headData)
//实例化连接对象
client := pool.NewClient(&pool.Config{
Id: list[0], //连接标识
Type: "ws", //连接类型
Channel: list[1:], //指定频道
Goroutine: 100,
})
fmt.Println(client.Id, "实例化连接对象完成")
//连接成功回调
client.OnOpen(func() {
fmt.Printf("连接己开启%s", client.Id)
})
//接收消息
client.OnMessage(func(msg *pool.SendMsg) {
if msg.Status == 3 {
fmt.Println("OnMessage:收到出错消息=》", client.Id, msg.Desc)
return
}
fmt.Println(msg.Msg)
if msg.ToClientId != "" {
//发送消息给指定的ToClientID连接
err := pool.Send(msg)
if err != nil {
fmt.Println("wsPool.Send(msg):", err.Error())
}
//发送消息给当前连接对象
//err = client.Send(msg)
//if err != nil {
// fmt.Println("client.Send(msg):", err.Error())
//}
}
/*if len(msg.Channel)>0{
//按频道广播,可指定多个频道[]string
err:=wsPool.Broadcast(msg) //或者 wsPool.Broadcast(msg)
if err!=nil {
log.Println("wsPool.Broadcast(msg)", err.Error())
}
}
//或都全局广播,所有连接都进行发送
err:=wsPool.BroadcastAll(msg)
if err!=nil {
log.Println("wsPool.BroadcastAll(msg)", err.Error())
}*/
})
//连接断开回调
client.OnClose(func() {
fmt.Printf("连接己经关闭%s", client.Id)
})
client.OnError(func(err error) {
fmt.Printf("连接%s错误信息:%s", client.Id, err.Error())
})
//开启连接
client.OpenClient(w, r, head)
fmt.Println(client.Id, "开启连接")
r.Close = true
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment