events.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package gocql
  2. import (
  3. "log"
  4. "net"
  5. )
  6. func (s *Session) handleEvent(framer *framer) {
  7. defer framerPool.Put(framer)
  8. frame, err := framer.parseFrame()
  9. if err != nil {
  10. // TODO: logger
  11. log.Printf("gocql: unable to parse event frame: %v\n", err)
  12. return
  13. }
  14. // TODO: handle medatadata events
  15. switch f := frame.(type) {
  16. case *schemaChangeKeyspace:
  17. case *schemaChangeFunction:
  18. case *schemaChangeTable:
  19. case *topologyChangeEventFrame:
  20. switch f.change {
  21. case "NEW_NODE":
  22. s.handleNewNode(f.host, f.port)
  23. case "REMOVED_NODE":
  24. s.handleRemovedNode(f.host, f.port)
  25. case "MOVED_NODE":
  26. // java-driver handles this, not mentioned in the spec
  27. // TODO(zariel): refresh token map
  28. }
  29. case *statusChangeEventFrame:
  30. // TODO(zariel): is it worth having 2 methods for these?
  31. switch f.change {
  32. case "UP":
  33. s.handleNodeUp(f.host, f.port)
  34. case "DOWN":
  35. }
  36. default:
  37. log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
  38. }
  39. }
  40. func (s *Session) handleNewNode(host net.IP, port int) {
  41. if !s.cfg.DiscoverHosts || s.control == nil {
  42. return
  43. }
  44. if s.control.addr() == host.String() {
  45. go s.control.reconnect(false)
  46. }
  47. hostInfo, err := s.control.fetchHostInfo(host, port)
  48. if err != nil {
  49. log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err)
  50. return
  51. }
  52. s.pool.addHost(hostInfo)
  53. }
  54. func (s *Session) handleRemovedNode(host net.IP, port int) {
  55. if !s.cfg.DiscoverHosts {
  56. return
  57. }
  58. s.pool.removeHost(host.String())
  59. }
  60. func (s *Session) handleNodeUp(host net.IP, port int) {
  61. // even if were not disconvering new nodes we should still handle nodes going
  62. // up.
  63. s.pool.hostUp(host.String())
  64. }
  65. func (s *Session) handleNodeDown(host net.IP, port int) {
  66. s.pool.hostDown(host.String())
  67. }