host_source.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package gocql
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type nodeState int32
  12. func (n nodeState) String() string {
  13. if n == NodeUp {
  14. return "UP"
  15. } else if n == NodeDown {
  16. return "DOWN"
  17. }
  18. return fmt.Sprintf("UNKNOWN_%d", n)
  19. }
  20. const (
  21. NodeUp nodeState = iota
  22. NodeDown
  23. )
  24. type cassVersion struct {
  25. Major, Minor, Patch int
  26. }
  27. func (c *cassVersion) Set(v string) error {
  28. if v == "" {
  29. return nil
  30. }
  31. return c.UnmarshalCQL(nil, []byte(v))
  32. }
  33. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  34. return c.unmarshal(data)
  35. }
  36. func (c *cassVersion) unmarshal(data []byte) error {
  37. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  38. version = strings.TrimPrefix(version, "v")
  39. v := strings.Split(version, ".")
  40. if len(v) < 2 {
  41. return fmt.Errorf("invalid version string: %s", data)
  42. }
  43. var err error
  44. c.Major, err = strconv.Atoi(v[0])
  45. if err != nil {
  46. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  47. }
  48. c.Minor, err = strconv.Atoi(v[1])
  49. if err != nil {
  50. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  51. }
  52. if len(v) > 2 {
  53. c.Patch, err = strconv.Atoi(v[2])
  54. if err != nil {
  55. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (c cassVersion) Before(major, minor, patch int) bool {
  61. if c.Major > major {
  62. return true
  63. } else if c.Minor > minor {
  64. return true
  65. } else if c.Patch > patch {
  66. return true
  67. }
  68. return false
  69. }
  70. func (c cassVersion) String() string {
  71. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  72. }
  73. func (c cassVersion) nodeUpDelay() time.Duration {
  74. if c.Major >= 2 && c.Minor >= 2 {
  75. // CASSANDRA-8236
  76. return 0
  77. }
  78. return 10 * time.Second
  79. }
  80. type HostInfo struct {
  81. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  82. // that we are thread safe use a mutex to access all fields.
  83. mu sync.RWMutex
  84. peer string
  85. port int
  86. dataCenter string
  87. rack string
  88. hostId string
  89. version cassVersion
  90. state nodeState
  91. tokens []string
  92. }
  93. func (h *HostInfo) Equal(host *HostInfo) bool {
  94. h.mu.RLock()
  95. defer h.mu.RUnlock()
  96. host.mu.RLock()
  97. defer host.mu.RUnlock()
  98. return h.peer == host.peer && h.hostId == host.hostId
  99. }
  100. func (h *HostInfo) Peer() string {
  101. h.mu.RLock()
  102. defer h.mu.RUnlock()
  103. return h.peer
  104. }
  105. func (h *HostInfo) setPeer(peer string) *HostInfo {
  106. h.mu.Lock()
  107. defer h.mu.Unlock()
  108. h.peer = peer
  109. return h
  110. }
  111. func (h *HostInfo) DataCenter() string {
  112. h.mu.RLock()
  113. defer h.mu.RUnlock()
  114. return h.dataCenter
  115. }
  116. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  117. h.mu.Lock()
  118. defer h.mu.Unlock()
  119. h.dataCenter = dataCenter
  120. return h
  121. }
  122. func (h *HostInfo) Rack() string {
  123. h.mu.RLock()
  124. defer h.mu.RUnlock()
  125. return h.rack
  126. }
  127. func (h *HostInfo) setRack(rack string) *HostInfo {
  128. h.mu.Lock()
  129. defer h.mu.Unlock()
  130. h.rack = rack
  131. return h
  132. }
  133. func (h *HostInfo) HostID() string {
  134. h.mu.RLock()
  135. defer h.mu.RUnlock()
  136. return h.hostId
  137. }
  138. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  139. h.mu.Lock()
  140. defer h.mu.Unlock()
  141. h.hostId = hostID
  142. return h
  143. }
  144. func (h *HostInfo) Version() cassVersion {
  145. h.mu.RLock()
  146. defer h.mu.RUnlock()
  147. return h.version
  148. }
  149. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  150. h.mu.Lock()
  151. defer h.mu.Unlock()
  152. h.version = cassVersion{major, minor, patch}
  153. return h
  154. }
  155. func (h *HostInfo) State() nodeState {
  156. h.mu.RLock()
  157. defer h.mu.RUnlock()
  158. return h.state
  159. }
  160. func (h *HostInfo) setState(state nodeState) *HostInfo {
  161. h.mu.Lock()
  162. defer h.mu.Unlock()
  163. h.state = state
  164. return h
  165. }
  166. func (h *HostInfo) Tokens() []string {
  167. h.mu.RLock()
  168. defer h.mu.RUnlock()
  169. return h.tokens
  170. }
  171. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  172. h.mu.Lock()
  173. defer h.mu.Unlock()
  174. h.tokens = tokens
  175. return h
  176. }
  177. func (h *HostInfo) Port() int {
  178. h.mu.RLock()
  179. defer h.mu.RUnlock()
  180. return h.port
  181. }
  182. func (h *HostInfo) setPort(port int) *HostInfo {
  183. h.mu.Lock()
  184. defer h.mu.Unlock()
  185. h.port = port
  186. return h
  187. }
  188. func (h *HostInfo) update(from *HostInfo) {
  189. h.mu.Lock()
  190. defer h.mu.Unlock()
  191. h.tokens = from.tokens
  192. h.version = from.version
  193. h.hostId = from.hostId
  194. h.dataCenter = from.dataCenter
  195. }
  196. func (h *HostInfo) IsUp() bool {
  197. return h != nil && h.State() == NodeUp
  198. }
  199. func (h *HostInfo) String() string {
  200. h.mu.RLock()
  201. defer h.mu.RUnlock()
  202. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  203. }
  204. // Polls system.peers at a specific interval to find new hosts
  205. type ringDescriber struct {
  206. dcFilter string
  207. rackFilter string
  208. session *Session
  209. closeChan chan bool
  210. // indicates that we can use system.local to get the connections remote address
  211. localHasRpcAddr bool
  212. mu sync.Mutex
  213. prevHosts []*HostInfo
  214. prevPartitioner string
  215. }
  216. func checkSystemLocal(control *controlConn) (bool, error) {
  217. iter := control.query("SELECT broadcast_address FROM system.local")
  218. if err := iter.err; err != nil {
  219. if errf, ok := err.(*errorFrame); ok {
  220. if errf.code == errSyntax {
  221. return false, nil
  222. }
  223. }
  224. return false, err
  225. }
  226. return true, nil
  227. }
  228. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  229. func checkSystemSchema(control *controlConn) (bool, error) {
  230. iter := control.query("SELECT * FROM system_schema.keyspaces")
  231. if err := iter.err; err != nil {
  232. if errf, ok := err.(*errorFrame); ok {
  233. if errf.code == errReadFailure {
  234. return false, nil
  235. }
  236. }
  237. return false, err
  238. }
  239. return true, nil
  240. }
  241. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  242. r.mu.Lock()
  243. defer r.mu.Unlock()
  244. // we need conn to be the same because we need to query system.peers and system.local
  245. // on the same node to get the whole cluster
  246. const (
  247. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  248. // only supported in 2.2.0, 2.1.6, 2.0.16
  249. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  250. )
  251. localHost := &HostInfo{}
  252. if r.localHasRpcAddr {
  253. iter := r.session.control.query(localQuery)
  254. if iter == nil {
  255. return r.prevHosts, r.prevPartitioner, nil
  256. }
  257. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  258. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  259. if err = iter.Close(); err != nil {
  260. return nil, "", err
  261. }
  262. } else {
  263. iter := r.session.control.query(legacyLocalQuery)
  264. if iter == nil {
  265. return r.prevHosts, r.prevPartitioner, nil
  266. }
  267. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  268. if err = iter.Close(); err != nil {
  269. return nil, "", err
  270. }
  271. addr, _, err := net.SplitHostPort(r.session.control.addr())
  272. if err != nil {
  273. // this should not happen, ever, as this is the address that was dialed by conn, here
  274. // a panic makes sense, please report a bug if it occurs.
  275. panic(err)
  276. }
  277. localHost.peer = addr
  278. }
  279. localHost.port = r.session.cfg.Port
  280. hosts = []*HostInfo{localHost}
  281. rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner()
  282. if rows == nil {
  283. return r.prevHosts, r.prevPartitioner, nil
  284. }
  285. for rows.Next() {
  286. host := &HostInfo{port: r.session.cfg.Port}
  287. err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
  288. if err != nil {
  289. log.Println(err)
  290. continue
  291. }
  292. if r.matchFilter(host) {
  293. hosts = append(hosts, host)
  294. }
  295. }
  296. if err = rows.Err(); err != nil {
  297. return nil, "", err
  298. }
  299. r.prevHosts = hosts
  300. r.prevPartitioner = partitioner
  301. return hosts, partitioner, nil
  302. }
  303. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  304. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  305. return false
  306. }
  307. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  308. return false
  309. }
  310. return true
  311. }
  312. func (r *ringDescriber) refreshRing() error {
  313. // if we have 0 hosts this will return the previous list of hosts to
  314. // attempt to reconnect to the cluster otherwise we would never find
  315. // downed hosts again, could possibly have an optimisation to only
  316. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  317. hosts, partitioner, err := r.GetHosts()
  318. if err != nil {
  319. return err
  320. }
  321. // TODO: move this to session
  322. // TODO: handle removing hosts here
  323. for _, h := range hosts {
  324. if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) {
  325. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  326. r.session.pool.addHost(h)
  327. } else {
  328. host.update(h)
  329. }
  330. }
  331. }
  332. r.session.metadata.setPartitioner(partitioner)
  333. r.session.policy.SetPartitioner(partitioner)
  334. return nil
  335. }