host_source.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. )
  9. type nodeState int32
  10. func (n nodeState) String() string {
  11. if n == NodeUp {
  12. return "UP"
  13. } else if n == NodeDown {
  14. return "DOWN"
  15. }
  16. return fmt.Sprintf("UNKNOWN_%d", n)
  17. }
  18. const (
  19. NodeUp nodeState = iota
  20. NodeDown
  21. )
  22. type cassVersion struct {
  23. Major, Minor, Patch int
  24. }
  25. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  26. v := strings.Split(string(data), ".")
  27. if len(v) != 3 {
  28. return fmt.Errorf("invalid schema_version: %v", string(data))
  29. }
  30. var err error
  31. c.Major, err = strconv.Atoi(v[0])
  32. if err != nil {
  33. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  34. }
  35. c.Minor, err = strconv.Atoi(v[1])
  36. if err != nil {
  37. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  38. }
  39. c.Patch, err = strconv.Atoi(v[2])
  40. if err != nil {
  41. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  42. }
  43. return nil
  44. }
  45. func (c cassVersion) String() string {
  46. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  47. }
  48. type HostInfo struct {
  49. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  50. // that we are thread safe use a mutex to access all fields.
  51. mu sync.RWMutex
  52. peer string
  53. port int
  54. dataCenter string
  55. rack string
  56. hostId string
  57. version cassVersion
  58. state nodeState
  59. tokens []string
  60. }
  61. func (h *HostInfo) Equal(host *HostInfo) bool {
  62. h.mu.RLock()
  63. defer h.mu.RUnlock()
  64. host.mu.RLock()
  65. defer host.mu.RUnlock()
  66. return h.peer == host.peer && h.hostId == host.hostId
  67. }
  68. func (h *HostInfo) Peer() string {
  69. h.mu.RLock()
  70. defer h.mu.RUnlock()
  71. return h.peer
  72. }
  73. func (h *HostInfo) setPeer(peer string) *HostInfo {
  74. h.mu.Lock()
  75. defer h.mu.Unlock()
  76. h.peer = peer
  77. return h
  78. }
  79. func (h *HostInfo) DataCenter() string {
  80. h.mu.RLock()
  81. defer h.mu.RUnlock()
  82. return h.dataCenter
  83. }
  84. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  85. h.mu.Lock()
  86. defer h.mu.Unlock()
  87. h.dataCenter = dataCenter
  88. return h
  89. }
  90. func (h *HostInfo) Rack() string {
  91. h.mu.RLock()
  92. defer h.mu.RUnlock()
  93. return h.rack
  94. }
  95. func (h *HostInfo) setRack(rack string) *HostInfo {
  96. h.mu.Lock()
  97. defer h.mu.Unlock()
  98. h.rack = rack
  99. return h
  100. }
  101. func (h *HostInfo) HostID() string {
  102. h.mu.RLock()
  103. defer h.mu.RUnlock()
  104. return h.hostId
  105. }
  106. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  107. h.mu.Lock()
  108. defer h.mu.Unlock()
  109. h.hostId = hostID
  110. return h
  111. }
  112. func (h *HostInfo) Version() cassVersion {
  113. h.mu.RLock()
  114. defer h.mu.RUnlock()
  115. return h.version
  116. }
  117. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  118. h.mu.Lock()
  119. defer h.mu.Unlock()
  120. h.version.Major = major
  121. h.version.Minor = minor
  122. h.version.Patch = patch
  123. return h
  124. }
  125. func (h *HostInfo) State() nodeState {
  126. h.mu.RLock()
  127. defer h.mu.RUnlock()
  128. return h.state
  129. }
  130. func (h *HostInfo) setState(state nodeState) *HostInfo {
  131. h.mu.Lock()
  132. defer h.mu.Unlock()
  133. h.state = state
  134. return h
  135. }
  136. func (h *HostInfo) Tokens() []string {
  137. h.mu.RLock()
  138. defer h.mu.RUnlock()
  139. return h.tokens
  140. }
  141. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  142. h.mu.Lock()
  143. defer h.mu.Unlock()
  144. h.tokens = tokens
  145. return h
  146. }
  147. func (h *HostInfo) Port() int {
  148. h.mu.RLock()
  149. defer h.mu.RUnlock()
  150. return h.port
  151. }
  152. func (h *HostInfo) setPort(port int) *HostInfo {
  153. h.mu.Lock()
  154. defer h.mu.Unlock()
  155. h.port = port
  156. return h
  157. }
  158. func (h *HostInfo) update(from *HostInfo) {
  159. h.mu.Lock()
  160. defer h.mu.Unlock()
  161. h.tokens = from.tokens
  162. h.version = from.version
  163. h.hostId = from.hostId
  164. h.dataCenter = from.dataCenter
  165. }
  166. func (h *HostInfo) IsUp() bool {
  167. return h.State() == NodeUp
  168. }
  169. func (h *HostInfo) String() string {
  170. h.mu.RLock()
  171. defer h.mu.RUnlock()
  172. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  173. }
  174. // Polls system.peers at a specific interval to find new hosts
  175. type ringDescriber struct {
  176. dcFilter string
  177. rackFilter string
  178. session *Session
  179. closeChan chan bool
  180. // indicates that we can use system.local to get the connections remote address
  181. localHasRpcAddr bool
  182. mu sync.Mutex
  183. prevHosts []*HostInfo
  184. prevPartitioner string
  185. }
  186. func checkSystemLocal(control *controlConn) (bool, error) {
  187. iter := control.query("SELECT broadcast_address FROM system.local")
  188. if err := iter.err; err != nil {
  189. if errf, ok := err.(*errorFrame); ok {
  190. if errf.code == errSyntax {
  191. return false, nil
  192. }
  193. }
  194. return false, err
  195. }
  196. return true, nil
  197. }
  198. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  199. r.mu.Lock()
  200. defer r.mu.Unlock()
  201. // we need conn to be the same because we need to query system.peers and system.local
  202. // on the same node to get the whole cluster
  203. const (
  204. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  205. // only supported in 2.2.0, 2.1.6, 2.0.16
  206. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  207. )
  208. localHost := &HostInfo{}
  209. if r.localHasRpcAddr {
  210. iter := r.session.control.query(localQuery)
  211. if iter == nil {
  212. return r.prevHosts, r.prevPartitioner, nil
  213. }
  214. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  215. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  216. if err = iter.Close(); err != nil {
  217. return nil, "", err
  218. }
  219. } else {
  220. iter := r.session.control.query(legacyLocalQuery)
  221. if iter == nil {
  222. return r.prevHosts, r.prevPartitioner, nil
  223. }
  224. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  225. if err = iter.Close(); err != nil {
  226. return nil, "", err
  227. }
  228. addr, _, err := net.SplitHostPort(r.session.control.addr())
  229. if err != nil {
  230. // this should not happen, ever, as this is the address that was dialed by conn, here
  231. // a panic makes sense, please report a bug if it occurs.
  232. panic(err)
  233. }
  234. localHost.peer = addr
  235. }
  236. localHost.port = r.session.cfg.Port
  237. hosts = []*HostInfo{localHost}
  238. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
  239. if iter == nil {
  240. return r.prevHosts, r.prevPartitioner, nil
  241. }
  242. host := &HostInfo{port: r.session.cfg.Port}
  243. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
  244. if r.matchFilter(host) {
  245. hosts = append(hosts, host)
  246. }
  247. host = &HostInfo{
  248. port: r.session.cfg.Port,
  249. }
  250. }
  251. if err = iter.Close(); err != nil {
  252. return nil, "", err
  253. }
  254. r.prevHosts = hosts
  255. r.prevPartitioner = partitioner
  256. return hosts, partitioner, nil
  257. }
  258. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  259. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  260. return false
  261. }
  262. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  263. return false
  264. }
  265. return true
  266. }
  267. func (r *ringDescriber) refreshRing() error {
  268. // if we have 0 hosts this will return the previous list of hosts to
  269. // attempt to reconnect to the cluster otherwise we would never find
  270. // downed hosts again, could possibly have an optimisation to only
  271. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  272. hosts, partitioner, err := r.GetHosts()
  273. if err != nil {
  274. return err
  275. }
  276. // TODO: move this to session
  277. // TODO: handle removing hosts here
  278. for _, h := range hosts {
  279. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  280. r.session.pool.addHost(h)
  281. } else {
  282. host.update(h)
  283. }
  284. }
  285. r.session.pool.SetPartitioner(partitioner)
  286. return nil
  287. }