host_source.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. type nodeState int32
  10. func (n nodeState) String() {
  11. if n == NodeUp {
  12. return "UP"
  13. } else if n == NodeDown {
  14. return "DOWN"
  15. }
  16. }
  17. const (
  18. NodeUp nodeState = iota
  19. NodeDown
  20. )
  21. type cassVersion struct {
  22. Major, Minor, Patch int
  23. }
  24. func (c cassVersion) String() string {
  25. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  26. }
  27. type HostInfo struct {
  28. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  29. // that we are thread safe use a mutex to access all fields.
  30. mu sync.RWMutex
  31. peer string
  32. dataCenter string
  33. rack string
  34. hostId string
  35. version cassVersion
  36. state nodeState
  37. tokens []string
  38. }
  39. func (h *HostInfo) Peer() string {
  40. h.mu.RLock()
  41. defer h.mu.RUnlock()
  42. return h.peer
  43. }
  44. func (h *HostInfo) SetPeer(peer string) *HostInfo {
  45. h.mu.Lock()
  46. defer h.mu.Unlock()
  47. h.peer = peer
  48. return h
  49. }
  50. func (h *HostInfo) DataCenter() string {
  51. h.mu.RLock()
  52. defer h.mu.RUnlock()
  53. return h.dataCenter
  54. }
  55. func (h *HostInfo) SetDataCenter(dataCenter string) *HostInfo {
  56. h.mu.Lock()
  57. defer h.mu.Unlock()
  58. h.dataCenter = dataCenter
  59. return h
  60. }
  61. func (h *HostFilter) Rack() string {
  62. h.mu.RLock()
  63. defer h.mu.RUnlock()
  64. return h.rack
  65. }
  66. func (h *HostInfo) SetRack(rack string) *HostInfo {
  67. h.mu.Lock()
  68. defer h.mu.Unlock()
  69. h.rack = rack
  70. return h
  71. }
  72. func (h *HostInfo) HostID() string {
  73. h.mu.RLock()
  74. defer h.mu.RUnlock()
  75. return h.hostId
  76. }
  77. func (h *HostInfo) SetHostID(hostID string) *HostInfo {
  78. h.mu.Lock()
  79. defer h.mu.Unlock()
  80. h.hostId = hostID
  81. return h
  82. }
  83. func (h *HostInfo) Version() cassVersion {
  84. h.mu.RLock()
  85. defer h.mu.RUnlock()
  86. return h.version
  87. }
  88. func (h *HostInfo) SetVersion(major, minor, patch int) *HostInfo {
  89. h.mu.Lock()
  90. defer h.mu.Unlock()
  91. h.version.Major = major
  92. h.version.Minor = minor
  93. h.version.Patch = patch
  94. return h
  95. }
  96. func (h *HostInfo) State() nodeState {
  97. h.mu.RLock()
  98. defer h.mu.RUnlock()
  99. return h.state
  100. }
  101. func (h *HostInfo) SetState(state nodeState) *HostInfo {
  102. h.mu.Lock()
  103. defer h.mu.Unlock()
  104. h.state = state
  105. return h
  106. }
  107. func (h *HostInfo) Tokens() []string {
  108. h.mu.RLock()
  109. defer h.mu.RUnlock()
  110. return h.tokens
  111. }
  112. func (h *HostInfo) SetTokens(tokens []string) *HostInfo {
  113. h.mu.Lock()
  114. defer h.mu.Unlock()
  115. h.tokens = tokens
  116. return h
  117. }
  118. func (h HostInfo) String() string {
  119. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.Peer, h.DataCenter, h.Rack, h.HostId, h.Version, h.State, len(h.Tokens))
  120. }
  121. // Polls system.peers at a specific interval to find new hosts
  122. type ringDescriber struct {
  123. dcFilter string
  124. rackFilter string
  125. prevHosts []HostInfo
  126. prevPartitioner string
  127. session *Session
  128. closeChan chan bool
  129. // indicates that we can use system.local to get the connections remote address
  130. localHasRpcAddr bool
  131. mu sync.Mutex
  132. }
  133. func checkSystemLocal(control *controlConn) (bool, error) {
  134. iter := control.query("SELECT broadcast_address FROM system.local")
  135. if err := iter.err; err != nil {
  136. if errf, ok := err.(*errorFrame); ok {
  137. if errf.code == errSyntax {
  138. return false, nil
  139. }
  140. }
  141. return false, err
  142. }
  143. return true, nil
  144. }
  145. func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err error) {
  146. r.mu.Lock()
  147. defer r.mu.Unlock()
  148. // we need conn to be the same because we need to query system.peers and system.local
  149. // on the same node to get the whole cluster
  150. const (
  151. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
  152. // only supported in 2.2.0, 2.1.6, 2.0.16
  153. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
  154. )
  155. var localHost HostInfo
  156. if r.localHasRpcAddr {
  157. iter := r.session.control.query(localQuery)
  158. if iter == nil {
  159. return r.prevHosts, r.prevPartitioner, nil
  160. }
  161. iter.Scan(&localHost.Peer, &localHost.DataCenter, &localHost.Rack,
  162. &localHost.HostId, &localHost.Tokens, &partitioner)
  163. if err = iter.Close(); err != nil {
  164. return nil, "", err
  165. }
  166. } else {
  167. iter := r.session.control.query(legacyLocalQuery)
  168. if iter == nil {
  169. return r.prevHosts, r.prevPartitioner, nil
  170. }
  171. iter.Scan(&localHost.DataCenter, &localHost.Rack, &localHost.HostId, &localHost.Tokens, &partitioner)
  172. if err = iter.Close(); err != nil {
  173. return nil, "", err
  174. }
  175. addr, _, err := net.SplitHostPort(r.session.control.addr())
  176. if err != nil {
  177. // this should not happen, ever, as this is the address that was dialed by conn, here
  178. // a panic makes sense, please report a bug if it occurs.
  179. panic(err)
  180. }
  181. localHost.Peer = addr
  182. }
  183. hosts = []HostInfo{localHost}
  184. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens FROM system.peers")
  185. if iter == nil {
  186. return r.prevHosts, r.prevPartitioner, nil
  187. }
  188. host := HostInfo{}
  189. for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
  190. if r.matchFilter(&host) {
  191. hosts = append(hosts, host)
  192. }
  193. host = HostInfo{}
  194. }
  195. if err = iter.Close(); err != nil {
  196. return nil, "", err
  197. }
  198. r.prevHosts = hosts
  199. r.prevPartitioner = partitioner
  200. return hosts, partitioner, nil
  201. }
  202. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  203. if r.dcFilter != "" && r.dcFilter != host.DataCenter {
  204. return false
  205. }
  206. if r.rackFilter != "" && r.rackFilter != host.Rack {
  207. return false
  208. }
  209. return true
  210. }
  211. func (r *ringDescriber) refreshRing() {
  212. // if we have 0 hosts this will return the previous list of hosts to
  213. // attempt to reconnect to the cluster otherwise we would never find
  214. // downed hosts again, could possibly have an optimisation to only
  215. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  216. hosts, partitioner, err := r.GetHosts()
  217. if err != nil {
  218. log.Println("RingDescriber: unable to get ring topology:", err)
  219. return
  220. }
  221. r.session.pool.SetHosts(hosts)
  222. r.session.pool.SetPartitioner(partitioner)
  223. }
  224. func (r *ringDescriber) run(sleep time.Duration) {
  225. if sleep == 0 {
  226. sleep = 30 * time.Second
  227. }
  228. for {
  229. r.refreshRing()
  230. select {
  231. case <-time.After(sleep):
  232. case <-r.closeChan:
  233. return
  234. }
  235. }
  236. }