topology.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package gocql
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. )
  6. type Node interface {
  7. ExecuteQuery(qry *Query) (*Iter, error)
  8. ExecuteBatch(batch *Batch) error
  9. Close()
  10. }
  11. type RoundRobin struct {
  12. pool []Node
  13. pos uint32
  14. mu sync.RWMutex
  15. }
  16. func NewRoundRobin() *RoundRobin {
  17. return &RoundRobin{}
  18. }
  19. func (r *RoundRobin) AddNode(node Node) {
  20. r.mu.Lock()
  21. r.pool = append(r.pool, node)
  22. r.mu.Unlock()
  23. }
  24. func (r *RoundRobin) RemoveNode(node Node) {
  25. r.mu.Lock()
  26. n := len(r.pool)
  27. for i := 0; i < n; i++ {
  28. if r.pool[i] == node {
  29. r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
  30. r.pool = r.pool[:n-1]
  31. break
  32. }
  33. }
  34. r.mu.Unlock()
  35. }
  36. func (r *RoundRobin) Size() int {
  37. r.mu.RLock()
  38. n := len(r.pool)
  39. r.mu.RUnlock()
  40. return n
  41. }
  42. func (r *RoundRobin) GetPool() []Node {
  43. r.mu.RLock()
  44. pool := make([]Node, len(r.pool))
  45. copy(pool, r.pool)
  46. r.mu.RUnlock()
  47. return pool
  48. }
  49. func (r *RoundRobin) ExecuteQuery(qry *Query) (*Iter, error) {
  50. node := r.pick()
  51. if node == nil {
  52. return nil, ErrNoHostAvailable
  53. }
  54. return node.ExecuteQuery(qry)
  55. }
  56. func (r *RoundRobin) ExecuteBatch(batch *Batch) error {
  57. node := r.pick()
  58. if node == nil {
  59. return ErrNoHostAvailable
  60. }
  61. return node.ExecuteBatch(batch)
  62. }
  63. func (r *RoundRobin) pick() Node {
  64. pos := atomic.AddUint32(&r.pos, 1)
  65. var node Node
  66. r.mu.RLock()
  67. if len(r.pool) > 0 {
  68. node = r.pool[pos%uint32(len(r.pool))]
  69. }
  70. r.mu.RUnlock()
  71. return node
  72. }
  73. func (r *RoundRobin) Close() {
  74. r.mu.Lock()
  75. for i := 0; i < len(r.pool); i++ {
  76. r.pool[i].Close()
  77. }
  78. r.pool = nil
  79. r.mu.Unlock()
  80. }