queue_test.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package queue
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. const (
  10. consumers = 4
  11. rounds = 100
  12. )
  13. func TestQueue(t *testing.T) {
  14. producer := newMockedProducer(rounds)
  15. consumer := newMockedConsumer()
  16. consumer.wait.Add(consumers)
  17. q := NewQueue(func() (Producer, error) {
  18. return producer, nil
  19. }, func() (Consumer, error) {
  20. return consumer, nil
  21. })
  22. q.AddListener(new(mockedListener))
  23. q.SetName("mockqueue")
  24. q.SetNumConsumer(consumers)
  25. q.SetNumProducer(1)
  26. q.pause()
  27. q.resume()
  28. go func() {
  29. producer.wait.Wait()
  30. q.Stop()
  31. }()
  32. q.Start()
  33. assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
  34. }
  35. type mockedConsumer struct {
  36. count int32
  37. events int32
  38. wait sync.WaitGroup
  39. }
  40. func newMockedConsumer() *mockedConsumer {
  41. return new(mockedConsumer)
  42. }
  43. func (c *mockedConsumer) Consume(string) error {
  44. atomic.AddInt32(&c.count, 1)
  45. return nil
  46. }
  47. func (c *mockedConsumer) OnEvent(interface{}) {
  48. if atomic.AddInt32(&c.events, 1) <= consumers {
  49. c.wait.Done()
  50. }
  51. }
  52. type mockedProducer struct {
  53. total int32
  54. count int32
  55. wait sync.WaitGroup
  56. }
  57. func newMockedProducer(total int32) *mockedProducer {
  58. p := new(mockedProducer)
  59. p.total = total
  60. p.wait.Add(int(total))
  61. return p
  62. }
  63. func (p *mockedProducer) AddListener(listener ProduceListener) {
  64. }
  65. func (p *mockedProducer) Produce() (string, bool) {
  66. if atomic.AddInt32(&p.count, 1) <= p.total {
  67. p.wait.Done()
  68. return "item", true
  69. }
  70. time.Sleep(time.Second)
  71. return "", false
  72. }
  73. type mockedListener struct {
  74. }
  75. func (l *mockedListener) OnPause() {
  76. }
  77. func (l *mockedListener) OnResume() {
  78. }