host_source.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type nodeState int32
  12. func (n nodeState) String() string {
  13. if n == NodeUp {
  14. return "UP"
  15. } else if n == NodeDown {
  16. return "DOWN"
  17. }
  18. return fmt.Sprintf("UNKNOWN_%d", n)
  19. }
  20. const (
  21. NodeUp nodeState = iota
  22. NodeDown
  23. )
  24. type cassVersion struct {
  25. Major, Minor, Patch int
  26. }
  27. func (c *cassVersion) Set(v string) error {
  28. if v == "" {
  29. return nil
  30. }
  31. return c.UnmarshalCQL(nil, []byte(v))
  32. }
  33. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  34. return c.unmarshal(data)
  35. }
  36. func (c *cassVersion) unmarshal(data []byte) error {
  37. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  38. version = strings.TrimPrefix(version, "v")
  39. v := strings.Split(version, ".")
  40. if len(v) < 2 {
  41. return fmt.Errorf("invalid version string: %s", data)
  42. }
  43. var err error
  44. c.Major, err = strconv.Atoi(v[0])
  45. if err != nil {
  46. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  47. }
  48. c.Minor, err = strconv.Atoi(v[1])
  49. if err != nil {
  50. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  51. }
  52. if len(v) > 2 {
  53. c.Patch, err = strconv.Atoi(v[2])
  54. if err != nil {
  55. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (c cassVersion) Before(major, minor, patch int) bool {
  61. if c.Major > major {
  62. return true
  63. } else if c.Minor > minor {
  64. return true
  65. } else if c.Patch > patch {
  66. return true
  67. }
  68. return false
  69. }
  70. func (c cassVersion) String() string {
  71. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  72. }
  73. func (c cassVersion) nodeUpDelay() time.Duration {
  74. if c.Major >= 2 && c.Minor >= 2 {
  75. // CASSANDRA-8236
  76. return 0
  77. }
  78. return 10 * time.Second
  79. }
  80. type HostInfo struct {
  81. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  82. // that we are thread safe use a mutex to access all fields.
  83. mu sync.RWMutex
  84. peer net.IP
  85. port int
  86. dataCenter string
  87. rack string
  88. hostId string
  89. version cassVersion
  90. state nodeState
  91. tokens []string
  92. }
  93. func (h *HostInfo) Equal(host *HostInfo) bool {
  94. h.mu.RLock()
  95. defer h.mu.RUnlock()
  96. host.mu.RLock()
  97. defer host.mu.RUnlock()
  98. return h.peer.Equal(host.peer)
  99. }
  100. func (h *HostInfo) Peer() net.IP {
  101. h.mu.RLock()
  102. defer h.mu.RUnlock()
  103. return h.peer
  104. }
  105. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  106. h.mu.Lock()
  107. defer h.mu.Unlock()
  108. h.peer = peer
  109. return h
  110. }
  111. func (h *HostInfo) DataCenter() string {
  112. h.mu.RLock()
  113. defer h.mu.RUnlock()
  114. return h.dataCenter
  115. }
  116. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  117. h.mu.Lock()
  118. defer h.mu.Unlock()
  119. h.dataCenter = dataCenter
  120. return h
  121. }
  122. func (h *HostInfo) Rack() string {
  123. h.mu.RLock()
  124. defer h.mu.RUnlock()
  125. return h.rack
  126. }
  127. func (h *HostInfo) setRack(rack string) *HostInfo {
  128. h.mu.Lock()
  129. defer h.mu.Unlock()
  130. h.rack = rack
  131. return h
  132. }
  133. func (h *HostInfo) HostID() string {
  134. h.mu.RLock()
  135. defer h.mu.RUnlock()
  136. return h.hostId
  137. }
  138. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  139. h.mu.Lock()
  140. defer h.mu.Unlock()
  141. h.hostId = hostID
  142. return h
  143. }
  144. func (h *HostInfo) Version() cassVersion {
  145. h.mu.RLock()
  146. defer h.mu.RUnlock()
  147. return h.version
  148. }
  149. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  150. h.mu.Lock()
  151. defer h.mu.Unlock()
  152. h.version = cassVersion{major, minor, patch}
  153. return h
  154. }
  155. func (h *HostInfo) State() nodeState {
  156. h.mu.RLock()
  157. defer h.mu.RUnlock()
  158. return h.state
  159. }
  160. func (h *HostInfo) setState(state nodeState) *HostInfo {
  161. h.mu.Lock()
  162. defer h.mu.Unlock()
  163. h.state = state
  164. return h
  165. }
  166. func (h *HostInfo) Tokens() []string {
  167. h.mu.RLock()
  168. defer h.mu.RUnlock()
  169. return h.tokens
  170. }
  171. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  172. h.mu.Lock()
  173. defer h.mu.Unlock()
  174. h.tokens = tokens
  175. return h
  176. }
  177. func (h *HostInfo) Port() int {
  178. h.mu.RLock()
  179. defer h.mu.RUnlock()
  180. return h.port
  181. }
  182. func (h *HostInfo) setPort(port int) *HostInfo {
  183. h.mu.Lock()
  184. defer h.mu.Unlock()
  185. h.port = port
  186. return h
  187. }
  188. func (h *HostInfo) update(from *HostInfo) {
  189. h.mu.Lock()
  190. defer h.mu.Unlock()
  191. h.tokens = from.tokens
  192. h.version = from.version
  193. h.hostId = from.hostId
  194. h.dataCenter = from.dataCenter
  195. }
  196. func (h *HostInfo) IsUp() bool {
  197. return h != nil && h.State() == NodeUp
  198. }
  199. func (h *HostInfo) String() string {
  200. h.mu.RLock()
  201. defer h.mu.RUnlock()
  202. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  203. }
  204. // Polls system.peers at a specific interval to find new hosts
  205. type ringDescriber struct {
  206. dcFilter string
  207. rackFilter string
  208. session *Session
  209. closeChan chan bool
  210. // indicates that we can use system.local to get the connections remote address
  211. localHasRpcAddr bool
  212. mu sync.Mutex
  213. prevHosts []*HostInfo
  214. prevPartitioner string
  215. }
  216. func checkSystemLocal(control *controlConn) (bool, error) {
  217. iter := control.query("SELECT broadcast_address FROM system.local")
  218. if err := iter.err; err != nil {
  219. if errf, ok := err.(*errorFrame); ok {
  220. if errf.code == errSyntax {
  221. return false, nil
  222. }
  223. }
  224. return false, err
  225. }
  226. return true, nil
  227. }
  228. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  229. func checkSystemSchema(control *controlConn) (bool, error) {
  230. iter := control.query("SELECT * FROM system_schema.keyspaces")
  231. if err := iter.err; err != nil {
  232. if errf, ok := err.(*errorFrame); ok {
  233. if errf.code == errReadFailure {
  234. return false, nil
  235. }
  236. }
  237. return false, err
  238. }
  239. return true, nil
  240. }
  241. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  242. r.mu.Lock()
  243. defer r.mu.Unlock()
  244. // we need conn to be the same because we need to query system.peers and system.local
  245. // on the same node to get the whole cluster
  246. const (
  247. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  248. // only supported in 2.2.0, 2.1.6, 2.0.16
  249. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  250. )
  251. localHost := &HostInfo{}
  252. if r.localHasRpcAddr {
  253. iter := r.session.control.query(localQuery)
  254. if iter == nil {
  255. return r.prevHosts, r.prevPartitioner, nil
  256. }
  257. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  258. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  259. if err = iter.Close(); err != nil {
  260. return nil, "", err
  261. }
  262. } else {
  263. iter := r.session.control.withConn(func(c *Conn) *Iter {
  264. localHost = c.host
  265. return c.query(legacyLocalQuery)
  266. })
  267. if iter == nil {
  268. return r.prevHosts, r.prevPartitioner, nil
  269. }
  270. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  271. if err = iter.Close(); err != nil {
  272. return nil, "", err
  273. }
  274. }
  275. localHost.port = r.session.cfg.Port
  276. hosts = []*HostInfo{localHost}
  277. rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner()
  278. if rows == nil {
  279. return r.prevHosts, r.prevPartitioner, nil
  280. }
  281. for rows.Next() {
  282. host := &HostInfo{port: r.session.cfg.Port}
  283. err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
  284. if err != nil {
  285. log.Println(err)
  286. continue
  287. }
  288. if r.matchFilter(host) {
  289. hosts = append(hosts, host)
  290. }
  291. }
  292. if err = rows.Err(); err != nil {
  293. return nil, "", err
  294. }
  295. r.prevHosts = hosts
  296. r.prevPartitioner = partitioner
  297. return hosts, partitioner, nil
  298. }
  299. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  300. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  301. return false
  302. }
  303. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  304. return false
  305. }
  306. return true
  307. }
  308. func (r *ringDescriber) refreshRing() error {
  309. // if we have 0 hosts this will return the previous list of hosts to
  310. // attempt to reconnect to the cluster otherwise we would never find
  311. // downed hosts again, could possibly have an optimisation to only
  312. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  313. hosts, partitioner, err := r.GetHosts()
  314. if err != nil {
  315. return err
  316. }
  317. // TODO: move this to session
  318. // TODO: handle removing hosts here
  319. for _, h := range hosts {
  320. if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) {
  321. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  322. r.session.pool.addHost(h)
  323. } else {
  324. host.update(h)
  325. }
  326. }
  327. }
  328. r.session.metadata.setPartitioner(partitioner)
  329. r.session.policy.SetPartitioner(partitioner)
  330. return nil
  331. }