ring.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package gocql
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. )
  6. type ring struct {
  7. // endpoints are the set of endpoints which the driver will attempt to connect
  8. // to in the case it can not reach any of its hosts. They are also used to boot
  9. // strap the initial connection.
  10. endpoints []string
  11. // hosts are the set of all hosts in the cassandra ring that we know of
  12. mu sync.RWMutex
  13. hosts map[string]*HostInfo
  14. hostList []*HostInfo
  15. pos uint32
  16. // TODO: we should store the ring metadata here also.
  17. }
  18. func (r *ring) rrHost() *HostInfo {
  19. // TODO: should we filter hosts that get used here? These hosts will be used
  20. // for the control connection, should we also provide an iterator?
  21. r.mu.RLock()
  22. defer r.mu.RUnlock()
  23. if len(r.hostList) == 0 {
  24. return nil
  25. }
  26. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  27. return r.hostList[pos%len(r.hostList)]
  28. }
  29. func (r *ring) getHost(addr string) *HostInfo {
  30. r.mu.RLock()
  31. host := r.hosts[addr]
  32. r.mu.RUnlock()
  33. return host
  34. }
  35. func (r *ring) allHosts() []*HostInfo {
  36. r.mu.RLock()
  37. hosts := make([]*HostInfo, 0, len(r.hosts))
  38. for _, host := range r.hosts {
  39. hosts = append(hosts, host)
  40. }
  41. r.mu.RUnlock()
  42. return hosts
  43. }
  44. func (r *ring) addHost(host *HostInfo) bool {
  45. r.mu.Lock()
  46. if r.hosts == nil {
  47. r.hosts = make(map[string]*HostInfo)
  48. }
  49. addr := host.Peer()
  50. _, ok := r.hosts[addr]
  51. r.hosts[addr] = host
  52. r.mu.Unlock()
  53. return ok
  54. }
  55. func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
  56. r.mu.Lock()
  57. if r.hosts == nil {
  58. r.hosts = make(map[string]*HostInfo)
  59. }
  60. addr := host.Peer()
  61. existing, ok := r.hosts[addr]
  62. if !ok {
  63. r.hosts[addr] = host
  64. existing = host
  65. }
  66. r.mu.Unlock()
  67. return existing, ok
  68. }
  69. func (r *ring) removeHost(addr string) bool {
  70. r.mu.Lock()
  71. if r.hosts == nil {
  72. r.hosts = make(map[string]*HostInfo)
  73. }
  74. _, ok := r.hosts[addr]
  75. delete(r.hosts, addr)
  76. r.mu.Unlock()
  77. return ok
  78. }
  79. type clusterMetadata struct {
  80. mu sync.RWMutex
  81. partitioner string
  82. }
  83. func (c *clusterMetadata) setPartitioner(partitioner string) {
  84. c.mu.RLock()
  85. defer c.mu.RUnlock()
  86. if c.partitioner != partitioner {
  87. // TODO: update other things now
  88. c.partitioner = partitioner
  89. }
  90. }