ring.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. type ring struct {
  9. // endpoints are the set of endpoints which the driver will attempt to connect
  10. // to in the case it can not reach any of its hosts. They are also used to boot
  11. // strap the initial connection.
  12. endpoints []*HostInfo
  13. // hosts are the set of all hosts in the cassandra ring that we know of
  14. mu sync.RWMutex
  15. hosts map[string]*HostInfo
  16. hostList []*HostInfo
  17. pos uint32
  18. // TODO: we should store the ring metadata here also.
  19. }
  20. func (r *ring) rrHost() *HostInfo {
  21. // TODO: should we filter hosts that get used here? These hosts will be used
  22. // for the control connection, should we also provide an iterator?
  23. r.mu.RLock()
  24. defer r.mu.RUnlock()
  25. if len(r.hostList) == 0 {
  26. return nil
  27. }
  28. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  29. return r.hostList[pos%len(r.hostList)]
  30. }
  31. func (r *ring) getHost(ip net.IP) *HostInfo {
  32. r.mu.RLock()
  33. host := r.hosts[ip.String()]
  34. r.mu.RUnlock()
  35. return host
  36. }
  37. func (r *ring) allHosts() []*HostInfo {
  38. r.mu.RLock()
  39. hosts := make([]*HostInfo, 0, len(r.hosts))
  40. for _, host := range r.hosts {
  41. hosts = append(hosts, host)
  42. }
  43. r.mu.RUnlock()
  44. return hosts
  45. }
  46. func (r *ring) currentHosts() map[string]*HostInfo {
  47. r.mu.RLock()
  48. hosts := make(map[string]*HostInfo, len(r.hosts))
  49. for k, v := range r.hosts {
  50. hosts[k] = v
  51. }
  52. r.mu.RUnlock()
  53. return hosts
  54. }
  55. func (r *ring) addHost(host *HostInfo) bool {
  56. if host.invalidConnectAddr() {
  57. panic(fmt.Sprintf("invalid host: %v", host))
  58. }
  59. ip := host.ConnectAddress().String()
  60. r.mu.Lock()
  61. if r.hosts == nil {
  62. r.hosts = make(map[string]*HostInfo)
  63. }
  64. _, ok := r.hosts[ip]
  65. if !ok {
  66. r.hostList = append(r.hostList, host)
  67. }
  68. r.hosts[ip] = host
  69. r.mu.Unlock()
  70. return ok
  71. }
  72. func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
  73. if existingHost, ok := r.addHostIfMissing(host); ok {
  74. existingHost.update(host)
  75. host = existingHost
  76. }
  77. return host
  78. }
  79. func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
  80. if host.invalidConnectAddr() {
  81. panic(fmt.Sprintf("invalid host: %v", host))
  82. }
  83. ip := host.ConnectAddress().String()
  84. r.mu.Lock()
  85. if r.hosts == nil {
  86. r.hosts = make(map[string]*HostInfo)
  87. }
  88. existing, ok := r.hosts[ip]
  89. if !ok {
  90. r.hosts[ip] = host
  91. existing = host
  92. r.hostList = append(r.hostList, host)
  93. }
  94. r.mu.Unlock()
  95. return existing, ok
  96. }
  97. func (r *ring) removeHost(ip net.IP) bool {
  98. r.mu.Lock()
  99. if r.hosts == nil {
  100. r.hosts = make(map[string]*HostInfo)
  101. }
  102. k := ip.String()
  103. _, ok := r.hosts[k]
  104. if ok {
  105. for i, host := range r.hostList {
  106. if host.ConnectAddress().Equal(ip) {
  107. r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
  108. break
  109. }
  110. }
  111. }
  112. delete(r.hosts, k)
  113. r.mu.Unlock()
  114. return ok
  115. }
  116. type clusterMetadata struct {
  117. mu sync.RWMutex
  118. partitioner string
  119. }
  120. func (c *clusterMetadata) setPartitioner(partitioner string) {
  121. c.mu.RLock()
  122. defer c.mu.RUnlock()
  123. if c.partitioner != partitioner {
  124. // TODO: update other things now
  125. c.partitioner = partitioner
  126. }
  127. }