浏览代码

etcdserver, storage: wait for physical compaction if already compacted

Anthony Romano 9 年之前
父节点
当前提交
7b37bd332c
共有 2 个文件被更改,包括 6 次插入4 次删除
  1. 3 3
      etcdserver/v3demo_server.go
  2. 3 1
      storage/kvstore.go

+ 3 - 3
etcdserver/v3demo_server.go

@@ -94,12 +94,12 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
 
 
 func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
 func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
 	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Compaction: r})
 	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Compaction: r})
-	if err != nil {
-		return nil, err
-	}
 	if r.Physical && result.physc != nil {
 	if r.Physical && result.physc != nil {
 		<-result.physc
 		<-result.physc
 	}
 	}
+	if err != nil {
+		return nil, err
+	}
 	resp := result.resp.(*pb.CompactionResponse)
 	resp := result.resp.(*pb.CompactionResponse)
 	if resp == nil {
 	if resp == nil {
 		resp = &pb.CompactionResponse{}
 		resp = &pb.CompactionResponse{}

+ 3 - 1
storage/kvstore.go

@@ -222,7 +222,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 	if rev <= s.compactMainRev {
 	if rev <= s.compactMainRev {
-		return nil, ErrCompacted
+		ch := make(chan struct{})
+		s.fifoSched.Schedule(func(context.Context) { close(ch) })
+		return ch, ErrCompacted
 	}
 	}
 	if rev > s.currentRev.main {
 	if rev > s.currentRev.main {
 		return nil, ErrFutureRev
 		return nil, ErrFutureRev