host_source.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. type nodeState int32
  10. func (n nodeState) String() string {
  11. if n == NodeUp {
  12. return "UP"
  13. } else if n == NodeDown {
  14. return "DOWN"
  15. }
  16. return fmt.Sprintf("UNKNOWN_%d", n)
  17. }
  18. const (
  19. NodeUp nodeState = iota
  20. NodeDown
  21. )
  22. type cassVersion struct {
  23. Major, Minor, Patch int
  24. }
  25. func (c cassVersion) String() string {
  26. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  27. }
  28. type HostInfo struct {
  29. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  30. // that we are thread safe use a mutex to access all fields.
  31. mu sync.RWMutex
  32. peer string
  33. dataCenter string
  34. rack string
  35. hostId string
  36. version cassVersion
  37. state nodeState
  38. tokens []string
  39. }
  40. func (h *HostInfo) Equal(host *HostInfo) bool {
  41. h.mu.RLock()
  42. defer h.mu.RUnlock()
  43. host.mu.RLock()
  44. defer host.mu.RUnlock()
  45. return h.peer == host.peer && h.hostId == host.hostId
  46. }
  47. func (h *HostInfo) Peer() string {
  48. h.mu.RLock()
  49. defer h.mu.RUnlock()
  50. return h.peer
  51. }
  52. func (h *HostInfo) setPeer(peer string) *HostInfo {
  53. h.mu.Lock()
  54. defer h.mu.Unlock()
  55. h.peer = peer
  56. return h
  57. }
  58. func (h *HostInfo) DataCenter() string {
  59. h.mu.RLock()
  60. defer h.mu.RUnlock()
  61. return h.dataCenter
  62. }
  63. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  64. h.mu.Lock()
  65. defer h.mu.Unlock()
  66. h.dataCenter = dataCenter
  67. return h
  68. }
  69. func (h *HostInfo) Rack() string {
  70. h.mu.RLock()
  71. defer h.mu.RUnlock()
  72. return h.rack
  73. }
  74. func (h *HostInfo) setRack(rack string) *HostInfo {
  75. h.mu.Lock()
  76. defer h.mu.Unlock()
  77. h.rack = rack
  78. return h
  79. }
  80. func (h *HostInfo) HostID() string {
  81. h.mu.RLock()
  82. defer h.mu.RUnlock()
  83. return h.hostId
  84. }
  85. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  86. h.mu.Lock()
  87. defer h.mu.Unlock()
  88. h.hostId = hostID
  89. return h
  90. }
  91. func (h *HostInfo) Version() cassVersion {
  92. h.mu.RLock()
  93. defer h.mu.RUnlock()
  94. return h.version
  95. }
  96. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  97. h.mu.Lock()
  98. defer h.mu.Unlock()
  99. h.version.Major = major
  100. h.version.Minor = minor
  101. h.version.Patch = patch
  102. return h
  103. }
  104. func (h *HostInfo) State() nodeState {
  105. h.mu.RLock()
  106. defer h.mu.RUnlock()
  107. return h.state
  108. }
  109. func (h *HostInfo) setState(state nodeState) *HostInfo {
  110. h.mu.Lock()
  111. defer h.mu.Unlock()
  112. h.state = state
  113. return h
  114. }
  115. func (h *HostInfo) Tokens() []string {
  116. h.mu.RLock()
  117. defer h.mu.RUnlock()
  118. return h.tokens
  119. }
  120. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  121. h.mu.Lock()
  122. defer h.mu.Unlock()
  123. h.tokens = tokens
  124. return h
  125. }
  126. func (h HostInfo) String() string {
  127. h.mu.RLock()
  128. defer h.mu.RUnlock()
  129. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  130. }
  131. // Polls system.peers at a specific interval to find new hosts
  132. type ringDescriber struct {
  133. dcFilter string
  134. rackFilter string
  135. session *Session
  136. closeChan chan bool
  137. // indicates that we can use system.local to get the connections remote address
  138. localHasRpcAddr bool
  139. mu sync.Mutex
  140. prevHosts []*HostInfo
  141. prevPartitioner string
  142. }
  143. func checkSystemLocal(control *controlConn) (bool, error) {
  144. iter := control.query("SELECT broadcast_address FROM system.local")
  145. if err := iter.err; err != nil {
  146. if errf, ok := err.(*errorFrame); ok {
  147. if errf.code == errSyntax {
  148. return false, nil
  149. }
  150. }
  151. return false, err
  152. }
  153. return true, nil
  154. }
  155. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  156. r.mu.Lock()
  157. defer r.mu.Unlock()
  158. // we need conn to be the same because we need to query system.peers and system.local
  159. // on the same node to get the whole cluster
  160. const (
  161. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
  162. // only supported in 2.2.0, 2.1.6, 2.0.16
  163. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
  164. )
  165. localHost := &HostInfo{}
  166. if r.localHasRpcAddr {
  167. iter := r.session.control.query(localQuery)
  168. if iter == nil {
  169. return r.prevHosts, r.prevPartitioner, nil
  170. }
  171. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  172. &localHost.hostId, &localHost.tokens, &partitioner)
  173. if err = iter.Close(); err != nil {
  174. return nil, "", err
  175. }
  176. } else {
  177. iter := r.session.control.query(legacyLocalQuery)
  178. if iter == nil {
  179. return r.prevHosts, r.prevPartitioner, nil
  180. }
  181. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner)
  182. if err = iter.Close(); err != nil {
  183. return nil, "", err
  184. }
  185. addr, _, err := net.SplitHostPort(r.session.control.addr())
  186. if err != nil {
  187. // this should not happen, ever, as this is the address that was dialed by conn, here
  188. // a panic makes sense, please report a bug if it occurs.
  189. panic(err)
  190. }
  191. localHost.peer = addr
  192. }
  193. hosts = []*HostInfo{localHost}
  194. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens FROM system.peers")
  195. if iter == nil {
  196. return r.prevHosts, r.prevPartitioner, nil
  197. }
  198. host := &HostInfo{}
  199. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens) {
  200. if r.matchFilter(host) {
  201. hosts = append(hosts, host)
  202. }
  203. host = &HostInfo{}
  204. }
  205. if err = iter.Close(); err != nil {
  206. return nil, "", err
  207. }
  208. r.prevHosts = hosts
  209. r.prevPartitioner = partitioner
  210. return hosts, partitioner, nil
  211. }
  212. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  213. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  214. return false
  215. }
  216. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  217. return false
  218. }
  219. return true
  220. }
  221. func (r *ringDescriber) refreshRing() {
  222. // if we have 0 hosts this will return the previous list of hosts to
  223. // attempt to reconnect to the cluster otherwise we would never find
  224. // downed hosts again, could possibly have an optimisation to only
  225. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  226. hosts, partitioner, err := r.GetHosts()
  227. if err != nil {
  228. log.Println("RingDescriber: unable to get ring topology:", err)
  229. return
  230. }
  231. r.session.pool.SetHosts(hosts)
  232. r.session.pool.SetPartitioner(partitioner)
  233. }
  234. func (r *ringDescriber) run(sleep time.Duration) {
  235. if sleep == 0 {
  236. sleep = 30 * time.Second
  237. }
  238. for {
  239. r.refreshRing()
  240. select {
  241. case <-time.After(sleep):
  242. case <-r.closeChan:
  243. return
  244. }
  245. }
  246. }