Browse Source

*: add a scheduler and use it to schedule compaction

Xiang Li 10 years ago
parent
commit
d314345e6d

+ 172 - 0
pkg/schedule/schedule.go

@@ -0,0 +1,172 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schedule
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+type Job func(context.Context)
+
+// Scheduler can schedule jobs.
+type Scheduler interface {
+	// Schedule asks the scheduler to schedule a job defined by the given func.
+	// Schedule to a stopped scheduler might panic.
+	Schedule(j Job)
+
+	// Pending returns number of pending jobs
+	Pending() int
+
+	// Scheduled returns the number of scheduled jobs (excluding pending jobs)
+	Scheduled() int
+
+	// Finished returns the number of finished jobs
+	Finished() int
+
+	// WaitFinish waits all pending jobs to finish.
+	WaitFinish()
+
+	// Stop stops the scheduler.
+	Stop()
+}
+
+type fifo struct {
+	mu sync.Mutex
+
+	resume    chan struct{}
+	scheduled int
+	finished  int
+	pendings  []Job
+
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	finishCond *sync.Cond
+	donec      chan struct{}
+}
+
+// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
+// order sequentially
+func NewFIFOScheduler() Scheduler {
+	f := &fifo{
+		resume: make(chan struct{}, 1),
+		donec:  make(chan struct{}, 1),
+	}
+	f.finishCond = sync.NewCond(&f.mu)
+	f.ctx, f.cancel = context.WithCancel(context.Background())
+	go f.run()
+	return f
+}
+
+// Schedule schedules a job that will be ran in FIFO order sequentially.
+func (f *fifo) Schedule(j Job) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	if f.cancel == nil {
+		panic("schedule: schedule to stopped scheduler")
+	}
+
+	if len(f.pendings) == 0 {
+		select {
+		case f.resume <- struct{}{}:
+		default:
+		}
+	}
+	f.pendings = append(f.pendings, j)
+
+	return
+}
+
+func (f *fifo) Pending() int {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	return len(f.pendings)
+}
+
+func (f *fifo) Scheduled() int {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	return f.scheduled
+}
+
+func (f *fifo) Finished() int {
+	f.finishCond.L.Lock()
+	defer f.finishCond.L.Unlock()
+	return f.finished
+}
+
+func (f *fifo) WaitFinish() {
+	f.finishCond.L.Lock()
+	finish := f.finished
+	f.finishCond.L.Unlock()
+
+	f.finishCond.L.Lock()
+	for f.finished == finish || len(f.pendings) != 0 {
+		f.finishCond.Wait()
+	}
+	f.finishCond.L.Unlock()
+}
+
+// Stop stops the scheduler and cancels all pending jobs.
+func (f *fifo) Stop() {
+	f.mu.Lock()
+	f.cancel()
+	f.cancel = nil
+	f.mu.Unlock()
+	<-f.donec
+}
+
+func (f *fifo) run() {
+	// TODO: recover from job panic?
+	defer func() {
+		close(f.donec)
+		close(f.resume)
+	}()
+
+	for {
+		var todo Job
+		f.mu.Lock()
+		if len(f.pendings) != 0 {
+			f.scheduled++
+			todo = f.pendings[0]
+		}
+		f.mu.Unlock()
+		if todo == nil {
+			select {
+			case <-f.resume:
+			case <-f.ctx.Done():
+				f.mu.Lock()
+				pendings := f.pendings
+				f.pendings = nil
+				f.mu.Unlock()
+				// clean up pending jobs
+				for _, todo := range pendings {
+					todo(f.ctx)
+				}
+				return
+			}
+		} else {
+			todo(f.ctx)
+			f.finishCond.L.Lock()
+			f.finished++
+			f.pendings = f.pendings[1:]
+			f.finishCond.Broadcast()
+			f.finishCond.L.Unlock()
+		}
+	}
+}

+ 50 - 0
pkg/schedule/schedule_test.go

@@ -0,0 +1,50 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schedule
+
+import (
+	"testing"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+func TestFIFOSchedule(t *testing.T) {
+	s := NewFIFOScheduler()
+	defer s.Stop()
+
+	next := 0
+	jobCreator := func(i int) Job {
+		return func(ctx context.Context) {
+			if next != i {
+				t.Fatalf("job#%d: got %d, want %d", next, i)
+			}
+			next = i + 1
+		}
+	}
+
+	var jobs []Job
+	for i := 0; i < 100; i++ {
+		jobs = append(jobs, jobCreator(i))
+	}
+
+	for _, j := range jobs {
+		s.Schedule(j)
+	}
+
+	s.WaitFinish()
+	if s.Scheduled() != 100 {
+		t.Errorf("scheduled = %d, want %d", s.Scheduled(), 100)
+	}
+}

+ 21 - 10
storage/kvstore.go

@@ -22,7 +22,9 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/pkg/schedule"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 )
@@ -62,9 +64,9 @@ type store struct {
 	tx    backend.BatchTx
 	tx    backend.BatchTx
 	txnID int64 // tracks the current txnID to verify txn operations
 	txnID int64 // tracks the current txnID to verify txn operations
 
 
-	changes []storagepb.KeyValue
+	changes   []storagepb.KeyValue
+	fifoSched schedule.Scheduler
 
 
-	wg    sync.WaitGroup
 	stopc chan struct{}
 	stopc chan struct{}
 }
 }
 
 
