host_source.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "sync"
  7. )
  8. type nodeState int32
  9. func (n nodeState) String() string {
  10. if n == NodeUp {
  11. return "UP"
  12. } else if n == NodeDown {
  13. return "DOWN"
  14. }
  15. return fmt.Sprintf("UNKNOWN_%d", n)
  16. }
  17. const (
  18. NodeUp nodeState = iota
  19. NodeDown
  20. )
  21. type cassVersion struct {
  22. Major, Minor, Patch int
  23. }
  24. func (c cassVersion) String() string {
  25. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  26. }
  27. type HostInfo struct {
  28. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  29. // that we are thread safe use a mutex to access all fields.
  30. mu sync.RWMutex
  31. peer string
  32. dataCenter string
  33. rack string
  34. hostId string
  35. version cassVersion
  36. state nodeState
  37. tokens []string
  38. }
  39. func (h *HostInfo) Equal(host *HostInfo) bool {
  40. h.mu.RLock()
  41. defer h.mu.RUnlock()
  42. host.mu.RLock()
  43. defer host.mu.RUnlock()
  44. return h.peer == host.peer && h.hostId == host.hostId
  45. }
  46. func (h *HostInfo) Peer() string {
  47. h.mu.RLock()
  48. defer h.mu.RUnlock()
  49. return h.peer
  50. }
  51. func (h *HostInfo) setPeer(peer string) *HostInfo {
  52. h.mu.Lock()
  53. defer h.mu.Unlock()
  54. h.peer = peer
  55. return h
  56. }
  57. func (h *HostInfo) DataCenter() string {
  58. h.mu.RLock()
  59. defer h.mu.RUnlock()
  60. return h.dataCenter
  61. }
  62. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  63. h.mu.Lock()
  64. defer h.mu.Unlock()
  65. h.dataCenter = dataCenter
  66. return h
  67. }
  68. func (h *HostInfo) Rack() string {
  69. h.mu.RLock()
  70. defer h.mu.RUnlock()
  71. return h.rack
  72. }
  73. func (h *HostInfo) setRack(rack string) *HostInfo {
  74. h.mu.Lock()
  75. defer h.mu.Unlock()
  76. h.rack = rack
  77. return h
  78. }
  79. func (h *HostInfo) HostID() string {
  80. h.mu.RLock()
  81. defer h.mu.RUnlock()
  82. return h.hostId
  83. }
  84. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  85. h.mu.Lock()
  86. defer h.mu.Unlock()
  87. h.hostId = hostID
  88. return h
  89. }
  90. func (h *HostInfo) Version() cassVersion {
  91. h.mu.RLock()
  92. defer h.mu.RUnlock()
  93. return h.version
  94. }
  95. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  96. h.mu.Lock()
  97. defer h.mu.Unlock()
  98. h.version.Major = major
  99. h.version.Minor = minor
  100. h.version.Patch = patch
  101. return h
  102. }
  103. func (h *HostInfo) State() nodeState {
  104. h.mu.RLock()
  105. defer h.mu.RUnlock()
  106. return h.state
  107. }
  108. func (h *HostInfo) setState(state nodeState) *HostInfo {
  109. h.mu.Lock()
  110. defer h.mu.Unlock()
  111. h.state = state
  112. return h
  113. }
  114. func (h *HostInfo) Tokens() []string {
  115. h.mu.RLock()
  116. defer h.mu.RUnlock()
  117. return h.tokens
  118. }
  119. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  120. h.mu.Lock()
  121. defer h.mu.Unlock()
  122. h.tokens = tokens
  123. return h
  124. }
  125. func (h *HostInfo) IsUp() bool {
  126. return h.State() == NodeUp
  127. }
  128. func (h HostInfo) String() string {
  129. h.mu.RLock()
  130. defer h.mu.RUnlock()
  131. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  132. }
  133. // Polls system.peers at a specific interval to find new hosts
  134. type ringDescriber struct {
  135. dcFilter string
  136. rackFilter string
  137. session *Session
  138. closeChan chan bool
  139. // indicates that we can use system.local to get the connections remote address
  140. localHasRpcAddr bool
  141. mu sync.Mutex
  142. prevHosts []*HostInfo
  143. prevPartitioner string
  144. }
  145. func checkSystemLocal(control *controlConn) (bool, error) {
  146. iter := control.query("SELECT broadcast_address FROM system.local")
  147. if err := iter.err; err != nil {
  148. if errf, ok := err.(*errorFrame); ok {
  149. if errf.code == errSyntax {
  150. return false, nil
  151. }
  152. }
  153. return false, err
  154. }
  155. return true, nil
  156. }
  157. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  158. r.mu.Lock()
  159. defer r.mu.Unlock()
  160. // we need conn to be the same because we need to query system.peers and system.local
  161. // on the same node to get the whole cluster
  162. const (
  163. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
  164. // only supported in 2.2.0, 2.1.6, 2.0.16
  165. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
  166. )
  167. localHost := &HostInfo{}
  168. if r.localHasRpcAddr {
  169. iter := r.session.control.query(localQuery)
  170. if iter == nil {
  171. return r.prevHosts, r.prevPartitioner, nil
  172. }
  173. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  174. &localHost.hostId, &localHost.tokens, &partitioner)
  175. if err = iter.Close(); err != nil {
  176. return nil, "", err
  177. }
  178. } else {
  179. iter := r.session.control.query(legacyLocalQuery)
  180. if iter == nil {
  181. return r.prevHosts, r.prevPartitioner, nil
  182. }
  183. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner)
  184. if err = iter.Close(); err != nil {
  185. return nil, "", err
  186. }
  187. addr, _, err := net.SplitHostPort(r.session.control.addr())
  188. if err != nil {
  189. // this should not happen, ever, as this is the address that was dialed by conn, here
  190. // a panic makes sense, please report a bug if it occurs.
  191. panic(err)
  192. }
  193. localHost.peer = addr
  194. }
  195. hosts = []*HostInfo{localHost}
  196. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens FROM system.peers")
  197. if iter == nil {
  198. return r.prevHosts, r.prevPartitioner, nil
  199. }
  200. host := &HostInfo{}
  201. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens) {
  202. if r.matchFilter(host) {
  203. hosts = append(hosts, host)
  204. }
  205. host = &HostInfo{}
  206. }
  207. if err = iter.Close(); err != nil {
  208. return nil, "", err
  209. }
  210. r.prevHosts = hosts
  211. r.prevPartitioner = partitioner
  212. return hosts, partitioner, nil
  213. }
  214. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  215. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  216. return false
  217. }
  218. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  219. return false
  220. }
  221. return true
  222. }
  223. func (r *ringDescriber) refreshRing() {
  224. // if we have 0 hosts this will return the previous list of hosts to
  225. // attempt to reconnect to the cluster otherwise we would never find
  226. // downed hosts again, could possibly have an optimisation to only
  227. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  228. hosts, partitioner, err := r.GetHosts()
  229. if err != nil {
  230. log.Println("RingDescriber: unable to get ring topology:", err)
  231. return
  232. }
  233. // TODO: move this to session
  234. for _, h := range hosts {
  235. if r.session.ring.addHostIfMissing(h) {
  236. r.session.pool.addHost(h)
  237. // TODO: trigger OnUp/OnAdd
  238. }
  239. }
  240. r.session.pool.SetPartitioner(partitioner)
  241. }