|
|
@@ -3,8 +3,60 @@ package gocql
|
|
|
import (
|
|
|
"log"
|
|
|
"net"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
+type eventDeouncer struct {
|
|
|
+ mu sync.Mutex
|
|
|
+ events []frame // TODO: possibly use a chan here
|
|
|
+
|
|
|
+ callback func([]frame)
|
|
|
+ quit chan struct{}
|
|
|
+}
|
|
|
+
|
|
|
+func (e *eventDeouncer) flusher() {
|
|
|
+ ticker := time.NewTicker(1 * time.Second)
|
|
|
+ defer ticker.Stop()
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker.C:
|
|
|
+ e.mu.Lock()
|
|
|
+ e.flush()
|
|
|
+ e.mu.Unlock()
|
|
|
+ case <-e.quit:
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// flush must be called with mu locked
|
|
|
+func (e *eventDeouncer) flush() {
|
|
|
+ if len(e.events) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: can this be done in a nicer way?
|
|
|
+ events := make([]frame, len(e.events))
|
|
|
+ copy(events, e.events)
|
|
|
+ e.events = e.events[:0]
|
|
|
+
|
|
|
+ go e.callback(events)
|
|
|
+}
|
|
|
+
|
|
|
+func (e *eventDeouncer) handleEvent(frame frame) {
|
|
|
+ e.mu.Lock()
|
|
|
+
|
|
|
+ const maxEvents = 100
|
|
|
+ e.events = append(e.events, frame)
|
|
|
+ // TODO: probably need a warning to track if this threshold is too low
|
|
|
+ if len(e.events) > maxEvents {
|
|
|
+ e.flush()
|
|
|
+ }
|
|
|
+ e.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
func (s *Session) handleEvent(framer *framer) {
|
|
|
// TODO(zariel): need to debounce events frames, and possible also events
|
|
|
defer framerPool.Put(framer)
|