host_source.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. type nodeState int32
  10. func (n nodeState) String() {
  11. if n == NodeUp {
  12. return "UP"
  13. } else if n == NodeDown {
  14. return "DOWN"
  15. }
  16. }
  17. const (
  18. NodeUp nodeState = iota
  19. NodeDown
  20. )
  21. type HostInfo struct {
  22. Peer string
  23. DataCenter string
  24. Rack string
  25. HostId string
  26. Version cassVersion
  27. State nodeState
  28. Tokens []string
  29. }
  30. type cassVersion struct {
  31. Major, Minor, Patch int
  32. }
  33. func (c cassVersion) String() string {
  34. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  35. }
  36. func (h HostInfo) String() string {
  37. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.Peer, h.DataCenter, h.Rack, h.HostId, h.Version, h.State, len(h.Tokens))
  38. }
  39. // Polls system.peers at a specific interval to find new hosts
  40. type ringDescriber struct {
  41. dcFilter string
  42. rackFilter string
  43. prevHosts []HostInfo
  44. prevPartitioner string
  45. session *Session
  46. closeChan chan bool
  47. // indicates that we can use system.local to get the connections remote address
  48. localHasRpcAddr bool
  49. mu sync.Mutex
  50. }
  51. func checkSystemLocal(control *controlConn) (bool, error) {
  52. iter := control.query("SELECT broadcast_address FROM system.local")
  53. if err := iter.err; err != nil {
  54. if errf, ok := err.(*errorFrame); ok {
  55. if errf.code == errSyntax {
  56. return false, nil
  57. }
  58. }
  59. return false, err
  60. }
  61. return true, nil
  62. }
  63. func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err error) {
  64. r.mu.Lock()
  65. defer r.mu.Unlock()
  66. // we need conn to be the same because we need to query system.peers and system.local
  67. // on the same node to get the whole cluster
  68. const (
  69. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
  70. // only supported in 2.2.0, 2.1.6, 2.0.16
  71. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
  72. )
  73. var localHost HostInfo
  74. if r.localHasRpcAddr {
  75. iter := r.session.control.query(localQuery)
  76. if iter == nil {
  77. return r.prevHosts, r.prevPartitioner, nil
  78. }
  79. iter.Scan(&localHost.Peer, &localHost.DataCenter, &localHost.Rack,
  80. &localHost.HostId, &localHost.Tokens, &partitioner)
  81. if err = iter.Close(); err != nil {
  82. return nil, "", err
  83. }
  84. } else {
  85. iter := r.session.control.query(legacyLocalQuery)
  86. if iter == nil {
  87. return r.prevHosts, r.prevPartitioner, nil
  88. }
  89. iter.Scan(&localHost.DataCenter, &localHost.Rack, &localHost.HostId, &localHost.Tokens, &partitioner)
  90. if err = iter.Close(); err != nil {
  91. return nil, "", err
  92. }
  93. addr, _, err := net.SplitHostPort(r.session.control.addr())
  94. if err != nil {
  95. // this should not happen, ever, as this is the address that was dialed by conn, here
  96. // a panic makes sense, please report a bug if it occurs.
  97. panic(err)
  98. }
  99. localHost.Peer = addr
  100. }
  101. hosts = []HostInfo{localHost}
  102. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens FROM system.peers")
  103. if iter == nil {
  104. return r.prevHosts, r.prevPartitioner, nil
  105. }
  106. host := HostInfo{}
  107. for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
  108. if r.matchFilter(&host) {
  109. hosts = append(hosts, host)
  110. }
  111. host = HostInfo{}
  112. }
  113. if err = iter.Close(); err != nil {
  114. return nil, "", err
  115. }
  116. r.prevHosts = hosts
  117. r.prevPartitioner = partitioner
  118. return hosts, partitioner, nil
  119. }
  120. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  121. if r.dcFilter != "" && r.dcFilter != host.DataCenter {
  122. return false
  123. }
  124. if r.rackFilter != "" && r.rackFilter != host.Rack {
  125. return false
  126. }
  127. return true
  128. }
  129. func (r *ringDescriber) refreshRing() {
  130. // if we have 0 hosts this will return the previous list of hosts to
  131. // attempt to reconnect to the cluster otherwise we would never find
  132. // downed hosts again, could possibly have an optimisation to only
  133. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  134. hosts, partitioner, err := r.GetHosts()
  135. if err != nil {
  136. log.Println("RingDescriber: unable to get ring topology:", err)
  137. return
  138. }
  139. r.session.pool.SetHosts(hosts)
  140. r.session.pool.SetPartitioner(partitioner)
  141. }
  142. func (r *ringDescriber) run(sleep time.Duration) {
  143. if sleep == 0 {
  144. sleep = 30 * time.Second
  145. }
  146. for {
  147. r.refreshRing()
  148. select {
  149. case <-time.After(sleep):
  150. case <-r.closeChan:
  151. return
  152. }
  153. }
  154. }