host_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. package gocql
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type nodeState int32
  13. func (n nodeState) String() string {
  14. if n == NodeUp {
  15. return "UP"
  16. } else if n == NodeDown {
  17. return "DOWN"
  18. }
  19. return fmt.Sprintf("UNKNOWN_%d", n)
  20. }
  21. const (
  22. NodeUp nodeState = iota
  23. NodeDown
  24. )
  25. type cassVersion struct {
  26. Major, Minor, Patch int
  27. }
  28. func (c *cassVersion) Set(v string) error {
  29. if v == "" {
  30. return nil
  31. }
  32. return c.UnmarshalCQL(nil, []byte(v))
  33. }
  34. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  35. return c.unmarshal(data)
  36. }
  37. func (c *cassVersion) unmarshal(data []byte) error {
  38. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  39. version = strings.TrimPrefix(version, "v")
  40. v := strings.Split(version, ".")
  41. if len(v) < 2 {
  42. return fmt.Errorf("invalid version string: %s", data)
  43. }
  44. var err error
  45. c.Major, err = strconv.Atoi(v[0])
  46. if err != nil {
  47. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  48. }
  49. c.Minor, err = strconv.Atoi(v[1])
  50. if err != nil {
  51. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  52. }
  53. if len(v) > 2 {
  54. c.Patch, err = strconv.Atoi(v[2])
  55. if err != nil {
  56. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  57. }
  58. }
  59. return nil
  60. }
  61. func (c cassVersion) Before(major, minor, patch int) bool {
  62. // We're comparing us (cassVersion) with the provided version (major, minor, patch)
  63. // We return true if our version is lower (comes before) than the provided one.
  64. if c.Major < major {
  65. return true
  66. } else if c.Major == major {
  67. if c.Minor < minor {
  68. return true
  69. } else if c.Minor == minor && c.Patch < patch {
  70. return true
  71. }
  72. }
  73. return false
  74. }
  75. func (c cassVersion) String() string {
  76. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  77. }
  78. func (c cassVersion) nodeUpDelay() time.Duration {
  79. if c.Major >= 2 && c.Minor >= 2 {
  80. // CASSANDRA-8236
  81. return 0
  82. }
  83. return 10 * time.Second
  84. }
  85. type HostInfo struct {
  86. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  87. // that we are thread safe use a mutex to access all fields.
  88. mu sync.RWMutex
  89. peer net.IP
  90. broadcastAddress net.IP
  91. listenAddress net.IP
  92. rpcAddress net.IP
  93. preferredIP net.IP
  94. connectAddress net.IP
  95. port int
  96. dataCenter string
  97. rack string
  98. hostId string
  99. workload string
  100. graph bool
  101. dseVersion string
  102. partitioner string
  103. clusterName string
  104. version cassVersion
  105. state nodeState
  106. tokens []string
  107. }
  108. func (h *HostInfo) Equal(host *HostInfo) bool {
  109. if h == host {
  110. // prevent rlock reentry
  111. return true
  112. }
  113. return h.ConnectAddress().Equal(host.ConnectAddress())
  114. }
  115. func (h *HostInfo) Peer() net.IP {
  116. h.mu.RLock()
  117. defer h.mu.RUnlock()
  118. return h.peer
  119. }
  120. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  121. h.mu.Lock()
  122. defer h.mu.Unlock()
  123. h.peer = peer
  124. return h
  125. }
  126. func (h *HostInfo) invalidConnectAddr() bool {
  127. h.mu.RLock()
  128. defer h.mu.RUnlock()
  129. addr, _ := h.connectAddressLocked()
  130. return !validIpAddr(addr)
  131. }
  132. func validIpAddr(addr net.IP) bool {
  133. return addr != nil && !addr.IsUnspecified()
  134. }
  135. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  136. if validIpAddr(h.connectAddress) {
  137. return h.connectAddress, "connect_address"
  138. } else if validIpAddr(h.rpcAddress) {
  139. return h.rpcAddress, "rpc_adress"
  140. } else if validIpAddr(h.preferredIP) {
  141. // where does perferred_ip get set?
  142. return h.preferredIP, "preferred_ip"
  143. } else if validIpAddr(h.broadcastAddress) {
  144. return h.broadcastAddress, "broadcast_address"
  145. } else if validIpAddr(h.peer) {
  146. return h.peer, "peer"
  147. }
  148. return net.IPv4zero, "invalid"
  149. }
  150. // Returns the address that should be used to connect to the host.
  151. // If you wish to override this, use an AddressTranslator or
  152. // use a HostFilter to SetConnectAddress()
  153. func (h *HostInfo) ConnectAddress() net.IP {
  154. h.mu.RLock()
  155. defer h.mu.RUnlock()
  156. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  157. return addr
  158. }
  159. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  160. }
  161. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  162. // TODO(zariel): should this not be exported?
  163. h.mu.Lock()
  164. defer h.mu.Unlock()
  165. h.connectAddress = address
  166. return h
  167. }
  168. func (h *HostInfo) BroadcastAddress() net.IP {
  169. h.mu.RLock()
  170. defer h.mu.RUnlock()
  171. return h.broadcastAddress
  172. }
  173. func (h *HostInfo) ListenAddress() net.IP {
  174. h.mu.RLock()
  175. defer h.mu.RUnlock()
  176. return h.listenAddress
  177. }
  178. func (h *HostInfo) RPCAddress() net.IP {
  179. h.mu.RLock()
  180. defer h.mu.RUnlock()
  181. return h.rpcAddress
  182. }
  183. func (h *HostInfo) PreferredIP() net.IP {
  184. h.mu.RLock()
  185. defer h.mu.RUnlock()
  186. return h.preferredIP
  187. }
  188. func (h *HostInfo) DataCenter() string {
  189. h.mu.RLock()
  190. defer h.mu.RUnlock()
  191. return h.dataCenter
  192. }
  193. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  194. h.mu.Lock()
  195. defer h.mu.Unlock()
  196. h.dataCenter = dataCenter
  197. return h
  198. }
  199. func (h *HostInfo) Rack() string {
  200. h.mu.RLock()
  201. defer h.mu.RUnlock()
  202. return h.rack
  203. }
  204. func (h *HostInfo) setRack(rack string) *HostInfo {
  205. h.mu.Lock()
  206. defer h.mu.Unlock()
  207. h.rack = rack
  208. return h
  209. }
  210. func (h *HostInfo) HostID() string {
  211. h.mu.RLock()
  212. defer h.mu.RUnlock()
  213. return h.hostId
  214. }
  215. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  216. h.mu.Lock()
  217. defer h.mu.Unlock()
  218. h.hostId = hostID
  219. return h
  220. }
  221. func (h *HostInfo) WorkLoad() string {
  222. h.mu.RLock()
  223. defer h.mu.RUnlock()
  224. return h.workload
  225. }
  226. func (h *HostInfo) Graph() bool {
  227. h.mu.RLock()
  228. defer h.mu.RUnlock()
  229. return h.graph
  230. }
  231. func (h *HostInfo) DSEVersion() string {
  232. h.mu.RLock()
  233. defer h.mu.RUnlock()
  234. return h.dseVersion
  235. }
  236. func (h *HostInfo) Partitioner() string {
  237. h.mu.RLock()
  238. defer h.mu.RUnlock()
  239. return h.partitioner
  240. }
  241. func (h *HostInfo) ClusterName() string {
  242. h.mu.RLock()
  243. defer h.mu.RUnlock()
  244. return h.clusterName
  245. }
  246. func (h *HostInfo) Version() cassVersion {
  247. h.mu.RLock()
  248. defer h.mu.RUnlock()
  249. return h.version
  250. }
  251. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  252. h.mu.Lock()
  253. defer h.mu.Unlock()
  254. h.version = cassVersion{major, minor, patch}
  255. return h
  256. }
  257. func (h *HostInfo) State() nodeState {
  258. h.mu.RLock()
  259. defer h.mu.RUnlock()
  260. return h.state
  261. }
  262. func (h *HostInfo) setState(state nodeState) *HostInfo {
  263. h.mu.Lock()
  264. defer h.mu.Unlock()
  265. h.state = state
  266. return h
  267. }
  268. func (h *HostInfo) Tokens() []string {
  269. h.mu.RLock()
  270. defer h.mu.RUnlock()
  271. return h.tokens
  272. }
  273. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  274. h.mu.Lock()
  275. defer h.mu.Unlock()
  276. h.tokens = tokens
  277. return h
  278. }
  279. func (h *HostInfo) Port() int {
  280. h.mu.RLock()
  281. defer h.mu.RUnlock()
  282. return h.port
  283. }
  284. func (h *HostInfo) setPort(port int) *HostInfo {
  285. h.mu.Lock()
  286. defer h.mu.Unlock()
  287. h.port = port
  288. return h
  289. }
  290. func (h *HostInfo) update(from *HostInfo) {
  291. if h == from {
  292. return
  293. }
  294. h.mu.Lock()
  295. defer h.mu.Unlock()
  296. from.mu.RLock()
  297. defer from.mu.RUnlock()
  298. // autogenerated do not update
  299. if h.peer == nil {
  300. h.peer = from.peer
  301. }
  302. if h.broadcastAddress == nil {
  303. h.broadcastAddress = from.broadcastAddress
  304. }
  305. if h.listenAddress == nil {
  306. h.listenAddress = from.listenAddress
  307. }
  308. if h.rpcAddress == nil {
  309. h.rpcAddress = from.rpcAddress
  310. }
  311. if h.preferredIP == nil {
  312. h.preferredIP = from.preferredIP
  313. }
  314. if h.connectAddress == nil {
  315. h.connectAddress = from.connectAddress
  316. }
  317. if h.port == 0 {
  318. h.port = from.port
  319. }
  320. if h.dataCenter == "" {
  321. h.dataCenter = from.dataCenter
  322. }
  323. if h.rack == "" {
  324. h.rack = from.rack
  325. }
  326. if h.hostId == "" {
  327. h.hostId = from.hostId
  328. }
  329. if h.workload == "" {
  330. h.workload = from.workload
  331. }
  332. if h.dseVersion == "" {
  333. h.dseVersion = from.dseVersion
  334. }
  335. if h.partitioner == "" {
  336. h.partitioner = from.partitioner
  337. }
  338. if h.clusterName == "" {
  339. h.clusterName = from.clusterName
  340. }
  341. if h.version == (cassVersion{}) {
  342. h.version = from.version
  343. }
  344. if h.tokens == nil {
  345. h.tokens = from.tokens
  346. }
  347. }
  348. func (h *HostInfo) IsUp() bool {
  349. return h != nil && h.State() == NodeUp
  350. }
  351. func (h *HostInfo) String() string {
  352. h.mu.RLock()
  353. defer h.mu.RUnlock()
  354. connectAddr, source := h.connectAddressLocked()
  355. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  356. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  357. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  358. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  359. connectAddr, source,
  360. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  361. }
  362. // Polls system.peers at a specific interval to find new hosts
  363. type ringDescriber struct {
  364. session *Session
  365. mu sync.Mutex
  366. prevHosts []*HostInfo
  367. prevPartitioner string
  368. }
  369. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  370. func checkSystemSchema(control *controlConn) (bool, error) {
  371. iter := control.query("SELECT * FROM system_schema.keyspaces")
  372. if err := iter.err; err != nil {
  373. if errf, ok := err.(*errorFrame); ok {
  374. if errf.code == errSyntax {
  375. return false, nil
  376. }
  377. }
  378. return false, err
  379. }
  380. return true, nil
  381. }
  382. // Given a map that represents a row from either system.local or system.peers
  383. // return as much information as we can in *HostInfo
  384. func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) {
  385. const assertErrorMsg = "Assertion failed for %s"
  386. var ok bool
  387. // Default to our connected port if the cluster doesn't have port information
  388. host := HostInfo{
  389. port: port,
  390. }
  391. for key, value := range row {
  392. switch key {
  393. case "data_center":
  394. host.dataCenter, ok = value.(string)
  395. if !ok {
  396. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  397. }
  398. case "rack":
  399. host.rack, ok = value.(string)
  400. if !ok {
  401. return nil, fmt.Errorf(assertErrorMsg, "rack")
  402. }
  403. case "host_id":
  404. hostId, ok := value.(UUID)
  405. if !ok {
  406. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  407. }
  408. host.hostId = hostId.String()
  409. case "release_version":
  410. version, ok := value.(string)
  411. if !ok {
  412. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  413. }
  414. host.version.Set(version)
  415. case "peer":
  416. ip, ok := value.(string)
  417. if !ok {
  418. return nil, fmt.Errorf(assertErrorMsg, "peer")
  419. }
  420. host.peer = net.ParseIP(ip)
  421. case "cluster_name":
  422. host.clusterName, ok = value.(string)
  423. if !ok {
  424. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  425. }
  426. case "partitioner":
  427. host.partitioner, ok = value.(string)
  428. if !ok {
  429. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  430. }
  431. case "broadcast_address":
  432. ip, ok := value.(string)
  433. if !ok {
  434. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  435. }
  436. host.broadcastAddress = net.ParseIP(ip)
  437. case "preferred_ip":
  438. ip, ok := value.(string)
  439. if !ok {
  440. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  441. }
  442. host.preferredIP = net.ParseIP(ip)
  443. case "rpc_address":
  444. ip, ok := value.(string)
  445. if !ok {
  446. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  447. }
  448. host.rpcAddress = net.ParseIP(ip)
  449. case "listen_address":
  450. ip, ok := value.(string)
  451. if !ok {
  452. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  453. }
  454. host.listenAddress = net.ParseIP(ip)
  455. case "workload":
  456. host.workload, ok = value.(string)
  457. if !ok {
  458. return nil, fmt.Errorf(assertErrorMsg, "workload")
  459. }
  460. case "graph":
  461. host.graph, ok = value.(bool)
  462. if !ok {
  463. return nil, fmt.Errorf(assertErrorMsg, "graph")
  464. }
  465. case "tokens":
  466. host.tokens, ok = value.([]string)
  467. if !ok {
  468. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  469. }
  470. case "dse_version":
  471. host.dseVersion, ok = value.(string)
  472. if !ok {
  473. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  474. }
  475. }
  476. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  477. // Not sure what the port field will be called until the JIRA issue is complete
  478. }
  479. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  480. host.connectAddress = ip
  481. host.port = port
  482. return &host, nil
  483. }
  484. // Ask the control node for host info on all it's known peers
  485. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  486. var hosts []*HostInfo
  487. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  488. hosts = append(hosts, ch.host)
  489. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  490. })
  491. if iter == nil {
  492. return nil, errNoControl
  493. }
  494. rows, err := iter.SliceMap()
  495. if err != nil {
  496. // TODO(zariel): make typed error
  497. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  498. }
  499. for _, row := range rows {
  500. // extract all available info about the peer
  501. host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port)
  502. if err != nil {
  503. return nil, err
  504. } else if !isValidPeer(host) {
  505. // If it's not a valid peer
  506. Logger.Printf("Found invalid peer '%s' "+
  507. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  508. continue
  509. }
  510. hosts = append(hosts, host)
  511. }
  512. return hosts, nil
  513. }
  514. // Return true if the host is a valid peer
  515. func isValidPeer(host *HostInfo) bool {
  516. return !(len(host.RPCAddress()) == 0 ||
  517. host.hostId == "" ||
  518. host.dataCenter == "" ||
  519. host.rack == "" ||
  520. len(host.tokens) == 0)
  521. }
  522. // Return a list of hosts the cluster knows about
  523. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  524. r.mu.Lock()
  525. defer r.mu.Unlock()
  526. hosts, err := r.getClusterPeerInfo()
  527. if err != nil {
  528. return r.prevHosts, r.prevPartitioner, err
  529. }
  530. var partitioner string
  531. if len(hosts) > 0 {
  532. partitioner = hosts[0].Partitioner()
  533. }
  534. return hosts, partitioner, nil
  535. }
  536. // Given an ip/port return HostInfo for the specified ip/port
  537. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  538. var host *HostInfo
  539. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  540. if ch.host.ConnectAddress().Equal(ip) {
  541. host = ch.host
  542. return nil
  543. }
  544. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  545. })
  546. if iter != nil {
  547. rows, err := iter.SliceMap()
  548. if err != nil {
  549. return nil, err
  550. }
  551. for _, row := range rows {
  552. h, err := r.session.hostInfoFromMap(row, port)
  553. if err != nil {
  554. return nil, err
  555. }
  556. if h.ConnectAddress().Equal(ip) {
  557. host = h
  558. break
  559. }
  560. }
  561. if host == nil {
  562. return nil, errors.New("host not found in peers table")
  563. }
  564. }
  565. if host == nil {
  566. return nil, errors.New("unable to fetch host info: invalid control connection")
  567. } else if host.invalidConnectAddr() {
  568. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  569. }
  570. return host, nil
  571. }
  572. func (r *ringDescriber) refreshRing() error {
  573. // if we have 0 hosts this will return the previous list of hosts to
  574. // attempt to reconnect to the cluster otherwise we would never find
  575. // downed hosts again, could possibly have an optimisation to only
  576. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  577. hosts, partitioner, err := r.GetHosts()
  578. if err != nil {
  579. return err
  580. }
  581. prevHosts := r.session.ring.currentHosts()
  582. // TODO: move this to session
  583. for _, h := range hosts {
  584. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  585. continue
  586. }
  587. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  588. r.session.pool.addHost(h)
  589. r.session.policy.AddHost(h)
  590. } else {
  591. host.update(h)
  592. }
  593. delete(prevHosts, h.ConnectAddress().String())
  594. }
  595. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  596. // in a session so that everything sees a consistent state. Becuase as is today
  597. // events can come in and due to ordering an UP host could be removed from the cluster
  598. for _, host := range prevHosts {
  599. r.session.removeHost(host)
  600. }
  601. r.session.metadata.setPartitioner(partitioner)
  602. r.session.policy.SetPartitioner(partitioner)
  603. return nil
  604. }