host_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. package gocql
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type nodeState int32
  13. func (n nodeState) String() string {
  14. if n == NodeUp {
  15. return "UP"
  16. } else if n == NodeDown {
  17. return "DOWN"
  18. }
  19. return fmt.Sprintf("UNKNOWN_%d", n)
  20. }
  21. const (
  22. NodeUp nodeState = iota
  23. NodeDown
  24. )
  25. type cassVersion struct {
  26. Major, Minor, Patch int
  27. }
  28. func (c *cassVersion) Set(v string) error {
  29. if v == "" {
  30. return nil
  31. }
  32. return c.UnmarshalCQL(nil, []byte(v))
  33. }
  34. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  35. return c.unmarshal(data)
  36. }
  37. func (c *cassVersion) unmarshal(data []byte) error {
  38. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  39. version = strings.TrimPrefix(version, "v")
  40. v := strings.Split(version, ".")
  41. if len(v) < 2 {
  42. return fmt.Errorf("invalid version string: %s", data)
  43. }
  44. var err error
  45. c.Major, err = strconv.Atoi(v[0])
  46. if err != nil {
  47. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  48. }
  49. c.Minor, err = strconv.Atoi(v[1])
  50. if err != nil {
  51. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  52. }
  53. if len(v) > 2 {
  54. c.Patch, err = strconv.Atoi(v[2])
  55. if err != nil {
  56. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  57. }
  58. }
  59. return nil
  60. }
  61. func (c cassVersion) Before(major, minor, patch int) bool {
  62. // We're comparing us (cassVersion) with the provided version (major, minor, patch)
  63. // We return true if our version is lower (comes before) than the provided one.
  64. if c.Major < major {
  65. return true
  66. } else if c.Major == major {
  67. if c.Minor < minor {
  68. return true
  69. } else if c.Minor == minor && c.Patch < patch {
  70. return true
  71. }
  72. }
  73. return false
  74. }
  75. func (c cassVersion) AtLeast(major, minor, patch int) bool {
  76. return !c.Before(major, minor, patch)
  77. }
  78. func (c cassVersion) String() string {
  79. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  80. }
  81. func (c cassVersion) nodeUpDelay() time.Duration {
  82. if c.Major >= 2 && c.Minor >= 2 {
  83. // CASSANDRA-8236
  84. return 0
  85. }
  86. return 10 * time.Second
  87. }
  88. type HostInfo struct {
  89. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  90. // that we are thread safe use a mutex to access all fields.
  91. mu sync.RWMutex
  92. hostname string
  93. peer net.IP
  94. broadcastAddress net.IP
  95. listenAddress net.IP
  96. rpcAddress net.IP
  97. preferredIP net.IP
  98. connectAddress net.IP
  99. port int
  100. dataCenter string
  101. rack string
  102. hostId string
  103. workload string
  104. graph bool
  105. dseVersion string
  106. partitioner string
  107. clusterName string
  108. version cassVersion
  109. state nodeState
  110. tokens []string
  111. }
  112. func (h *HostInfo) Equal(host *HostInfo) bool {
  113. if h == host {
  114. // prevent rlock reentry
  115. return true
  116. }
  117. return h.ConnectAddress().Equal(host.ConnectAddress())
  118. }
  119. func (h *HostInfo) Peer() net.IP {
  120. h.mu.RLock()
  121. defer h.mu.RUnlock()
  122. return h.peer
  123. }
  124. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  125. h.mu.Lock()
  126. defer h.mu.Unlock()
  127. h.peer = peer
  128. return h
  129. }
  130. func (h *HostInfo) invalidConnectAddr() bool {
  131. h.mu.RLock()
  132. defer h.mu.RUnlock()
  133. addr, _ := h.connectAddressLocked()
  134. return !validIpAddr(addr)
  135. }
  136. func validIpAddr(addr net.IP) bool {
  137. return addr != nil && !addr.IsUnspecified()
  138. }
  139. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  140. if validIpAddr(h.connectAddress) {
  141. return h.connectAddress, "connect_address"
  142. } else if validIpAddr(h.rpcAddress) {
  143. return h.rpcAddress, "rpc_adress"
  144. } else if validIpAddr(h.preferredIP) {
  145. // where does perferred_ip get set?
  146. return h.preferredIP, "preferred_ip"
  147. } else if validIpAddr(h.broadcastAddress) {
  148. return h.broadcastAddress, "broadcast_address"
  149. } else if validIpAddr(h.peer) {
  150. return h.peer, "peer"
  151. }
  152. return net.IPv4zero, "invalid"
  153. }
  154. // Returns the address that should be used to connect to the host.
  155. // If you wish to override this, use an AddressTranslator or
  156. // use a HostFilter to SetConnectAddress()
  157. func (h *HostInfo) ConnectAddress() net.IP {
  158. h.mu.RLock()
  159. defer h.mu.RUnlock()
  160. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  161. return addr
  162. }
  163. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  164. }
  165. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  166. // TODO(zariel): should this not be exported?
  167. h.mu.Lock()
  168. defer h.mu.Unlock()
  169. h.connectAddress = address
  170. return h
  171. }
  172. func (h *HostInfo) BroadcastAddress() net.IP {
  173. h.mu.RLock()
  174. defer h.mu.RUnlock()
  175. return h.broadcastAddress
  176. }
  177. func (h *HostInfo) ListenAddress() net.IP {
  178. h.mu.RLock()
  179. defer h.mu.RUnlock()
  180. return h.listenAddress
  181. }
  182. func (h *HostInfo) RPCAddress() net.IP {
  183. h.mu.RLock()
  184. defer h.mu.RUnlock()
  185. return h.rpcAddress
  186. }
  187. func (h *HostInfo) PreferredIP() net.IP {
  188. h.mu.RLock()
  189. defer h.mu.RUnlock()
  190. return h.preferredIP
  191. }
  192. func (h *HostInfo) DataCenter() string {
  193. h.mu.RLock()
  194. defer h.mu.RUnlock()
  195. return h.dataCenter
  196. }
  197. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  198. h.mu.Lock()
  199. defer h.mu.Unlock()
  200. h.dataCenter = dataCenter
  201. return h
  202. }
  203. func (h *HostInfo) Rack() string {
  204. h.mu.RLock()
  205. defer h.mu.RUnlock()
  206. return h.rack
  207. }
  208. func (h *HostInfo) setRack(rack string) *HostInfo {
  209. h.mu.Lock()
  210. defer h.mu.Unlock()
  211. h.rack = rack
  212. return h
  213. }
  214. func (h *HostInfo) HostID() string {
  215. h.mu.RLock()
  216. defer h.mu.RUnlock()
  217. return h.hostId
  218. }
  219. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  220. h.mu.Lock()
  221. defer h.mu.Unlock()
  222. h.hostId = hostID
  223. return h
  224. }
  225. func (h *HostInfo) WorkLoad() string {
  226. h.mu.RLock()
  227. defer h.mu.RUnlock()
  228. return h.workload
  229. }
  230. func (h *HostInfo) Graph() bool {
  231. h.mu.RLock()
  232. defer h.mu.RUnlock()
  233. return h.graph
  234. }
  235. func (h *HostInfo) DSEVersion() string {
  236. h.mu.RLock()
  237. defer h.mu.RUnlock()
  238. return h.dseVersion
  239. }
  240. func (h *HostInfo) Partitioner() string {
  241. h.mu.RLock()
  242. defer h.mu.RUnlock()
  243. return h.partitioner
  244. }
  245. func (h *HostInfo) ClusterName() string {
  246. h.mu.RLock()
  247. defer h.mu.RUnlock()
  248. return h.clusterName
  249. }
  250. func (h *HostInfo) Version() cassVersion {
  251. h.mu.RLock()
  252. defer h.mu.RUnlock()
  253. return h.version
  254. }
  255. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  256. h.mu.Lock()
  257. defer h.mu.Unlock()
  258. h.version = cassVersion{major, minor, patch}
  259. return h
  260. }
  261. func (h *HostInfo) State() nodeState {
  262. h.mu.RLock()
  263. defer h.mu.RUnlock()
  264. return h.state
  265. }
  266. func (h *HostInfo) setState(state nodeState) *HostInfo {
  267. h.mu.Lock()
  268. defer h.mu.Unlock()
  269. h.state = state
  270. return h
  271. }
  272. func (h *HostInfo) Tokens() []string {
  273. h.mu.RLock()
  274. defer h.mu.RUnlock()
  275. return h.tokens
  276. }
  277. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  278. h.mu.Lock()
  279. defer h.mu.Unlock()
  280. h.tokens = tokens
  281. return h
  282. }
  283. func (h *HostInfo) Port() int {
  284. h.mu.RLock()
  285. defer h.mu.RUnlock()
  286. return h.port
  287. }
  288. func (h *HostInfo) setPort(port int) *HostInfo {
  289. h.mu.Lock()
  290. defer h.mu.Unlock()
  291. h.port = port
  292. return h
  293. }
  294. func (h *HostInfo) update(from *HostInfo) {
  295. if h == from {
  296. return
  297. }
  298. h.mu.Lock()
  299. defer h.mu.Unlock()
  300. from.mu.RLock()
  301. defer from.mu.RUnlock()
  302. // autogenerated do not update
  303. if h.peer == nil {
  304. h.peer = from.peer
  305. }
  306. if h.broadcastAddress == nil {
  307. h.broadcastAddress = from.broadcastAddress
  308. }
  309. if h.listenAddress == nil {
  310. h.listenAddress = from.listenAddress
  311. }
  312. if h.rpcAddress == nil {
  313. h.rpcAddress = from.rpcAddress
  314. }
  315. if h.preferredIP == nil {
  316. h.preferredIP = from.preferredIP
  317. }
  318. if h.connectAddress == nil {
  319. h.connectAddress = from.connectAddress
  320. }
  321. if h.port == 0 {
  322. h.port = from.port
  323. }
  324. if h.dataCenter == "" {
  325. h.dataCenter = from.dataCenter
  326. }
  327. if h.rack == "" {
  328. h.rack = from.rack
  329. }
  330. if h.hostId == "" {
  331. h.hostId = from.hostId
  332. }
  333. if h.workload == "" {
  334. h.workload = from.workload
  335. }
  336. if h.dseVersion == "" {
  337. h.dseVersion = from.dseVersion
  338. }
  339. if h.partitioner == "" {
  340. h.partitioner = from.partitioner
  341. }
  342. if h.clusterName == "" {
  343. h.clusterName = from.clusterName
  344. }
  345. if h.version == (cassVersion{}) {
  346. h.version = from.version
  347. }
  348. if h.tokens == nil {
  349. h.tokens = from.tokens
  350. }
  351. }
  352. func (h *HostInfo) IsUp() bool {
  353. return h != nil && h.State() == NodeUp
  354. }
  355. func (h *HostInfo) HostnameAndPort() string {
  356. if h.hostname == "" {
  357. h.hostname = h.ConnectAddress().String()
  358. }
  359. return net.JoinHostPort(h.hostname, strconv.Itoa(h.port))
  360. }
  361. func (h *HostInfo) String() string {
  362. h.mu.RLock()
  363. defer h.mu.RUnlock()
  364. connectAddr, source := h.connectAddressLocked()
  365. return fmt.Sprintf("[HostInfo hostname=%q connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  366. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  367. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  368. h.hostname, h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  369. connectAddr, source,
  370. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  371. }
  372. // Polls system.peers at a specific interval to find new hosts
  373. type ringDescriber struct {
  374. session *Session
  375. mu sync.Mutex
  376. prevHosts []*HostInfo
  377. prevPartitioner string
  378. }
  379. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  380. func checkSystemSchema(control *controlConn) (bool, error) {
  381. iter := control.query("SELECT * FROM system_schema.keyspaces")
  382. if err := iter.err; err != nil {
  383. if errf, ok := err.(*errorFrame); ok {
  384. if errf.code == errSyntax {
  385. return false, nil
  386. }
  387. }
  388. return false, err
  389. }
  390. return true, nil
  391. }
  392. // Given a map that represents a row from either system.local or system.peers
  393. // return as much information as we can in *HostInfo
  394. func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) {
  395. const assertErrorMsg = "Assertion failed for %s"
  396. var ok bool
  397. // Default to our connected port if the cluster doesn't have port information
  398. for key, value := range row {
  399. switch key {
  400. case "data_center":
  401. host.dataCenter, ok = value.(string)
  402. if !ok {
  403. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  404. }
  405. case "rack":
  406. host.rack, ok = value.(string)
  407. if !ok {
  408. return nil, fmt.Errorf(assertErrorMsg, "rack")
  409. }
  410. case "host_id":
  411. hostId, ok := value.(UUID)
  412. if !ok {
  413. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  414. }
  415. host.hostId = hostId.String()
  416. case "release_version":
  417. version, ok := value.(string)
  418. if !ok {
  419. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  420. }
  421. host.version.Set(version)
  422. case "peer":
  423. ip, ok := value.(string)
  424. if !ok {
  425. return nil, fmt.Errorf(assertErrorMsg, "peer")
  426. }
  427. host.peer = net.ParseIP(ip)
  428. case "cluster_name":
  429. host.clusterName, ok = value.(string)
  430. if !ok {
  431. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  432. }
  433. case "partitioner":
  434. host.partitioner, ok = value.(string)
  435. if !ok {
  436. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  437. }
  438. case "broadcast_address":
  439. ip, ok := value.(string)
  440. if !ok {
  441. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  442. }
  443. host.broadcastAddress = net.ParseIP(ip)
  444. case "preferred_ip":
  445. ip, ok := value.(string)
  446. if !ok {
  447. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  448. }
  449. host.preferredIP = net.ParseIP(ip)
  450. case "rpc_address":
  451. ip, ok := value.(string)
  452. if !ok {
  453. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  454. }
  455. host.rpcAddress = net.ParseIP(ip)
  456. case "listen_address":
  457. ip, ok := value.(string)
  458. if !ok {
  459. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  460. }
  461. host.listenAddress = net.ParseIP(ip)
  462. case "workload":
  463. host.workload, ok = value.(string)
  464. if !ok {
  465. return nil, fmt.Errorf(assertErrorMsg, "workload")
  466. }
  467. case "graph":
  468. host.graph, ok = value.(bool)
  469. if !ok {
  470. return nil, fmt.Errorf(assertErrorMsg, "graph")
  471. }
  472. case "tokens":
  473. host.tokens, ok = value.([]string)
  474. if !ok {
  475. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  476. }
  477. case "dse_version":
  478. host.dseVersion, ok = value.(string)
  479. if !ok {
  480. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  481. }
  482. }
  483. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  484. // Not sure what the port field will be called until the JIRA issue is complete
  485. }
  486. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  487. host.connectAddress = ip
  488. host.port = port
  489. return host, nil
  490. }
  491. // Ask the control node for host info on all it's known peers
  492. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  493. var hosts []*HostInfo
  494. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  495. hosts = append(hosts, ch.host)
  496. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  497. })
  498. if iter == nil {
  499. return nil, errNoControl
  500. }
  501. rows, err := iter.SliceMap()
  502. if err != nil {
  503. // TODO(zariel): make typed error
  504. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  505. }
  506. for _, row := range rows {
  507. // extract all available info about the peer
  508. host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port})
  509. if err != nil {
  510. return nil, err
  511. } else if !isValidPeer(host) {
  512. // If it's not a valid peer
  513. Logger.Printf("Found invalid peer '%s' "+
  514. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  515. continue
  516. }
  517. hosts = append(hosts, host)
  518. }
  519. return hosts, nil
  520. }
  521. // Return true if the host is a valid peer
  522. func isValidPeer(host *HostInfo) bool {
  523. return !(len(host.RPCAddress()) == 0 ||
  524. host.hostId == "" ||
  525. host.dataCenter == "" ||
  526. host.rack == "" ||
  527. len(host.tokens) == 0)
  528. }
  529. // Return a list of hosts the cluster knows about
  530. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  531. r.mu.Lock()
  532. defer r.mu.Unlock()
  533. hosts, err := r.getClusterPeerInfo()
  534. if err != nil {
  535. return r.prevHosts, r.prevPartitioner, err
  536. }
  537. var partitioner string
  538. if len(hosts) > 0 {
  539. partitioner = hosts[0].Partitioner()
  540. }
  541. return hosts, partitioner, nil
  542. }
  543. // Given an ip/port return HostInfo for the specified ip/port
  544. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  545. var host *HostInfo
  546. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  547. if ch.host.ConnectAddress().Equal(ip) {
  548. host = ch.host
  549. return nil
  550. }
  551. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  552. })
  553. if iter != nil {
  554. rows, err := iter.SliceMap()
  555. if err != nil {
  556. return nil, err
  557. }
  558. for _, row := range rows {
  559. h, err := r.session.hostInfoFromMap(row, &HostInfo{connectAddress: ip, port: port})
  560. if err != nil {
  561. return nil, err
  562. }
  563. if h.ConnectAddress().Equal(ip) {
  564. host = h
  565. break
  566. }
  567. }
  568. if host == nil {
  569. return nil, errors.New("host not found in peers table")
  570. }
  571. }
  572. if host == nil {
  573. return nil, errors.New("unable to fetch host info: invalid control connection")
  574. } else if host.invalidConnectAddr() {
  575. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  576. }
  577. return host, nil
  578. }
  579. func (r *ringDescriber) refreshRing() error {
  580. // if we have 0 hosts this will return the previous list of hosts to
  581. // attempt to reconnect to the cluster otherwise we would never find
  582. // downed hosts again, could possibly have an optimisation to only
  583. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  584. hosts, partitioner, err := r.GetHosts()
  585. if err != nil {
  586. return err
  587. }
  588. prevHosts := r.session.ring.currentHosts()
  589. // TODO: move this to session
  590. for _, h := range hosts {
  591. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  592. continue
  593. }
  594. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  595. r.session.pool.addHost(h)
  596. r.session.policy.AddHost(h)
  597. } else {
  598. host.update(h)
  599. }
  600. delete(prevHosts, h.ConnectAddress().String())
  601. }
  602. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  603. // in a session so that everything sees a consistent state. Becuase as is today
  604. // events can come in and due to ordering an UP host could be removed from the cluster
  605. for _, host := range prevHosts {
  606. r.session.removeHost(host)
  607. }
  608. r.session.metadata.setPartitioner(partitioner)
  609. r.session.policy.SetPartitioner(partitioner)
  610. return nil
  611. }