host_source.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. )
  9. type nodeState int32
  10. func (n nodeState) String() string {
  11. if n == NodeUp {
  12. return "UP"
  13. } else if n == NodeDown {
  14. return "DOWN"
  15. }
  16. return fmt.Sprintf("UNKNOWN_%d", n)
  17. }
  18. const (
  19. NodeUp nodeState = iota
  20. NodeDown
  21. )
  22. type cassVersion struct {
  23. Major, Minor, Patch int
  24. }
  25. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  26. v := strings.Split(string(data), ".")
  27. if len(v) != 3 {
  28. return fmt.Errorf("invalid schema_version: %v", string(data))
  29. }
  30. var err error
  31. c.Major, err = strconv.Atoi(v[0])
  32. if err != nil {
  33. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  34. }
  35. c.Minor, err = strconv.Atoi(v[1])
  36. if err != nil {
  37. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  38. }
  39. c.Patch, err = strconv.Atoi(v[2])
  40. if err != nil {
  41. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  42. }
  43. return nil
  44. }
  45. func (c cassVersion) String() string {
  46. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  47. }
  48. type HostInfo struct {
  49. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  50. // that we are thread safe use a mutex to access all fields.
  51. mu sync.RWMutex
  52. peer string
  53. dataCenter string
  54. rack string
  55. hostId string
  56. version cassVersion
  57. state nodeState
  58. tokens []string
  59. }
  60. func (h *HostInfo) Equal(host *HostInfo) bool {
  61. h.mu.RLock()
  62. defer h.mu.RUnlock()
  63. host.mu.RLock()
  64. defer host.mu.RUnlock()
  65. return h.peer == host.peer && h.hostId == host.hostId
  66. }
  67. func (h *HostInfo) Peer() string {
  68. h.mu.RLock()
  69. defer h.mu.RUnlock()
  70. return h.peer
  71. }
  72. func (h *HostInfo) setPeer(peer string) *HostInfo {
  73. h.mu.Lock()
  74. defer h.mu.Unlock()
  75. h.peer = peer
  76. return h
  77. }
  78. func (h *HostInfo) DataCenter() string {
  79. h.mu.RLock()
  80. defer h.mu.RUnlock()
  81. return h.dataCenter
  82. }
  83. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  84. h.mu.Lock()
  85. defer h.mu.Unlock()
  86. h.dataCenter = dataCenter
  87. return h
  88. }
  89. func (h *HostInfo) Rack() string {
  90. h.mu.RLock()
  91. defer h.mu.RUnlock()
  92. return h.rack
  93. }
  94. func (h *HostInfo) setRack(rack string) *HostInfo {
  95. h.mu.Lock()
  96. defer h.mu.Unlock()
  97. h.rack = rack
  98. return h
  99. }
  100. func (h *HostInfo) HostID() string {
  101. h.mu.RLock()
  102. defer h.mu.RUnlock()
  103. return h.hostId
  104. }
  105. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  106. h.mu.Lock()
  107. defer h.mu.Unlock()
  108. h.hostId = hostID
  109. return h
  110. }
  111. func (h *HostInfo) Version() cassVersion {
  112. h.mu.RLock()
  113. defer h.mu.RUnlock()
  114. return h.version
  115. }
  116. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  117. h.mu.Lock()
  118. defer h.mu.Unlock()
  119. h.version.Major = major
  120. h.version.Minor = minor
  121. h.version.Patch = patch
  122. return h
  123. }
  124. func (h *HostInfo) State() nodeState {
  125. h.mu.RLock()
  126. defer h.mu.RUnlock()
  127. return h.state
  128. }
  129. func (h *HostInfo) setState(state nodeState) *HostInfo {
  130. h.mu.Lock()
  131. defer h.mu.Unlock()
  132. h.state = state
  133. return h
  134. }
  135. func (h *HostInfo) Tokens() []string {
  136. h.mu.RLock()
  137. defer h.mu.RUnlock()
  138. return h.tokens
  139. }
  140. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  141. h.mu.Lock()
  142. defer h.mu.Unlock()
  143. h.tokens = tokens
  144. return h
  145. }
  146. func (h *HostInfo) IsUp() bool {
  147. return h.State() == NodeUp
  148. }
  149. func (h HostInfo) String() string {
  150. h.mu.RLock()
  151. defer h.mu.RUnlock()
  152. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  153. }
  154. // Polls system.peers at a specific interval to find new hosts
  155. type ringDescriber struct {
  156. dcFilter string
  157. rackFilter string
  158. session *Session
  159. closeChan chan bool
  160. // indicates that we can use system.local to get the connections remote address
  161. localHasRpcAddr bool
  162. mu sync.Mutex
  163. prevHosts []*HostInfo
  164. prevPartitioner string
  165. }
  166. func checkSystemLocal(control *controlConn) (bool, error) {
  167. iter := control.query("SELECT broadcast_address FROM system.local")
  168. if err := iter.err; err != nil {
  169. if errf, ok := err.(*errorFrame); ok {
  170. if errf.code == errSyntax {
  171. return false, nil
  172. }
  173. }
  174. return false, err
  175. }
  176. return true, nil
  177. }
  178. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  179. r.mu.Lock()
  180. defer r.mu.Unlock()
  181. // we need conn to be the same because we need to query system.peers and system.local
  182. // on the same node to get the whole cluster
  183. const (
  184. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  185. // only supported in 2.2.0, 2.1.6, 2.0.16
  186. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  187. )
  188. localHost := &HostInfo{}
  189. if r.localHasRpcAddr {
  190. iter := r.session.control.query(localQuery)
  191. if iter == nil {
  192. return r.prevHosts, r.prevPartitioner, nil
  193. }
  194. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  195. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  196. if err = iter.Close(); err != nil {
  197. return nil, "", err
  198. }
  199. } else {
  200. iter := r.session.control.query(legacyLocalQuery)
  201. if iter == nil {
  202. return r.prevHosts, r.prevPartitioner, nil
  203. }
  204. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  205. if err = iter.Close(); err != nil {
  206. return nil, "", err
  207. }
  208. addr, _, err := net.SplitHostPort(r.session.control.addr())
  209. if err != nil {
  210. // this should not happen, ever, as this is the address that was dialed by conn, here
  211. // a panic makes sense, please report a bug if it occurs.
  212. panic(err)
  213. }
  214. localHost.peer = addr
  215. }
  216. hosts = []*HostInfo{localHost}
  217. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
  218. if iter == nil {
  219. return r.prevHosts, r.prevPartitioner, nil
  220. }
  221. host := &HostInfo{}
  222. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
  223. if r.matchFilter(host) {
  224. hosts = append(hosts, host)
  225. }
  226. host = &HostInfo{}
  227. }
  228. if err = iter.Close(); err != nil {
  229. return nil, "", err
  230. }
  231. r.prevHosts = hosts
  232. r.prevPartitioner = partitioner
  233. return hosts, partitioner, nil
  234. }
  235. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  236. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  237. return false
  238. }
  239. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  240. return false
  241. }
  242. return true
  243. }
  244. func (r *ringDescriber) refreshRing() {
  245. // if we have 0 hosts this will return the previous list of hosts to
  246. // attempt to reconnect to the cluster otherwise we would never find
  247. // downed hosts again, could possibly have an optimisation to only
  248. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  249. hosts, partitioner, err := r.GetHosts()
  250. if err != nil {
  251. log.Println("RingDescriber: unable to get ring topology:", err)
  252. return
  253. }
  254. // TODO: move this to session
  255. for _, h := range hosts {
  256. if r.session.ring.addHostIfMissing(h) {
  257. r.session.pool.addHost(h)
  258. // TODO: trigger OnUp/OnAdd
  259. }
  260. }
  261. r.session.pool.SetPartitioner(partitioner)
  262. }