host_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. package gocql
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type nodeState int32
  13. func (n nodeState) String() string {
  14. if n == NodeUp {
  15. return "UP"
  16. } else if n == NodeDown {
  17. return "DOWN"
  18. }
  19. return fmt.Sprintf("UNKNOWN_%d", n)
  20. }
  21. const (
  22. NodeUp nodeState = iota
  23. NodeDown
  24. )
  25. type cassVersion struct {
  26. Major, Minor, Patch int
  27. }
  28. func (c *cassVersion) Set(v string) error {
  29. if v == "" {
  30. return nil
  31. }
  32. return c.UnmarshalCQL(nil, []byte(v))
  33. }
  34. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  35. return c.unmarshal(data)
  36. }
  37. func (c *cassVersion) unmarshal(data []byte) error {
  38. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  39. version = strings.TrimPrefix(version, "v")
  40. v := strings.Split(version, ".")
  41. if len(v) < 2 {
  42. return fmt.Errorf("invalid version string: %s", data)
  43. }
  44. var err error
  45. c.Major, err = strconv.Atoi(v[0])
  46. if err != nil {
  47. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  48. }
  49. c.Minor, err = strconv.Atoi(v[1])
  50. if err != nil {
  51. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  52. }
  53. if len(v) > 2 {
  54. c.Patch, err = strconv.Atoi(v[2])
  55. if err != nil {
  56. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  57. }
  58. }
  59. return nil
  60. }
  61. func (c cassVersion) Before(major, minor, patch int) bool {
  62. // We're comparing us (cassVersion) with the provided version (major, minor, patch)
  63. // We return true if our version is lower (comes before) than the provided one.
  64. if c.Major < major {
  65. return true
  66. } else if c.Major == major {
  67. if c.Minor < minor {
  68. return true
  69. } else if c.Minor == minor && c.Patch < patch {
  70. return true
  71. }
  72. }
  73. return false
  74. }
  75. func (c cassVersion) AtLeast(major, minor, patch int) bool {
  76. return !c.Before(major, minor, patch)
  77. }
  78. func (c cassVersion) String() string {
  79. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  80. }
  81. func (c cassVersion) nodeUpDelay() time.Duration {
  82. if c.Major >= 2 && c.Minor >= 2 {
  83. // CASSANDRA-8236
  84. return 0
  85. }
  86. return 10 * time.Second
  87. }
  88. type HostInfo struct {
  89. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  90. // that we are thread safe use a mutex to access all fields.
  91. mu sync.RWMutex
  92. hostname string
  93. peer net.IP
  94. broadcastAddress net.IP
  95. listenAddress net.IP
  96. rpcAddress net.IP
  97. preferredIP net.IP
  98. connectAddress net.IP
  99. port int
  100. dataCenter string
  101. rack string
  102. hostId string
  103. workload string
  104. graph bool
  105. dseVersion string
  106. partitioner string
  107. clusterName string
  108. version cassVersion
  109. state nodeState
  110. schemaVersion string
  111. tokens []string
  112. }
  113. func (h *HostInfo) Equal(host *HostInfo) bool {
  114. if h == host {
  115. // prevent rlock reentry
  116. return true
  117. }
  118. return h.ConnectAddress().Equal(host.ConnectAddress())
  119. }
  120. func (h *HostInfo) Peer() net.IP {
  121. h.mu.RLock()
  122. defer h.mu.RUnlock()
  123. return h.peer
  124. }
  125. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  126. h.mu.Lock()
  127. defer h.mu.Unlock()
  128. h.peer = peer
  129. return h
  130. }
  131. func (h *HostInfo) invalidConnectAddr() bool {
  132. h.mu.RLock()
  133. defer h.mu.RUnlock()
  134. addr, _ := h.connectAddressLocked()
  135. return !validIpAddr(addr)
  136. }
  137. func validIpAddr(addr net.IP) bool {
  138. return addr != nil && !addr.IsUnspecified()
  139. }
  140. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  141. if validIpAddr(h.connectAddress) {
  142. return h.connectAddress, "connect_address"
  143. } else if validIpAddr(h.rpcAddress) {
  144. return h.rpcAddress, "rpc_adress"
  145. } else if validIpAddr(h.preferredIP) {
  146. // where does perferred_ip get set?
  147. return h.preferredIP, "preferred_ip"
  148. } else if validIpAddr(h.broadcastAddress) {
  149. return h.broadcastAddress, "broadcast_address"
  150. } else if validIpAddr(h.peer) {
  151. return h.peer, "peer"
  152. }
  153. return net.IPv4zero, "invalid"
  154. }
  155. // Returns the address that should be used to connect to the host.
  156. // If you wish to override this, use an AddressTranslator or
  157. // use a HostFilter to SetConnectAddress()
  158. func (h *HostInfo) ConnectAddress() net.IP {
  159. h.mu.RLock()
  160. defer h.mu.RUnlock()
  161. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  162. return addr
  163. }
  164. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  165. }
  166. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  167. // TODO(zariel): should this not be exported?
  168. h.mu.Lock()
  169. defer h.mu.Unlock()
  170. h.connectAddress = address
  171. return h
  172. }
  173. func (h *HostInfo) BroadcastAddress() net.IP {
  174. h.mu.RLock()
  175. defer h.mu.RUnlock()
  176. return h.broadcastAddress
  177. }
  178. func (h *HostInfo) ListenAddress() net.IP {
  179. h.mu.RLock()
  180. defer h.mu.RUnlock()
  181. return h.listenAddress
  182. }
  183. func (h *HostInfo) RPCAddress() net.IP {
  184. h.mu.RLock()
  185. defer h.mu.RUnlock()
  186. return h.rpcAddress
  187. }
  188. func (h *HostInfo) PreferredIP() net.IP {
  189. h.mu.RLock()
  190. defer h.mu.RUnlock()
  191. return h.preferredIP
  192. }
  193. func (h *HostInfo) DataCenter() string {
  194. h.mu.RLock()
  195. defer h.mu.RUnlock()
  196. return h.dataCenter
  197. }
  198. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  199. h.mu.Lock()
  200. defer h.mu.Unlock()
  201. h.dataCenter = dataCenter
  202. return h
  203. }
  204. func (h *HostInfo) Rack() string {
  205. h.mu.RLock()
  206. defer h.mu.RUnlock()
  207. return h.rack
  208. }
  209. func (h *HostInfo) setRack(rack string) *HostInfo {
  210. h.mu.Lock()
  211. defer h.mu.Unlock()
  212. h.rack = rack
  213. return h
  214. }
  215. func (h *HostInfo) HostID() string {
  216. h.mu.RLock()
  217. defer h.mu.RUnlock()
  218. return h.hostId
  219. }
  220. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  221. h.mu.Lock()
  222. defer h.mu.Unlock()
  223. h.hostId = hostID
  224. return h
  225. }
  226. func (h *HostInfo) WorkLoad() string {
  227. h.mu.RLock()
  228. defer h.mu.RUnlock()
  229. return h.workload
  230. }
  231. func (h *HostInfo) Graph() bool {
  232. h.mu.RLock()
  233. defer h.mu.RUnlock()
  234. return h.graph
  235. }
  236. func (h *HostInfo) DSEVersion() string {
  237. h.mu.RLock()
  238. defer h.mu.RUnlock()
  239. return h.dseVersion
  240. }
  241. func (h *HostInfo) Partitioner() string {
  242. h.mu.RLock()
  243. defer h.mu.RUnlock()
  244. return h.partitioner
  245. }
  246. func (h *HostInfo) ClusterName() string {
  247. h.mu.RLock()
  248. defer h.mu.RUnlock()
  249. return h.clusterName
  250. }
  251. func (h *HostInfo) Version() cassVersion {
  252. h.mu.RLock()
  253. defer h.mu.RUnlock()
  254. return h.version
  255. }
  256. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  257. h.mu.Lock()
  258. defer h.mu.Unlock()
  259. h.version = cassVersion{major, minor, patch}
  260. return h
  261. }
  262. func (h *HostInfo) State() nodeState {
  263. h.mu.RLock()
  264. defer h.mu.RUnlock()
  265. return h.state
  266. }
  267. func (h *HostInfo) setState(state nodeState) *HostInfo {
  268. h.mu.Lock()
  269. defer h.mu.Unlock()
  270. h.state = state
  271. return h
  272. }
  273. func (h *HostInfo) Tokens() []string {
  274. h.mu.RLock()
  275. defer h.mu.RUnlock()
  276. return h.tokens
  277. }
  278. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  279. h.mu.Lock()
  280. defer h.mu.Unlock()
  281. h.tokens = tokens
  282. return h
  283. }
  284. func (h *HostInfo) Port() int {
  285. h.mu.RLock()
  286. defer h.mu.RUnlock()
  287. return h.port
  288. }
  289. func (h *HostInfo) setPort(port int) *HostInfo {
  290. h.mu.Lock()
  291. defer h.mu.Unlock()
  292. h.port = port
  293. return h
  294. }
  295. func (h *HostInfo) update(from *HostInfo) {
  296. if h == from {
  297. return
  298. }
  299. h.mu.Lock()
  300. defer h.mu.Unlock()
  301. from.mu.RLock()
  302. defer from.mu.RUnlock()
  303. // autogenerated do not update
  304. if h.peer == nil {
  305. h.peer = from.peer
  306. }
  307. if h.broadcastAddress == nil {
  308. h.broadcastAddress = from.broadcastAddress
  309. }
  310. if h.listenAddress == nil {
  311. h.listenAddress = from.listenAddress
  312. }
  313. if h.rpcAddress == nil {
  314. h.rpcAddress = from.rpcAddress
  315. }
  316. if h.preferredIP == nil {
  317. h.preferredIP = from.preferredIP
  318. }
  319. if h.connectAddress == nil {
  320. h.connectAddress = from.connectAddress
  321. }
  322. if h.port == 0 {
  323. h.port = from.port
  324. }
  325. if h.dataCenter == "" {
  326. h.dataCenter = from.dataCenter
  327. }
  328. if h.rack == "" {
  329. h.rack = from.rack
  330. }
  331. if h.hostId == "" {
  332. h.hostId = from.hostId
  333. }
  334. if h.workload == "" {
  335. h.workload = from.workload
  336. }
  337. if h.dseVersion == "" {
  338. h.dseVersion = from.dseVersion
  339. }
  340. if h.partitioner == "" {
  341. h.partitioner = from.partitioner
  342. }
  343. if h.clusterName == "" {
  344. h.clusterName = from.clusterName
  345. }
  346. if h.version == (cassVersion{}) {
  347. h.version = from.version
  348. }
  349. if h.tokens == nil {
  350. h.tokens = from.tokens
  351. }
  352. }
  353. func (h *HostInfo) IsUp() bool {
  354. return h != nil && h.State() == NodeUp
  355. }
  356. func (h *HostInfo) HostnameAndPort() string {
  357. if h.hostname == "" {
  358. h.hostname = h.ConnectAddress().String()
  359. }
  360. return net.JoinHostPort(h.hostname, strconv.Itoa(h.port))
  361. }
  362. func (h *HostInfo) String() string {
  363. h.mu.RLock()
  364. defer h.mu.RUnlock()
  365. connectAddr, source := h.connectAddressLocked()
  366. return fmt.Sprintf("[HostInfo hostname=%q connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  367. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  368. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  369. h.hostname, h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  370. connectAddr, source,
  371. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  372. }
  373. // Polls system.peers at a specific interval to find new hosts
  374. type ringDescriber struct {
  375. session *Session
  376. mu sync.Mutex
  377. prevHosts []*HostInfo
  378. prevPartitioner string
  379. }
  380. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  381. func checkSystemSchema(control *controlConn) (bool, error) {
  382. iter := control.query("SELECT * FROM system_schema.keyspaces")
  383. if err := iter.err; err != nil {
  384. if errf, ok := err.(*errorFrame); ok {
  385. if errf.code == errSyntax {
  386. return false, nil
  387. }
  388. }
  389. return false, err
  390. }
  391. return true, nil
  392. }
  393. // Given a map that represents a row from either system.local or system.peers
  394. // return as much information as we can in *HostInfo
  395. func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) {
  396. const assertErrorMsg = "Assertion failed for %s"
  397. var ok bool
  398. // Default to our connected port if the cluster doesn't have port information
  399. for key, value := range row {
  400. switch key {
  401. case "data_center":
  402. host.dataCenter, ok = value.(string)
  403. if !ok {
  404. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  405. }
  406. case "rack":
  407. host.rack, ok = value.(string)
  408. if !ok {
  409. return nil, fmt.Errorf(assertErrorMsg, "rack")
  410. }
  411. case "host_id":
  412. hostId, ok := value.(UUID)
  413. if !ok {
  414. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  415. }
  416. host.hostId = hostId.String()
  417. case "release_version":
  418. version, ok := value.(string)
  419. if !ok {
  420. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  421. }
  422. host.version.Set(version)
  423. case "peer":
  424. ip, ok := value.(string)
  425. if !ok {
  426. return nil, fmt.Errorf(assertErrorMsg, "peer")
  427. }
  428. host.peer = net.ParseIP(ip)
  429. case "cluster_name":
  430. host.clusterName, ok = value.(string)
  431. if !ok {
  432. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  433. }
  434. case "partitioner":
  435. host.partitioner, ok = value.(string)
  436. if !ok {
  437. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  438. }
  439. case "broadcast_address":
  440. ip, ok := value.(string)
  441. if !ok {
  442. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  443. }
  444. host.broadcastAddress = net.ParseIP(ip)
  445. case "preferred_ip":
  446. ip, ok := value.(string)
  447. if !ok {
  448. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  449. }
  450. host.preferredIP = net.ParseIP(ip)
  451. case "rpc_address":
  452. ip, ok := value.(string)
  453. if !ok {
  454. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  455. }
  456. host.rpcAddress = net.ParseIP(ip)
  457. case "listen_address":
  458. ip, ok := value.(string)
  459. if !ok {
  460. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  461. }
  462. host.listenAddress = net.ParseIP(ip)
  463. case "workload":
  464. host.workload, ok = value.(string)
  465. if !ok {
  466. return nil, fmt.Errorf(assertErrorMsg, "workload")
  467. }
  468. case "graph":
  469. host.graph, ok = value.(bool)
  470. if !ok {
  471. return nil, fmt.Errorf(assertErrorMsg, "graph")
  472. }
  473. case "tokens":
  474. host.tokens, ok = value.([]string)
  475. if !ok {
  476. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  477. }
  478. case "dse_version":
  479. host.dseVersion, ok = value.(string)
  480. if !ok {
  481. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  482. }
  483. case "schema_version":
  484. schemaVersion, ok := value.(UUID)
  485. if !ok {
  486. return nil, fmt.Errorf(assertErrorMsg, "schema_version")
  487. }
  488. host.schemaVersion = schemaVersion.String()
  489. }
  490. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  491. // Not sure what the port field will be called until the JIRA issue is complete
  492. }
  493. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  494. host.connectAddress = ip
  495. host.port = port
  496. return host, nil
  497. }
  498. // Ask the control node for host info on all it's known peers
  499. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  500. var hosts []*HostInfo
  501. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  502. hosts = append(hosts, ch.host)
  503. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  504. })
  505. if iter == nil {
  506. return nil, errNoControl
  507. }
  508. rows, err := iter.SliceMap()
  509. if err != nil {
  510. // TODO(zariel): make typed error
  511. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  512. }
  513. for _, row := range rows {
  514. // extract all available info about the peer
  515. host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port})
  516. if err != nil {
  517. return nil, err
  518. } else if !isValidPeer(host) {
  519. // If it's not a valid peer
  520. Logger.Printf("Found invalid peer '%s' "+
  521. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  522. continue
  523. }
  524. hosts = append(hosts, host)
  525. }
  526. return hosts, nil
  527. }
  528. // Return true if the host is a valid peer
  529. func isValidPeer(host *HostInfo) bool {
  530. return !(len(host.RPCAddress()) == 0 ||
  531. host.hostId == "" ||
  532. host.dataCenter == "" ||
  533. host.rack == "" ||
  534. len(host.tokens) == 0)
  535. }
  536. // Return a list of hosts the cluster knows about
  537. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  538. r.mu.Lock()
  539. defer r.mu.Unlock()
  540. hosts, err := r.getClusterPeerInfo()
  541. if err != nil {
  542. return r.prevHosts, r.prevPartitioner, err
  543. }
  544. var partitioner string
  545. if len(hosts) > 0 {
  546. partitioner = hosts[0].Partitioner()
  547. }
  548. return hosts, partitioner, nil
  549. }
  550. // Given an ip/port return HostInfo for the specified ip/port
  551. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  552. var host *HostInfo
  553. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  554. if ch.host.ConnectAddress().Equal(ip) {
  555. host = ch.host
  556. return nil
  557. }
  558. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  559. })
  560. if iter != nil {
  561. rows, err := iter.SliceMap()
  562. if err != nil {
  563. return nil, err
  564. }
  565. for _, row := range rows {
  566. h, err := r.session.hostInfoFromMap(row, &HostInfo{connectAddress: ip, port: port})
  567. if err != nil {
  568. return nil, err
  569. }
  570. if h.ConnectAddress().Equal(ip) {
  571. host = h
  572. break
  573. }
  574. }
  575. if host == nil {
  576. return nil, errors.New("host not found in peers table")
  577. }
  578. }
  579. if host == nil {
  580. return nil, errors.New("unable to fetch host info: invalid control connection")
  581. } else if host.invalidConnectAddr() {
  582. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  583. }
  584. return host, nil
  585. }
  586. func (r *ringDescriber) refreshRing() error {
  587. // if we have 0 hosts this will return the previous list of hosts to
  588. // attempt to reconnect to the cluster otherwise we would never find
  589. // downed hosts again, could possibly have an optimisation to only
  590. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  591. hosts, partitioner, err := r.GetHosts()
  592. if err != nil {
  593. return err
  594. }
  595. prevHosts := r.session.ring.currentHosts()
  596. // TODO: move this to session
  597. for _, h := range hosts {
  598. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  599. continue
  600. }
  601. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  602. r.session.pool.addHost(h)
  603. r.session.policy.AddHost(h)
  604. } else {
  605. host.update(h)
  606. }
  607. delete(prevHosts, h.ConnectAddress().String())
  608. }
  609. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  610. // in a session so that everything sees a consistent state. Becuase as is today
  611. // events can come in and due to ordering an UP host could be removed from the cluster
  612. for _, host := range prevHosts {
  613. r.session.removeHost(host)
  614. }
  615. r.session.metadata.setPartitioner(partitioner)
  616. r.session.policy.SetPartitioner(partitioner)
  617. return nil
  618. }