host_source.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. package gocql
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type nodeState int32
  12. func (n nodeState) String() string {
  13. if n == NodeUp {
  14. return "UP"
  15. } else if n == NodeDown {
  16. return "DOWN"
  17. }
  18. return fmt.Sprintf("UNKNOWN_%d", n)
  19. }
  20. const (
  21. NodeUp nodeState = iota
  22. NodeDown
  23. )
  24. type cassVersion struct {
  25. Major, Minor, Patch int
  26. }
  27. func (c *cassVersion) Set(v string) error {
  28. if v == "" {
  29. return nil
  30. }
  31. return c.UnmarshalCQL(nil, []byte(v))
  32. }
  33. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  34. return c.unmarshal(data)
  35. }
  36. func (c *cassVersion) unmarshal(data []byte) error {
  37. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  38. version = strings.TrimPrefix(version, "v")
  39. v := strings.Split(version, ".")
  40. if len(v) < 2 {
  41. return fmt.Errorf("invalid version string: %s", data)
  42. }
  43. var err error
  44. c.Major, err = strconv.Atoi(v[0])
  45. if err != nil {
  46. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  47. }
  48. c.Minor, err = strconv.Atoi(v[1])
  49. if err != nil {
  50. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  51. }
  52. if len(v) > 2 {
  53. c.Patch, err = strconv.Atoi(v[2])
  54. if err != nil {
  55. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (c cassVersion) Before(major, minor, patch int) bool {
  61. // We're comparing us (cassVersion) with the provided version (major, minor, patch)
  62. // We return true if our version is lower (comes before) than the provided one.
  63. if c.Major < major {
  64. return true
  65. } else if c.Major == major {
  66. if c.Minor < minor {
  67. return true
  68. } else if c.Minor == minor && c.Patch < patch {
  69. return true
  70. }
  71. }
  72. return false
  73. }
  74. func (c cassVersion) String() string {
  75. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  76. }
  77. func (c cassVersion) nodeUpDelay() time.Duration {
  78. if c.Major >= 2 && c.Minor >= 2 {
  79. // CASSANDRA-8236
  80. return 0
  81. }
  82. return 10 * time.Second
  83. }
  84. type HostInfo struct {
  85. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  86. // that we are thread safe use a mutex to access all fields.
  87. mu sync.RWMutex
  88. peer net.IP
  89. broadcastAddress net.IP
  90. listenAddress net.IP
  91. rpcAddress net.IP
  92. preferredIP net.IP
  93. connectAddress net.IP
  94. port int
  95. dataCenter string
  96. rack string
  97. hostId string
  98. workload string
  99. graph bool
  100. dseVersion string
  101. partitioner string
  102. clusterName string
  103. version cassVersion
  104. state nodeState
  105. tokens []string
  106. }
  107. func (h *HostInfo) Equal(host *HostInfo) bool {
  108. if h == host {
  109. // prevent rlock reentry
  110. return true
  111. }
  112. return h.ConnectAddress().Equal(host.ConnectAddress())
  113. }
  114. func (h *HostInfo) Peer() net.IP {
  115. h.mu.RLock()
  116. defer h.mu.RUnlock()
  117. return h.peer
  118. }
  119. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  120. h.mu.Lock()
  121. defer h.mu.Unlock()
  122. h.peer = peer
  123. return h
  124. }
  125. func (h *HostInfo) invalidConnectAddr() bool {
  126. h.mu.RLock()
  127. defer h.mu.RUnlock()
  128. addr, _ := h.connectAddressLocked()
  129. return !validIpAddr(addr)
  130. }
  131. func validIpAddr(addr net.IP) bool {
  132. return addr != nil && !addr.IsUnspecified()
  133. }
  134. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  135. if validIpAddr(h.connectAddress) {
  136. return h.connectAddress, "connect_address"
  137. } else if validIpAddr(h.rpcAddress) {
  138. return h.rpcAddress, "rpc_adress"
  139. } else if validIpAddr(h.preferredIP) {
  140. // where does perferred_ip get set?
  141. return h.preferredIP, "preferred_ip"
  142. } else if validIpAddr(h.broadcastAddress) {
  143. return h.broadcastAddress, "broadcast_address"
  144. } else if validIpAddr(h.peer) {
  145. return h.peer, "peer"
  146. }
  147. return net.IPv4zero, "invalid"
  148. }
  149. // Returns the address that should be used to connect to the host.
  150. // If you wish to override this, use an AddressTranslator or
  151. // use a HostFilter to SetConnectAddress()
  152. func (h *HostInfo) ConnectAddress() net.IP {
  153. h.mu.RLock()
  154. defer h.mu.RUnlock()
  155. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  156. return addr
  157. }
  158. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  159. }
  160. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  161. // TODO(zariel): should this not be exported?
  162. h.mu.Lock()
  163. defer h.mu.Unlock()
  164. h.connectAddress = address
  165. return h
  166. }
  167. func (h *HostInfo) BroadcastAddress() net.IP {
  168. h.mu.RLock()
  169. defer h.mu.RUnlock()
  170. return h.broadcastAddress
  171. }
  172. func (h *HostInfo) ListenAddress() net.IP {
  173. h.mu.RLock()
  174. defer h.mu.RUnlock()
  175. return h.listenAddress
  176. }
  177. func (h *HostInfo) RPCAddress() net.IP {
  178. h.mu.RLock()
  179. defer h.mu.RUnlock()
  180. return h.rpcAddress
  181. }
  182. func (h *HostInfo) PreferredIP() net.IP {
  183. h.mu.RLock()
  184. defer h.mu.RUnlock()
  185. return h.preferredIP
  186. }
  187. func (h *HostInfo) DataCenter() string {
  188. h.mu.RLock()
  189. defer h.mu.RUnlock()
  190. return h.dataCenter
  191. }
  192. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  193. h.mu.Lock()
  194. defer h.mu.Unlock()
  195. h.dataCenter = dataCenter
  196. return h
  197. }
  198. func (h *HostInfo) Rack() string {
  199. h.mu.RLock()
  200. defer h.mu.RUnlock()
  201. return h.rack
  202. }
  203. func (h *HostInfo) setRack(rack string) *HostInfo {
  204. h.mu.Lock()
  205. defer h.mu.Unlock()
  206. h.rack = rack
  207. return h
  208. }
  209. func (h *HostInfo) HostID() string {
  210. h.mu.RLock()
  211. defer h.mu.RUnlock()
  212. return h.hostId
  213. }
  214. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  215. h.mu.Lock()
  216. defer h.mu.Unlock()
  217. h.hostId = hostID
  218. return h
  219. }
  220. func (h *HostInfo) WorkLoad() string {
  221. h.mu.RLock()
  222. defer h.mu.RUnlock()
  223. return h.workload
  224. }
  225. func (h *HostInfo) Graph() bool {
  226. h.mu.RLock()
  227. defer h.mu.RUnlock()
  228. return h.graph
  229. }
  230. func (h *HostInfo) DSEVersion() string {
  231. h.mu.RLock()
  232. defer h.mu.RUnlock()
  233. return h.dseVersion
  234. }
  235. func (h *HostInfo) Partitioner() string {
  236. h.mu.RLock()
  237. defer h.mu.RUnlock()
  238. return h.partitioner
  239. }
  240. func (h *HostInfo) ClusterName() string {
  241. h.mu.RLock()
  242. defer h.mu.RUnlock()
  243. return h.clusterName
  244. }
  245. func (h *HostInfo) Version() cassVersion {
  246. h.mu.RLock()
  247. defer h.mu.RUnlock()
  248. return h.version
  249. }
  250. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  251. h.mu.Lock()
  252. defer h.mu.Unlock()
  253. h.version = cassVersion{major, minor, patch}
  254. return h
  255. }
  256. func (h *HostInfo) State() nodeState {
  257. h.mu.RLock()
  258. defer h.mu.RUnlock()
  259. return h.state
  260. }
  261. func (h *HostInfo) setState(state nodeState) *HostInfo {
  262. h.mu.Lock()
  263. defer h.mu.Unlock()
  264. h.state = state
  265. return h
  266. }
  267. func (h *HostInfo) Tokens() []string {
  268. h.mu.RLock()
  269. defer h.mu.RUnlock()
  270. return h.tokens
  271. }
  272. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  273. h.mu.Lock()
  274. defer h.mu.Unlock()
  275. h.tokens = tokens
  276. return h
  277. }
  278. func (h *HostInfo) Port() int {
  279. h.mu.RLock()
  280. defer h.mu.RUnlock()
  281. return h.port
  282. }
  283. func (h *HostInfo) setPort(port int) *HostInfo {
  284. h.mu.Lock()
  285. defer h.mu.Unlock()
  286. h.port = port
  287. return h
  288. }
  289. func (h *HostInfo) update(from *HostInfo) {
  290. if h == from {
  291. return
  292. }
  293. h.mu.Lock()
  294. defer h.mu.Unlock()
  295. from.mu.RLock()
  296. defer from.mu.RUnlock()
  297. // autogenerated do not update
  298. if h.peer == nil {
  299. h.peer = from.peer
  300. }
  301. if h.broadcastAddress == nil {
  302. h.broadcastAddress = from.broadcastAddress
  303. }
  304. if h.listenAddress == nil {
  305. h.listenAddress = from.listenAddress
  306. }
  307. if h.rpcAddress == nil {
  308. h.rpcAddress = from.rpcAddress
  309. }
  310. if h.preferredIP == nil {
  311. h.preferredIP = from.preferredIP
  312. }
  313. if h.connectAddress == nil {
  314. h.connectAddress = from.connectAddress
  315. }
  316. if h.port == 0 {
  317. h.port = from.port
  318. }
  319. if h.dataCenter == "" {
  320. h.dataCenter = from.dataCenter
  321. }
  322. if h.rack == "" {
  323. h.rack = from.rack
  324. }
  325. if h.hostId == "" {
  326. h.hostId = from.hostId
  327. }
  328. if h.workload == "" {
  329. h.workload = from.workload
  330. }
  331. if h.dseVersion == "" {
  332. h.dseVersion = from.dseVersion
  333. }
  334. if h.partitioner == "" {
  335. h.partitioner = from.partitioner
  336. }
  337. if h.clusterName == "" {
  338. h.clusterName = from.clusterName
  339. }
  340. if h.version == (cassVersion{}) {
  341. h.version = from.version
  342. }
  343. if h.tokens == nil {
  344. h.tokens = from.tokens
  345. }
  346. }
  347. func (h *HostInfo) IsUp() bool {
  348. return h != nil && h.State() == NodeUp
  349. }
  350. func (h *HostInfo) String() string {
  351. h.mu.RLock()
  352. defer h.mu.RUnlock()
  353. connectAddr, source := h.connectAddressLocked()
  354. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  355. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  356. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  357. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  358. connectAddr, source,
  359. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  360. }
  361. // Polls system.peers at a specific interval to find new hosts
  362. type ringDescriber struct {
  363. session *Session
  364. mu sync.Mutex
  365. prevHosts []*HostInfo
  366. prevPartitioner string
  367. }
  368. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  369. func checkSystemSchema(control *controlConn) (bool, error) {
  370. iter := control.query("SELECT * FROM system_schema.keyspaces")
  371. if err := iter.err; err != nil {
  372. if errf, ok := err.(*errorFrame); ok {
  373. if errf.code == errSyntax {
  374. return false, nil
  375. }
  376. }
  377. return false, err
  378. }
  379. return true, nil
  380. }
  381. // Given a map that represents a row from either system.local or system.peers
  382. // return as much information as we can in *HostInfo
  383. func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) {
  384. const assertErrorMsg = "Assertion failed for %s"
  385. var ok bool
  386. // Default to our connected port if the cluster doesn't have port information
  387. host := HostInfo{
  388. port: port,
  389. }
  390. for key, value := range row {
  391. switch key {
  392. case "data_center":
  393. host.dataCenter, ok = value.(string)
  394. if !ok {
  395. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  396. }
  397. case "rack":
  398. host.rack, ok = value.(string)
  399. if !ok {
  400. return nil, fmt.Errorf(assertErrorMsg, "rack")
  401. }
  402. case "host_id":
  403. hostId, ok := value.(UUID)
  404. if !ok {
  405. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  406. }
  407. host.hostId = hostId.String()
  408. case "release_version":
  409. version, ok := value.(string)
  410. if !ok {
  411. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  412. }
  413. host.version.Set(version)
  414. case "peer":
  415. ip, ok := value.(string)
  416. if !ok {
  417. return nil, fmt.Errorf(assertErrorMsg, "peer")
  418. }
  419. host.peer = net.ParseIP(ip)
  420. case "cluster_name":
  421. host.clusterName, ok = value.(string)
  422. if !ok {
  423. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  424. }
  425. case "partitioner":
  426. host.partitioner, ok = value.(string)
  427. if !ok {
  428. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  429. }
  430. case "broadcast_address":
  431. ip, ok := value.(string)
  432. if !ok {
  433. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  434. }
  435. host.broadcastAddress = net.ParseIP(ip)
  436. case "preferred_ip":
  437. ip, ok := value.(string)
  438. if !ok {
  439. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  440. }
  441. host.preferredIP = net.ParseIP(ip)
  442. case "rpc_address":
  443. ip, ok := value.(string)
  444. if !ok {
  445. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  446. }
  447. host.rpcAddress = net.ParseIP(ip)
  448. case "listen_address":
  449. ip, ok := value.(string)
  450. if !ok {
  451. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  452. }
  453. host.listenAddress = net.ParseIP(ip)
  454. case "workload":
  455. host.workload, ok = value.(string)
  456. if !ok {
  457. return nil, fmt.Errorf(assertErrorMsg, "workload")
  458. }
  459. case "graph":
  460. host.graph, ok = value.(bool)
  461. if !ok {
  462. return nil, fmt.Errorf(assertErrorMsg, "graph")
  463. }
  464. case "tokens":
  465. host.tokens, ok = value.([]string)
  466. if !ok {
  467. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  468. }
  469. case "dse_version":
  470. host.dseVersion, ok = value.(string)
  471. if !ok {
  472. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  473. }
  474. }
  475. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  476. // Not sure what the port field will be called until the JIRA issue is complete
  477. }
  478. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  479. host.connectAddress = ip
  480. host.port = port
  481. return &host, nil
  482. }
  483. // Ask the control node for host info on all it's known peers
  484. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  485. var hosts []*HostInfo
  486. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  487. hosts = append(hosts, ch.host)
  488. return ch.conn.query("SELECT * FROM system.peers")
  489. })
  490. if iter == nil {
  491. return nil, errNoControl
  492. }
  493. rows, err := iter.SliceMap()
  494. if err != nil {
  495. // TODO(zariel): make typed error
  496. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  497. }
  498. for _, row := range rows {
  499. // extract all available info about the peer
  500. host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port)
  501. if err != nil {
  502. return nil, err
  503. } else if !isValidPeer(host) {
  504. // If it's not a valid peer
  505. Logger.Printf("Found invalid peer '%s' "+
  506. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  507. continue
  508. }
  509. hosts = append(hosts, host)
  510. }
  511. return hosts, nil
  512. }
  513. // Return true if the host is a valid peer
  514. func isValidPeer(host *HostInfo) bool {
  515. return !(len(host.RPCAddress()) == 0 ||
  516. host.hostId == "" ||
  517. host.dataCenter == "" ||
  518. host.rack == "" ||
  519. len(host.tokens) == 0)
  520. }
  521. // Return a list of hosts the cluster knows about
  522. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  523. r.mu.Lock()
  524. defer r.mu.Unlock()
  525. hosts, err := r.getClusterPeerInfo()
  526. if err != nil {
  527. return r.prevHosts, r.prevPartitioner, err
  528. }
  529. var partitioner string
  530. if len(hosts) > 0 {
  531. partitioner = hosts[0].Partitioner()
  532. }
  533. return hosts, partitioner, nil
  534. }
  535. // Given an ip/port return HostInfo for the specified ip/port
  536. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  537. var host *HostInfo
  538. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  539. if ch.host.ConnectAddress().Equal(ip) {
  540. host = ch.host
  541. return nil
  542. }
  543. return ch.conn.query("SELECT * FROM system.peers")
  544. })
  545. if iter != nil {
  546. rows, err := iter.SliceMap()
  547. if err != nil {
  548. return nil, err
  549. }
  550. for _, row := range rows {
  551. h, err := r.session.hostInfoFromMap(row, port)
  552. if err != nil {
  553. return nil, err
  554. }
  555. if h.ConnectAddress().Equal(ip) {
  556. host = h
  557. break
  558. }
  559. }
  560. if host == nil {
  561. return nil, errors.New("host not found in peers table")
  562. }
  563. }
  564. if host == nil {
  565. return nil, errors.New("unable to fetch host info: invalid control connection")
  566. } else if host.invalidConnectAddr() {
  567. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  568. }
  569. return host, nil
  570. }
  571. func (r *ringDescriber) refreshRing() error {
  572. // if we have 0 hosts this will return the previous list of hosts to
  573. // attempt to reconnect to the cluster otherwise we would never find
  574. // downed hosts again, could possibly have an optimisation to only
  575. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  576. hosts, partitioner, err := r.GetHosts()
  577. if err != nil {
  578. return err
  579. }
  580. prevHosts := r.session.ring.currentHosts()
  581. // TODO: move this to session
  582. for _, h := range hosts {
  583. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  584. continue
  585. }
  586. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  587. r.session.pool.addHost(h)
  588. r.session.policy.AddHost(h)
  589. } else {
  590. host.update(h)
  591. }
  592. delete(prevHosts, h.ConnectAddress().String())
  593. }
  594. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  595. // in a session so that everything sees a consistent state. Becuase as is today
  596. // events can come in and due to ordering an UP host could be removed from the cluster
  597. for _, host := range prevHosts {
  598. r.session.removeHost(host)
  599. }
  600. r.session.metadata.setPartitioner(partitioner)
  601. r.session.policy.SetPartitioner(partitioner)
  602. return nil
  603. }