Browse Source

storage: watchable_store.go events slice

Gyu-Ho Lee 10 years ago
parent
commit
0b01acf131
1 changed files with 99 additions and 54 deletions
  1. 99 54
      storage/watchable_store.go

+ 99 - 54
storage/watchable_store.go

@@ -33,7 +33,7 @@ const (
 )
 )
 
 
 type watchable interface {
 type watchable interface {
-	watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
+	watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc)
 }
 }
 
 
 type watchableStore struct {
 type watchableStore struct {
@@ -75,10 +75,11 @@ func (s *watchableStore) Put(key, value []byte) (rev int64) {
 	if err != nil {
 	if err != nil {
 		log.Panicf("unexpected range error (%v)", err)
 		log.Panicf("unexpected range error (%v)", err)
 	}
 	}
-	s.handle(rev, storagepb.Event{
+	ev := storagepb.Event{
 		Type: storagepb.PUT,
 		Type: storagepb.PUT,
 		Kv:   &kvs[0],
 		Kv:   &kvs[0],
-	})
+	}
+	s.handle(rev, []storagepb.Event{ev})
 	return rev
 	return rev
 }
 }
 
 
@@ -92,14 +93,15 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 		log.Panicf("unexpected range error (%v)", err)
 		log.Panicf("unexpected range error (%v)", err)
 	}
 	}
 	n, rev = s.store.DeleteRange(key, end)
 	n, rev = s.store.DeleteRange(key, end)
-	for _, kv := range kvs {
-		s.handle(rev, storagepb.Event{
+	evs := make([]storagepb.Event, len(kvs))
+	for i, kv := range kvs {
+		evs[i] = storagepb.Event{
 			Type: storagepb.DELETE,
 			Type: storagepb.DELETE,
 			Kv: &storagepb.KeyValue{
 			Kv: &storagepb.KeyValue{
 				Key: kv.Key,
 				Key: kv.Key,
-			},
-		})
+			}}
 	}
 	}
+	s.handle(rev, evs)
 	return n, rev
 	return n, rev
 }
 }
 
 
@@ -138,24 +140,33 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
 	}
 	}
 
 
 	_, rev, _ := s.store.Range(nil, nil, 0, 0)
 	_, rev, _ := s.store.Range(nil, nil, 0, 0)
+
+	evs := []storagepb.Event{}
+
 	for k := range s.tx.putm {
 	for k := range s.tx.putm {
 		kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
 		kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
 		if err != nil {
 		if err != nil {
 			log.Panicf("unexpected range error (%v)", err)
 			log.Panicf("unexpected range error (%v)", err)
 		}
 		}
-		s.handle(rev, storagepb.Event{
+		ev := storagepb.Event{
 			Type: storagepb.PUT,
 			Type: storagepb.PUT,
 			Kv:   &kvs[0],
 			Kv:   &kvs[0],
-		})
+		}
+		evs = append(evs, ev)
 	}
 	}
+
 	for k := range s.tx.delm {
 	for k := range s.tx.delm {
-		s.handle(rev, storagepb.Event{
+		ev := storagepb.Event{
 			Type: storagepb.DELETE,
 			Type: storagepb.DELETE,
 			Kv: &storagepb.KeyValue{
 			Kv: &storagepb.KeyValue{
 				Key: []byte(k),
 				Key: []byte(k),
 			},
 			},
-		})
+		}
+		evs = append(evs, ev)
 	}
 	}
+
+	s.handle(rev, evs)
+
 	s.mu.Unlock()
 	s.mu.Unlock()
 	return nil
 	return nil
 }
 }
@@ -170,11 +181,11 @@ func (s *watchableStore) NewWatcher() Watcher {
 	watcherGauge.Inc()
 	watcherGauge.Inc()
 	return &watcher{
 	return &watcher{
 		watchable: s,
 		watchable: s,
-		ch:        make(chan storagepb.Event, chanBufLen),
+		ch:        make(chan []storagepb.Event, chanBufLen),
 	}
 	}
 }
 }
 
 
