ring.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package gocql
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. )
  7. type ring struct {
  8. // endpoints are the set of endpoints which the driver will attempt to connect
  9. // to in the case it can not reach any of its hosts. They are also used to boot
  10. // strap the initial connection.
  11. endpoints []*HostInfo
  12. // hosts are the set of all hosts in the cassandra ring that we know of
  13. mu sync.RWMutex
  14. hosts map[string]*HostInfo
  15. hostList []*HostInfo
  16. pos uint32
  17. // TODO: we should store the ring metadata here also.
  18. }
  19. func (r *ring) rrHost() *HostInfo {
  20. // TODO: should we filter hosts that get used here? These hosts will be used
  21. // for the control connection, should we also provide an iterator?
  22. r.mu.RLock()
  23. defer r.mu.RUnlock()
  24. if len(r.hostList) == 0 {
  25. return nil
  26. }
  27. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  28. return r.hostList[pos%len(r.hostList)]
  29. }
  30. func (r *ring) getHost(ip net.IP) *HostInfo {
  31. r.mu.RLock()
  32. host := r.hosts[ip.String()]
  33. r.mu.RUnlock()
  34. return host
  35. }
  36. func (r *ring) allHosts() []*HostInfo {
  37. r.mu.RLock()
  38. hosts := make([]*HostInfo, 0, len(r.hosts))
  39. for _, host := range r.hosts {
  40. hosts = append(hosts, host)
  41. }
  42. r.mu.RUnlock()
  43. return hosts
  44. }
  45. func (r *ring) addHost(host *HostInfo) bool {
  46. ip := host.ConnectAddress().String()
  47. r.mu.Lock()
  48. if r.hosts == nil {
  49. r.hosts = make(map[string]*HostInfo)
  50. }
  51. _, ok := r.hosts[ip]
  52. if !ok {
  53. r.hostList = append(r.hostList, host)
  54. }
  55. r.hosts[ip] = host
  56. r.mu.Unlock()
  57. return ok
  58. }
  59. func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
  60. if existingHost, ok := r.addHostIfMissing(host); ok {
  61. existingHost.update(host)
  62. host = existingHost
  63. }
  64. return host
  65. }
  66. func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
  67. ip := host.ConnectAddress().String()
  68. r.mu.Lock()
  69. if r.hosts == nil {
  70. r.hosts = make(map[string]*HostInfo)
  71. }
  72. existing, ok := r.hosts[ip]
  73. if !ok {
  74. r.hosts[ip] = host
  75. existing = host
  76. r.hostList = append(r.hostList, host)
  77. }
  78. r.mu.Unlock()
  79. return existing, ok
  80. }
  81. func (r *ring) removeHost(ip net.IP) bool {
  82. r.mu.Lock()
  83. if r.hosts == nil {
  84. r.hosts = make(map[string]*HostInfo)
  85. }
  86. k := ip.String()
  87. _, ok := r.hosts[k]
  88. if ok {
  89. for i, host := range r.hostList {
  90. if host.ConnectAddress().Equal(ip) {
  91. r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
  92. break
  93. }
  94. }
  95. }
  96. delete(r.hosts, k)
  97. r.mu.Unlock()
  98. return ok
  99. }
  100. type clusterMetadata struct {
  101. mu sync.RWMutex
  102. partitioner string
  103. }
  104. func (c *clusterMetadata) setPartitioner(partitioner string) {
  105. c.mu.RLock()
  106. defer c.mu.RUnlock()
  107. if c.partitioner != partitioner {
  108. // TODO: update other things now
  109. c.partitioner = partitioner
  110. }
  111. }