balancedpusher.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package queue
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/logx"
  6. )
  7. // ErrNoAvailablePusher indicates no pusher available.
  8. var ErrNoAvailablePusher = errors.New("no available pusher")
  9. // A BalancedPusher is used to push messages to multiple pusher with round robin algorithm.
  10. type BalancedPusher struct {
  11. name string
  12. pushers []Pusher
  13. index uint64
  14. }
  15. // NewBalancedPusher returns a BalancedPusher.
  16. func NewBalancedPusher(pushers []Pusher) Pusher {
  17. return &BalancedPusher{
  18. name: generateName(pushers),
  19. pushers: pushers,
  20. }
  21. }
  22. // Name returns the name of pusher.
  23. func (pusher *BalancedPusher) Name() string {
  24. return pusher.name
  25. }
  26. // Push pushes message to one of the underlying pushers.
  27. func (pusher *BalancedPusher) Push(message string) error {
  28. size := len(pusher.pushers)
  29. for i := 0; i < size; i++ {
  30. index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
  31. target := pusher.pushers[index]
  32. if err := target.Push(message); err != nil {
  33. logx.Error(err)
  34. } else {
  35. return nil
  36. }
  37. }
  38. return ErrNoAvailablePusher
  39. }