host_source.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package gocql
  2. import (
  3. "log"
  4. "net"
  5. "time"
  6. )
  7. type HostInfo struct {
  8. Peer string
  9. DataCenter string
  10. Rack string
  11. HostId string
  12. Tokens []string
  13. }
  14. // Polls system.peers at a specific interval to find new hosts
  15. type ringDescriber struct {
  16. dcFilter string
  17. rackFilter string
  18. prevHosts []HostInfo
  19. prevPartitioner string
  20. session *Session
  21. }
  22. func (r *ringDescriber) GetHosts() (
  23. hosts []HostInfo,
  24. partitioner string,
  25. err error,
  26. ) {
  27. // we need conn to be the same because we need to query system.peers and system.local
  28. // on the same node to get the whole cluster
  29. conn := r.session.Pool.Pick(nil)
  30. if conn == nil {
  31. return r.prevHosts, r.prevPartitioner, nil
  32. }
  33. query := r.session.Query("SELECT data_center, rack, host_id, tokens, partitioner FROM system.local")
  34. iter := conn.executeQuery(query)
  35. host := HostInfo{}
  36. iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens, &partitioner)
  37. if err = iter.Close(); err != nil {
  38. return nil, "", err
  39. }
  40. addr, _, err := net.SplitHostPort(conn.Address())
  41. if err != nil {
  42. // this should not happen, ever, as this is the address that was dialed by conn, here
  43. // a panic makes sense, please report a bug if it occurs.
  44. panic(err)
  45. }
  46. host.Peer = addr
  47. hosts = []HostInfo{host}
  48. query = r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
  49. iter = conn.executeQuery(query)
  50. host = HostInfo{}
  51. for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
  52. if r.matchFilter(&host) {
  53. hosts = append(hosts, host)
  54. }
  55. host = HostInfo{}
  56. }
  57. if err = iter.Close(); err != nil {
  58. return nil, "", err
  59. }
  60. r.prevHosts = hosts
  61. r.prevPartitioner = partitioner
  62. return hosts, partitioner, nil
  63. }
  64. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  65. if r.dcFilter != "" && r.dcFilter != host.DataCenter {
  66. return false
  67. }
  68. if r.rackFilter != "" && r.rackFilter != host.Rack {
  69. return false
  70. }
  71. return true
  72. }
  73. func (h *ringDescriber) run(sleep time.Duration) {
  74. if sleep == 0 {
  75. sleep = 30 * time.Second
  76. }
  77. for {
  78. // if we have 0 hosts this will return the previous list of hosts to
  79. // attempt to reconnect to the cluster otherwise we would never find
  80. // downed hosts again, could possibly have an optimisation to only
  81. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  82. hosts, partitioner, err := h.GetHosts()
  83. if err != nil {
  84. log.Println("RingDescriber: unable to get ring topology:", err)
  85. } else {
  86. h.session.Pool.SetHosts(hosts)
  87. if v, ok := h.session.Pool.(SetPartitioner); ok {
  88. v.SetPartitioner(partitioner)
  89. }
  90. }
  91. time.Sleep(sleep)
  92. }
  93. }