events.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package gocql
  2. import (
  3. "log"
  4. "net"
  5. "sync"
  6. "time"
  7. )
  8. type eventDeouncer struct {
  9. mu sync.Mutex
  10. events []frame // TODO: possibly use a chan here
  11. callback func([]frame)
  12. quit chan struct{}
  13. }
  14. func (e *eventDeouncer) flusher() {
  15. ticker := time.NewTicker(1 * time.Second)
  16. defer ticker.Stop()
  17. for {
  18. select {
  19. case <-ticker.C:
  20. e.mu.Lock()
  21. e.flush()
  22. e.mu.Unlock()
  23. case <-e.quit:
  24. return
  25. }
  26. }
  27. }
  28. // flush must be called with mu locked
  29. func (e *eventDeouncer) flush() {
  30. if len(e.events) == 0 {
  31. return
  32. }
  33. // TODO: can this be done in a nicer way?
  34. events := make([]frame, len(e.events))
  35. copy(events, e.events)
  36. e.events = e.events[:0]
  37. go e.callback(events)
  38. }
  39. func (e *eventDeouncer) handleEvent(frame frame) {
  40. e.mu.Lock()
  41. const maxEvents = 100
  42. e.events = append(e.events, frame)
  43. // TODO: probably need a warning to track if this threshold is too low
  44. if len(e.events) > maxEvents {
  45. e.flush()
  46. }
  47. e.mu.Unlock()
  48. }
  49. func (s *Session) handleEvent(framer *framer) {
  50. // TODO(zariel): need to debounce events frames, and possible also events
  51. defer framerPool.Put(framer)
  52. frame, err := framer.parseFrame()
  53. if err != nil {
  54. // TODO: logger
  55. log.Printf("gocql: unable to parse event frame: %v\n", err)
  56. return
  57. }
  58. // TODO: handle medatadata events
  59. switch f := frame.(type) {
  60. case *schemaChangeKeyspace:
  61. case *schemaChangeFunction:
  62. case *schemaChangeTable:
  63. case *topologyChangeEventFrame:
  64. switch f.change {
  65. case "NEW_NODE":
  66. s.handleNewNode(f.host, f.port)
  67. case "REMOVED_NODE":
  68. s.handleRemovedNode(f.host, f.port)
  69. case "MOVED_NODE":
  70. // java-driver handles this, not mentioned in the spec
  71. // TODO(zariel): refresh token map
  72. }
  73. case *statusChangeEventFrame:
  74. // TODO(zariel): is it worth having 2 methods for these?
  75. switch f.change {
  76. case "UP":
  77. s.handleNodeUp(f.host, f.port)
  78. case "DOWN":
  79. s.handleNodeDown(f.host, f.port)
  80. }
  81. default:
  82. log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
  83. }
  84. }
  85. func (s *Session) handleNewNode(host net.IP, port int) {
  86. // TODO(zariel): need to be able to filter discovered nodes
  87. if s.control == nil {
  88. return
  89. }
  90. hostInfo, err := s.control.fetchHostInfo(host, port)
  91. if err != nil {
  92. log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err)
  93. return
  94. }
  95. // should this handle token moving?
  96. if !s.ring.addHostIfMissing(hostInfo) {
  97. s.handleNodeUp(host, port)
  98. return
  99. }
  100. s.pool.addHost(hostInfo)
  101. }
  102. func (s *Session) handleRemovedNode(ip net.IP, port int) {
  103. // we remove all nodes but only add ones which pass the filter
  104. addr := ip.String()
  105. s.pool.removeHost(addr)
  106. s.ring.removeHost(addr)
  107. }
  108. func (s *Session) handleNodeUp(ip net.IP, port int) {
  109. addr := ip.String()
  110. host := s.ring.getHost(addr)
  111. if host != nil {
  112. host.setState(NodeUp)
  113. s.pool.hostUp(host)
  114. return
  115. }
  116. // TODO: this could infinite loop
  117. s.handleNewNode(ip, port)
  118. }
  119. func (s *Session) handleNodeDown(ip net.IP, port int) {
  120. addr := ip.String()
  121. host := s.ring.getHost(addr)
  122. if host != nil {
  123. host.setState(NodeDown)
  124. }
  125. s.pool.hostDown(addr)
  126. }