@@ -79,7 +81,10 @@ func NewStore(b backend.Backend, le lease.Lessor) *store {
 
 
 		currentRev:     revision{main: 1},
 		currentRev:     revision{main: 1},
 		compactMainRev: -1,
 		compactMainRev: -1,
-		stopc:          make(chan struct{}),
+
+		fifoSched: schedule.NewFIFOScheduler(),
+
+		stopc: make(chan struct{}),
 	}
 	}
 
 
 	if s.le != nil {
 	if s.le != nil {
@@ -239,8 +244,16 @@ func (s *store) Compact(rev int64) error {
 
 
 	keep := s.kvindex.Compact(rev)
 	keep := s.kvindex.Compact(rev)
 
 
-	s.wg.Add(1)
-	go s.scheduleCompaction(rev, keep)
+	var j = func(ctx context.Context) {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+		}
+		s.scheduleCompaction(rev, keep)
+	}
+
+	s.fifoSched.Schedule(j)
 
 
 	indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
 	indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
 	return nil
 	return nil
@@ -258,10 +271,7 @@ func (s *store) Restore(b backend.Backend) error {
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 
 
 	close(s.stopc)
 	close(s.stopc)
-	// TODO: restore without waiting for compaction routine to finish.
-	// We need a way to notify that the store is finished using the old
-	// backend though.
-	s.wg.Wait()
+	s.fifoSched.Stop()
 
 
 	s.b = b
 	s.b = b
 	s.kvindex = newTreeIndex()
 	s.kvindex = newTreeIndex()
@@ -269,6 +279,7 @@ func (s *store) Restore(b backend.Backend) error {
 	s.compactMainRev = -1
 	s.compactMainRev = -1
 	s.tx = b.BatchTx()
 	s.tx = b.BatchTx()
 	s.txnID = -1
 	s.txnID = -1
+	s.fifoSched = schedule.NewFIFOScheduler()
 	s.stopc = make(chan struct{})
 	s.stopc = make(chan struct{})
 
 
 	return s.restore()
 	return s.restore()
@@ -340,7 +351,7 @@ func (s *store) restore() error {
 
 
 func (s *store) Close() error {
 func (s *store) Close() error {
 	close(s.stopc)
 	close(s.stopc)
-	s.wg.Wait()
+	s.fifoSched.Stop()
 	return nil
 	return nil
 }
 }
 
 

+ 0 - 2
storage/kvstore_compaction.go

@@ -20,8 +20,6 @@ import (
 )
 )
 
 
 func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) {
 func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) {
-	defer s.wg.Done()
-
 	totalStart := time.Now()
 	totalStart := time.Now()
 	defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond))
 	defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond))
 
 

+ 1 - 3
storage/kvstore_compaction_test.go

@@ -72,9 +72,7 @@ func TestScheduleCompaction(t *testing.T) {
 			tx.UnsafePut(keyBucketName, ibytes, []byte("bar"))
 			tx.UnsafePut(keyBucketName, ibytes, []byte("bar"))
 		}
 		}
 		tx.Unlock()
 		tx.Unlock()
-		// call `s.wg.Add(1)` to match the `s.wg.Done()` call in scheduleCompaction
-		// to avoid panic from wait group
-		s.wg.Add(1)
+
 		s.scheduleCompaction(tt.rev, tt.keep)
 		s.scheduleCompaction(tt.rev, tt.keep)
 
 
 		tx.Lock()
 		tx.Lock()

+ 10 - 1
storage/kvstore_test.go

@@ -24,6 +24,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/pkg/schedule"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
@@ -32,6 +33,7 @@ import (
 func TestStoreRev(t *testing.T) {
 func TestStoreRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(b, &lease.FakeLessor{})
 	s := NewStore(b, &lease.FakeLessor{})
+	defer s.Close()
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
 	for i := 1; i <= 3; i++ {
 	for i := 1; i <= 3; i++ {
@@ -129,6 +131,8 @@ func TestStorePut(t *testing.T) {
 		if s.currentRev != tt.wrev {
 		if s.currentRev != tt.wrev {
 			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
 			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
 		}
 		}
+
+		s.Close()
 	}
 	}
 }
 }
 
 
@@ -198,6 +202,8 @@ func TestStoreRange(t *testing.T) {
 		if s.currentRev != currev {
 		if s.currentRev != currev {
 			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
 			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
 		}
 		}
+
+		s.Close()
 	}
 	}
 }
 }
 
 
@@ -269,6 +275,7 @@ func TestStoreDeleteRange(t *testing.T) {
 
 
 func TestStoreCompact(t *testing.T) {
 func TestStoreCompact(t *testing.T) {
 	s := newFakeStore()
 	s := newFakeStore()
+	defer s.Close()
 	b := s.b.(*fakeBackend)
 	b := s.b.(*fakeBackend)
 	fi := s.kvindex.(*fakeIndex)
 	fi := s.kvindex.(*fakeIndex)
 
 
@@ -279,7 +286,7 @@ func TestStoreCompact(t *testing.T) {
 	b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
 	b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
 
 
 	s.Compact(3)
 	s.Compact(3)
-	s.wg.Wait()
+	s.fifoSched.WaitFinish()
 
 
 	if s.compactMainRev != 3 {
 	if s.compactMainRev != 3 {
 		t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
 		t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
@@ -494,6 +501,8 @@ func newFakeStore() *store {
 		kvindex:        fi,
 		kvindex:        fi,
 		currentRev:     revision{},
 		currentRev:     revision{},
 		compactMainRev: -1,
 		compactMainRev: -1,
+		fifoSched:      schedule.NewFIFOScheduler(),
+		stopc:          make(chan struct{}),
 	}
 	}
 }
 }