host_source.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. type nodeState int32
  11. func (n nodeState) String() string {
  12. if n == NodeUp {
  13. return "UP"
  14. } else if n == NodeDown {
  15. return "DOWN"
  16. }
  17. return fmt.Sprintf("UNKNOWN_%d", n)
  18. }
  19. const (
  20. NodeUp nodeState = iota
  21. NodeDown
  22. )
  23. type cassVersion struct {
  24. Major, Minor, Patch int
  25. }
  26. func (c *cassVersion) Set(v string) error {
  27. if v == "" {
  28. return nil
  29. }
  30. return c.UnmarshalCQL(nil, []byte(v))
  31. }
  32. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  33. return c.unmarshal(data)
  34. }
  35. func (c *cassVersion) unmarshal(data []byte) error {
  36. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  37. version = strings.TrimPrefix(version, "v")
  38. v := strings.Split(version, ".")
  39. if len(v) < 2 {
  40. return fmt.Errorf("invalid version string: %s", data)
  41. }
  42. var err error
  43. c.Major, err = strconv.Atoi(v[0])
  44. if err != nil {
  45. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  46. }
  47. c.Minor, err = strconv.Atoi(v[1])
  48. if err != nil {
  49. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  50. }
  51. if len(v) > 2 {
  52. c.Patch, err = strconv.Atoi(v[2])
  53. if err != nil {
  54. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  55. }
  56. }
  57. return nil
  58. }
  59. func (c cassVersion) Before(major, minor, patch int) bool {
  60. if c.Major > major {
  61. return true
  62. } else if c.Minor > minor {
  63. return true
  64. } else if c.Patch > patch {
  65. return true
  66. }
  67. return false
  68. }
  69. func (c cassVersion) String() string {
  70. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  71. }
  72. func (c cassVersion) nodeUpDelay() time.Duration {
  73. if c.Major >= 2 && c.Minor >= 2 {
  74. // CASSANDRA-8236
  75. return 0
  76. }
  77. return 10 * time.Second
  78. }
  79. type HostInfo struct {
  80. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  81. // that we are thread safe use a mutex to access all fields.
  82. mu sync.RWMutex
  83. peer net.IP
  84. port int
  85. dataCenter string
  86. rack string
  87. hostId string
  88. version cassVersion
  89. state nodeState
  90. tokens []string
  91. }
  92. func (h *HostInfo) Equal(host *HostInfo) bool {
  93. h.mu.RLock()
  94. defer h.mu.RUnlock()
  95. host.mu.RLock()
  96. defer host.mu.RUnlock()
  97. return h.peer.Equal(host.peer)
  98. }
  99. func (h *HostInfo) Peer() net.IP {
  100. h.mu.RLock()
  101. defer h.mu.RUnlock()
  102. return h.peer
  103. }
  104. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  105. h.mu.Lock()
  106. defer h.mu.Unlock()
  107. h.peer = peer
  108. return h
  109. }
  110. func (h *HostInfo) DataCenter() string {
  111. h.mu.RLock()
  112. defer h.mu.RUnlock()
  113. return h.dataCenter
  114. }
  115. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  116. h.mu.Lock()
  117. defer h.mu.Unlock()
  118. h.dataCenter = dataCenter
  119. return h
  120. }
  121. func (h *HostInfo) Rack() string {
  122. h.mu.RLock()
  123. defer h.mu.RUnlock()
  124. return h.rack
  125. }
  126. func (h *HostInfo) setRack(rack string) *HostInfo {
  127. h.mu.Lock()
  128. defer h.mu.Unlock()
  129. h.rack = rack
  130. return h
  131. }
  132. func (h *HostInfo) HostID() string {
  133. h.mu.RLock()
  134. defer h.mu.RUnlock()
  135. return h.hostId
  136. }
  137. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  138. h.mu.Lock()
  139. defer h.mu.Unlock()
  140. h.hostId = hostID
  141. return h
  142. }
  143. func (h *HostInfo) Version() cassVersion {
  144. h.mu.RLock()
  145. defer h.mu.RUnlock()
  146. return h.version
  147. }
  148. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  149. h.mu.Lock()
  150. defer h.mu.Unlock()
  151. h.version = cassVersion{major, minor, patch}
  152. return h
  153. }
  154. func (h *HostInfo) State() nodeState {
  155. h.mu.RLock()
  156. defer h.mu.RUnlock()
  157. return h.state
  158. }
  159. func (h *HostInfo) setState(state nodeState) *HostInfo {
  160. h.mu.Lock()
  161. defer h.mu.Unlock()
  162. h.state = state
  163. return h
  164. }
  165. func (h *HostInfo) Tokens() []string {
  166. h.mu.RLock()
  167. defer h.mu.RUnlock()
  168. return h.tokens
  169. }
  170. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  171. h.mu.Lock()
  172. defer h.mu.Unlock()
  173. h.tokens = tokens
  174. return h
  175. }
  176. func (h *HostInfo) Port() int {
  177. h.mu.RLock()
  178. defer h.mu.RUnlock()
  179. return h.port
  180. }
  181. func (h *HostInfo) setPort(port int) *HostInfo {
  182. h.mu.Lock()
  183. defer h.mu.Unlock()
  184. h.port = port
  185. return h
  186. }
  187. func (h *HostInfo) update(from *HostInfo) {
  188. h.mu.Lock()
  189. defer h.mu.Unlock()
  190. h.tokens = from.tokens
  191. h.version = from.version
  192. h.hostId = from.hostId
  193. h.dataCenter = from.dataCenter
  194. }
  195. func (h *HostInfo) IsUp() bool {
  196. return h != nil && h.State() == NodeUp
  197. }
  198. func (h *HostInfo) String() string {
  199. h.mu.RLock()
  200. defer h.mu.RUnlock()
  201. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  202. }
  203. // Polls system.peers at a specific interval to find new hosts
  204. type ringDescriber struct {
  205. dcFilter string
  206. rackFilter string
  207. session *Session
  208. closeChan chan bool
  209. // indicates that we can use system.local to get the connections remote address
  210. localHasRpcAddr bool
  211. mu sync.Mutex
  212. prevHosts []*HostInfo
  213. prevPartitioner string
  214. }
  215. func checkSystemLocal(control *controlConn) (bool, error) {
  216. iter := control.query("SELECT broadcast_address FROM system.local")
  217. if err := iter.err; err != nil {
  218. if errf, ok := err.(*errorFrame); ok {
  219. if errf.code == errSyntax {
  220. return false, nil
  221. }
  222. }
  223. return false, err
  224. }
  225. return true, nil
  226. }
  227. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  228. func checkSystemSchema(control *controlConn) (bool, error) {
  229. iter := control.query("SELECT * FROM system_schema.keyspaces")
  230. if err := iter.err; err != nil {
  231. if errf, ok := err.(*errorFrame); ok {
  232. if errf.code == errReadFailure {
  233. return false, nil
  234. }
  235. }
  236. return false, err
  237. }
  238. return true, nil
  239. }
  240. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  241. r.mu.Lock()
  242. defer r.mu.Unlock()
  243. // we need conn to be the same because we need to query system.peers and system.local
  244. // on the same node to get the whole cluster
  245. const (
  246. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  247. // only supported in 2.2.0, 2.1.6, 2.0.16
  248. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  249. )
  250. localHost := &HostInfo{}
  251. if r.localHasRpcAddr {
  252. iter := r.session.control.query(localQuery)
  253. if iter == nil {
  254. return r.prevHosts, r.prevPartitioner, nil
  255. }
  256. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  257. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  258. if err = iter.Close(); err != nil {
  259. return nil, "", err
  260. }
  261. } else {
  262. iter := r.session.control.withConn(func(c *Conn) *Iter {
  263. localHost = c.host
  264. return c.query(legacyLocalQuery)
  265. })
  266. if iter == nil {
  267. return r.prevHosts, r.prevPartitioner, nil
  268. }
  269. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  270. if err = iter.Close(); err != nil {
  271. return nil, "", err
  272. }
  273. }
  274. localHost.port = r.session.cfg.Port
  275. hosts = []*HostInfo{localHost}
  276. rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner()
  277. if rows == nil {
  278. return r.prevHosts, r.prevPartitioner, nil
  279. }
  280. for rows.Next() {
  281. host := &HostInfo{port: r.session.cfg.Port}
  282. err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
  283. if err != nil {
  284. Logger.Println(err)
  285. continue
  286. }
  287. if r.matchFilter(host) {
  288. hosts = append(hosts, host)
  289. }
  290. }
  291. if err = rows.Err(); err != nil {
  292. return nil, "", err
  293. }
  294. r.prevHosts = hosts
  295. r.prevPartitioner = partitioner
  296. return hosts, partitioner, nil
  297. }
  298. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  299. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  300. return false
  301. }
  302. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  303. return false
  304. }
  305. return true
  306. }
  307. func (r *ringDescriber) refreshRing() error {
  308. // if we have 0 hosts this will return the previous list of hosts to
  309. // attempt to reconnect to the cluster otherwise we would never find
  310. // downed hosts again, could possibly have an optimisation to only
  311. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  312. hosts, partitioner, err := r.GetHosts()
  313. if err != nil {
  314. return err
  315. }
  316. // TODO: move this to session
  317. // TODO: handle removing hosts here
  318. for _, h := range hosts {
  319. if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) {
  320. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  321. r.session.pool.addHost(h)
  322. } else {
  323. host.update(h)
  324. }
  325. }
  326. }
  327. r.session.metadata.setPartitioner(partitioner)
  328. r.session.policy.SetPartitioner(partitioner)
  329. return nil
  330. }