host_source.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. type nodeState int32
  11. func (n nodeState) String() string {
  12. if n == NodeUp {
  13. return "UP"
  14. } else if n == NodeDown {
  15. return "DOWN"
  16. }
  17. return fmt.Sprintf("UNKNOWN_%d", n)
  18. }
  19. const (
  20. NodeUp nodeState = iota
  21. NodeDown
  22. )
  23. type cassVersion struct {
  24. Major, Minor, Patch int
  25. }
  26. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  27. v := strings.Split(string(data), ".")
  28. if len(v) != 3 {
  29. return fmt.Errorf("invalid schema_version: %v", string(data))
  30. }
  31. var err error
  32. c.Major, err = strconv.Atoi(v[0])
  33. if err != nil {
  34. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  35. }
  36. c.Minor, err = strconv.Atoi(v[1])
  37. if err != nil {
  38. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  39. }
  40. c.Patch, err = strconv.Atoi(v[2])
  41. if err != nil {
  42. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  43. }
  44. return nil
  45. }
  46. func (c cassVersion) String() string {
  47. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  48. }
  49. func (c cassVersion) nodeUpDelay() time.Duration {
  50. if c.Major >= 2 && c.Minor >= 2 {
  51. // CASSANDRA-8236
  52. return 0
  53. }
  54. return 10 * time.Second
  55. }
  56. type HostInfo struct {
  57. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  58. // that we are thread safe use a mutex to access all fields.
  59. mu sync.RWMutex
  60. peer string
  61. port int
  62. dataCenter string
  63. rack string
  64. hostId string
  65. version cassVersion
  66. state nodeState
  67. tokens []string
  68. }
  69. func (h *HostInfo) Equal(host *HostInfo) bool {
  70. h.mu.RLock()
  71. defer h.mu.RUnlock()
  72. host.mu.RLock()
  73. defer host.mu.RUnlock()
  74. return h.peer == host.peer && h.hostId == host.hostId
  75. }
  76. func (h *HostInfo) Peer() string {
  77. h.mu.RLock()
  78. defer h.mu.RUnlock()
  79. return h.peer
  80. }
  81. func (h *HostInfo) setPeer(peer string) *HostInfo {
  82. h.mu.Lock()
  83. defer h.mu.Unlock()
  84. h.peer = peer
  85. return h
  86. }
  87. func (h *HostInfo) DataCenter() string {
  88. h.mu.RLock()
  89. defer h.mu.RUnlock()
  90. return h.dataCenter
  91. }
  92. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  93. h.mu.Lock()
  94. defer h.mu.Unlock()
  95. h.dataCenter = dataCenter
  96. return h
  97. }
  98. func (h *HostInfo) Rack() string {
  99. h.mu.RLock()
  100. defer h.mu.RUnlock()
  101. return h.rack
  102. }
  103. func (h *HostInfo) setRack(rack string) *HostInfo {
  104. h.mu.Lock()
  105. defer h.mu.Unlock()
  106. h.rack = rack
  107. return h
  108. }
  109. func (h *HostInfo) HostID() string {
  110. h.mu.RLock()
  111. defer h.mu.RUnlock()
  112. return h.hostId
  113. }
  114. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  115. h.mu.Lock()
  116. defer h.mu.Unlock()
  117. h.hostId = hostID
  118. return h
  119. }
  120. func (h *HostInfo) Version() cassVersion {
  121. h.mu.RLock()
  122. defer h.mu.RUnlock()
  123. return h.version
  124. }
  125. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  126. h.mu.Lock()
  127. defer h.mu.Unlock()
  128. h.version = cassVersion{major, minor, patch}
  129. return h
  130. }
  131. func (h *HostInfo) State() nodeState {
  132. h.mu.RLock()
  133. defer h.mu.RUnlock()
  134. return h.state
  135. }
  136. func (h *HostInfo) setState(state nodeState) *HostInfo {
  137. h.mu.Lock()
  138. defer h.mu.Unlock()
  139. h.state = state
  140. return h
  141. }
  142. func (h *HostInfo) Tokens() []string {
  143. h.mu.RLock()
  144. defer h.mu.RUnlock()
  145. return h.tokens
  146. }
  147. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  148. h.mu.Lock()
  149. defer h.mu.Unlock()
  150. h.tokens = tokens
  151. return h
  152. }
  153. func (h *HostInfo) Port() int {
  154. h.mu.RLock()
  155. defer h.mu.RUnlock()
  156. return h.port
  157. }
  158. func (h *HostInfo) setPort(port int) *HostInfo {
  159. h.mu.Lock()
  160. defer h.mu.Unlock()
  161. h.port = port
  162. return h
  163. }
  164. func (h *HostInfo) update(from *HostInfo) {
  165. h.mu.Lock()
  166. defer h.mu.Unlock()
  167. h.tokens = from.tokens
  168. h.version = from.version
  169. h.hostId = from.hostId
  170. h.dataCenter = from.dataCenter
  171. }
  172. func (h *HostInfo) IsUp() bool {
  173. return h.State() == NodeUp
  174. }
  175. func (h *HostInfo) String() string {
  176. h.mu.RLock()
  177. defer h.mu.RUnlock()
  178. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  179. }
  180. // Polls system.peers at a specific interval to find new hosts
  181. type ringDescriber struct {
  182. dcFilter string
  183. rackFilter string
  184. session *Session
  185. closeChan chan bool
  186. // indicates that we can use system.local to get the connections remote address
  187. localHasRpcAddr bool
  188. mu sync.Mutex
  189. prevHosts []*HostInfo
  190. prevPartitioner string
  191. }
  192. func checkSystemLocal(control *controlConn) (bool, error) {
  193. iter := control.query("SELECT broadcast_address FROM system.local")
  194. if err := iter.err; err != nil {
  195. if errf, ok := err.(*errorFrame); ok {
  196. if errf.code == errSyntax {
  197. return false, nil
  198. }
  199. }
  200. return false, err
  201. }
  202. return true, nil
  203. }
  204. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  205. r.mu.Lock()
  206. defer r.mu.Unlock()
  207. // we need conn to be the same because we need to query system.peers and system.local
  208. // on the same node to get the whole cluster
  209. const (
  210. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  211. // only supported in 2.2.0, 2.1.6, 2.0.16
  212. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  213. )
  214. localHost := &HostInfo{}
  215. if r.localHasRpcAddr {
  216. iter := r.session.control.query(localQuery)
  217. if iter == nil {
  218. return r.prevHosts, r.prevPartitioner, nil
  219. }
  220. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  221. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  222. if err = iter.Close(); err != nil {
  223. return nil, "", err
  224. }
  225. } else {
  226. iter := r.session.control.query(legacyLocalQuery)
  227. if iter == nil {
  228. return r.prevHosts, r.prevPartitioner, nil
  229. }
  230. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  231. if err = iter.Close(); err != nil {
  232. return nil, "", err
  233. }
  234. addr, _, err := net.SplitHostPort(r.session.control.addr())
  235. if err != nil {
  236. // this should not happen, ever, as this is the address that was dialed by conn, here
  237. // a panic makes sense, please report a bug if it occurs.
  238. panic(err)
  239. }
  240. localHost.peer = addr
  241. }
  242. localHost.port = r.session.cfg.Port
  243. hosts = []*HostInfo{localHost}
  244. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
  245. if iter == nil {
  246. return r.prevHosts, r.prevPartitioner, nil
  247. }
  248. host := &HostInfo{port: r.session.cfg.Port}
  249. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
  250. if r.matchFilter(host) {
  251. hosts = append(hosts, host)
  252. }
  253. host = &HostInfo{
  254. port: r.session.cfg.Port,
  255. }
  256. }
  257. if err = iter.Close(); err != nil {
  258. return nil, "", err
  259. }
  260. r.prevHosts = hosts
  261. r.prevPartitioner = partitioner
  262. return hosts, partitioner, nil
  263. }
  264. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  265. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  266. return false
  267. }
  268. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  269. return false
  270. }
  271. return true
  272. }
  273. func (r *ringDescriber) refreshRing() error {
  274. // if we have 0 hosts this will return the previous list of hosts to
  275. // attempt to reconnect to the cluster otherwise we would never find
  276. // downed hosts again, could possibly have an optimisation to only
  277. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  278. hosts, partitioner, err := r.GetHosts()
  279. if err != nil {
  280. return err
  281. }
  282. // TODO: move this to session
  283. // TODO: handle removing hosts here
  284. for _, h := range hosts {
  285. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  286. r.session.pool.addHost(h)
  287. } else {
  288. host.update(h)
  289. }
  290. }
  291. r.session.pool.SetPartitioner(partitioner)
  292. return nil
  293. }