queue_test.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package queue
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. const (
  10. consumers = 4
  11. rounds = 100
  12. )
  13. func TestQueue(t *testing.T) {
  14. producer := newMockedProducer(rounds)
  15. consumer := newMockedConsumer()
  16. consumer.wait.Add(consumers)
  17. q := NewQueue(func() (Producer, error) {
  18. return producer, nil
  19. }, func() (Consumer, error) {
  20. return consumer, nil
  21. })
  22. q.AddListener(new(mockedListener))
  23. q.SetName("mockqueue")
  24. q.SetNumConsumer(consumers)
  25. q.SetNumProducer(1)
  26. q.pause()
  27. q.resume()
  28. go func() {
  29. producer.wait.Wait()
  30. q.Stop()
  31. }()
  32. q.Start()
  33. assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
  34. }
  35. type mockedConsumer struct {
  36. count int32
  37. events int32
  38. wait sync.WaitGroup
  39. }
  40. func newMockedConsumer() *mockedConsumer {
  41. return new(mockedConsumer)
  42. }
  43. func (c *mockedConsumer) Consume(string) error {
  44. atomic.AddInt32(&c.count, 1)
  45. return nil
  46. }
  47. func (c *mockedConsumer) OnEvent(interface{}) {
  48. if atomic.AddInt32(&c.events, 1) <= consumers {
  49. c.wait.Done()
  50. }
  51. }
  52. type mockedProducer struct {
  53. total int32
  54. count int32
  55. wait sync.WaitGroup
  56. }
  57. func newMockedProducer(total int32) *mockedProducer {
  58. p := new(mockedProducer)
  59. p.total = total
  60. p.wait.Add(int(total))
  61. return p
  62. }
  63. func (p *mockedProducer) AddListener(listener ProduceListener) {
  64. }
  65. func (p *mockedProducer) Produce() (string, bool) {
  66. if atomic.AddInt32(&p.count, 1) <= p.total {
  67. p.wait.Done()
  68. return "item", true
  69. }
  70. time.Sleep(time.Second)
  71. return "", false
  72. }
  73. type mockedListener struct{}
  74. func (l *mockedListener) OnPause() {
  75. }
  76. func (l *mockedListener) OnResume() {
  77. }