host_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. package gocql
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type nodeState int32
  13. func (n nodeState) String() string {
  14. if n == NodeUp {
  15. return "UP"
  16. } else if n == NodeDown {
  17. return "DOWN"
  18. }
  19. return fmt.Sprintf("UNKNOWN_%d", n)
  20. }
  21. const (
  22. NodeUp nodeState = iota
  23. NodeDown
  24. )
  25. type cassVersion struct {
  26. Major, Minor, Patch int
  27. }
  28. func (c *cassVersion) Set(v string) error {
  29. if v == "" {
  30. return nil
  31. }
  32. return c.UnmarshalCQL(nil, []byte(v))
  33. }
  34. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  35. return c.unmarshal(data)
  36. }
  37. func (c *cassVersion) unmarshal(data []byte) error {
  38. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  39. version = strings.TrimPrefix(version, "v")
  40. v := strings.Split(version, ".")
  41. if len(v) < 2 {
  42. return fmt.Errorf("invalid version string: %s", data)
  43. }
  44. var err error
  45. c.Major, err = strconv.Atoi(v[0])
  46. if err != nil {
  47. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  48. }
  49. c.Minor, err = strconv.Atoi(v[1])
  50. if err != nil {
  51. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  52. }
  53. if len(v) > 2 {
  54. c.Patch, err = strconv.Atoi(v[2])
  55. if err != nil {
  56. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  57. }
  58. }
  59. return nil
  60. }
  61. func (c cassVersion) Before(major, minor, patch int) bool {
  62. // We're comparing us (cassVersion) with the provided version (major, minor, patch)
  63. // We return true if our version is lower (comes before) than the provided one.
  64. if c.Major < major {
  65. return true
  66. } else if c.Major == major {
  67. if c.Minor < minor {
  68. return true
  69. } else if c.Minor == minor && c.Patch < patch {
  70. return true
  71. }
  72. }
  73. return false
  74. }
  75. func (c cassVersion) AtLeast(major, minor, patch int) bool {
  76. return !c.Before(major, minor, patch)
  77. }
  78. func (c cassVersion) String() string {
  79. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  80. }
  81. func (c cassVersion) nodeUpDelay() time.Duration {
  82. if c.Major >= 2 && c.Minor >= 2 {
  83. // CASSANDRA-8236
  84. return 0
  85. }
  86. return 10 * time.Second
  87. }
  88. type HostInfo struct {
  89. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  90. // that we are thread safe use a mutex to access all fields.
  91. mu sync.RWMutex
  92. peer net.IP
  93. broadcastAddress net.IP
  94. listenAddress net.IP
  95. rpcAddress net.IP
  96. preferredIP net.IP
  97. connectAddress net.IP
  98. port int
  99. dataCenter string
  100. rack string
  101. hostId string
  102. workload string
  103. graph bool
  104. dseVersion string
  105. partitioner string
  106. clusterName string
  107. version cassVersion
  108. state nodeState
  109. tokens []string
  110. }
  111. func (h *HostInfo) Equal(host *HostInfo) bool {
  112. if h == host {
  113. // prevent rlock reentry
  114. return true
  115. }
  116. return h.ConnectAddress().Equal(host.ConnectAddress())
  117. }
  118. func (h *HostInfo) Peer() net.IP {
  119. h.mu.RLock()
  120. defer h.mu.RUnlock()
  121. return h.peer
  122. }
  123. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  124. h.mu.Lock()
  125. defer h.mu.Unlock()
  126. h.peer = peer
  127. return h
  128. }
  129. func (h *HostInfo) invalidConnectAddr() bool {
  130. h.mu.RLock()
  131. defer h.mu.RUnlock()
  132. addr, _ := h.connectAddressLocked()
  133. return !validIpAddr(addr)
  134. }
  135. func validIpAddr(addr net.IP) bool {
  136. return addr != nil && !addr.IsUnspecified()
  137. }
  138. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  139. if validIpAddr(h.connectAddress) {
  140. return h.connectAddress, "connect_address"
  141. } else if validIpAddr(h.rpcAddress) {
  142. return h.rpcAddress, "rpc_adress"
  143. } else if validIpAddr(h.preferredIP) {
  144. // where does perferred_ip get set?
  145. return h.preferredIP, "preferred_ip"
  146. } else if validIpAddr(h.broadcastAddress) {
  147. return h.broadcastAddress, "broadcast_address"
  148. } else if validIpAddr(h.peer) {
  149. return h.peer, "peer"
  150. }
  151. return net.IPv4zero, "invalid"
  152. }
  153. // Returns the address that should be used to connect to the host.
  154. // If you wish to override this, use an AddressTranslator or
  155. // use a HostFilter to SetConnectAddress()
  156. func (h *HostInfo) ConnectAddress() net.IP {
  157. h.mu.RLock()
  158. defer h.mu.RUnlock()
  159. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  160. return addr
  161. }
  162. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  163. }
  164. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  165. // TODO(zariel): should this not be exported?
  166. h.mu.Lock()
  167. defer h.mu.Unlock()
  168. h.connectAddress = address
  169. return h
  170. }
  171. func (h *HostInfo) BroadcastAddress() net.IP {
  172. h.mu.RLock()
  173. defer h.mu.RUnlock()
  174. return h.broadcastAddress
  175. }
  176. func (h *HostInfo) ListenAddress() net.IP {
  177. h.mu.RLock()
  178. defer h.mu.RUnlock()
  179. return h.listenAddress
  180. }
  181. func (h *HostInfo) RPCAddress() net.IP {
  182. h.mu.RLock()
  183. defer h.mu.RUnlock()
  184. return h.rpcAddress
  185. }
  186. func (h *HostInfo) PreferredIP() net.IP {
  187. h.mu.RLock()
  188. defer h.mu.RUnlock()
  189. return h.preferredIP
  190. }
  191. func (h *HostInfo) DataCenter() string {
  192. h.mu.RLock()
  193. defer h.mu.RUnlock()
  194. return h.dataCenter
  195. }
  196. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  197. h.mu.Lock()
  198. defer h.mu.Unlock()
  199. h.dataCenter = dataCenter
  200. return h
  201. }
  202. func (h *HostInfo) Rack() string {
  203. h.mu.RLock()
  204. defer h.mu.RUnlock()
  205. return h.rack
  206. }
  207. func (h *HostInfo) setRack(rack string) *HostInfo {
  208. h.mu.Lock()
  209. defer h.mu.Unlock()
  210. h.rack = rack
  211. return h
  212. }
  213. func (h *HostInfo) HostID() string {
  214. h.mu.RLock()
  215. defer h.mu.RUnlock()
  216. return h.hostId
  217. }
  218. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  219. h.mu.Lock()
  220. defer h.mu.Unlock()
  221. h.hostId = hostID
  222. return h
  223. }
  224. func (h *HostInfo) WorkLoad() string {
  225. h.mu.RLock()
  226. defer h.mu.RUnlock()
  227. return h.workload
  228. }
  229. func (h *HostInfo) Graph() bool {
  230. h.mu.RLock()
  231. defer h.mu.RUnlock()
  232. return h.graph
  233. }
  234. func (h *HostInfo) DSEVersion() string {
  235. h.mu.RLock()
  236. defer h.mu.RUnlock()
  237. return h.dseVersion
  238. }
  239. func (h *HostInfo) Partitioner() string {
  240. h.mu.RLock()
  241. defer h.mu.RUnlock()
  242. return h.partitioner
  243. }
  244. func (h *HostInfo) ClusterName() string {
  245. h.mu.RLock()
  246. defer h.mu.RUnlock()
  247. return h.clusterName
  248. }
  249. func (h *HostInfo) Version() cassVersion {
  250. h.mu.RLock()
  251. defer h.mu.RUnlock()
  252. return h.version
  253. }
  254. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  255. h.mu.Lock()
  256. defer h.mu.Unlock()
  257. h.version = cassVersion{major, minor, patch}
  258. return h
  259. }
  260. func (h *HostInfo) State() nodeState {
  261. h.mu.RLock()
  262. defer h.mu.RUnlock()
  263. return h.state
  264. }
  265. func (h *HostInfo) setState(state nodeState) *HostInfo {
  266. h.mu.Lock()
  267. defer h.mu.Unlock()
  268. h.state = state
  269. return h
  270. }
  271. func (h *HostInfo) Tokens() []string {
  272. h.mu.RLock()
  273. defer h.mu.RUnlock()
  274. return h.tokens
  275. }
  276. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  277. h.mu.Lock()
  278. defer h.mu.Unlock()
  279. h.tokens = tokens
  280. return h
  281. }
  282. func (h *HostInfo) Port() int {
  283. h.mu.RLock()
  284. defer h.mu.RUnlock()
  285. return h.port
  286. }
  287. func (h *HostInfo) setPort(port int) *HostInfo {
  288. h.mu.Lock()
  289. defer h.mu.Unlock()
  290. h.port = port
  291. return h
  292. }
  293. func (h *HostInfo) update(from *HostInfo) {
  294. if h == from {
  295. return
  296. }
  297. h.mu.Lock()
  298. defer h.mu.Unlock()
  299. from.mu.RLock()
  300. defer from.mu.RUnlock()
  301. // autogenerated do not update
  302. if h.peer == nil {
  303. h.peer = from.peer
  304. }
  305. if h.broadcastAddress == nil {
  306. h.broadcastAddress = from.broadcastAddress
  307. }
  308. if h.listenAddress == nil {
  309. h.listenAddress = from.listenAddress
  310. }
  311. if h.rpcAddress == nil {
  312. h.rpcAddress = from.rpcAddress
  313. }
  314. if h.preferredIP == nil {
  315. h.preferredIP = from.preferredIP
  316. }
  317. if h.connectAddress == nil {
  318. h.connectAddress = from.connectAddress
  319. }
  320. if h.port == 0 {
  321. h.port = from.port
  322. }
  323. if h.dataCenter == "" {
  324. h.dataCenter = from.dataCenter
  325. }
  326. if h.rack == "" {
  327. h.rack = from.rack
  328. }
  329. if h.hostId == "" {
  330. h.hostId = from.hostId
  331. }
  332. if h.workload == "" {
  333. h.workload = from.workload
  334. }
  335. if h.dseVersion == "" {
  336. h.dseVersion = from.dseVersion
  337. }
  338. if h.partitioner == "" {
  339. h.partitioner = from.partitioner
  340. }
  341. if h.clusterName == "" {
  342. h.clusterName = from.clusterName
  343. }
  344. if h.version == (cassVersion{}) {
  345. h.version = from.version
  346. }
  347. if h.tokens == nil {
  348. h.tokens = from.tokens
  349. }
  350. }
  351. func (h *HostInfo) IsUp() bool {
  352. return h != nil && h.State() == NodeUp
  353. }
  354. func (h *HostInfo) String() string {
  355. h.mu.RLock()
  356. defer h.mu.RUnlock()
  357. connectAddr, source := h.connectAddressLocked()
  358. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  359. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  360. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  361. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  362. connectAddr, source,
  363. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  364. }
  365. // Polls system.peers at a specific interval to find new hosts
  366. type ringDescriber struct {
  367. session *Session
  368. mu sync.Mutex
  369. prevHosts []*HostInfo
  370. prevPartitioner string
  371. }
  372. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  373. func checkSystemSchema(control *controlConn) (bool, error) {
  374. iter := control.query("SELECT * FROM system_schema.keyspaces")
  375. if err := iter.err; err != nil {
  376. if errf, ok := err.(*errorFrame); ok {
  377. if errf.code == errSyntax {
  378. return false, nil
  379. }
  380. }
  381. return false, err
  382. }
  383. return true, nil
  384. }
  385. // Given a map that represents a row from either system.local or system.peers
  386. // return as much information as we can in *HostInfo
  387. func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) {
  388. const assertErrorMsg = "Assertion failed for %s"
  389. var ok bool
  390. // Default to our connected port if the cluster doesn't have port information
  391. host := HostInfo{
  392. port: port,
  393. }
  394. for key, value := range row {
  395. switch key {
  396. case "data_center":
  397. host.dataCenter, ok = value.(string)
  398. if !ok {
  399. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  400. }
  401. case "rack":
  402. host.rack, ok = value.(string)
  403. if !ok {
  404. return nil, fmt.Errorf(assertErrorMsg, "rack")
  405. }
  406. case "host_id":
  407. hostId, ok := value.(UUID)
  408. if !ok {
  409. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  410. }
  411. host.hostId = hostId.String()
  412. case "release_version":
  413. version, ok := value.(string)
  414. if !ok {
  415. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  416. }
  417. host.version.Set(version)
  418. case "peer":
  419. ip, ok := value.(string)
  420. if !ok {
  421. return nil, fmt.Errorf(assertErrorMsg, "peer")
  422. }
  423. host.peer = net.ParseIP(ip)
  424. case "cluster_name":
  425. host.clusterName, ok = value.(string)
  426. if !ok {
  427. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  428. }
  429. case "partitioner":
  430. host.partitioner, ok = value.(string)
  431. if !ok {
  432. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  433. }
  434. case "broadcast_address":
  435. ip, ok := value.(string)
  436. if !ok {
  437. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  438. }
  439. host.broadcastAddress = net.ParseIP(ip)
  440. case "preferred_ip":
  441. ip, ok := value.(string)
  442. if !ok {
  443. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  444. }
  445. host.preferredIP = net.ParseIP(ip)
  446. case "rpc_address":
  447. ip, ok := value.(string)
  448. if !ok {
  449. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  450. }
  451. host.rpcAddress = net.ParseIP(ip)
  452. case "listen_address":
  453. ip, ok := value.(string)
  454. if !ok {
  455. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  456. }
  457. host.listenAddress = net.ParseIP(ip)
  458. case "workload":
  459. host.workload, ok = value.(string)
  460. if !ok {
  461. return nil, fmt.Errorf(assertErrorMsg, "workload")
  462. }
  463. case "graph":
  464. host.graph, ok = value.(bool)
  465. if !ok {
  466. return nil, fmt.Errorf(assertErrorMsg, "graph")
  467. }
  468. case "tokens":
  469. host.tokens, ok = value.([]string)
  470. if !ok {
  471. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  472. }
  473. case "dse_version":
  474. host.dseVersion, ok = value.(string)
  475. if !ok {
  476. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  477. }
  478. }
  479. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  480. // Not sure what the port field will be called until the JIRA issue is complete
  481. }
  482. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  483. host.connectAddress = ip
  484. host.port = port
  485. return &host, nil
  486. }
  487. // Ask the control node for host info on all it's known peers
  488. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  489. var hosts []*HostInfo
  490. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  491. hosts = append(hosts, ch.host)
  492. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  493. })
  494. if iter == nil {
  495. return nil, errNoControl
  496. }
  497. rows, err := iter.SliceMap()
  498. if err != nil {
  499. // TODO(zariel): make typed error
  500. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  501. }
  502. for _, row := range rows {
  503. // extract all available info about the peer
  504. host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port)
  505. if err != nil {
  506. return nil, err
  507. } else if !isValidPeer(host) {
  508. // If it's not a valid peer
  509. Logger.Printf("Found invalid peer '%s' "+
  510. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  511. continue
  512. }
  513. hosts = append(hosts, host)
  514. }
  515. return hosts, nil
  516. }
  517. // Return true if the host is a valid peer
  518. func isValidPeer(host *HostInfo) bool {
  519. return !(len(host.RPCAddress()) == 0 ||
  520. host.hostId == "" ||
  521. host.dataCenter == "" ||
  522. host.rack == "" ||
  523. len(host.tokens) == 0)
  524. }
  525. // Return a list of hosts the cluster knows about
  526. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  527. r.mu.Lock()
  528. defer r.mu.Unlock()
  529. hosts, err := r.getClusterPeerInfo()
  530. if err != nil {
  531. return r.prevHosts, r.prevPartitioner, err
  532. }
  533. var partitioner string
  534. if len(hosts) > 0 {
  535. partitioner = hosts[0].Partitioner()
  536. }
  537. return hosts, partitioner, nil
  538. }
  539. // Given an ip/port return HostInfo for the specified ip/port
  540. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  541. var host *HostInfo
  542. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  543. if ch.host.ConnectAddress().Equal(ip) {
  544. host = ch.host
  545. return nil
  546. }
  547. return ch.conn.query(context.TODO(), "SELECT * FROM system.peers")
  548. })
  549. if iter != nil {
  550. rows, err := iter.SliceMap()
  551. if err != nil {
  552. return nil, err
  553. }
  554. for _, row := range rows {
  555. h, err := r.session.hostInfoFromMap(row, port)
  556. if err != nil {
  557. return nil, err
  558. }
  559. if h.ConnectAddress().Equal(ip) {
  560. host = h
  561. break
  562. }
  563. }
  564. if host == nil {
  565. return nil, errors.New("host not found in peers table")
  566. }
  567. }
  568. if host == nil {
  569. return nil, errors.New("unable to fetch host info: invalid control connection")
  570. } else if host.invalidConnectAddr() {
  571. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  572. }
  573. return host, nil
  574. }
  575. func (r *ringDescriber) refreshRing() error {
  576. // if we have 0 hosts this will return the previous list of hosts to
  577. // attempt to reconnect to the cluster otherwise we would never find
  578. // downed hosts again, could possibly have an optimisation to only
  579. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  580. hosts, partitioner, err := r.GetHosts()
  581. if err != nil {
  582. return err
  583. }
  584. prevHosts := r.session.ring.currentHosts()
  585. // TODO: move this to session
  586. for _, h := range hosts {
  587. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  588. continue
  589. }
  590. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  591. r.session.pool.addHost(h)
  592. r.session.policy.AddHost(h)
  593. } else {
  594. host.update(h)
  595. }
  596. delete(prevHosts, h.ConnectAddress().String())
  597. }
  598. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  599. // in a session so that everything sees a consistent state. Becuase as is today
  600. // events can come in and due to ordering an UP host could be removed from the cluster
  601. for _, host := range prevHosts {
  602. r.session.removeHost(host)
  603. }
  604. r.session.metadata.setPartitioner(partitioner)
  605. r.session.policy.SetPartitioner(partitioner)
  606. return nil
  607. }