host_source.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "time"
  7. )
  8. type HostInfo struct {
  9. Peer string
  10. DataCenter string
  11. Rack string
  12. HostId string
  13. Tokens []string
  14. }
  15. func (h HostInfo) String() string {
  16. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q num_tokens=%d]", h.Peer, h.DataCenter, h.Rack, h.HostId, len(h.Tokens))
  17. }
  18. // Polls system.peers at a specific interval to find new hosts
  19. type ringDescriber struct {
  20. dcFilter string
  21. rackFilter string
  22. prevHosts []HostInfo
  23. prevPartitioner string
  24. session *Session
  25. closeChan chan bool
  26. // indicates that we can use system.local to get the connections remote address
  27. localHasRpcAddr bool
  28. }
  29. func checkSystemLocal(control *controlConn) (bool, error) {
  30. iter := control.query("SELECT rpc_address FROM system.local")
  31. if err := iter.err; err != nil {
  32. if errf, ok := err.(*errorFrame); ok {
  33. if errf.code == errSyntax {
  34. return false, nil
  35. }
  36. }
  37. return false, err
  38. }
  39. return true, nil
  40. }
  41. func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err error) {
  42. // we need conn to be the same because we need to query system.peers and system.local
  43. // on the same node to get the whole cluster
  44. const (
  45. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
  46. // only supported in 2.2.0, 2.1.6, 2.0.16
  47. localQuery = "SELECT rpc_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
  48. )
  49. var localHost HostInfo
  50. if r.localHasRpcAddr {
  51. iter := r.session.control.query(localQuery)
  52. if iter == nil {
  53. return r.prevHosts, r.prevPartitioner, nil
  54. }
  55. iter.Scan(&localHost.Peer, &localHost.DataCenter, &localHost.Rack,
  56. &localHost.HostId, &localHost.Tokens, &partitioner)
  57. if err = iter.Close(); err != nil {
  58. return nil, "", err
  59. }
  60. } else {
  61. iter := r.session.control.query(legacyLocalQuery)
  62. if iter == nil {
  63. return r.prevHosts, r.prevPartitioner, nil
  64. }
  65. iter.Scan(&localHost.DataCenter, &localHost.Rack, &localHost.HostId, &localHost.Tokens, &partitioner)
  66. if err = iter.Close(); err != nil {
  67. return nil, "", err
  68. }
  69. addr, _, err := net.SplitHostPort(r.session.control.addr())
  70. if err != nil {
  71. // this should not happen, ever, as this is the address that was dialed by conn, here
  72. // a panic makes sense, please report a bug if it occurs.
  73. panic(err)
  74. }
  75. localHost.Peer = addr
  76. }
  77. hosts = []HostInfo{localHost}
  78. iter := r.session.control.query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
  79. if iter == nil {
  80. return r.prevHosts, r.prevPartitioner, nil
  81. }
  82. host := HostInfo{}
  83. for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
  84. if r.matchFilter(&host) {
  85. hosts = append(hosts, host)
  86. }
  87. host = HostInfo{}
  88. }
  89. if err = iter.Close(); err != nil {
  90. return nil, "", err
  91. }
  92. r.prevHosts = hosts
  93. r.prevPartitioner = partitioner
  94. return hosts, partitioner, nil
  95. }
  96. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  97. if r.dcFilter != "" && r.dcFilter != host.DataCenter {
  98. return false
  99. }
  100. if r.rackFilter != "" && r.rackFilter != host.Rack {
  101. return false
  102. }
  103. return true
  104. }
  105. func (r *ringDescriber) refreshRing() {
  106. // if we have 0 hosts this will return the previous list of hosts to
  107. // attempt to reconnect to the cluster otherwise we would never find
  108. // downed hosts again, could possibly have an optimisation to only
  109. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  110. hosts, partitioner, err := r.GetHosts()
  111. if err != nil {
  112. log.Println("RingDescriber: unable to get ring topology:", err)
  113. return
  114. }
  115. r.session.pool.SetHosts(hosts)
  116. r.session.pool.SetPartitioner(partitioner)
  117. }
  118. func (r *ringDescriber) run(sleep time.Duration) {
  119. if sleep == 0 {
  120. sleep = 30 * time.Second
  121. }
  122. for {
  123. r.refreshRing()
  124. select {
  125. case <-time.After(sleep):
  126. case <-r.closeChan:
  127. return
  128. }
  129. }
  130. }