|
|
@@ -322,6 +322,11 @@ func (c *cluster) compactKV(rev int64) error {
|
|
|
conn *grpc.ClientConn
|
|
|
err error
|
|
|
)
|
|
|
+
|
|
|
+ if rev <= 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
for _, u := range c.GRPCURLs {
|
|
|
conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
|
|
|
if err != nil {
|
|
|
@@ -329,7 +334,7 @@ func (c *cluster) compactKV(rev int64) error {
|
|
|
}
|
|
|
kvc := pb.NewKVClient(conn)
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
- _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
|
|
|
+ _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true})
|
|
|
cancel()
|
|
|
conn.Close()
|
|
|
if err == nil {
|
|
|
@@ -338,3 +343,33 @@ func (c *cluster) compactKV(rev int64) error {
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
+func (c *cluster) checkCompact(rev int64) error {
|
|
|
+ if rev == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ for _, u := range c.GRPCURLs {
|
|
|
+ cli, err := clientv3.New(clientv3.Config{
|
|
|
+ Endpoints: []string{u},
|
|
|
+ DialTimeout: 5 * time.Second,
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+ wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
|
|
|
+ wr, ok := <-wch
|
|
|
+ cancel()
|
|
|
+
|
|
|
+ cli.Close()
|
|
|
+
|
|
|
+ if !ok {
|
|
|
+ return fmt.Errorf("watch channel terminated")
|
|
|
+ }
|
|
|
+ if wr.CompactRevision != rev {
|
|
|
+ return fmt.Errorf("got compact revision %v, wanted %v", wr.CompactRevision, rev)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|