topology.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type Node interface {
  10. ExecuteQuery(qry *Query) (*Iter, error)
  11. ExecuteBatch(batch *Batch) error
  12. Close()
  13. }
  14. type RoundRobin struct {
  15. pool []Node
  16. pos uint32
  17. mu sync.RWMutex
  18. }
  19. func NewRoundRobin() *RoundRobin {
  20. return &RoundRobin{}
  21. }
  22. func (r *RoundRobin) AddNode(node Node) {
  23. r.mu.Lock()
  24. r.pool = append(r.pool, node)
  25. r.mu.Unlock()
  26. }
  27. func (r *RoundRobin) RemoveNode(node Node) {
  28. r.mu.Lock()
  29. n := len(r.pool)
  30. for i := 0; i < n; i++ {
  31. if r.pool[i] == node {
  32. r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
  33. r.pool = r.pool[:n-1]
  34. break
  35. }
  36. }
  37. r.mu.Unlock()
  38. }
  39. func (r *RoundRobin) Size() int {
  40. r.mu.RLock()
  41. n := len(r.pool)
  42. r.mu.RUnlock()
  43. return n
  44. }
  45. func (r *RoundRobin) GetPool() []Node {
  46. r.mu.RLock()
  47. pool := make([]Node, len(r.pool))
  48. copy(pool, r.pool)
  49. r.mu.RUnlock()
  50. return pool
  51. }
  52. func (r *RoundRobin) ExecuteQuery(qry *Query) (*Iter, error) {
  53. node := r.pick()
  54. if node == nil {
  55. return nil, ErrNoHostAvailable
  56. }
  57. return node.ExecuteQuery(qry)
  58. }
  59. func (r *RoundRobin) ExecuteBatch(batch *Batch) error {
  60. node := r.pick()
  61. if node == nil {
  62. return ErrNoHostAvailable
  63. }
  64. return node.ExecuteBatch(batch)
  65. }
  66. func (r *RoundRobin) pick() Node {
  67. pos := atomic.AddUint32(&r.pos, 1)
  68. var node Node
  69. r.mu.RLock()
  70. if len(r.pool) > 0 {
  71. node = r.pool[pos%uint32(len(r.pool))]
  72. }
  73. r.mu.RUnlock()
  74. return node
  75. }
  76. func (r *RoundRobin) Close() {
  77. r.mu.Lock()
  78. for i := 0; i < len(r.pool); i++ {
  79. r.pool[i].Close()
  80. }
  81. r.pool = nil
  82. r.mu.Unlock()
  83. }