host_source.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/pkg/errors"
  10. )
  11. const assertErrorMsg = "Assertion failed for %s"
  12. type nodeState int32
  13. func (n nodeState) String() string {
  14. if n == NodeUp {
  15. return "UP"
  16. } else if n == NodeDown {
  17. return "DOWN"
  18. }
  19. return fmt.Sprintf("UNKNOWN_%d", n)
  20. }
  21. const (
  22. NodeUp nodeState = iota
  23. NodeDown
  24. )
  25. type cassVersion struct {
  26. Major, Minor, Patch int
  27. }
  28. func (c *cassVersion) Set(v string) error {
  29. if v == "" {
  30. return nil
  31. }
  32. return c.UnmarshalCQL(nil, []byte(v))
  33. }
  34. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  35. return c.unmarshal(data)
  36. }
  37. func (c *cassVersion) unmarshal(data []byte) error {
  38. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  39. version = strings.TrimPrefix(version, "v")
  40. v := strings.Split(version, ".")
  41. if len(v) < 2 {
  42. return fmt.Errorf("invalid version string: %s", data)
  43. }
  44. var err error
  45. c.Major, err = strconv.Atoi(v[0])
  46. if err != nil {
  47. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  48. }
  49. c.Minor, err = strconv.Atoi(v[1])
  50. if err != nil {
  51. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  52. }
  53. if len(v) > 2 {
  54. c.Patch, err = strconv.Atoi(v[2])
  55. if err != nil {
  56. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  57. }
  58. }
  59. return nil
  60. }
  61. func (c cassVersion) Before(major, minor, patch int) bool {
  62. if c.Major > major {
  63. return true
  64. } else if c.Minor > minor {
  65. return true
  66. } else if c.Patch > patch {
  67. return true
  68. }
  69. return false
  70. }
  71. func (c cassVersion) String() string {
  72. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  73. }
  74. func (c cassVersion) nodeUpDelay() time.Duration {
  75. if c.Major >= 2 && c.Minor >= 2 {
  76. // CASSANDRA-8236
  77. return 0
  78. }
  79. return 10 * time.Second
  80. }
  81. type HostInfo struct {
  82. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  83. // that we are thread safe use a mutex to access all fields.
  84. mu sync.RWMutex
  85. peer net.IP
  86. broadcastAddress net.IP
  87. listenAddress net.IP
  88. rpcAddress net.IP
  89. preferredIP net.IP
  90. connectAddress net.IP
  91. port int
  92. dataCenter string
  93. rack string
  94. hostId string
  95. workload string
  96. graph bool
  97. dseVersion string
  98. partitioner string
  99. clusterName string
  100. version cassVersion
  101. state nodeState
  102. tokens []string
  103. }
  104. func (h *HostInfo) Equal(host *HostInfo) bool {
  105. h.mu.RLock()
  106. defer h.mu.RUnlock()
  107. host.mu.RLock()
  108. defer host.mu.RUnlock()
  109. return h.ConnectAddress().Equal(host.ConnectAddress())
  110. }
  111. func (h *HostInfo) Peer() net.IP {
  112. h.mu.RLock()
  113. defer h.mu.RUnlock()
  114. return h.peer
  115. }
  116. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  117. h.mu.Lock()
  118. defer h.mu.Unlock()
  119. h.peer = peer
  120. return h
  121. }
  122. // Returns the address that should be used to connect to the host
  123. // This defaults to 'broadcast_address', then falls back to 'peer'
  124. // This is to maintain existing functionality. If you wish to
  125. // override this, use an AddressTranslator or use a HostFilter
  126. // to SetConnectAddress()
  127. func (h *HostInfo) ConnectAddress() net.IP {
  128. h.mu.RLock()
  129. defer h.mu.RUnlock()
  130. if h.connectAddress == nil {
  131. if h.broadcastAddress != nil {
  132. return h.broadcastAddress
  133. }
  134. if h.peer != nil {
  135. return h.peer
  136. }
  137. }
  138. return h.connectAddress
  139. }
  140. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  141. h.mu.Lock()
  142. defer h.mu.Unlock()
  143. h.connectAddress = address
  144. return h
  145. }
  146. func (h *HostInfo) BroadcastAddress() net.IP {
  147. h.mu.RLock()
  148. defer h.mu.RUnlock()
  149. return h.broadcastAddress
  150. }
  151. func (h *HostInfo) ListenAddress() net.IP {
  152. h.mu.RLock()
  153. defer h.mu.RUnlock()
  154. return h.listenAddress
  155. }
  156. func (h *HostInfo) RPCAddress() net.IP {
  157. h.mu.RLock()
  158. defer h.mu.RUnlock()
  159. return h.rpcAddress
  160. }
  161. func (h *HostInfo) PreferredIP() net.IP {
  162. h.mu.RLock()
  163. defer h.mu.RUnlock()
  164. return h.preferredIP
  165. }
  166. func (h *HostInfo) DataCenter() string {
  167. h.mu.RLock()
  168. defer h.mu.RUnlock()
  169. return h.dataCenter
  170. }
  171. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  172. h.mu.Lock()
  173. defer h.mu.Unlock()
  174. h.dataCenter = dataCenter
  175. return h
  176. }
  177. func (h *HostInfo) Rack() string {
  178. h.mu.RLock()
  179. defer h.mu.RUnlock()
  180. return h.rack
  181. }
  182. func (h *HostInfo) setRack(rack string) *HostInfo {
  183. h.mu.Lock()
  184. defer h.mu.Unlock()
  185. h.rack = rack
  186. return h
  187. }
  188. func (h *HostInfo) HostID() string {
  189. h.mu.RLock()
  190. defer h.mu.RUnlock()
  191. return h.hostId
  192. }
  193. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  194. h.mu.Lock()
  195. defer h.mu.Unlock()
  196. h.hostId = hostID
  197. return h
  198. }
  199. func (h *HostInfo) WorkLoad() string {
  200. h.mu.RLock()
  201. defer h.mu.RUnlock()
  202. return h.workload
  203. }
  204. func (h *HostInfo) Graph() bool {
  205. h.mu.RLock()
  206. defer h.mu.RUnlock()
  207. return h.graph
  208. }
  209. func (h *HostInfo) DSEVersion() string {
  210. h.mu.RLock()
  211. defer h.mu.RUnlock()
  212. return h.dseVersion
  213. }
  214. func (h *HostInfo) Partitioner() string {
  215. h.mu.RLock()
  216. defer h.mu.RUnlock()
  217. return h.partitioner
  218. }
  219. func (h *HostInfo) ClusterName() string {
  220. h.mu.RLock()
  221. defer h.mu.RUnlock()
  222. return h.clusterName
  223. }
  224. func (h *HostInfo) Version() cassVersion {
  225. h.mu.RLock()
  226. defer h.mu.RUnlock()
  227. return h.version
  228. }
  229. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  230. h.mu.Lock()
  231. defer h.mu.Unlock()
  232. h.version = cassVersion{major, minor, patch}
  233. return h
  234. }
  235. func (h *HostInfo) State() nodeState {
  236. h.mu.RLock()
  237. defer h.mu.RUnlock()
  238. return h.state
  239. }
  240. func (h *HostInfo) setState(state nodeState) *HostInfo {
  241. h.mu.Lock()
  242. defer h.mu.Unlock()
  243. h.state = state
  244. return h
  245. }
  246. func (h *HostInfo) Tokens() []string {
  247. h.mu.RLock()
  248. defer h.mu.RUnlock()
  249. return h.tokens
  250. }
  251. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  252. h.mu.Lock()
  253. defer h.mu.Unlock()
  254. h.tokens = tokens
  255. return h
  256. }
  257. func (h *HostInfo) Port() int {
  258. h.mu.RLock()
  259. defer h.mu.RUnlock()
  260. return h.port
  261. }
  262. func (h *HostInfo) setPort(port int) *HostInfo {
  263. h.mu.Lock()
  264. defer h.mu.Unlock()
  265. h.port = port
  266. return h
  267. }
  268. func (h *HostInfo) update(from *HostInfo) {
  269. h.mu.Lock()
  270. defer h.mu.Unlock()
  271. h.tokens = from.tokens
  272. h.version = from.version
  273. h.hostId = from.hostId
  274. h.dataCenter = from.dataCenter
  275. }
  276. func (h *HostInfo) IsUp() bool {
  277. return h != nil && h.State() == NodeUp
  278. }
  279. func (h *HostInfo) String() string {
  280. h.mu.RLock()
  281. defer h.mu.RUnlock()
  282. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  283. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  284. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress,
  285. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  286. }
  287. // Polls system.peers at a specific interval to find new hosts
  288. type ringDescriber struct {
  289. session *Session
  290. mu sync.Mutex
  291. prevHosts []*HostInfo
  292. localHost *HostInfo
  293. prevPartitioner string
  294. }
  295. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  296. func checkSystemSchema(control *controlConn) (bool, error) {
  297. iter := control.query("SELECT * FROM system_schema.keyspaces")
  298. if err := iter.err; err != nil {
  299. if errf, ok := err.(*errorFrame); ok {
  300. if errf.code == errReadFailure {
  301. return false, nil
  302. }
  303. }
  304. return false, err
  305. }
  306. return true, nil
  307. }
  308. // Given a map that represents a row from either system.local or system.peers
  309. // return as much information as we can in *HostInfo
  310. func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (error, *HostInfo) {
  311. host := HostInfo{}
  312. var ok bool
  313. for key, value := range row {
  314. switch key {
  315. case "data_center":
  316. host.dataCenter, ok = value.(string)
  317. if !ok {
  318. return fmt.Errorf(assertErrorMsg, "data_center"), nil
  319. }
  320. case "rack":
  321. host.rack, ok = value.(string)
  322. if !ok {
  323. return fmt.Errorf(assertErrorMsg, "rack"), nil
  324. }
  325. case "host_id":
  326. hostId, ok := value.(UUID)
  327. if !ok {
  328. return fmt.Errorf(assertErrorMsg, "host_id"), nil
  329. }
  330. host.hostId = hostId.String()
  331. case "release_version":
  332. version, ok := value.(string)
  333. if !ok {
  334. return fmt.Errorf(assertErrorMsg, "release_version"), nil
  335. }
  336. host.version.Set(version)
  337. case "peer":
  338. ip, ok := value.(string)
  339. if !ok {
  340. return fmt.Errorf(assertErrorMsg, "peer"), nil
  341. }
  342. host.peer = net.ParseIP(ip)
  343. case "cluster_name":
  344. host.clusterName, ok = value.(string)
  345. if !ok {
  346. return fmt.Errorf(assertErrorMsg, "cluster_name"), nil
  347. }
  348. case "partitioner":
  349. host.partitioner, ok = value.(string)
  350. if !ok {
  351. return fmt.Errorf(assertErrorMsg, "partitioner"), nil
  352. }
  353. case "broadcast_address":
  354. ip, ok := value.(string)
  355. if !ok {
  356. return fmt.Errorf(assertErrorMsg, "broadcast_address"), nil
  357. }
  358. host.broadcastAddress = net.ParseIP(ip)
  359. case "preferred_ip":
  360. ip, ok := value.(string)
  361. if !ok {
  362. return fmt.Errorf(assertErrorMsg, "preferred_ip"), nil
  363. }
  364. host.preferredIP = net.ParseIP(ip)
  365. case "rpc_address":
  366. ip, ok := value.(string)
  367. if !ok {
  368. return fmt.Errorf(assertErrorMsg, "rpc_address"), nil
  369. }
  370. host.rpcAddress = net.ParseIP(ip)
  371. case "listen_address":
  372. ip, ok := value.(string)
  373. if !ok {
  374. return fmt.Errorf(assertErrorMsg, "listen_address"), nil
  375. }
  376. host.listenAddress = net.ParseIP(ip)
  377. case "workload":
  378. host.workload, ok = value.(string)
  379. if !ok {
  380. return fmt.Errorf(assertErrorMsg, "workload"), nil
  381. }
  382. case "graph":
  383. host.graph, ok = value.(bool)
  384. if !ok {
  385. return fmt.Errorf(assertErrorMsg, "graph"), nil
  386. }
  387. case "tokens":
  388. host.tokens, ok = value.([]string)
  389. if !ok {
  390. return fmt.Errorf(assertErrorMsg, "tokens"), nil
  391. }
  392. case "dse_version":
  393. host.dseVersion, ok = value.(string)
  394. if !ok {
  395. return fmt.Errorf(assertErrorMsg, "dse_version"), nil
  396. }
  397. }
  398. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  399. // Not sure what the port field will be called until the JIRA issue is complete
  400. }
  401. // Default to our connected port if the cluster doesn't have port information
  402. if host.port == 0 {
  403. host.port = r.session.cfg.Port
  404. }
  405. return nil, &host
  406. }
  407. // Ask the control node for it's local host information
  408. func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) {
  409. row := make(map[string]interface{})
  410. // ask the connected node for local host info
  411. it := r.session.control.query("SELECT * FROM system.local WHERE key='local'")
  412. if it == nil {
  413. return nil, errors.New("Attempted to query 'system.local' on a closed control connection")
  414. }
  415. // expect only 1 row
  416. it.MapScan(row)
  417. if err := it.Close(); err != nil {
  418. return nil, err
  419. }
  420. // extract all available info about the host
  421. err, host := r.hostInfoFromMap(row)
  422. if err != nil {
  423. return nil, err
  424. }
  425. return host, err
  426. }
  427. // Given an ip address and port, return a peer that matched the ip address
  428. func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) {
  429. row := make(map[string]interface{})
  430. it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip)
  431. if it == nil {
  432. return nil, errors.New("Attempted to query 'system.peers' on a closed control connection")
  433. }
  434. // expect only 1 row
  435. it.MapScan(row)
  436. if err := it.Close(); err != nil {
  437. return nil, err
  438. }
  439. // extract all available info about the host
  440. err, host := r.hostInfoFromMap(row)
  441. if err != nil {
  442. return nil, err
  443. }
  444. return host, err
  445. }
  446. // Ask the control node for host info on all it's known peers
  447. func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) {
  448. var hosts []*HostInfo
  449. // Ask the node for a list of it's peers
  450. it := r.session.control.query("SELECT * FROM system.peers")
  451. if it == nil {
  452. return nil, errors.New("Attempted to query 'system.peers' on a closed connection")
  453. }
  454. for {
  455. row := make(map[string]interface{})
  456. if !it.MapScan(row) {
  457. break
  458. }
  459. // extract all available info about the peer
  460. err, host := r.hostInfoFromMap(row)
  461. if err != nil {
  462. return nil, err
  463. }
  464. // If it's not a valid peer
  465. if !r.IsValidPeer(host) {
  466. Logger.Printf("Found invalid peer '%+v' "+
  467. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  468. continue
  469. }
  470. hosts = append(hosts, host)
  471. }
  472. if it.err != nil {
  473. return nil, errors.Wrap(it.err, "GetClusterPeerInfo()")
  474. }
  475. return hosts, nil
  476. }
  477. // Return true if the host is a valid peer
  478. func (r *ringDescriber) IsValidPeer(host *HostInfo) bool {
  479. return !(len(host.RPCAddress()) == 0 ||
  480. host.hostId == "" ||
  481. host.dataCenter == "" ||
  482. host.rack == "" ||
  483. len(host.tokens) == 0)
  484. }
  485. // Return a list of hosts the cluster knows about
  486. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  487. r.mu.Lock()
  488. defer r.mu.Unlock()
  489. // Update the localHost info with data from the connected host
  490. localHost, err := r.GetLocalHostInfo()
  491. if err != nil {
  492. return r.prevHosts, r.prevPartitioner, err
  493. }
  494. // Update our list of hosts by querying the cluster
  495. hosts, err := r.GetClusterPeerInfo()
  496. if err != nil {
  497. return r.prevHosts, r.prevPartitioner, err
  498. }
  499. hosts = append(hosts, localHost)
  500. // Filter the hosts if filter is provided
  501. var filteredHosts []*HostInfo
  502. if r.session.cfg.HostFilter != nil {
  503. for _, host := range hosts {
  504. if r.session.cfg.HostFilter.Accept(host) {
  505. filteredHosts = append(filteredHosts, host)
  506. }
  507. }
  508. } else {
  509. filteredHosts = hosts
  510. }
  511. r.prevHosts = filteredHosts
  512. r.prevPartitioner = localHost.partitioner
  513. r.localHost = localHost
  514. return filteredHosts, localHost.partitioner, nil
  515. }
  516. // Given an ip/port return HostInfo for the specified ip/port
  517. func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) {
  518. var host *HostInfo
  519. var err error
  520. // TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup?
  521. // TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true?
  522. // Ignore the port and connect address and use the address/port we already have
  523. if r.session.control == nil || r.session.cfg.IgnorePeerAddr {
  524. return &HostInfo{connectAddress: ip, port: port}, nil
  525. }
  526. // Attempt to get the host info for our control connection
  527. controlHost := r.session.control.GetHostInfo()
  528. if controlHost == nil {
  529. return nil, errors.New("invalid control connection")
  530. }
  531. // If we are asking about the same node our control connection has a connection too
  532. if controlHost.ConnectAddress().Equal(ip) {
  533. host, err = r.GetLocalHostInfo()
  534. } else {
  535. host, err = r.GetPeerHostInfo(ip, port)
  536. }
  537. // No host was found matching this ip/port
  538. if err != nil {
  539. return nil, err
  540. }
  541. // Apply host filter to the result
  542. if r.session.cfg.HostFilter != nil && r.session.cfg.HostFilter.Accept(host) != true {
  543. return nil, err
  544. }
  545. return host, err
  546. }
  547. func (r *ringDescriber) refreshRing() error {
  548. // if we have 0 hosts this will return the previous list of hosts to
  549. // attempt to reconnect to the cluster otherwise we would never find
  550. // downed hosts again, could possibly have an optimisation to only
  551. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  552. hosts, partitioner, err := r.GetHosts()
  553. if err != nil {
  554. return err
  555. }
  556. // TODO: move this to session
  557. // TODO: handle removing hosts here
  558. for _, h := range hosts {
  559. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  560. r.session.pool.addHost(h)
  561. } else {
  562. host.update(h)
  563. }
  564. }
  565. r.session.metadata.setPartitioner(partitioner)
  566. r.session.policy.SetPartitioner(partitioner)
  567. return nil
  568. }