topology.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package gocql
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. )
  6. type Node interface {
  7. ExecuteQuery(qry *Query) (*Iter, error)
  8. ExecuteBatch(batch *Batch) error
  9. Close()
  10. }
  11. type RoundRobin struct {
  12. pool []Node
  13. pos uint32
  14. mu sync.RWMutex
  15. }
  16. func NewRoundRobin() *RoundRobin {
  17. return &RoundRobin{}
  18. }
  19. func (r *RoundRobin) AddNode(node Node) {
  20. r.mu.Lock()
  21. r.pool = append(r.pool, node)
  22. r.mu.Unlock()
  23. }
  24. func (r *RoundRobin) RemoveNode(node Node) {
  25. r.mu.Lock()
  26. n := len(r.pool)
  27. for i := 0; i < n; i++ {
  28. if r.pool[i] == node {
  29. r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
  30. r.pool = r.pool[:n-1]
  31. break
  32. }
  33. }
  34. r.mu.Unlock()
  35. }
  36. func (r *RoundRobin) ExecuteQuery(qry *Query) (*Iter, error) {
  37. node := r.pick()
  38. if node == nil {
  39. return nil, ErrNoHostAvailable
  40. }
  41. return node.ExecuteQuery(qry)
  42. }
  43. func (r *RoundRobin) ExecuteBatch(batch *Batch) error {
  44. node := r.pick()
  45. if node == nil {
  46. return ErrNoHostAvailable
  47. }
  48. return node.ExecuteBatch(batch)
  49. }
  50. func (r *RoundRobin) pick() Node {
  51. pos := atomic.AddUint32(&r.pos, 1)
  52. var node Node
  53. r.mu.RLock()
  54. if len(r.pool) > 0 {
  55. node = r.pool[pos%uint32(len(r.pool))]
  56. }
  57. r.mu.RUnlock()
  58. return node
  59. }
  60. func (r *RoundRobin) Close() {
  61. r.mu.Lock()
  62. for i := 0; i < len(r.pool); i++ {
  63. r.pool[i].Close()
  64. }
  65. r.pool = nil
  66. r.mu.Unlock()
  67. }