host_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. package gocql
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type nodeState int32
  13. func (n nodeState) String() string {
  14. if n == NodeUp {
  15. return "UP"
  16. } else if n == NodeDown {
  17. return "DOWN"
  18. }
  19. return fmt.Sprintf("UNKNOWN_%d", n)
  20. }
  21. const (
  22. NodeUp nodeState = iota
  23. NodeDown
  24. )
  25. type cassVersion struct {
  26. Major, Minor, Patch int
  27. }
  28. func (c *cassVersion) Set(v string) error {
  29. if v == "" {
  30. return nil
  31. }
  32. return c.UnmarshalCQL(nil, []byte(v))
  33. }
  34. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  35. return c.unmarshal(data)
  36. }
  37. func (c *cassVersion) unmarshal(data []byte) error {
  38. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  39. version = strings.TrimPrefix(version, "v")
  40. v := strings.Split(version, ".")
  41. if len(v) < 2 {
  42. return fmt.Errorf("invalid version string: %s", data)
  43. }
  44. var err error
  45. c.Major, err = strconv.Atoi(v[0])
  46. if err != nil {
  47. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  48. }
  49. c.Minor, err = strconv.Atoi(v[1])
  50. if err != nil {
  51. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  52. }
  53. if len(v) > 2 {
  54. c.Patch, err = strconv.Atoi(v[2])
  55. if err != nil {
  56. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  57. }
  58. }
  59. return nil
  60. }
  61. func (c cassVersion) Before(major, minor, patch int) bool {
  62. // We're comparing us (cassVersion) with the provided version (major, minor, patch)
  63. // We return true if our version is lower (comes before) than the provided one.
  64. if c.Major < major {
  65. return true
  66. } else if c.Major == major {
  67. if c.Minor < minor {
  68. return true
  69. } else if c.Minor == minor && c.Patch < patch {
  70. return true
  71. }
  72. }
  73. return false
  74. }
  75. func (c cassVersion) AtLeast(major, minor, patch int) bool {
  76. return !c.Before(major, minor, patch)
  77. }
  78. func (c cassVersion) String() string {
  79. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  80. }
  81. func (c cassVersion) nodeUpDelay() time.Duration {
  82. if c.Major >= 2 && c.Minor >= 2 {
  83. // CASSANDRA-8236
  84. return 0
  85. }
  86. return 10 * time.Second
  87. }
  88. type HostInfo struct {
  89. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  90. // that we are thread safe use a mutex to access all fields.
  91. mu sync.RWMutex
  92. hostname string
  93. peer net.IP
  94. broadcastAddress net.IP
  95. listenAddress net.IP
  96. rpcAddress net.IP
  97. preferredIP net.IP
  98. connectAddress net.IP
  99. port int
  100. dataCenter string
  101. rack string
  102. hostId string
  103. workload string
  104. graph bool
  105. dseVersion string
  106. partitioner string
  107. clusterName string
  108. version cassVersion
  109. state nodeState
  110. schemaVersion string
  111. tokens []string
  112. }
  113. func (h *HostInfo) Equal(host *HostInfo) bool {
  114. if h == host {
  115. // prevent rlock reentry
  116. return true
  117. }
  118. return h.ConnectAddress().Equal(host.ConnectAddress())
  119. }
  120. func (h *HostInfo) Peer() net.IP {
  121. h.mu.RLock()
  122. defer h.mu.RUnlock()
  123. return h.peer
  124. }
  125. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  126. h.mu.Lock()
  127. defer h.mu.Unlock()
  128. h.peer = peer
  129. return h
  130. }
  131. func (h *HostInfo) invalidConnectAddr() bool {
  132. h.mu.RLock()
  133. defer h.mu.RUnlock()
  134. addr, _ := h.connectAddressLocked()
  135. return !validIpAddr(addr)
  136. }
  137. func validIpAddr(addr net.IP) bool {
  138. return addr != nil && !addr.IsUnspecified()
  139. }
  140. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  141. if validIpAddr(h.connectAddress) {
  142. return h.connectAddress, "connect_address"
  143. } else if validIpAddr(h.rpcAddress) {
  144. return h.rpcAddress, "rpc_adress"
  145. } else if validIpAddr(h.preferredIP) {
  146. // where does perferred_ip get set?
  147. return h.preferredIP, "preferred_ip"
  148. } else if validIpAddr(h.broadcastAddress) {
  149. return h.broadcastAddress, "broadcast_address"
  150. } else if validIpAddr(h.peer) {
  151. return h.peer, "peer"
  152. }
  153. return net.IPv4zero, "invalid"
  154. }
  155. // Returns the address that should be used to connect to the host.
  156. // If you wish to override this, use an AddressTranslator or
  157. // use a HostFilter to SetConnectAddress()
  158. func (h *HostInfo) ConnectAddress() net.IP {
  159. h.mu.RLock()
  160. defer h.mu.RUnlock()
  161. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  162. return addr
  163. }
  164. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  165. }
  166. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  167. // TODO(zariel): should this not be exported?
  168. h.mu.Lock()
  169. defer h.mu.Unlock()
  170. h.connectAddress = address
  171. return h
  172. }
  173. func (h *HostInfo) BroadcastAddress() net.IP {
  174. h.mu.RLock()
  175. defer h.mu.RUnlock()
  176. return h.broadcastAddress
  177. }
  178. func (h *HostInfo) ListenAddress() net.IP {
  179. h.mu.RLock()
  180. defer h.mu.RUnlock()
  181. return h.listenAddress
  182. }
  183. func (h *HostInfo) RPCAddress() net.IP {
  184. h.mu.RLock()
  185. defer h.mu.RUnlock()
  186. return h.rpcAddress
  187. }
  188. func (h *HostInfo) PreferredIP() net.IP {
  189. h.mu.RLock()
  190. defer h.mu.RUnlock()
  191. return h.preferredIP
  192. }
  193. func (h *HostInfo) DataCenter() string {
  194. h.mu.RLock()
  195. dc := h.dataCenter
  196. h.mu.RUnlock()
  197. return dc
  198. }
  199. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  200. h.mu.Lock()
  201. defer h.mu.Unlock()
  202. h.dataCenter = dataCenter
  203. return h
  204. }
  205. func (h *HostInfo) Rack() string {
  206. h.mu.RLock()
  207. rack := h.rack
  208. h.mu.RUnlock()
  209. return rack
  210. }
  211. func (h *HostInfo) setRack(rack string) *HostInfo {
  212. h.mu.Lock()
  213. defer h.mu.Unlock()
  214. h.rack = rack
  215. return h
  216. }
  217. func (h *HostInfo) HostID() string {
  218. h.mu.RLock()
  219. defer h.mu.RUnlock()
  220. return h.hostId
  221. }
  222. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  223. h.mu.Lock()
  224. defer h.mu.Unlock()
  225. h.hostId = hostID
  226. return h
  227. }
  228. func (h *HostInfo) WorkLoad() string {
  229. h.mu.RLock()
  230. defer h.mu.RUnlock()
  231. return h.workload
  232. }
  233. func (h *HostInfo) Graph() bool {
  234. h.mu.RLock()
  235. defer h.mu.RUnlock()
  236. return h.graph
  237. }
  238. func (h *HostInfo) DSEVersion() string {
  239. h.mu.RLock()
  240. defer h.mu.RUnlock()
  241. return h.dseVersion
  242. }
  243. func (h *HostInfo) Partitioner() string {
  244. h.mu.RLock()
  245. defer h.mu.RUnlock()
  246. return h.partitioner
  247. }
  248. func (h *HostInfo) ClusterName() string {
  249. h.mu.RLock()
  250. defer h.mu.RUnlock()
  251. return h.clusterName
  252. }
  253. func (h *HostInfo) Version() cassVersion {
  254. h.mu.RLock()
  255. defer h.mu.RUnlock()
  256. return h.version
  257. }
  258. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  259. h.mu.Lock()
  260. defer h.mu.Unlock()
  261. h.version = cassVersion{major, minor, patch}
  262. return h
  263. }
  264. func (h *HostInfo) State() nodeState {
  265. h.mu.RLock()
  266. defer h.mu.RUnlock()
  267. return h.state
  268. }
  269. func (h *HostInfo) setState(state nodeState) *HostInfo {
  270. h.mu.Lock()
  271. defer h.mu.Unlock()
  272. h.state = state
  273. return h
  274. }
  275. func (h *HostInfo) Tokens() []string {
  276. h.mu.RLock()
  277. defer h.mu.RUnlock()
  278. return h.tokens
  279. }
  280. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  281. h.mu.Lock()
  282. defer h.mu.Unlock()
  283. h.tokens = tokens
  284. return h
  285. }
  286. func (h *HostInfo) Port() int {
  287. h.mu.RLock()
  288. defer h.mu.RUnlock()
  289. return h.port
  290. }
  291. func (h *HostInfo) setPort(port int) *HostInfo {
  292. h.mu.Lock()
  293. defer h.mu.Unlock()
  294. h.port = port
  295. return h
  296. }
  297. func (h *HostInfo) update(from *HostInfo) {
  298. if h == from {
  299. return
  300. }
  301. h.mu.Lock()
  302. defer h.mu.Unlock()
  303. from.mu.RLock()
  304. defer from.mu.RUnlock()
  305. // autogenerated do not update
  306. if h.peer == nil {
  307. h.peer = from.peer
  308. }
  309. if h.broadcastAddress == nil {
  310. h.broadcastAddress = from.broadcastAddress
  311. }
  312. if h.listenAddress == nil {
  313. h.listenAddress = from.listenAddress
  314. }
  315. if h.rpcAddress == nil {
  316. h.rpcAddress = from.rpcAddress
  317. }
  318. if h.preferredIP == nil {
  319. h.preferredIP = from.preferredIP
  320. }
  321. if h.connectAddress == nil {
  322. h.connectAddress = from.connectAddress
  323. }
  324. if h.port == 0 {
  325. h.port = from.port
  326. }
  327. if h.dataCenter == "" {
  328. h.dataCenter = from.dataCenter
  329. }
  330. if h.rack == "" {
  331. h.rack = from.rack
  332. }
  333. if h.hostId == "" {
  334. h.hostId = from.hostId
  335. }
  336. if h.workload == "" {
  337. h.workload = from.workload
  338. }
  339. if h.dseVersion == "" {
  340. h.dseVersion = from.dseVersion
  341. }
  342. if h.partitioner == "" {
  343. h.partitioner = from.partitioner
  344. }
  345. if h.clusterName == "" {
  346. h.clusterName = from.clusterName
  347. }
  348. if h.version == (cassVersion{}) {
  349. h.version = from.version
  350. }
  351. if h.tokens == nil {
  352. h.tokens = from.tokens
  353. }
  354. }
  355. func (h *HostInfo) IsUp() bool {
  356. return h != nil && h.State() == NodeUp
  357. }
  358. func (h *HostInfo) HostnameAndPort() string {
  359. if h.hostname == "" {
  360. h.hostname = h.ConnectAddress().String()
  361. }
  362. return net.JoinHostPort(h.hostname, strconv.Itoa(h.port))
  363. }
  364. func (h *HostInfo) String() string {
  365. h.mu.RLock()
  366. defer h.mu.RUnlock()
  367. connectAddr, source := h.connectAddressLocked()
  368. return fmt.Sprintf("[HostInfo hostname=%q connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  369. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  370. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  371. h.hostname, h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  372. connectAddr, source,
  373. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  374. }
  375. // Polls system.peers at a specific interval to find new hosts
  376. type ringDescriber struct {
  377. session *Session
  378. mu sync.Mutex
  379. prevHosts []*HostInfo
  380. prevPartitioner string
  381. }
  382. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  383. func checkSystemSchema(control *controlConn) (bool, error) {
  384. iter := control.query("SELECT * FROM system_schema.keyspaces")
  385. if err := iter.err; err != nil {
  386. if errf, ok := err.(*errorFrame); ok {
  387. if errf.code == errSyntax {
  388. return false, nil
  389. }
  390. }
  391. return false, err
  392. }
  393. return true, nil
  394. }
  395. // Given a map that represents a row from either system.local or system.peers
  396. // return as much information as we can in *HostInfo
  397. func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) {
  398. const assertErrorMsg = "Assertion failed for %s"
  399. var ok bool
  400. // Default to our connected port if the cluster doesn't have port information
  401. for key, value := range row {
  402. switch key {
  403. case "data_center":
  404. host.dataCenter, ok = value.(string)
  405. if !ok {
  406. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  407. }
  408. case "rack":
  409. host.rack, ok = value.(string)
  410. if !ok {
  411. return nil, fmt.Errorf(assertErrorMsg, "rack")
  412. }
  413. case "host_id":
  414. hostId, ok := value.(UUID)
  415. if !ok {
  416. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  417. }
  418. host.hostId = hostId.String()
  419. case "release_version":
  420. version, ok := value.(string)
  421. if !ok {
  422. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  423. }
  424. host.version.Set(version)
  425. case "peer":
  426. ip, ok := value.(string)
  427. if !ok {
  428. return nil, fmt.Errorf(assertErrorMsg, "peer")
  429. }
  430. host.peer = net.ParseIP(ip)
  431. case "cluster_name":
  432. host.clusterName, ok = value.(string)
  433. if !ok {
  434. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  435. }
  436. case "partitioner":
  437. host.partitioner, ok = value.(string)
  438. if !ok {
  439. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  440. }
  441. case "broadcast_address":
  442. ip, ok := value.(string)
  443. if !ok {
  444. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  445. }
  446. host.broadcastAddress = net.ParseIP(ip)
  447. case "preferred_ip":
  448. ip, ok := value.(string)
  449. if !ok {
  450. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  451. }
  452. host.preferredIP = net.ParseIP(ip)
  453. case "rpc_address":
  454. ip, ok := value.(string)
  455. if !ok {
  456. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  457. }
  458. host.rpcAddress = net.ParseIP(ip)
  459. case "listen_address":
  460. ip, ok := value.(string)
  461. if !ok {
  462. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  463. }
  464. host.listenAddress = net.ParseIP(ip)
  465. case "workload":
  466. host.workload, ok = value.(string)
  467. if !ok {
  468. return nil, fmt.Errorf(assertErrorMsg, "workload")
  469. }
  470. case "graph":
  471. host.graph, ok = value.(bool)
  472. if !ok {
  473. return nil, fmt.Errorf(assertErrorMsg, "graph")
  474. }
  475. case "tokens":
  476. host.tokens, ok = value.([]string)
  477. if !ok {
  478. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  479. }
  480. case "dse_version":
  481. host.dseVersion, ok = value.(string)
  482. if !ok {
  483. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  484. }
  485. case "schema_version":
  486. schemaVersion, ok := value.(UUID)
  487. if !ok {
  488. return nil, fmt.Errorf(assertErrorMsg, "schema_version")
  489. }
  490. host.schemaVersion = schemaVersion.String()
  491. }
  492. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  493. // Not sure what the port field will be called until the JIRA issue is complete
  494. }
  495. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  496. host.connectAddress = ip
  497. host.port = port
  498. return host, nil
  499. }
  500. // Ask the control node for host info on all it's known peers
  501. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  502. var hosts []*HostInfo
  503. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  504. hosts = append(hosts, ch.host)
  505. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  506. })
  507. if iter == nil {
  508. return nil, errNoControl
  509. }
  510. rows, err := iter.SliceMap()
  511. if err != nil {
  512. // TODO(zariel): make typed error
  513. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  514. }
  515. for _, row := range rows {
  516. // extract all available info about the peer
  517. host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port})
  518. if err != nil {
  519. return nil, err
  520. } else if !isValidPeer(host) {
  521. // If it's not a valid peer
  522. Logger.Printf("Found invalid peer '%s' "+
  523. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  524. continue
  525. }
  526. hosts = append(hosts, host)
  527. }
  528. return hosts, nil
  529. }
  530. // Return true if the host is a valid peer
  531. func isValidPeer(host *HostInfo) bool {
  532. return !(len(host.RPCAddress()) == 0 ||
  533. host.hostId == "" ||
  534. host.dataCenter == "" ||
  535. host.rack == "" ||
  536. len(host.tokens) == 0)
  537. }
  538. // Return a list of hosts the cluster knows about
  539. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  540. r.mu.Lock()
  541. defer r.mu.Unlock()
  542. hosts, err := r.getClusterPeerInfo()
  543. if err != nil {
  544. return r.prevHosts, r.prevPartitioner, err
  545. }
  546. var partitioner string
  547. if len(hosts) > 0 {
  548. partitioner = hosts[0].Partitioner()
  549. }
  550. return hosts, partitioner, nil
  551. }
  552. // Given an ip/port return HostInfo for the specified ip/port
  553. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  554. var host *HostInfo
  555. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  556. if ch.host.ConnectAddress().Equal(ip) {
  557. host = ch.host
  558. return nil
  559. }
  560. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  561. })
  562. if iter != nil {
  563. rows, err := iter.SliceMap()
  564. if err != nil {
  565. return nil, err
  566. }
  567. for _, row := range rows {
  568. h, err := r.session.hostInfoFromMap(row, &HostInfo{port: port})
  569. if err != nil {
  570. return nil, err
  571. }
  572. if h.ConnectAddress().Equal(ip) {
  573. host = h
  574. break
  575. }
  576. }
  577. if host == nil {
  578. return nil, errors.New("host not found in peers table")
  579. }
  580. }
  581. if host == nil {
  582. return nil, errors.New("unable to fetch host info: invalid control connection")
  583. } else if host.invalidConnectAddr() {
  584. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  585. }
  586. return host, nil
  587. }
  588. func (r *ringDescriber) refreshRing() error {
  589. // if we have 0 hosts this will return the previous list of hosts to
  590. // attempt to reconnect to the cluster otherwise we would never find
  591. // downed hosts again, could possibly have an optimisation to only
  592. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  593. hosts, partitioner, err := r.GetHosts()
  594. if err != nil {
  595. return err
  596. }
  597. prevHosts := r.session.ring.currentHosts()
  598. // TODO: move this to session
  599. for _, h := range hosts {
  600. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  601. continue
  602. }
  603. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  604. r.session.pool.addHost(h)
  605. r.session.policy.AddHost(h)
  606. } else {
  607. host.update(h)
  608. }
  609. delete(prevHosts, h.ConnectAddress().String())
  610. }
  611. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  612. // in a session so that everything sees a consistent state. Becuase as is today
  613. // events can come in and due to ordering an UP host could be removed from the cluster
  614. for _, host := range prevHosts {
  615. r.session.removeHost(host)
  616. }
  617. r.session.metadata.setPartitioner(partitioner)
  618. r.session.policy.SetPartitioner(partitioner)
  619. return nil
  620. }