balancedpusher.go 786 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package queue
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. "github.com/tal-tech/go-zero/core/logx"
  6. )
  7. var ErrNoAvailablePusher = errors.New("no available pusher")
  8. type BalancedPusher struct {
  9. name string
  10. pushers []Pusher
  11. index uint64
  12. }
  13. func NewBalancedPusher(pushers []Pusher) Pusher {
  14. return &BalancedPusher{
  15. name: generateName(pushers),
  16. pushers: pushers,
  17. }
  18. }
  19. func (pusher *BalancedPusher) Name() string {
  20. return pusher.name
  21. }
  22. func (pusher *BalancedPusher) Push(message string) error {
  23. size := len(pusher.pushers)
  24. for i := 0; i < size; i++ {
  25. index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
  26. target := pusher.pushers[index]
  27. if err := target.Push(message); err != nil {
  28. logx.Error(err)
  29. } else {
  30. return nil
  31. }
  32. }
  33. return ErrNoAvailablePusher
  34. }