producer.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package dq
  2. import (
  3. "bytes"
  4. "log"
  5. "math/rand"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "zero/core/errorx"
  10. "zero/core/fx"
  11. "zero/core/logx"
  12. )
  13. const (
  14. replicaNodes = 3
  15. minWrittenNodes = 2
  16. )
  17. type (
  18. Producer interface {
  19. At(body []byte, at time.Time) (string, error)
  20. Close() error
  21. Delay(body []byte, delay time.Duration) (string, error)
  22. Revoke(ids string) error
  23. }
  24. producerCluster struct {
  25. nodes []Producer
  26. }
  27. )
  28. func init() {
  29. rand.Seed(time.Now().UnixNano())
  30. }
  31. func NewProducer(beanstalks []Beanstalk) Producer {
  32. if len(beanstalks) < minWrittenNodes {
  33. log.Fatalf("nodes must be equal or greater than %d", minWrittenNodes)
  34. }
  35. var nodes []Producer
  36. for _, node := range beanstalks {
  37. nodes = append(nodes, NewProducerNode(node.Endpoint, node.Tube))
  38. }
  39. return &producerCluster{nodes: nodes}
  40. }
  41. func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
  42. return p.insert(func(node Producer) (string, error) {
  43. return node.At(p.wrap(body, at), at)
  44. })
  45. }
  46. func (p *producerCluster) Close() error {
  47. var be errorx.BatchError
  48. for _, node := range p.nodes {
  49. if err := node.Close(); err != nil {
  50. be.Add(err)
  51. }
  52. }
  53. return be.Err()
  54. }
  55. func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
  56. return p.insert(func(node Producer) (string, error) {
  57. return node.Delay(p.wrap(body, time.Now().Add(delay)), delay)
  58. })
  59. }
  60. func (p *producerCluster) Revoke(ids string) error {
  61. var be errorx.BatchError
  62. fx.From(func(source chan<- interface{}) {
  63. for _, node := range p.nodes {
  64. source <- node
  65. }
  66. }).Map(func(item interface{}) interface{} {
  67. node := item.(Producer)
  68. return node.Revoke(ids)
  69. }).ForEach(func(item interface{}) {
  70. if item != nil {
  71. be.Add(item.(error))
  72. }
  73. })
  74. return be.Err()
  75. }
  76. func (p *producerCluster) cloneNodes() []Producer {
  77. return append([]Producer(nil), p.nodes...)
  78. }
  79. func (p *producerCluster) getWriteNodes() []Producer {
  80. if len(p.nodes) <= replicaNodes {
  81. return p.nodes
  82. }
  83. nodes := p.cloneNodes()
  84. rand.Shuffle(len(nodes), func(i, j int) {
  85. nodes[i], nodes[j] = nodes[j], nodes[i]
  86. })
  87. return nodes[:replicaNodes]
  88. }
  89. func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string, error) {
  90. type idErr struct {
  91. id string
  92. err error
  93. }
  94. var ret []idErr
  95. fx.From(func(source chan<- interface{}) {
  96. for _, node := range p.getWriteNodes() {
  97. source <- node
  98. }
  99. }).Map(func(item interface{}) interface{} {
  100. node := item.(Producer)
  101. id, err := fn(node)
  102. return idErr{
  103. id: id,
  104. err: err,
  105. }
  106. }).ForEach(func(item interface{}) {
  107. ret = append(ret, item.(idErr))
  108. })
  109. var ids []string
  110. var be errorx.BatchError
  111. for _, val := range ret {
  112. if val.err != nil {
  113. be.Add(val.err)
  114. } else {
  115. ids = append(ids, val.id)
  116. }
  117. }
  118. jointId := strings.Join(ids, idSep)
  119. if len(ids) >= minWrittenNodes {
  120. return jointId, nil
  121. }
  122. if err := p.Revoke(jointId); err != nil {
  123. logx.Error(err)
  124. }
  125. return "", be.Err()
  126. }
  127. func (p *producerCluster) wrap(body []byte, at time.Time) []byte {
  128. var builder bytes.Buffer
  129. builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
  130. builder.WriteByte(timeSep)
  131. builder.Write(body)
  132. return builder.Bytes()
  133. }