-func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
+func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
@@ -301,6 +312,9 @@ func (s *watchableStore) syncWatchings() {
 	ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 	ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 	tx.Unlock()
 	tx.Unlock()
 
 
+	evs := []storagepb.Event{}
+
+	// get the list of all events from all key-value pairs
 	for i, v := range vs {
 	for i, v := range vs {
 		var kv storagepb.KeyValue
 		var kv storagepb.KeyValue
 		if err := kv.Unmarshal(v); err != nil {
 		if err := kv.Unmarshal(v); err != nil {
@@ -308,8 +322,7 @@ func (s *watchableStore) syncWatchings() {
 		}
 		}
 
 
 		k := string(kv.Key)
 		k := string(kv.Key)
-		wm, ok := keyToUnsynced[k]
-		if !ok {
+		if _, ok := keyToUnsynced[k]; !ok {
 			continue
 			continue
 		}
 		}
 
 
@@ -322,56 +335,53 @@ func (s *watchableStore) syncWatchings() {
 		}
 		}
 		ev.Kv = &kv
 		ev.Kv = &kv
 
 
-		for w := range wm {
-			ev.WatchID = w.id
+		evs = append(evs, ev)
+	}
 
 
-			select {
-			case w.ch <- ev:
-				pendingEventsGauge.Inc()
-			default:
-				// TODO: handle the full unsynced watchings.
-				// continue to process other watchings for now, the full ones
-				// will be processed next time and hopefully it will not be full.
-				continue
-			}
-			if err := unsafeAddWatching(&s.synced, k, w); err != nil {
-				log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
-			}
-			delete(s.unsynced, w)
+	for w, es := range newWatchingToEventMap(keyToUnsynced, evs) {
+		select {
+		case w.ch <- es:
+			pendingEventsGauge.Add(float64(len(es)))
+		default:
+			// TODO: handle the full unsynced watchings.
+			// continue to process other watchings for now, the full ones
+			// will be processed next time and hopefully it will not be full.
+			continue
 		}
 		}
+		k := string(w.key)
+		if err := unsafeAddWatching(&s.synced, k, w); err != nil {
+			log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
+		}
+		delete(s.unsynced, w)
 	}
 	}
 
 
 	slowWatchingGauge.Set(float64(len(s.unsynced)))
 	slowWatchingGauge.Set(float64(len(s.unsynced)))
 }
 }
 
 
 // handle handles the change of the happening event on all watchings.
 // handle handles the change of the happening event on all watchings.
-func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
-	s.notify(rev, ev)
+func (s *watchableStore) handle(rev int64, evs []storagepb.Event) {
+	s.notify(rev, evs)
 }
 }
 
 
 // notify notifies the fact that given event at the given rev just happened to
 // notify notifies the fact that given event at the given rev just happened to
 // watchings that watch on the key of the event.
 // watchings that watch on the key of the event.
-func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
-	// check all prefixes of the key to notify all corresponded watchings
-	for i := 0; i <= len(ev.Kv.Key); i++ {
-		k := string(ev.Kv.Key[:i])
-		if wm, ok := s.synced[k]; ok {
-			for w := range wm {
-				// the watching needs to be notified when either it watches prefix or
-				// the key is exactly matched.
-				if !w.prefix && i != len(ev.Kv.Key) {
-					continue
-				}
-				ev.WatchID = w.id
-				select {
-				case w.ch <- ev:
-					pendingEventsGauge.Inc()
-				default:
-					w.cur = rev
-					s.unsynced[w] = struct{}{}
-					delete(wm, w)
-					slowWatchingGauge.Inc()
-				}
+func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
+	we := newWatchingToEventMap(s.synced, evs)
+	for _, wm := range s.synced {
+		for w := range wm {
+			if _, ok := we[w]; !ok {
+				continue
+			}
+			es := we[w]
+			select {
+			case w.ch <- es:
+				pendingEventsGauge.Add(float64(len(es)))
+			default:
+				// move slow watching to unsynced
+				w.cur = rev
+				s.unsynced[w] = struct{}{}
+				delete(wm, w)
+				slowWatchingGauge.Inc()
 			}
 			}
 		}
 		}
 	}
 	}
@@ -418,7 +428,7 @@ type watching struct {
 
 
 	// a chan to send out the watched events.
 	// a chan to send out the watched events.
 	// The chan might be shared with other watchings.
 	// The chan might be shared with other watchings.
-	ch chan<- storagepb.Event
+	ch chan<- []storagepb.Event
 }
 }
 
 
 // unsafeAddWatching puts watching with key k into watchableStore's synced.
 // unsafeAddWatching puts watching with key k into watchableStore's synced.
@@ -441,3 +451,38 @@ func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *
 	mp[k][wa] = struct{}{}
 	mp[k][wa] = struct{}{}
 	return nil
 	return nil
 }
 }
+
+// newWatchingToEventMap creates a map that has watching as key and events as
+// value. It enables quick events look up by watching.
+func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb.Event) map[*watching][]storagepb.Event {
+	watchingToEvents := make(map[*watching][]storagepb.Event)
+	for _, ev := range evs {
+		key := string(ev.Kv.Key)
+
+		// check all prefixes of the key to notify all corresponded watchings
+		for i := 0; i <= len(key); i++ {
+			k := string(key[:i])
+
+			wm, ok := sm[k]
+			if !ok {
+				continue
+			}
+
+			for w := range wm {
+				// the watching needs to be notified when either it watches prefix or
+				// the key is exactly matched.
+				if !w.prefix && i != len(ev.Kv.Key) {
+					continue
+				}
+				ev.WatchID = w.id
+
+				if _, ok := watchingToEvents[w]; !ok {
+					watchingToEvents[w] = []storagepb.Event{}
+				}
+				watchingToEvents[w] = append(watchingToEvents[w], ev)
+			}
+		}
+	}
+
+	return watchingToEvents
+}