session.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package gocql
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. type NodePicker interface {
  8. AddNode(node *Node)
  9. RemoveNode(node *Node)
  10. Pick(query *Query) *Node
  11. }
  12. type RoundRobinPicker struct {
  13. pool []*Node
  14. pos uint32
  15. mu sync.RWMutex
  16. }
  17. func NewRoundRobinPicker() *RoundRobinPicker {
  18. return &RoundRobinPicker{}
  19. }
  20. func (r *RoundRobinPicker) AddNode(node *Node) {
  21. r.mu.Lock()
  22. r.pool = append(r.pool, node)
  23. r.mu.Unlock()
  24. }
  25. func (r *RoundRobinPicker) RemoveNode(node *Node) {
  26. r.mu.Lock()
  27. n := len(r.pool)
  28. for i := 0; i < n; i++ {
  29. if r.pool[i] == node {
  30. r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
  31. r.pool = r.pool[:n-1]
  32. break
  33. }
  34. }
  35. r.mu.Unlock()
  36. }
  37. func (r *RoundRobinPicker) Pick(query *Query) *Node {
  38. pos := atomic.AddUint32(&r.pos, 1)
  39. var node *Node
  40. r.mu.RLock()
  41. if len(r.pool) > 0 {
  42. node = r.pool[pos%uint32(len(r.pool))]
  43. }
  44. r.mu.RUnlock()
  45. return node
  46. }
  47. type Reconnector interface {
  48. Reconnect(session *Session, address string)
  49. }
  50. type ExponentialReconnector struct {
  51. baseDelay time.Duration
  52. maxDelay time.Duration
  53. }
  54. func NewExponentialReconnector(baseDelay, maxDelay time.Duration) *ExponentialReconnector {
  55. return &ExponentialReconnector{baseDelay, maxDelay}
  56. }
  57. func (e *ExponentialReconnector) Reconnect(session *Session, address string) {
  58. delay := e.baseDelay
  59. for {
  60. conn, err := Connect(address, session.cfg)
  61. if err != nil {
  62. <-time.After(delay)
  63. if delay *= 2; delay > e.maxDelay {
  64. delay = e.maxDelay
  65. }
  66. continue
  67. }
  68. node := &Node{conn}
  69. go func() {
  70. conn.Serve()
  71. session.pool.RemoveNode(node)
  72. e.Reconnect(session, address)
  73. }()
  74. session.pool.AddNode(node)
  75. return
  76. }
  77. }