queue.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package queue
  2. import (
  3. "runtime"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/tal-tech/go-zero/core/logx"
  8. "github.com/tal-tech/go-zero/core/rescue"
  9. "github.com/tal-tech/go-zero/core/stat"
  10. "github.com/tal-tech/go-zero/core/threading"
  11. "github.com/tal-tech/go-zero/core/timex"
  12. )
  13. const queueName = "queue"
  14. type (
  15. Queue struct {
  16. name string
  17. metrics *stat.Metrics
  18. producerFactory ProducerFactory
  19. producerRoutineGroup *threading.RoutineGroup
  20. consumerFactory ConsumerFactory
  21. consumerRoutineGroup *threading.RoutineGroup
  22. producerCount int
  23. consumerCount int
  24. active int32
  25. channel chan string
  26. quit chan struct{}
  27. listeners []Listener
  28. eventLock sync.Mutex
  29. eventChannels []chan interface{}
  30. }
  31. Listener interface {
  32. OnPause()
  33. OnResume()
  34. }
  35. Poller interface {
  36. Name() string
  37. Poll() string
  38. }
  39. Pusher interface {
  40. Name() string
  41. Push(string) error
  42. }
  43. )
  44. func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
  45. queue := &Queue{
  46. metrics: stat.NewMetrics(queueName),
  47. producerFactory: producerFactory,
  48. producerRoutineGroup: threading.NewRoutineGroup(),
  49. consumerFactory: consumerFactory,
  50. consumerRoutineGroup: threading.NewRoutineGroup(),
  51. producerCount: runtime.NumCPU(),
  52. consumerCount: runtime.NumCPU() << 1,
  53. channel: make(chan string),
  54. quit: make(chan struct{}),
  55. }
  56. queue.SetName(queueName)
  57. return queue
  58. }
  59. func (queue *Queue) AddListener(listener Listener) {
  60. queue.listeners = append(queue.listeners, listener)
  61. }
  62. func (queue *Queue) Broadcast(message interface{}) {
  63. go func() {
  64. queue.eventLock.Lock()
  65. defer queue.eventLock.Unlock()
  66. for _, channel := range queue.eventChannels {
  67. channel <- message
  68. }
  69. }()
  70. }
  71. func (queue *Queue) SetName(name string) {
  72. queue.name = name
  73. queue.metrics.SetName(name)
  74. }
  75. func (queue *Queue) SetNumConsumer(count int) {
  76. queue.consumerCount = count
  77. }
  78. func (queue *Queue) SetNumProducer(count int) {
  79. queue.producerCount = count
  80. }
  81. func (queue *Queue) Start() {
  82. queue.startProducers(queue.producerCount)
  83. queue.startConsumers(queue.consumerCount)
  84. queue.producerRoutineGroup.Wait()
  85. close(queue.channel)
  86. queue.consumerRoutineGroup.Wait()
  87. }
  88. func (queue *Queue) Stop() {
  89. close(queue.quit)
  90. }
  91. func (queue *Queue) consume(eventChan chan interface{}) {
  92. var consumer Consumer
  93. for {
  94. var err error
  95. if consumer, err = queue.consumerFactory(); err != nil {
  96. logx.Errorf("Error on creating consumer: %v", err)
  97. time.Sleep(time.Second)
  98. } else {
  99. break
  100. }
  101. }
  102. for {
  103. select {
  104. case message, ok := <-queue.channel:
  105. if ok {
  106. queue.consumeOne(consumer, message)
  107. } else {
  108. logx.Info("Task channel was closed, quitting consumer...")
  109. return
  110. }
  111. case event := <-eventChan:
  112. consumer.OnEvent(event)
  113. }
  114. }
  115. }
  116. func (queue *Queue) consumeOne(consumer Consumer, message string) {
  117. threading.RunSafe(func() {
  118. startTime := timex.Now()
  119. defer func() {
  120. duration := timex.Since(startTime)
  121. queue.metrics.Add(stat.Task{
  122. Duration: duration,
  123. })
  124. logx.WithDuration(duration).Infof("%s", message)
  125. }()
  126. if err := consumer.Consume(message); err != nil {
  127. logx.Errorf("Error occurred while consuming %v: %v", message, err)
  128. }
  129. })
  130. }
  131. func (queue *Queue) pause() {
  132. for _, listener := range queue.listeners {
  133. listener.OnPause()
  134. }
  135. }
  136. func (queue *Queue) produce() {
  137. var producer Producer
  138. for {
  139. var err error
  140. if producer, err = queue.producerFactory(); err != nil {
  141. logx.Errorf("Error on creating producer: %v", err)
  142. time.Sleep(time.Second)
  143. } else {
  144. break
  145. }
  146. }
  147. atomic.AddInt32(&queue.active, 1)
  148. producer.AddListener(routineListener{
  149. queue: queue,
  150. })
  151. for {
  152. select {
  153. case <-queue.quit:
  154. logx.Info("Quitting producer")
  155. return
  156. default:
  157. if v, ok := queue.produceOne(producer); ok {
  158. queue.channel <- v
  159. }
  160. }
  161. }
  162. }
  163. func (queue *Queue) produceOne(producer Producer) (string, bool) {
  164. // avoid panic quit the producer, just log it and continue
  165. defer rescue.Recover()
  166. return producer.Produce()
  167. }
  168. func (queue *Queue) resume() {
  169. for _, listener := range queue.listeners {
  170. listener.OnResume()
  171. }
  172. }
  173. func (queue *Queue) startConsumers(number int) {
  174. for i := 0; i < number; i++ {
  175. eventChan := make(chan interface{})
  176. queue.eventLock.Lock()
  177. queue.eventChannels = append(queue.eventChannels, eventChan)
  178. queue.eventLock.Unlock()
  179. queue.consumerRoutineGroup.Run(func() {
  180. queue.consume(eventChan)
  181. })
  182. }
  183. }
  184. func (queue *Queue) startProducers(number int) {
  185. for i := 0; i < number; i++ {
  186. queue.producerRoutineGroup.Run(func() {
  187. queue.produce()
  188. })
  189. }
  190. }
  191. type routineListener struct {
  192. queue *Queue
  193. }
  194. func (rl routineListener) OnProducerPause() {
  195. if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
  196. rl.queue.pause()
  197. }
  198. }
  199. func (rl routineListener) OnProducerResume() {
  200. if atomic.AddInt32(&rl.queue.active, 1) == 1 {
  201. rl.queue.resume()
  202. }
  203. }