host_source.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. )
  9. type nodeState int32
  10. func (n nodeState) String() string {
  11. if n == NodeUp {
  12. return "UP"
  13. } else if n == NodeDown {
  14. return "DOWN"
  15. }
  16. return fmt.Sprintf("UNKNOWN_%d", n)
  17. }
  18. const (
  19. NodeUp nodeState = iota
  20. NodeDown
  21. )
  22. type cassVersion struct {
  23. Major, Minor, Patch int
  24. }
  25. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  26. v := strings.Split(string(data), ".")
  27. if len(v) != 3 {
  28. return fmt.Errorf("invalid schema_version: %v", string(data))
  29. }
  30. var err error
  31. c.Major, err = strconv.Atoi(v[0])
  32. if err != nil {
  33. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  34. }
  35. c.Minor, err = strconv.Atoi(v[1])
  36. if err != nil {
  37. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  38. }
  39. c.Patch, err = strconv.Atoi(v[2])
  40. if err != nil {
  41. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  42. }
  43. return nil
  44. }
  45. func (c cassVersion) String() string {
  46. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  47. }
  48. type HostInfo struct {
  49. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  50. // that we are thread safe use a mutex to access all fields.
  51. mu sync.RWMutex
  52. peer string
  53. dataCenter string
  54. rack string
  55. hostId string
  56. version cassVersion
  57. state nodeState
  58. tokens []string
  59. }
  60. func (h *HostInfo) Equal(host *HostInfo) bool {
  61. h.mu.RLock()
  62. defer h.mu.RUnlock()
  63. host.mu.RLock()
  64. defer host.mu.RUnlock()
  65. return h.peer == host.peer && h.hostId == host.hostId
  66. }
  67. func (h *HostInfo) Peer() string {
  68. h.mu.RLock()
  69. defer h.mu.RUnlock()
  70. return h.peer
  71. }
  72. func (h *HostInfo) setPeer(peer string) *HostInfo {
  73. h.mu.Lock()
  74. defer h.mu.Unlock()
  75. h.peer = peer
  76. return h
  77. }
  78. func (h *HostInfo) DataCenter() string {
  79. h.mu.RLock()
  80. defer h.mu.RUnlock()
  81. return h.dataCenter
  82. }
  83. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  84. h.mu.Lock()
  85. defer h.mu.Unlock()
  86. h.dataCenter = dataCenter
  87. return h
  88. }
  89. func (h *HostInfo) Rack() string {
  90. h.mu.RLock()
  91. defer h.mu.RUnlock()
  92. return h.rack
  93. }
  94. func (h *HostInfo) setRack(rack string) *HostInfo {
  95. h.mu.Lock()
  96. defer h.mu.Unlock()
  97. h.rack = rack
  98. return h
  99. }
  100. func (h *HostInfo) HostID() string {
  101. h.mu.RLock()
  102. defer h.mu.RUnlock()
  103. return h.hostId
  104. }
  105. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  106. h.mu.Lock()
  107. defer h.mu.Unlock()
  108. h.hostId = hostID
  109. return h
  110. }
  111. func (h *HostInfo) Version() cassVersion {
  112. h.mu.RLock()
  113. defer h.mu.RUnlock()
  114. return h.version
  115. }
  116. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  117. h.mu.Lock()
  118. defer h.mu.Unlock()
  119. h.version.Major = major
  120. h.version.Minor = minor
  121. h.version.Patch = patch
  122. return h
  123. }
  124. func (h *HostInfo) State() nodeState {
  125. h.mu.RLock()
  126. defer h.mu.RUnlock()
  127. return h.state
  128. }
  129. func (h *HostInfo) setState(state nodeState) *HostInfo {
  130. h.mu.Lock()
  131. defer h.mu.Unlock()
  132. h.state = state
  133. return h
  134. }
  135. func (h *HostInfo) Tokens() []string {
  136. h.mu.RLock()
  137. defer h.mu.RUnlock()
  138. return h.tokens
  139. }
  140. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  141. h.mu.Lock()
  142. defer h.mu.Unlock()
  143. h.tokens = tokens
  144. return h
  145. }
  146. func (h *HostInfo) update(from *HostInfo) {
  147. h.mu.Lock()
  148. defer h.mu.Unlock()
  149. h.tokens = from.tokens
  150. h.version = from.version
  151. h.hostId = from.hostId
  152. h.dataCenter = from.dataCenter
  153. }
  154. func (h *HostInfo) IsUp() bool {
  155. return h.State() == NodeUp
  156. }
  157. func (h HostInfo) String() string {
  158. h.mu.RLock()
  159. defer h.mu.RUnlock()
  160. return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  161. }
  162. // Polls system.peers at a specific interval to find new hosts
  163. type ringDescriber struct {
  164. dcFilter string
  165. rackFilter string
  166. session *Session
  167. closeChan chan bool
  168. // indicates that we can use system.local to get the connections remote address
  169. localHasRpcAddr bool
  170. mu sync.Mutex
  171. prevHosts []*HostInfo
  172. prevPartitioner string
  173. }
  174. func checkSystemLocal(control *controlConn) (bool, error) {
  175. iter := control.query("SELECT broadcast_address FROM system.local")
  176. if err := iter.err; err != nil {
  177. if errf, ok := err.(*errorFrame); ok {
  178. if errf.code == errSyntax {
  179. return false, nil
  180. }
  181. }
  182. return false, err
  183. }
  184. return true, nil
  185. }
  186. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  187. r.mu.Lock()
  188. defer r.mu.Unlock()
  189. // we need conn to be the same because we need to query system.peers and system.local
  190. // on the same node to get the whole cluster
  191. const (
  192. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  193. // only supported in 2.2.0, 2.1.6, 2.0.16
  194. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  195. )
  196. localHost := &HostInfo{}
  197. if r.localHasRpcAddr {
  198. iter := r.session.control.query(localQuery)
  199. if iter == nil {
  200. return r.prevHosts, r.prevPartitioner, nil
  201. }
  202. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  203. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  204. if err = iter.Close(); err != nil {
  205. return nil, "", err
  206. }
  207. } else {
  208. iter := r.session.control.query(legacyLocalQuery)
  209. if iter == nil {
  210. return r.prevHosts, r.prevPartitioner, nil
  211. }
  212. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  213. if err = iter.Close(); err != nil {
  214. return nil, "", err
  215. }
  216. addr, _, err := net.SplitHostPort(r.session.control.addr())
  217. if err != nil {
  218. // this should not happen, ever, as this is the address that was dialed by conn, here
  219. // a panic makes sense, please report a bug if it occurs.
  220. panic(err)
  221. }
  222. localHost.peer = addr
  223. }
  224. hosts = []*HostInfo{localHost}
  225. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
  226. if iter == nil {
  227. return r.prevHosts, r.prevPartitioner, nil
  228. }
  229. host := &HostInfo{}
  230. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
  231. if r.matchFilter(host) {
  232. hosts = append(hosts, host)
  233. }
  234. host = &HostInfo{}
  235. }
  236. if err = iter.Close(); err != nil {
  237. return nil, "", err
  238. }
  239. r.prevHosts = hosts
  240. r.prevPartitioner = partitioner
  241. return hosts, partitioner, nil
  242. }
  243. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  244. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  245. return false
  246. }
  247. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  248. return false
  249. }
  250. return true
  251. }
  252. func (r *ringDescriber) refreshRing() error {
  253. // if we have 0 hosts this will return the previous list of hosts to
  254. // attempt to reconnect to the cluster otherwise we would never find
  255. // downed hosts again, could possibly have an optimisation to only
  256. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  257. hosts, partitioner, err := r.GetHosts()
  258. if err != nil {
  259. return err
  260. }
  261. // TODO: move this to session
  262. // TODO: handle removing hosts here
  263. for _, h := range hosts {
  264. log.Println(h)
  265. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  266. r.session.pool.addHost(h)
  267. } else {
  268. host.update(h)
  269. }
  270. }
  271. r.session.pool.SetPartitioner(partitioner)
  272. return nil
  273. }