|
@@ -320,30 +320,27 @@ func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error)
|
|
|
return revs, hashes, nil
|
|
return revs, hashes, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) compactKV(rev int64) error {
|
|
|
|
|
- var (
|
|
|
|
|
- conn *grpc.ClientConn
|
|
|
|
|
- err error
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
|
|
+func (c *cluster) compactKV(rev int64) (err error) {
|
|
|
if rev <= 0 {
|
|
if rev <= 0 {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for i, u := range c.GRPCURLs {
|
|
for i, u := range c.GRPCURLs {
|
|
|
- conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
|
|
|
|
|
- if err != nil {
|
|
|
|
|
|
|
+ conn, derr := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
|
|
|
|
|
+ if derr != nil {
|
|
|
|
|
+ err = derr
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
kvc := pb.NewKVClient(conn)
|
|
kvc := pb.NewKVClient(conn)
|
|
|
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
|
|
|
- _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true})
|
|
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
|
|
|
+ _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true})
|
|
|
cancel()
|
|
cancel()
|
|
|
conn.Close()
|
|
conn.Close()
|
|
|
- if err != nil {
|
|
|
|
|
|
|
+ if cerr != nil {
|
|
|
if strings.Contains(err.Error(), "required revision has been compacted") && i > 0 {
|
|
if strings.Contains(err.Error(), "required revision has been compacted") && i > 0 {
|
|
|
- plog.Printf("%s is already compacted with %d (%v)", u, rev, err)
|
|
|
|
|
- err = nil // in case compact was requested more than once
|
|
|
|
|
|
|
+ plog.Printf("%s is already compacted with %d (%v)", u, rev, cerr)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ err = cerr
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|