123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- package queue
- import (
- "runtime"
- "sync"
- "sync/atomic"
- "time"
- "github.com/tal-tech/go-zero/core/logx"
- "github.com/tal-tech/go-zero/core/rescue"
- "github.com/tal-tech/go-zero/core/stat"
- "github.com/tal-tech/go-zero/core/threading"
- "github.com/tal-tech/go-zero/core/timex"
- )
- const queueName = "queue"
- type (
- // A Queue is a message queue.
- Queue struct {
- name string
- metrics *stat.Metrics
- producerFactory ProducerFactory
- producerRoutineGroup *threading.RoutineGroup
- consumerFactory ConsumerFactory
- consumerRoutineGroup *threading.RoutineGroup
- producerCount int
- consumerCount int
- active int32
- channel chan string
- quit chan struct{}
- listeners []Listener
- eventLock sync.Mutex
- eventChannels []chan interface{}
- }
- // A Listener interface represents a listener that can be notified with queue events.
- Listener interface {
- OnPause()
- OnResume()
- }
- // A Poller interface wraps the method Poll.
- Poller interface {
- Name() string
- Poll() string
- }
- // A Pusher interface wraps the method Push.
- Pusher interface {
- Name() string
- Push(string) error
- }
- )
- // NewQueue returns a Queue.
- func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
- q := &Queue{
- metrics: stat.NewMetrics(queueName),
- producerFactory: producerFactory,
- producerRoutineGroup: threading.NewRoutineGroup(),
- consumerFactory: consumerFactory,
- consumerRoutineGroup: threading.NewRoutineGroup(),
- producerCount: runtime.NumCPU(),
- consumerCount: runtime.NumCPU() << 1,
- channel: make(chan string),
- quit: make(chan struct{}),
- }
- q.SetName(queueName)
- return q
- }
- // AddListener adds a litener to q.
- func (q *Queue) AddListener(listener Listener) {
- q.listeners = append(q.listeners, listener)
- }
- // Broadcast broadcasts message to all event channels.
- func (q *Queue) Broadcast(message interface{}) {
- go func() {
- q.eventLock.Lock()
- defer q.eventLock.Unlock()
- for _, channel := range q.eventChannels {
- channel <- message
- }
- }()
- }
- // SetName sets the name of q.
- func (q *Queue) SetName(name string) {
- q.name = name
- q.metrics.SetName(name)
- }
- // SetNumConsumer sets the number of consumers.
- func (q *Queue) SetNumConsumer(count int) {
- q.consumerCount = count
- }
- // SetNumProducer sets the number of producers.
- func (q *Queue) SetNumProducer(count int) {
- q.producerCount = count
- }
- // Start starts q.
- func (q *Queue) Start() {
- q.startProducers(q.producerCount)
- q.startConsumers(q.consumerCount)
- q.producerRoutineGroup.Wait()
- close(q.channel)
- q.consumerRoutineGroup.Wait()
- }
- // Stop stops q.
- func (q *Queue) Stop() {
- close(q.quit)
- }
- func (q *Queue) consume(eventChan chan interface{}) {
- var consumer Consumer
- for {
- var err error
- if consumer, err = q.consumerFactory(); err != nil {
- logx.Errorf("Error on creating consumer: %v", err)
- time.Sleep(time.Second)
- } else {
- break
- }
- }
- for {
- select {
- case message, ok := <-q.channel:
- if ok {
- q.consumeOne(consumer, message)
- } else {
- logx.Info("Task channel was closed, quitting consumer...")
- return
- }
- case event := <-eventChan:
- consumer.OnEvent(event)
- }
- }
- }
- func (q *Queue) consumeOne(consumer Consumer, message string) {
- threading.RunSafe(func() {
- startTime := timex.Now()
- defer func() {
- duration := timex.Since(startTime)
- q.metrics.Add(stat.Task{
- Duration: duration,
- })
- logx.WithDuration(duration).Infof("%s", message)
- }()
- if err := consumer.Consume(message); err != nil {
- logx.Errorf("Error occurred while consuming %v: %v", message, err)
- }
- })
- }
- func (q *Queue) pause() {
- for _, listener := range q.listeners {
- listener.OnPause()
- }
- }
- func (q *Queue) produce() {
- var producer Producer
- for {
- var err error
- if producer, err = q.producerFactory(); err != nil {
- logx.Errorf("Error on creating producer: %v", err)
- time.Sleep(time.Second)
- } else {
- break
- }
- }
- atomic.AddInt32(&q.active, 1)
- producer.AddListener(routineListener{
- queue: q,
- })
- for {
- select {
- case <-q.quit:
- logx.Info("Quitting producer")
- return
- default:
- if v, ok := q.produceOne(producer); ok {
- q.channel <- v
- }
- }
- }
- }
- func (q *Queue) produceOne(producer Producer) (string, bool) {
- // avoid panic quit the producer, just log it and continue
- defer rescue.Recover()
- return producer.Produce()
- }
- func (q *Queue) resume() {
- for _, listener := range q.listeners {
- listener.OnResume()
- }
- }
- func (q *Queue) startConsumers(number int) {
- for i := 0; i < number; i++ {
- eventChan := make(chan interface{})
- q.eventLock.Lock()
- q.eventChannels = append(q.eventChannels, eventChan)
- q.eventLock.Unlock()
- q.consumerRoutineGroup.Run(func() {
- q.consume(eventChan)
- })
- }
- }
- func (q *Queue) startProducers(number int) {
- for i := 0; i < number; i++ {
- q.producerRoutineGroup.Run(func() {
- q.produce()
- })
- }
- }
- type routineListener struct {
- queue *Queue
- }
- func (rl routineListener) OnProducerPause() {
- if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
- rl.queue.pause()
- }
- }
- func (rl routineListener) OnProducerResume() {
- if atomic.AddInt32(&rl.queue.active, 1) == 1 {
- rl.queue.resume()
- }
- }
|