host_source.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. type HostInfo struct {
  10. Peer string
  11. DataCenter string
  12. Rack string
  13. HostId string
  14. Tokens []string
  15. }
  16. func (h HostInfo) String() string {
  17. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q num_tokens=%d]", h.Peer, h.DataCenter, h.Rack, h.HostId, len(h.Tokens))
  18. }
  19. // Polls system.peers at a specific interval to find new hosts
  20. type ringDescriber struct {
  21. dcFilter string
  22. rackFilter string
  23. prevHosts []HostInfo
  24. prevPartitioner string
  25. session *Session
  26. closeChan chan bool
  27. // indicates that we can use system.local to get the connections remote address
  28. localHasRpcAddr bool
  29. mu sync.Mutex
  30. }
  31. func checkSystemLocal(control *controlConn) (bool, error) {
  32. iter := control.query("SELECT rpc_address FROM system.local")
  33. if err := iter.err; err != nil {
  34. if errf, ok := err.(*errorFrame); ok {
  35. if errf.code == errSyntax {
  36. return false, nil
  37. }
  38. }
  39. return false, err
  40. }
  41. return true, nil
  42. }
  43. func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err error) {
  44. r.mu.Lock()
  45. defer r.mu.Unlock()
  46. // we need conn to be the same because we need to query system.peers and system.local
  47. // on the same node to get the whole cluster
  48. const (
  49. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
  50. // only supported in 2.2.0, 2.1.6, 2.0.16
  51. localQuery = "SELECT rpc_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
  52. )
  53. var localHost HostInfo
  54. if r.localHasRpcAddr {
  55. iter := r.session.control.query(localQuery)
  56. if iter == nil {
  57. return r.prevHosts, r.prevPartitioner, nil
  58. }
  59. iter.Scan(&localHost.Peer, &localHost.DataCenter, &localHost.Rack,
  60. &localHost.HostId, &localHost.Tokens, &partitioner)
  61. if err = iter.Close(); err != nil {
  62. return nil, "", err
  63. }
  64. } else {
  65. iter := r.session.control.query(legacyLocalQuery)
  66. if iter == nil {
  67. return r.prevHosts, r.prevPartitioner, nil
  68. }
  69. iter.Scan(&localHost.DataCenter, &localHost.Rack, &localHost.HostId, &localHost.Tokens, &partitioner)
  70. if err = iter.Close(); err != nil {
  71. return nil, "", err
  72. }
  73. addr, _, err := net.SplitHostPort(r.session.control.addr())
  74. if err != nil {
  75. // this should not happen, ever, as this is the address that was dialed by conn, here
  76. // a panic makes sense, please report a bug if it occurs.
  77. panic(err)
  78. }
  79. localHost.Peer = addr
  80. }
  81. hosts = []HostInfo{localHost}
  82. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens FROM system.peers")
  83. if iter == nil {
  84. return r.prevHosts, r.prevPartitioner, nil
  85. }
  86. host := HostInfo{}
  87. for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
  88. if r.matchFilter(&host) {
  89. hosts = append(hosts, host)
  90. }
  91. host = HostInfo{}
  92. }
  93. if err = iter.Close(); err != nil {
  94. return nil, "", err
  95. }
  96. r.prevHosts = hosts
  97. r.prevPartitioner = partitioner
  98. return hosts, partitioner, nil
  99. }
  100. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  101. if r.dcFilter != "" && r.dcFilter != host.DataCenter {
  102. return false
  103. }
  104. if r.rackFilter != "" && r.rackFilter != host.Rack {
  105. return false
  106. }
  107. return true
  108. }
  109. func (r *ringDescriber) refreshRing() {
  110. // if we have 0 hosts this will return the previous list of hosts to
  111. // attempt to reconnect to the cluster otherwise we would never find
  112. // downed hosts again, could possibly have an optimisation to only
  113. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  114. hosts, partitioner, err := r.GetHosts()
  115. if err != nil {
  116. log.Println("RingDescriber: unable to get ring topology:", err)
  117. return
  118. }
  119. r.session.pool.SetHosts(hosts)
  120. r.session.pool.SetPartitioner(partitioner)
  121. }
  122. func (r *ringDescriber) run(sleep time.Duration) {
  123. if sleep == 0 {
  124. sleep = 30 * time.Second
  125. }
  126. for {
  127. r.refreshRing()
  128. select {
  129. case <-time.After(sleep):
  130. case <-r.closeChan:
  131. return
  132. }
  133. }
  134. }