ring.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. type ring struct {
  9. // endpoints are the set of endpoints which the driver will attempt to connect
  10. // to in the case it can not reach any of its hosts. They are also used to boot
  11. // strap the initial connection.
  12. endpoints []*HostInfo
  13. // hosts are the set of all hosts in the cassandra ring that we know of
  14. mu sync.RWMutex
  15. hosts map[string]*HostInfo
  16. hostList []*HostInfo
  17. pos uint32
  18. // TODO: we should store the ring metadata here also.
  19. }
  20. func (r *ring) rrHost() *HostInfo {
  21. // TODO: should we filter hosts that get used here? These hosts will be used
  22. // for the control connection, should we also provide an iterator?
  23. r.mu.RLock()
  24. defer r.mu.RUnlock()
  25. if len(r.hostList) == 0 {
  26. return nil
  27. }
  28. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  29. return r.hostList[pos%len(r.hostList)]
  30. }
  31. func (r *ring) getHost(ip net.IP) *HostInfo {
  32. r.mu.RLock()
  33. host := r.hosts[ip.String()]
  34. r.mu.RUnlock()
  35. return host
  36. }
  37. func (r *ring) allHosts() []*HostInfo {
  38. r.mu.RLock()
  39. hosts := make([]*HostInfo, 0, len(r.hosts))
  40. for _, host := range r.hosts {
  41. hosts = append(hosts, host)
  42. }
  43. r.mu.RUnlock()
  44. return hosts
  45. }
  46. func (r *ring) currentHosts() map[string]*HostInfo {
  47. r.mu.RLock()
  48. hosts := make(map[string]*HostInfo, len(r.hosts))
  49. for k, v := range r.hosts {
  50. hosts[k] = v
  51. }
  52. r.mu.RUnlock()
  53. return hosts
  54. }
  55. func (r *ring) addHost(host *HostInfo) bool {
  56. // TODO(zariel): key all host info by HostID instead of
  57. // ip addresses
  58. if host.invalidConnectAddr() {
  59. panic(fmt.Sprintf("invalid host: %v", host))
  60. }
  61. ip := host.ConnectAddress().String()
  62. r.mu.Lock()
  63. if r.hosts == nil {
  64. r.hosts = make(map[string]*HostInfo)
  65. }
  66. _, ok := r.hosts[ip]
  67. if !ok {
  68. r.hostList = append(r.hostList, host)
  69. }
  70. r.hosts[ip] = host
  71. r.mu.Unlock()
  72. return ok
  73. }
  74. func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
  75. if existingHost, ok := r.addHostIfMissing(host); ok {
  76. existingHost.update(host)
  77. host = existingHost
  78. }
  79. return host
  80. }
  81. func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
  82. if host.invalidConnectAddr() {
  83. panic(fmt.Sprintf("invalid host: %v", host))
  84. }
  85. ip := host.ConnectAddress().String()
  86. r.mu.Lock()
  87. if r.hosts == nil {
  88. r.hosts = make(map[string]*HostInfo)
  89. }
  90. existing, ok := r.hosts[ip]
  91. if !ok {
  92. r.hosts[ip] = host
  93. existing = host
  94. r.hostList = append(r.hostList, host)
  95. }
  96. r.mu.Unlock()
  97. return existing, ok
  98. }
  99. func (r *ring) removeHost(ip net.IP) bool {
  100. r.mu.Lock()
  101. if r.hosts == nil {
  102. r.hosts = make(map[string]*HostInfo)
  103. }
  104. k := ip.String()
  105. _, ok := r.hosts[k]
  106. if ok {
  107. for i, host := range r.hostList {
  108. if host.ConnectAddress().Equal(ip) {
  109. r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
  110. break
  111. }
  112. }
  113. }
  114. delete(r.hosts, k)
  115. r.mu.Unlock()
  116. return ok
  117. }
  118. type clusterMetadata struct {
  119. mu sync.RWMutex
  120. partitioner string
  121. }
  122. func (c *clusterMetadata) setPartitioner(partitioner string) {
  123. c.mu.Lock()
  124. defer c.mu.Unlock()
  125. if c.partitioner != partitioner {
  126. // TODO: update other things now
  127. c.partitioner = partitioner
  128. }
  129. }