host_source.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package gocql
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type nodeState int32
  12. func (n nodeState) String() string {
  13. if n == NodeUp {
  14. return "UP"
  15. } else if n == NodeDown {
  16. return "DOWN"
  17. }
  18. return fmt.Sprintf("UNKNOWN_%d", n)
  19. }
  20. const (
  21. NodeUp nodeState = iota
  22. NodeDown
  23. )
  24. type cassVersion struct {
  25. Major, Minor, Patch int
  26. }
  27. func (c *cassVersion) Set(v string) error {
  28. if v == "" {
  29. return nil
  30. }
  31. return c.UnmarshalCQL(nil, []byte(v))
  32. }
  33. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  34. return c.unmarshal(data)
  35. }
  36. func (c *cassVersion) unmarshal(data []byte) error {
  37. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  38. version = strings.TrimPrefix(version, "v")
  39. v := strings.Split(version, ".")
  40. if len(v) < 2 {
  41. return fmt.Errorf("invalid version string: %s", data)
  42. }
  43. var err error
  44. c.Major, err = strconv.Atoi(v[0])
  45. if err != nil {
  46. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  47. }
  48. c.Minor, err = strconv.Atoi(v[1])
  49. if err != nil {
  50. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  51. }
  52. if len(v) > 2 {
  53. c.Patch, err = strconv.Atoi(v[2])
  54. if err != nil {
  55. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (c cassVersion) Before(major, minor, patch int) bool {
  61. if c.Major > major {
  62. return true
  63. } else if c.Minor > minor {
  64. return true
  65. } else if c.Patch > patch {
  66. return true
  67. }
  68. return false
  69. }
  70. func (c cassVersion) String() string {
  71. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  72. }
  73. func (c cassVersion) nodeUpDelay() time.Duration {
  74. if c.Major >= 2 && c.Minor >= 2 {
  75. // CASSANDRA-8236
  76. return 0
  77. }
  78. return 10 * time.Second
  79. }
  80. type HostInfo struct {
  81. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  82. // that we are thread safe use a mutex to access all fields.
  83. mu sync.RWMutex
  84. peer net.IP
  85. broadcastAddress net.IP
  86. listenAddress net.IP
  87. rpcAddress net.IP
  88. preferredIP net.IP
  89. connectAddress net.IP
  90. port int
  91. dataCenter string
  92. rack string
  93. hostId string
  94. workload string
  95. graph bool
  96. dseVersion string
  97. partitioner string
  98. clusterName string
  99. version cassVersion
  100. state nodeState
  101. tokens []string
  102. }
  103. func (h *HostInfo) Equal(host *HostInfo) bool {
  104. h.mu.RLock()
  105. defer h.mu.RUnlock()
  106. host.mu.RLock()
  107. defer host.mu.RUnlock()
  108. return h.ConnectAddress().Equal(host.ConnectAddress())
  109. }
  110. func (h *HostInfo) Peer() net.IP {
  111. h.mu.RLock()
  112. defer h.mu.RUnlock()
  113. return h.peer
  114. }
  115. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  116. h.mu.Lock()
  117. defer h.mu.Unlock()
  118. h.peer = peer
  119. return h
  120. }
  121. // Returns the address that should be used to connect to the host.
  122. // If you wish to override this, use an AddressTranslator or
  123. // use a HostFilter to SetConnectAddress()
  124. func (h *HostInfo) ConnectAddress() net.IP {
  125. h.mu.RLock()
  126. defer h.mu.RUnlock()
  127. if h.connectAddress == nil {
  128. // Use 'rpc_address' if provided and it's not 0.0.0.0
  129. if h.rpcAddress != nil && !h.rpcAddress.IsUnspecified() {
  130. return h.rpcAddress
  131. } else if h.broadcastAddress != nil && !h.broadcastAddress.IsUnspecified() {
  132. return h.broadcastAddress
  133. } else if h.peer != nil {
  134. // Peer should always be set if this from 'system.peer'
  135. return h.peer
  136. }
  137. }
  138. return h.connectAddress
  139. }
  140. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  141. h.mu.Lock()
  142. defer h.mu.Unlock()
  143. h.connectAddress = address
  144. return h
  145. }
  146. func (h *HostInfo) BroadcastAddress() net.IP {
  147. h.mu.RLock()
  148. defer h.mu.RUnlock()
  149. return h.broadcastAddress
  150. }
  151. func (h *HostInfo) ListenAddress() net.IP {
  152. h.mu.RLock()
  153. defer h.mu.RUnlock()
  154. return h.listenAddress
  155. }
  156. func (h *HostInfo) RPCAddress() net.IP {
  157. h.mu.RLock()
  158. defer h.mu.RUnlock()
  159. return h.rpcAddress
  160. }
  161. func (h *HostInfo) PreferredIP() net.IP {
  162. h.mu.RLock()
  163. defer h.mu.RUnlock()
  164. return h.preferredIP
  165. }
  166. func (h *HostInfo) DataCenter() string {
  167. h.mu.RLock()
  168. defer h.mu.RUnlock()
  169. return h.dataCenter
  170. }
  171. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  172. h.mu.Lock()
  173. defer h.mu.Unlock()
  174. h.dataCenter = dataCenter
  175. return h
  176. }
  177. func (h *HostInfo) Rack() string {
  178. h.mu.RLock()
  179. defer h.mu.RUnlock()
  180. return h.rack
  181. }
  182. func (h *HostInfo) setRack(rack string) *HostInfo {
  183. h.mu.Lock()
  184. defer h.mu.Unlock()
  185. h.rack = rack
  186. return h
  187. }
  188. func (h *HostInfo) HostID() string {
  189. h.mu.RLock()
  190. defer h.mu.RUnlock()
  191. return h.hostId
  192. }
  193. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  194. h.mu.Lock()
  195. defer h.mu.Unlock()
  196. h.hostId = hostID
  197. return h
  198. }
  199. func (h *HostInfo) WorkLoad() string {
  200. h.mu.RLock()
  201. defer h.mu.RUnlock()
  202. return h.workload
  203. }
  204. func (h *HostInfo) Graph() bool {
  205. h.mu.RLock()
  206. defer h.mu.RUnlock()
  207. return h.graph
  208. }
  209. func (h *HostInfo) DSEVersion() string {
  210. h.mu.RLock()
  211. defer h.mu.RUnlock()
  212. return h.dseVersion
  213. }
  214. func (h *HostInfo) Partitioner() string {
  215. h.mu.RLock()
  216. defer h.mu.RUnlock()
  217. return h.partitioner
  218. }
  219. func (h *HostInfo) ClusterName() string {
  220. h.mu.RLock()
  221. defer h.mu.RUnlock()
  222. return h.clusterName
  223. }
  224. func (h *HostInfo) Version() cassVersion {
  225. h.mu.RLock()
  226. defer h.mu.RUnlock()
  227. return h.version
  228. }
  229. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  230. h.mu.Lock()
  231. defer h.mu.Unlock()
  232. h.version = cassVersion{major, minor, patch}
  233. return h
  234. }
  235. func (h *HostInfo) State() nodeState {
  236. h.mu.RLock()
  237. defer h.mu.RUnlock()
  238. return h.state
  239. }
  240. func (h *HostInfo) setState(state nodeState) *HostInfo {
  241. h.mu.Lock()
  242. defer h.mu.Unlock()
  243. h.state = state
  244. return h
  245. }
  246. func (h *HostInfo) Tokens() []string {
  247. h.mu.RLock()
  248. defer h.mu.RUnlock()
  249. return h.tokens
  250. }
  251. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  252. h.mu.Lock()
  253. defer h.mu.Unlock()
  254. h.tokens = tokens
  255. return h
  256. }
  257. func (h *HostInfo) Port() int {
  258. h.mu.RLock()
  259. defer h.mu.RUnlock()
  260. return h.port
  261. }
  262. func (h *HostInfo) setPort(port int) *HostInfo {
  263. h.mu.Lock()
  264. defer h.mu.Unlock()
  265. h.port = port
  266. return h
  267. }
  268. func (h *HostInfo) update(from *HostInfo) {
  269. h.mu.Lock()
  270. defer h.mu.Unlock()
  271. h.tokens = from.tokens
  272. h.version = from.version
  273. h.hostId = from.hostId
  274. h.dataCenter = from.dataCenter
  275. }
  276. func (h *HostInfo) IsUp() bool {
  277. return h != nil && h.State() == NodeUp
  278. }
  279. func (h *HostInfo) String() string {
  280. h.mu.RLock()
  281. defer h.mu.RUnlock()
  282. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  283. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  284. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress,
  285. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  286. }
  287. // Polls system.peers at a specific interval to find new hosts
  288. type ringDescriber struct {
  289. session *Session
  290. mu sync.Mutex
  291. prevHosts []*HostInfo
  292. localHost *HostInfo
  293. prevPartitioner string
  294. }
  295. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  296. func checkSystemSchema(control *controlConn) (bool, error) {
  297. iter := control.query("SELECT * FROM system_schema.keyspaces")
  298. if err := iter.err; err != nil {
  299. if errf, ok := err.(*errorFrame); ok {
  300. if errf.code == errSyntax {
  301. return false, nil
  302. }
  303. }
  304. return false, err
  305. }
  306. return true, nil
  307. }
  308. // Given a map that represents a row from either system.local or system.peers
  309. // return as much information as we can in *HostInfo
  310. func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (*HostInfo, error) {
  311. const assertErrorMsg = "Assertion failed for %s"
  312. var ok bool
  313. // Default to our connected port if the cluster doesn't have port information
  314. host := HostInfo{
  315. port: r.session.cfg.Port,
  316. }
  317. for key, value := range row {
  318. switch key {
  319. case "data_center":
  320. host.dataCenter, ok = value.(string)
  321. if !ok {
  322. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  323. }
  324. case "rack":
  325. host.rack, ok = value.(string)
  326. if !ok {
  327. return nil, fmt.Errorf(assertErrorMsg, "rack")
  328. }
  329. case "host_id":
  330. hostId, ok := value.(UUID)
  331. if !ok {
  332. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  333. }
  334. host.hostId = hostId.String()
  335. case "release_version":
  336. version, ok := value.(string)
  337. if !ok {
  338. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  339. }
  340. host.version.Set(version)
  341. case "peer":
  342. ip, ok := value.(string)
  343. if !ok {
  344. return nil, fmt.Errorf(assertErrorMsg, "peer")
  345. }
  346. host.peer = net.ParseIP(ip)
  347. case "cluster_name":
  348. host.clusterName, ok = value.(string)
  349. if !ok {
  350. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  351. }
  352. case "partitioner":
  353. host.partitioner, ok = value.(string)
  354. if !ok {
  355. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  356. }
  357. case "broadcast_address":
  358. ip, ok := value.(string)
  359. if !ok {
  360. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  361. }
  362. host.broadcastAddress = net.ParseIP(ip)
  363. case "preferred_ip":
  364. ip, ok := value.(string)
  365. if !ok {
  366. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  367. }
  368. host.preferredIP = net.ParseIP(ip)
  369. case "rpc_address":
  370. ip, ok := value.(string)
  371. if !ok {
  372. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  373. }
  374. host.rpcAddress = net.ParseIP(ip)
  375. case "listen_address":
  376. ip, ok := value.(string)
  377. if !ok {
  378. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  379. }
  380. host.listenAddress = net.ParseIP(ip)
  381. case "workload":
  382. host.workload, ok = value.(string)
  383. if !ok {
  384. return nil, fmt.Errorf(assertErrorMsg, "workload")
  385. }
  386. case "graph":
  387. host.graph, ok = value.(bool)
  388. if !ok {
  389. return nil, fmt.Errorf(assertErrorMsg, "graph")
  390. }
  391. case "tokens":
  392. host.tokens, ok = value.([]string)
  393. if !ok {
  394. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  395. }
  396. case "dse_version":
  397. host.dseVersion, ok = value.(string)
  398. if !ok {
  399. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  400. }
  401. }
  402. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  403. // Not sure what the port field will be called until the JIRA issue is complete
  404. }
  405. return &host, nil
  406. }
  407. // Ask the control node for it's local host information
  408. func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) {
  409. it := r.session.control.query("SELECT * FROM system.local WHERE key='local'")
  410. if it == nil {
  411. return nil, errors.New("Attempted to query 'system.local' on a closed control connection")
  412. }
  413. return r.extractHostInfo(it)
  414. }
  415. // Given an ip address and port, return a peer that matched the ip address
  416. func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) {
  417. it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip)
  418. if it == nil {
  419. return nil, errors.New("Attempted to query 'system.peers' on a closed control connection")
  420. }
  421. return r.extractHostInfo(it)
  422. }
  423. func (r *ringDescriber) extractHostInfo(it *Iter) (*HostInfo, error) {
  424. row := make(map[string]interface{})
  425. // expect only 1 row
  426. it.MapScan(row)
  427. if err := it.Close(); err != nil {
  428. return nil, err
  429. }
  430. // extract all available info about the host
  431. return r.hostInfoFromMap(row)
  432. }
  433. // Ask the control node for host info on all it's known peers
  434. func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) {
  435. var hosts []*HostInfo
  436. // Ask the node for a list of it's peers
  437. it := r.session.control.query("SELECT * FROM system.peers")
  438. if it == nil {
  439. return nil, errors.New("Attempted to query 'system.peers' on a closed connection")
  440. }
  441. for {
  442. row := make(map[string]interface{})
  443. if !it.MapScan(row) {
  444. break
  445. }
  446. // extract all available info about the peer
  447. host, err := r.hostInfoFromMap(row)
  448. if err != nil {
  449. return nil, err
  450. }
  451. // If it's not a valid peer
  452. if !r.IsValidPeer(host) {
  453. Logger.Printf("Found invalid peer '%+v' "+
  454. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  455. continue
  456. }
  457. hosts = append(hosts, host)
  458. }
  459. if it.err != nil {
  460. return nil, fmt.Errorf("while scanning 'system.peers' table: %s", it.err)
  461. }
  462. return hosts, nil
  463. }
  464. // Return true if the host is a valid peer
  465. func (r *ringDescriber) IsValidPeer(host *HostInfo) bool {
  466. return !(len(host.RPCAddress()) == 0 ||
  467. host.hostId == "" ||
  468. host.dataCenter == "" ||
  469. host.rack == "" ||
  470. len(host.tokens) == 0)
  471. }
  472. // Return a list of hosts the cluster knows about
  473. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  474. r.mu.Lock()
  475. defer r.mu.Unlock()
  476. // Update the localHost info with data from the connected host
  477. localHost, err := r.GetLocalHostInfo()
  478. if err != nil {
  479. return r.prevHosts, r.prevPartitioner, err
  480. }
  481. // Update our list of hosts by querying the cluster
  482. hosts, err := r.GetClusterPeerInfo()
  483. if err != nil {
  484. return r.prevHosts, r.prevPartitioner, err
  485. }
  486. hosts = append(hosts, localHost)
  487. // Filter the hosts if filter is provided
  488. filteredHosts := hosts
  489. if r.session.cfg.HostFilter != nil {
  490. filteredHosts = filteredHosts[:0]
  491. for _, host := range hosts {
  492. if r.session.cfg.HostFilter.Accept(host) {
  493. filteredHosts = append(filteredHosts, host)
  494. }
  495. }
  496. }
  497. r.prevHosts = filteredHosts
  498. r.prevPartitioner = localHost.partitioner
  499. r.localHost = localHost
  500. return filteredHosts, localHost.partitioner, nil
  501. }
  502. // Given an ip/port return HostInfo for the specified ip/port
  503. func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) {
  504. // TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup?
  505. // TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true?
  506. // Ignore the port and connect address and use the address/port we already have
  507. if r.session.control == nil || r.session.cfg.IgnorePeerAddr {
  508. return &HostInfo{connectAddress: ip, port: port}, nil
  509. }
  510. // Attempt to get the host info for our control connection
  511. controlHost := r.session.control.GetHostInfo()
  512. if controlHost == nil {
  513. return nil, errors.New("invalid control connection")
  514. }
  515. var (
  516. host *HostInfo
  517. err error
  518. )
  519. // If we are asking about the same node our control connection has a connection too
  520. if controlHost.ConnectAddress().Equal(ip) {
  521. host, err = r.GetLocalHostInfo()
  522. // Always respect the provided control node address and disregard the ip address
  523. // the cassandra node provides. We do this as we are already connected and have a
  524. // known valid ip address. This insulates gocql from client connection issues stemming
  525. // from node misconfiguration. For instance when a node is run from a container, by
  526. // default the node will report its ip address as 127.0.0.1 which is typically invalid.
  527. host.SetConnectAddress(ip)
  528. } else {
  529. host, err = r.GetPeerHostInfo(ip, port)
  530. }
  531. // No host was found matching this ip/port
  532. if err != nil {
  533. return nil, err
  534. }
  535. // Apply host filter to the result
  536. if r.session.cfg.HostFilter != nil && r.session.cfg.HostFilter.Accept(host) != true {
  537. return nil, err
  538. }
  539. return host, err
  540. }
  541. func (r *ringDescriber) refreshRing() error {
  542. // if we have 0 hosts this will return the previous list of hosts to
  543. // attempt to reconnect to the cluster otherwise we would never find
  544. // downed hosts again, could possibly have an optimisation to only
  545. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  546. hosts, partitioner, err := r.GetHosts()
  547. if err != nil {
  548. return err
  549. }
  550. // TODO: move this to session
  551. // TODO: handle removing hosts here
  552. for _, h := range hosts {
  553. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  554. r.session.pool.addHost(h)
  555. r.session.policy.AddHost(h)
  556. } else {
  557. host.update(h)
  558. }
  559. }
  560. r.session.metadata.setPartitioner(partitioner)
  561. r.session.policy.SetPartitioner(partitioner)
  562. return nil
  563. }