浏览代码

add event debouncing

Chris Bannister 10 年之前
父节点
当前提交
29b95e8481
共有 3 个文件被更改,包括 108 次插入41 次删除
  1. 94 34
      events.go
  2. 9 7
      events_ccm_test.go
  3. 5 0
      session.go

+ 94 - 34
events.go

@@ -8,20 +8,37 @@ import (
 )
 )
 
 
 type eventDeouncer struct {
 type eventDeouncer struct {
+	name   string
+	timer  *time.Timer
 	mu     sync.Mutex
 	mu     sync.Mutex
-	events []frame // TODO: possibly use a chan here
+	events []frame
 
 
 	callback func([]frame)
 	callback func([]frame)
 	quit     chan struct{}
 	quit     chan struct{}
 }
 }
 
 
-func (e *eventDeouncer) flusher() {
-	ticker := time.NewTicker(1 * time.Second)
-	defer ticker.Stop()
+func newEventDeouncer(name string, eventHandler func([]frame)) *eventDeouncer {
+	e := &eventDeouncer{
+		name:     name,
+		quit:     make(chan struct{}),
+		timer:    time.NewTimer(eventDebounceTime),
+		callback: eventHandler,
+	}
+	e.timer.Stop()
+	go e.flusher()
+
+	return e
+}
+
+func (e *eventDeouncer) stop() {
+	e.quit <- struct{}{} // sync with flusher
+	close(e.quit)
+}
 
 
+func (e *eventDeouncer) flusher() {
 	for {
 	for {
 		select {
 		select {
-		case <-ticker.C:
+		case <-e.timer.C:
 			e.mu.Lock()
 			e.mu.Lock()
 			e.flush()
 			e.flush()
 			e.mu.Unlock()
 			e.mu.Unlock()
@@ -31,32 +48,89 @@ func (e *eventDeouncer) flusher() {
 	}
 	}
 }
 }
 
 
+const (
+	eventBufferSize   = 1000
+	eventDebounceTime = 1 * time.Second
+)
+
 // flush must be called with mu locked
 // flush must be called with mu locked
 func (e *eventDeouncer) flush() {
 func (e *eventDeouncer) flush() {
+	log.Printf("%s: flushing %d events\n", e.name, len(e.events))
 	if len(e.events) == 0 {
 	if len(e.events) == 0 {
 		return
 		return
 	}
 	}
 
 
-	// TODO: can this be done in a nicer way?
-	events := make([]frame, len(e.events))
-	copy(events, e.events)
-	e.events = e.events[:0]
-
-	go e.callback(events)
+	// if the flush interval is faster than the callback then we will end up calling
+	// the callback multiple times, probably a bad idea. In this case we could drop
+	// frames?
+	go e.callback(e.events)
+	e.events = make([]frame, 0, eventBufferSize)
 }
 }
 
 
-func (e *eventDeouncer) handleEvent(frame frame) {
+func (e *eventDeouncer) debounce(frame frame) {
 	e.mu.Lock()
 	e.mu.Lock()
+	e.timer.Reset(eventDebounceTime)
 
 
-	const maxEvents = 100
-	e.events = append(e.events, frame)
 	// TODO: probably need a warning to track if this threshold is too low
 	// TODO: probably need a warning to track if this threshold is too low
-	if len(e.events) > maxEvents {
-		e.flush()
+	if len(e.events) < eventBufferSize {
+		log.Printf("%s: buffering event: %v", e.name, frame)
+		e.events = append(e.events, frame)
+	} else {
+		log.Printf("%s: buffer full, dropping event frame: %s", e.name, frame)
 	}
 	}
+
 	e.mu.Unlock()
 	e.mu.Unlock()
 }
 }
 
 
+func (s *Session) handleNodeEvent(frames []frame) {
+	type nodeEvent struct {
+		change string
+		host   net.IP
+		port   int
+	}
+
+	events := make(map[string]*nodeEvent)
+
+	for _, frame := range frames {
+		// TODO: can we be sure the order of events in the buffer is correct?
+		switch f := frame.(type) {
+		case *topologyChangeEventFrame:
+			event, ok := events[f.host.String()]
+			if !ok {
+				event = &nodeEvent{change: f.change, host: f.host}
+				events[f.host.String()] = event
+			}
+			event.change = f.change
+
+		case *statusChangeEventFrame:
+			event, ok := events[f.host.String()]
+			if !ok {
+				event = &nodeEvent{change: f.change, host: f.host}
+				events[f.host.String()] = event
+			}
+			event.change = f.change
+		}
+	}
+
+	for addr, f := range events {
+		log.Printf("NodeEvent: handling debounced event: %q => %s", addr, f.change)
+
+		switch f.change {
+		case "NEW_NODE":
+			s.handleNewNode(f.host, f.port)
+		case "REMOVED_NODE":
+			s.handleRemovedNode(f.host, f.port)
+		case "MOVED_NODE":
+		// java-driver handles this, not mentioned in the spec
+		// TODO(zariel): refresh token map
+		case "UP":
+			s.handleNodeUp(f.host, f.port)
+		case "DOWN":
+			s.handleNodeDown(f.host, f.port)
+		}
+	}
+}
+
 func (s *Session) handleEvent(framer *framer) {
 func (s *Session) handleEvent(framer *framer) {
 	// TODO(zariel): need to debounce events frames, and possible also events
 	// TODO(zariel): need to debounce events frames, and possible also events
 	defer framerPool.Put(framer)
 	defer framerPool.Put(framer)
@@ -67,33 +141,19 @@ func (s *Session) handleEvent(framer *framer) {
 		log.Printf("gocql: unable to parse event frame: %v\n", err)
 		log.Printf("gocql: unable to parse event frame: %v\n", err)
 		return
 		return
 	}
 	}
+	log.Println(frame)
 
 
 	// TODO: handle medatadata events
 	// TODO: handle medatadata events
 	switch f := frame.(type) {
 	switch f := frame.(type) {
 	case *schemaChangeKeyspace:
 	case *schemaChangeKeyspace:
 	case *schemaChangeFunction:
 	case *schemaChangeFunction:
 	case *schemaChangeTable:
 	case *schemaChangeTable:
-	case *topologyChangeEventFrame:
-		switch f.change {
-		case "NEW_NODE":
-			s.handleNewNode(f.host, f.port)
-		case "REMOVED_NODE":
-			s.handleRemovedNode(f.host, f.port)
-		case "MOVED_NODE":
-			// java-driver handles this, not mentioned in the spec
-			// TODO(zariel): refresh token map
-		}
-	case *statusChangeEventFrame:
-		// TODO(zariel): is it worth having 2 methods for these?
-		switch f.change {
-		case "UP":
-			s.handleNodeUp(f.host, f.port)
-		case "DOWN":
-			s.handleNodeDown(f.host, f.port)
-		}
+	case *topologyChangeEventFrame, *statusChangeEventFrame:
+		s.nodeEvents.debounce(frame)
 	default:
 	default:
 		log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
 		log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
 	}
 	}
+
 }
 }
 
 
 func (s *Session) handleNewNode(host net.IP, port int) {
 func (s *Session) handleNewNode(host net.IP, port int) {

+ 9 - 7
events_test.go → events_ccm_test.go

@@ -86,7 +86,9 @@ func TestEventNodeUp(t *testing.T) {
 	session := createSession(t)
 	session := createSession(t)
 	defer session.Close()
 	defer session.Close()
 
 
-	if err := ccm.NodeDown("node1"); err != nil {
+	const targetNode = "node2"
+
+	if err := ccm.NodeDown(targetNode); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
@@ -96,15 +98,15 @@ func TestEventNodeUp(t *testing.T) {
 
 
 	poolHosts := session.pool.hostConnPools
 	poolHosts := session.pool.hostConnPools
 	log.Printf("poolhosts=%+v\n", poolHosts)
 	log.Printf("poolhosts=%+v\n", poolHosts)
-	node1 := status["node1"]
+	node := status[targetNode]
 
 
-	if _, ok := poolHosts[node1.Addr]; ok {
+	if _, ok := poolHosts[node.Addr]; ok {
 		session.pool.mu.RUnlock()
 		session.pool.mu.RUnlock()
-		t.Fatal("node1 not removed after remove event")
+		t.Fatal("node not removed after remove event")
 	}
 	}
 	session.pool.mu.RUnlock()
 	session.pool.mu.RUnlock()
 
 
-	if err := ccm.NodeUp("node1"); err != nil {
+	if err := ccm.NodeUp(targetNode); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
@@ -112,9 +114,9 @@ func TestEventNodeUp(t *testing.T) {
 
 
 	session.pool.mu.RLock()
 	session.pool.mu.RLock()
 	log.Printf("poolhosts=%+v\n", poolHosts)
 	log.Printf("poolhosts=%+v\n", poolHosts)
-	if _, ok := poolHosts[node1.Addr]; !ok {
+	if _, ok := poolHosts[node.Addr]; !ok {
 		session.pool.mu.RUnlock()
 		session.pool.mu.RUnlock()
-		t.Fatal("node1 not added after node added event")
+		t.Fatal("node not added after node added event")
 	}
 	}
 	session.pool.mu.RUnlock()
 	session.pool.mu.RUnlock()
 }
 }

+ 5 - 0
session.go

@@ -43,6 +43,9 @@ type Session struct {
 
 
 	control *controlConn
 	control *controlConn
 
 
+	// event handlers
+	nodeEvents *eventDeouncer
+
 	// ring metadata
 	// ring metadata
 	hosts []HostInfo
 	hosts []HostInfo
 
 
@@ -71,6 +74,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		pageSize: cfg.PageSize,
 		pageSize: cfg.PageSize,
 	}
 	}
 
 
+	s.nodeEvents = newEventDeouncer("NodeEvents", s.handleNodeEvent)
+
 	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
 	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
 
 
 	// I think it might be a good idea to simplify this and make it always discover
 	// I think it might be a good idea to simplify this and make it always discover