ring.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package gocql
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. )
  6. type ring struct {
  7. // endpoints are the set of endpoints which the driver will attempt to connect
  8. // to in the case it can not reach any of its hosts. They are also used to boot
  9. // strap the initial connection.
  10. endpoints []string
  11. // hosts are the set of all hosts in the cassandra ring that we know of
  12. mu sync.RWMutex
  13. hosts map[string]*HostInfo
  14. hostList []*HostInfo
  15. pos uint32
  16. // TODO: we should store the ring metadata here also.
  17. }
  18. func (r *ring) rrHost() *HostInfo {
  19. // TODO: should we filter hosts that get used here? These hosts will be used
  20. // for the control connection, should we also provide an iterator?
  21. r.mu.RLock()
  22. defer r.mu.RUnlock()
  23. pos := int(atomic.AddUint32(&r.pos, 1) - 1)
  24. return r.hostList[pos%len(r.hostList)]
  25. }
  26. func (r *ring) getHost(addr string) *HostInfo {
  27. r.mu.RLock()
  28. host := r.hosts[addr]
  29. r.mu.RUnlock()
  30. return host
  31. }
  32. func (r *ring) allHosts() []*HostInfo {
  33. r.mu.RLock()
  34. hosts := make([]*HostInfo, 0, len(r.hosts))
  35. for _, host := range r.hosts {
  36. hosts = append(hosts, host)
  37. }
  38. r.mu.RUnlock()
  39. return hosts
  40. }
  41. func (r *ring) addHost(host *HostInfo) bool {
  42. r.mu.Lock()
  43. if r.hosts == nil {
  44. r.hosts = make(map[string]*HostInfo)
  45. }
  46. addr := host.Peer()
  47. _, ok := r.hosts[addr]
  48. r.hosts[addr] = host
  49. r.mu.Unlock()
  50. return ok
  51. }
  52. func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
  53. r.mu.Lock()
  54. if r.hosts == nil {
  55. r.hosts = make(map[string]*HostInfo)
  56. }
  57. addr := host.Peer()
  58. existing, ok := r.hosts[addr]
  59. if !ok {
  60. r.hosts[addr] = host
  61. existing = host
  62. }
  63. r.mu.Unlock()
  64. return existing, ok
  65. }
  66. func (r *ring) removeHost(addr string) bool {
  67. r.mu.Lock()
  68. if r.hosts == nil {
  69. r.hosts = make(map[string]*HostInfo)
  70. }
  71. _, ok := r.hosts[addr]
  72. delete(r.hosts, addr)
  73. r.mu.Unlock()
  74. return ok
  75. }
  76. type clusterMetadata struct {
  77. mu sync.RWMutex
  78. partitioner string
  79. }
  80. func (c *clusterMetadata) setPartitioner(partitioner string) {
  81. c.mu.RLock()
  82. defer c.mu.RUnlock()
  83. if c.partitioner != partitioner {
  84. // TODO: update other things now
  85. c.partitioner = partitioner
  86. }
  87. }