topology.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package gocql
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. type Node interface {
  8. ExecuteQuery(qry *Query) (*Iter, error)
  9. ExecuteBatch(batch *Batch) error
  10. Close()
  11. }
  12. type NodePicker interface {
  13. AddNode(node Node)
  14. RemoveNode(node Node)
  15. Pick(qry *Query) Node
  16. }
  17. type RoundRobinPicker struct {
  18. pool []Node
  19. pos uint32
  20. mu sync.RWMutex
  21. }
  22. func NewRoundRobinPicker() *RoundRobinPicker {
  23. return &RoundRobinPicker{}
  24. }
  25. func (r *RoundRobinPicker) AddNode(node Node) {
  26. r.mu.Lock()
  27. r.pool = append(r.pool, node)
  28. r.mu.Unlock()
  29. }
  30. func (r *RoundRobinPicker) RemoveNode(node Node) {
  31. r.mu.Lock()
  32. n := len(r.pool)
  33. for i := 0; i < n; i++ {
  34. if r.pool[i] == node {
  35. r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
  36. r.pool = r.pool[:n-1]
  37. break
  38. }
  39. }
  40. r.mu.Unlock()
  41. }
  42. func (r *RoundRobinPicker) Pick(query *Query) Node {
  43. pos := atomic.AddUint32(&r.pos, 1)
  44. var node Node
  45. r.mu.RLock()
  46. if len(r.pool) > 0 {
  47. node = r.pool[pos%uint32(len(r.pool))]
  48. }
  49. r.mu.RUnlock()
  50. return node
  51. }
  52. type Reconnector interface {
  53. Reconnect(session *Session, address string)
  54. }
  55. type ExponentialReconnector struct {
  56. baseDelay time.Duration
  57. maxDelay time.Duration
  58. }
  59. func NewExponentialReconnector(baseDelay, maxDelay time.Duration) *ExponentialReconnector {
  60. return &ExponentialReconnector{baseDelay, maxDelay}
  61. }
  62. func (e *ExponentialReconnector) Reconnect(session *Session, address string) {
  63. delay := e.baseDelay
  64. for {
  65. conn, err := Connect(address, session.cfg)
  66. if err != nil {
  67. <-time.After(delay)
  68. if delay *= 2; delay > e.maxDelay {
  69. delay = e.maxDelay
  70. }
  71. continue
  72. }
  73. node := &Host{conn}
  74. go func() {
  75. conn.Serve()
  76. session.pool.RemoveNode(node)
  77. e.Reconnect(session, address)
  78. }()
  79. session.pool.AddNode(node)
  80. return
  81. }
  82. }
  83. type Host struct {
  84. conn *Conn
  85. }
  86. func (h *Host) ExecuteQuery(qry *Query) (*Iter, error) {
  87. return h.conn.ExecuteQuery(qry)
  88. }
  89. func (h *Host) ExecuteBatch(batch *Batch) error {
  90. return nil
  91. }
  92. func (h *Host) Close() {
  93. h.conn.conn.Close()
  94. }