1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// bill 2018.1.8
//优先级队列[同级别先进先出]权重值越大越优先
package queue
import (
"container/list"
"log"
"sync"
"time"
)
type Item struct {
Data interface{} //数据
Priority int32 //优先级
AddTime time.Time //插入队列的时间
Expiration int64 //过期时间值 以秒为单位
}
type PriorityQueue struct {
Data *list.List
PriorityMap map[int32]*pqmap
}
type pqmap struct {
element *list.Element
totle int
}
var lock sync.RWMutex
func NewPriorityQueue() *PriorityQueue {
pq:= &PriorityQueue{
Data: list.New(),
PriorityMap: make(map[int32]*pqmap),
}
return pq
}
func (pq *PriorityQueue) Len() int {
defer lock.RUnlock()
lock.RLock()
return pq.Data.Len()
}
func (pq *PriorityQueue) Push(v *Item) {
defer lock.Unlock()
lock.Lock()
newElement := pq.Data.PushFront(v)
if _, ok := pq.PriorityMap[v.Priority]; !ok {
pq.PriorityMap[v.Priority] = &pqmap{
element: newElement,
totle: 1,
}
} else {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle + 1
}
//找出小于自己的最大值权重值
var maxKey int32 = 1
for p, _ := range pq.PriorityMap {
if p < v.Priority && p >= maxKey {
maxKey = p
}
}
//pq.Dump()
if v.Priority != maxKey {
if _, ok := pq.PriorityMap[maxKey]; ok {
pq.Data.MoveAfter(newElement, pq.PriorityMap[maxKey].element)
}
}
//log.Println("挺入队列的消息:",v,"消息权重值:",v.Priority)
}
func (pq *PriorityQueue) Pop() *Item {
defer lock.Unlock()
lock.Lock()
iter := pq.Data.Back()
if iter==nil||iter.Value==nil{
return nil
}
v := iter.Value.(*Item)
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
//log.Println("取出队列的消息:",v,"消息权重值:",v.Priority)
return v
}
func (pq *PriorityQueue) Dump() {
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
log.Println("队列信息:", iter.Value.(*Item))
}
}
//清除队列
func (pq *PriorityQueue) Clear() {
defer lock.RUnlock()
lock.RLock()
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
//fmt.Println("item:", iter.Value.(*Item))
v := iter.Value.(*Item)
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
}
}
//检测超时任务
func (pq *PriorityQueue) Expirations(expriCallback func(item *Item)) {
defer lock.RUnlock()
lock.RLock()
for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
//fmt.Println("item:", iter.Value.(*Item))
v := iter.Value.(*Item)
if v.Expiration==0 {
continue
}
isExpri:=v.AddTime.Add(time.Duration(v.Expiration)*time.Second).Before(time.Now())
if isExpri {
pq.Data.Remove(iter)
if pq.PriorityMap[v.Priority].totle > 1 {
pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
} else {
delete(pq.PriorityMap, v.Priority)
}
expriCallback(v)
}else{
//没过期说明越往前越新
//break
}
}
}