Explorar el Código

Merge pull request #5883 from westhood/master

clientv3: fix sync base
Xiang Li hace 9 años
padre
commit
b6a497214e
Se han modificado 3 ficheros con 61 adiciones y 1 borrados
  1. 54 0
      clientv3/integration/mirror_test.go
  2. 1 1
      clientv3/mirror/syncer.go
  3. 6 0
      clientv3/op.go

+ 54 - 0
clientv3/integration/mirror_test.go

@@ -15,7 +15,9 @@
 package integration
 package integration
 
 
 import (
 import (
+	"fmt"
 	"reflect"
 	"reflect"
+	"sync"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
 		t.Fatal("failed to receive update in one second")
 		t.Fatal("failed to receive update in one second")
 	}
 	}
 }
 }
+
+func TestMirrorSyncBase(t *testing.T) {
+	cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
+	defer cluster.Terminate(nil)
+
+	cli := cluster.Client(0)
+	ctx := context.TODO()
+
+	keyCh := make(chan string)
+	var wg sync.WaitGroup
+
+	for i := 0; i < 50; i++ {
+		wg.Add(1)
+
+		go func() {
+			defer wg.Done()
+
+			for key := range keyCh {
+				if _, err := cli.Put(ctx, key, "test"); err != nil {
+					t.Fatal(err)
+				}
+			}
+		}()
+	}
+
+	for i := 0; i < 2000; i++ {
+		keyCh <- fmt.Sprintf("test%d", i)
+	}
+
+	close(keyCh)
+	wg.Wait()
+
+	syncer := mirror.NewSyncer(cli, "test", 0)
+	respCh, errCh := syncer.SyncBase(ctx)
+
+	count := 0
+
+	for resp := range respCh {
+		count = count + len(resp.Kvs)
+		if !resp.More {
+			break
+		}
+	}
+
+	for err := range errCh {
+		t.Fatalf("unexpected error %v", err)
+	}
+
+	if count != 2000 {
+		t.Errorf("unexpected kv count: %d", count)
+	}
+}

+ 1 - 1
clientv3/mirror/syncer.go

@@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
 			// If len(s.prefix) != 0, we will sync key-value space with given prefix.
 			// If len(s.prefix) != 0, we will sync key-value space with given prefix.
 			// We then range from the prefix to the next prefix if exists. Or we will
 			// We then range from the prefix to the next prefix if exists. Or we will
 			// range from the prefix to the end if the next prefix does not exists.
 			// range from the prefix to the end if the next prefix does not exists.
-			opts = append(opts, clientv3.WithPrefix())
+			opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
 			key = s.prefix
 			key = s.prefix
 		}
 		}
 
 

+ 6 - 0
clientv3/op.go

@@ -184,6 +184,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
 	}
 	}
 }
 }
 
 
+// GetPrefixRangeEnd gets the range end of the prefix.
+// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
+func GetPrefixRangeEnd(prefix string) string {
+	return string(getPrefix([]byte(prefix)))
+}
+
 func getPrefix(key []byte) []byte {
 func getPrefix(key []byte) []byte {
 	end := make([]byte, len(key))
 	end := make([]byte, len(key))
 	copy(end, key)
 	copy(end, key)