host_source.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. type nodeState int32
  11. func (n nodeState) String() string {
  12. if n == NodeUp {
  13. return "UP"
  14. } else if n == NodeDown {
  15. return "DOWN"
  16. }
  17. return fmt.Sprintf("UNKNOWN_%d", n)
  18. }
  19. const (
  20. NodeUp nodeState = iota
  21. NodeDown
  22. )
  23. type cassVersion struct {
  24. Major, Minor, Patch int
  25. }
  26. func (c *cassVersion) Set(v string) error {
  27. if v == "" {
  28. return nil
  29. }
  30. return c.UnmarshalCQL(nil, []byte(v))
  31. }
  32. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  33. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  34. version = strings.TrimPrefix(version, "v")
  35. v := strings.Split(version, ".")
  36. var err error
  37. c.Major, err = strconv.Atoi(v[0])
  38. if err != nil {
  39. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  40. }
  41. c.Minor, err = strconv.Atoi(v[1])
  42. if err != nil {
  43. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  44. }
  45. if len(v) > 2 {
  46. c.Patch, err = strconv.Atoi(v[2])
  47. if err != nil {
  48. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  49. }
  50. }
  51. return nil
  52. }
  53. func (c cassVersion) Before(major, minor, patch int) bool {
  54. if c.Major > major {
  55. return true
  56. } else if c.Minor > minor {
  57. return true
  58. } else if c.Patch > patch {
  59. return true
  60. }
  61. return false
  62. }
  63. func (c cassVersion) String() string {
  64. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  65. }
  66. func (c cassVersion) nodeUpDelay() time.Duration {
  67. if c.Major >= 2 && c.Minor >= 2 {
  68. // CASSANDRA-8236
  69. return 0
  70. }
  71. return 10 * time.Second
  72. }
  73. type HostInfo struct {
  74. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  75. // that we are thread safe use a mutex to access all fields.
  76. mu sync.RWMutex
  77. peer string
  78. port int
  79. dataCenter string
  80. rack string
  81. hostId string
  82. version cassVersion
  83. state nodeState
  84. tokens []string
  85. }
  86. func (h *HostInfo) Equal(host *HostInfo) bool {
  87. h.mu.RLock()
  88. defer h.mu.RUnlock()
  89. host.mu.RLock()
  90. defer host.mu.RUnlock()
  91. return h.peer == host.peer && h.hostId == host.hostId
  92. }
  93. func (h *HostInfo) Peer() string {
  94. h.mu.RLock()
  95. defer h.mu.RUnlock()
  96. return h.peer
  97. }
  98. func (h *HostInfo) setPeer(peer string) *HostInfo {
  99. h.mu.Lock()
  100. defer h.mu.Unlock()
  101. h.peer = peer
  102. return h
  103. }
  104. func (h *HostInfo) DataCenter() string {
  105. h.mu.RLock()
  106. defer h.mu.RUnlock()
  107. return h.dataCenter
  108. }
  109. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  110. h.mu.Lock()
  111. defer h.mu.Unlock()
  112. h.dataCenter = dataCenter
  113. return h
  114. }
  115. func (h *HostInfo) Rack() string {
  116. h.mu.RLock()
  117. defer h.mu.RUnlock()
  118. return h.rack
  119. }
  120. func (h *HostInfo) setRack(rack string) *HostInfo {
  121. h.mu.Lock()
  122. defer h.mu.Unlock()
  123. h.rack = rack
  124. return h
  125. }
  126. func (h *HostInfo) HostID() string {
  127. h.mu.RLock()
  128. defer h.mu.RUnlock()
  129. return h.hostId
  130. }
  131. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  132. h.mu.Lock()
  133. defer h.mu.Unlock()
  134. h.hostId = hostID
  135. return h
  136. }
  137. func (h *HostInfo) Version() cassVersion {
  138. h.mu.RLock()
  139. defer h.mu.RUnlock()
  140. return h.version
  141. }
  142. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  143. h.mu.Lock()
  144. defer h.mu.Unlock()
  145. h.version = cassVersion{major, minor, patch}
  146. return h
  147. }
  148. func (h *HostInfo) State() nodeState {
  149. h.mu.RLock()
  150. defer h.mu.RUnlock()
  151. return h.state
  152. }
  153. func (h *HostInfo) setState(state nodeState) *HostInfo {
  154. h.mu.Lock()
  155. defer h.mu.Unlock()
  156. h.state = state
  157. return h
  158. }
  159. func (h *HostInfo) Tokens() []string {
  160. h.mu.RLock()
  161. defer h.mu.RUnlock()
  162. return h.tokens
  163. }
  164. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  165. h.mu.Lock()
  166. defer h.mu.Unlock()
  167. h.tokens = tokens
  168. return h
  169. }
  170. func (h *HostInfo) Port() int {
  171. h.mu.RLock()
  172. defer h.mu.RUnlock()
  173. return h.port
  174. }
  175. func (h *HostInfo) setPort(port int) *HostInfo {
  176. h.mu.Lock()
  177. defer h.mu.Unlock()
  178. h.port = port
  179. return h
  180. }
  181. func (h *HostInfo) update(from *HostInfo) {
  182. h.mu.Lock()
  183. defer h.mu.Unlock()
  184. h.tokens = from.tokens
  185. h.version = from.version
  186. h.hostId = from.hostId
  187. h.dataCenter = from.dataCenter
  188. }
  189. func (h *HostInfo) IsUp() bool {
  190. return h.State() == NodeUp
  191. }
  192. func (h *HostInfo) String() string {
  193. h.mu.RLock()
  194. defer h.mu.RUnlock()
  195. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  196. }
  197. // Polls system.peers at a specific interval to find new hosts
  198. type ringDescriber struct {
  199. dcFilter string
  200. rackFilter string
  201. session *Session
  202. closeChan chan bool
  203. // indicates that we can use system.local to get the connections remote address
  204. localHasRpcAddr bool
  205. mu sync.Mutex
  206. prevHosts []*HostInfo
  207. prevPartitioner string
  208. }
  209. func checkSystemLocal(control *controlConn) (bool, error) {
  210. iter := control.query("SELECT broadcast_address FROM system.local")
  211. if err := iter.err; err != nil {
  212. if errf, ok := err.(*errorFrame); ok {
  213. if errf.code == errSyntax {
  214. return false, nil
  215. }
  216. }
  217. return false, err
  218. }
  219. return true, nil
  220. }
  221. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  222. r.mu.Lock()
  223. defer r.mu.Unlock()
  224. // we need conn to be the same because we need to query system.peers and system.local
  225. // on the same node to get the whole cluster
  226. const (
  227. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  228. // only supported in 2.2.0, 2.1.6, 2.0.16
  229. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  230. )
  231. localHost := &HostInfo{}
  232. if r.localHasRpcAddr {
  233. iter := r.session.control.query(localQuery)
  234. if iter == nil {
  235. return r.prevHosts, r.prevPartitioner, nil
  236. }
  237. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  238. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  239. if err = iter.Close(); err != nil {
  240. return nil, "", err
  241. }
  242. } else {
  243. iter := r.session.control.query(legacyLocalQuery)
  244. if iter == nil {
  245. return r.prevHosts, r.prevPartitioner, nil
  246. }
  247. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  248. if err = iter.Close(); err != nil {
  249. return nil, "", err
  250. }
  251. addr, _, err := net.SplitHostPort(r.session.control.addr())
  252. if err != nil {
  253. // this should not happen, ever, as this is the address that was dialed by conn, here
  254. // a panic makes sense, please report a bug if it occurs.
  255. panic(err)
  256. }
  257. localHost.peer = addr
  258. }
  259. localHost.port = r.session.cfg.Port
  260. hosts = []*HostInfo{localHost}
  261. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
  262. if iter == nil {
  263. return r.prevHosts, r.prevPartitioner, nil
  264. }
  265. host := &HostInfo{port: r.session.cfg.Port}
  266. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
  267. if r.matchFilter(host) {
  268. hosts = append(hosts, host)
  269. }
  270. host = &HostInfo{
  271. port: r.session.cfg.Port,
  272. }
  273. }
  274. if err = iter.Close(); err != nil {
  275. return nil, "", err
  276. }
  277. r.prevHosts = hosts
  278. r.prevPartitioner = partitioner
  279. return hosts, partitioner, nil
  280. }
  281. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  282. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  283. return false
  284. }
  285. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  286. return false
  287. }
  288. return true
  289. }
  290. func (r *ringDescriber) refreshRing() error {
  291. // if we have 0 hosts this will return the previous list of hosts to
  292. // attempt to reconnect to the cluster otherwise we would never find
  293. // downed hosts again, could possibly have an optimisation to only
  294. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  295. hosts, partitioner, err := r.GetHosts()
  296. if err != nil {
  297. return err
  298. }
  299. // TODO: move this to session
  300. // TODO: handle removing hosts here
  301. for _, h := range hosts {
  302. if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) {
  303. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  304. r.session.pool.addHost(h)
  305. } else {
  306. host.update(h)
  307. }
  308. }
  309. }
  310. r.session.pool.SetPartitioner(partitioner)
  311. return nil
  312. }