ring.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. type ring struct {
  9. // endpoints are the set of endpoints which the driver will attempt to connect
  10. // to in the case it can not reach any of its hosts. They are also used to boot
  11. // strap the initial connection.
  12. endpoints []*HostInfo
  13. // hosts are the set of all hosts in the cassandra ring that we know of
  14. mu sync.RWMutex
  15. hosts map[string]*HostInfo
  16. hostList []*HostInfo
  17. pos uint32
  18. // TODO: we should store the ring metadata here also.
  19. }
  20. func (r *ring) rrHost() *HostInfo {
  21. // TODO: should we filter hosts that get used here? These hosts will be used
  22. // for the control connection, should we also provide an iterator?
  23. r.mu.RLock()
  24. defer r.mu.RUnlock()
  25. if len(r.hostList) == 0 {
  26. return nil
  27. }
  28. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  29. return r.hostList[pos%len(r.hostList)]
  30. }
  31. func (r *ring) getHost(ip net.IP) *HostInfo {
  32. r.mu.RLock()
  33. host := r.hosts[ip.String()]
  34. r.mu.RUnlock()
  35. return host
  36. }
  37. func (r *ring) allHosts() []*HostInfo {
  38. r.mu.RLock()
  39. hosts := make([]*HostInfo, 0, len(r.hosts))
  40. for _, host := range r.hosts {
  41. hosts = append(hosts, host)
  42. }
  43. r.mu.RUnlock()
  44. return hosts
  45. }
  46. func (r *ring) addHost(host *HostInfo) bool {
  47. if host.invalidConnectAddr() {
  48. panic(fmt.Sprintf("invalid host: %v", host))
  49. }
  50. ip := host.ConnectAddress().String()
  51. r.mu.Lock()
  52. if r.hosts == nil {
  53. r.hosts = make(map[string]*HostInfo)
  54. }
  55. _, ok := r.hosts[ip]
  56. if !ok {
  57. r.hostList = append(r.hostList, host)
  58. }
  59. r.hosts[ip] = host
  60. r.mu.Unlock()
  61. return ok
  62. }
  63. func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
  64. if existingHost, ok := r.addHostIfMissing(host); ok {
  65. existingHost.update(host)
  66. host = existingHost
  67. }
  68. return host
  69. }
  70. func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
  71. if host.invalidConnectAddr() {
  72. panic(fmt.Sprintf("invalid host: %v", host))
  73. }
  74. ip := host.ConnectAddress().String()
  75. r.mu.Lock()
  76. if r.hosts == nil {
  77. r.hosts = make(map[string]*HostInfo)
  78. }
  79. existing, ok := r.hosts[ip]
  80. if !ok {
  81. r.hosts[ip] = host
  82. existing = host
  83. r.hostList = append(r.hostList, host)
  84. }
  85. r.mu.Unlock()
  86. return existing, ok
  87. }
  88. func (r *ring) removeHost(ip net.IP) bool {
  89. r.mu.Lock()
  90. if r.hosts == nil {
  91. r.hosts = make(map[string]*HostInfo)
  92. }
  93. k := ip.String()
  94. _, ok := r.hosts[k]
  95. if ok {
  96. for i, host := range r.hostList {
  97. if host.ConnectAddress().Equal(ip) {
  98. r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
  99. break
  100. }
  101. }
  102. }
  103. delete(r.hosts, k)
  104. r.mu.Unlock()
  105. return ok
  106. }
  107. type clusterMetadata struct {
  108. mu sync.RWMutex
  109. partitioner string
  110. }
  111. func (c *clusterMetadata) setPartitioner(partitioner string) {
  112. c.mu.RLock()
  113. defer c.mu.RUnlock()
  114. if c.partitioner != partitioner {
  115. // TODO: update other things now
  116. c.partitioner = partitioner
  117. }
  118. }