Browse Source

contrib/recipes: add election and double barrier recipes

these recipes rely on leases so they weren't included in the last batch
Anthony Romano 10 years ago
parent
commit
6f0cc54541

+ 3 - 2
contrib/recipes/client.go

@@ -23,8 +23,9 @@ import (
 )
 
 var (
-	ErrKeyExists    = errors.New("key already exists")
-	ErrWaitMismatch = errors.New("unexpected wait result")
+	ErrKeyExists      = errors.New("key already exists")
+	ErrWaitMismatch   = errors.New("unexpected wait result")
+	ErrTooManyClients = errors.New("too many clients")
 )
 
 // deleteRevKey deletes a key by revision, returning false if key is missing

+ 126 - 0
contrib/recipes/double_barrier.go

@@ -0,0 +1,126 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+// DoubleBarrier blocks processes on Enter until an expected count enters, then
+// blocks again on Leave until all processes have left.
+type DoubleBarrier struct {
+	client *clientv3.Client
+	key    string // key for the collective barrier
+	count  int
+	myKey  *EphemeralKV // current key for this process on the barrier
+}
+
+func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
+	return &DoubleBarrier{client, key, count, nil}
+}
+
+// Enter waits for "count" processes to enter the barrier then returns
+func (b *DoubleBarrier) Enter() error {
+	ek, err := NewUniqueEphemeralKey(b.client, b.key+"/waiters")
+	if err != nil {
+		return err
+	}
+	b.myKey = ek
+
+	resp, err := NewRange(b.client, b.key+"/waiters").Prefix()
+	if err != nil {
+		return err
+	}
+
+	if len(resp.Kvs) > b.count {
+		return ErrTooManyClients
+	}
+
+	if len(resp.Kvs) == b.count {
+		// unblock waiters
+		_, err = putEmptyKey(b.client.KV, b.key+"/ready")
+		return err
+	}
+
+	_, err = WaitEvents(
+		b.client,
+		b.key+"/ready",
+		resp.Header.Revision,
+		[]storagepb.Event_EventType{storagepb.PUT})
+	return err
+}
+
+// Leave waits for "count" processes to leave the barrier then returns
+func (b *DoubleBarrier) Leave() error {
+	resp, err := NewRange(b.client, b.key+"/waiters").Prefix()
+	if len(resp.Kvs) == 0 {
+		return nil
+	}
+
+	lowest, highest := resp.Kvs[0], resp.Kvs[0]
+	for _, k := range resp.Kvs {
+		if k.ModRevision < lowest.ModRevision {
+			lowest = k
+		}
+		if k.ModRevision > highest.ModRevision {
+			highest = k
+		}
+	}
+	isLowest := string(lowest.Key) == b.myKey.Key()
+
+	if len(resp.Kvs) == 1 {
+		// this is the only node in the barrier; finish up
+		req := &pb.DeleteRangeRequest{Key: []byte(b.key + "/ready")}
+		if _, err = b.client.KV.DeleteRange(context.TODO(), req); err != nil {
+			return err
+		}
+		return b.myKey.Delete()
+	}
+
+	// this ensures that if a process fails, the ephemeral lease will be
+	// revoked, its barrier key is removed, and the barrier can resume
+
+	// lowest process in node => wait on highest process
+	if isLowest {
+		_, err = WaitEvents(
+			b.client,
+			string(highest.Key),
+			resp.Header.Revision,
+			[]storagepb.Event_EventType{storagepb.DELETE})
+		if err != nil {
+			return err
+		}
+		return b.Leave()
+	}
+
+	// delete self and wait on lowest process
+	if err := b.myKey.Delete(); err != nil {
+		return err
+	}
+
+	key := string(lowest.Key)
+	_, err = WaitEvents(
+		b.client,
+		key,
+		resp.Header.Revision,
+		[]storagepb.Event_EventType{storagepb.DELETE})
+	if err != nil {
+		return err
+	}
+	return b.Leave()
+}

+ 108 - 0
contrib/recipes/election.go

@@ -0,0 +1,108 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package recipe
+
+import (
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+type Election struct {
+	client    *clientv3.Client
+	keyPrefix string
+	leaderKey *EphemeralKV
+}
+
+// NewElection returns a new election on a given key prefix.
+func NewElection(client *clientv3.Client, keyPrefix string) *Election {
+	return &Election{client, keyPrefix, nil}
+}
+
+// Volunteer puts a value as elegible for the election. It blocks until
+// it is elected or an error occurs (cannot withdraw candidacy)
+func (e *Election) Volunteer(val string) error {
+	if e.leaderKey != nil {
+		return e.leaderKey.Put(val)
+	}
+	myKey, err := NewUniqueEphemeralKV(e.client, e.keyPrefix, val)
+	if err != nil {
+		return err
+	}
+	if err = e.waitLeadership(myKey); err != nil {
+		return err
+	}
+	e.leaderKey = myKey
+	return nil
+}
+
+// Resign lets a leader start a new election.
+func (e *Election) Resign() (err error) {
+	if e.leaderKey != nil {
+		err = e.leaderKey.Delete()
+		e.leaderKey = nil
+	}
+	return err
+}
+
+// Leader returns the leader value for the current election.
+func (e *Election) Leader() (string, error) {
+	resp, err := NewRange(e.client, e.keyPrefix).FirstCreate()
+	if err != nil {
+		return "", err
+	} else if len(resp.Kvs) == 0 {
+		// no leader currently elected
+		return "", etcdserver.ErrNoLeader
+	}
+	return string(resp.Kvs[0].Value), nil
+}
+
+// Wait waits for a leader to be elected, returning the leader value.
+func (e *Election) Wait() (string, error) {
+	resp, err := NewRange(e.client, e.keyPrefix).FirstCreate()
+	if err != nil {
+		return "", err
+	} else if len(resp.Kvs) != 0 {
+		// leader already exists
+		return string(resp.Kvs[0].Value), nil
+	}
+	_, err = WaitPrefixEvents(
+		e.client,
+		e.keyPrefix,
+		resp.Header.Revision,
+		[]storagepb.Event_EventType{storagepb.PUT})
+	if err != nil {
+		return "", err
+	}
+	return e.Wait()
+}
+
+func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
+	resp, err := NewRangeRev(
+		e.client,
+		e.keyPrefix,
+		tryKey.Revision()-1).LastCreate()
+	if err != nil {
+		return err
+	} else if len(resp.Kvs) == 0 {
+		// nothing before tryKey => have leadership
+		return nil
+	}
+	_, err = WaitEvents(
+		e.client,
+		string(resp.Kvs[0].Key),
+		tryKey.Revision(),
+		[]storagepb.Event_EventType{storagepb.DELETE})
+	return err
+}

+ 33 - 0
contrib/recipes/key.go

@@ -197,3 +197,36 @@ func (rk *RemoteKV) Put(val string) error {
 	_, err := rk.client.KV.Put(context.TODO(), req)
 	return err
 }
+
+// EphemeralKV is a new key associated with a session lease
+type EphemeralKV struct{ RemoteKV }
+
+// NewEphemeralKV creates a new key/value pair associated with a session lease
+func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, error) {
+	leaseID, err := SessionLease(client)
+	if err != nil {
+		return nil, err
+	}
+	k, err := NewKV(client, key, val, leaseID)
+	if err != nil {
+		return nil, err
+	}
+	return &EphemeralKV{*k}, nil
+}
+
+// NewEphemeralKey creates a new unique valueless key associated with a session lease
+func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) {
+	return NewUniqueEphemeralKV(client, prefix, "")
+}
+
+// NewEphemeralKV creates a new unique key/value pair associated with a session lease
+func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) {
+	for {
+		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
+		ek, err = NewEphemeralKV(client, newKey, val)
+		if err == nil || err != ErrKeyExists {
+			break
+		}
+	}
+	return ek, err
+}

+ 129 - 0
contrib/recipes/lease.go

@@ -0,0 +1,129 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package recipe
+
+import (
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/lease"
+)
+
+// only keep one ephemeral lease per clientection
+var clientLeases clientLeaseMgr = clientLeaseMgr{leases: make(map[*clientv3.Client]*leaseKeepAlive)}
+
+type clientLeaseMgr struct {
+	leases map[*clientv3.Client]*leaseKeepAlive
+	mu     sync.Mutex
+}
+
+type leaseKeepAlive struct {
+	id    lease.LeaseID
+	donec chan struct{}
+}
+
+func SessionLease(client *clientv3.Client) (lease.LeaseID, error) {
+	return clientLeases.sessionLease(client, 120)
+}
+
+func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error) {
+	return clientLeases.sessionLease(client, ttl)
+}
+
+// StopSessionLease ends the refresh for the session lease. This is useful
+// in case the state of the client clientection is indeterminate (revoke
+// would fail) or if transferring lease ownership.
+func StopSessionLease(client *clientv3.Client) {
+	clientLeases.mu.Lock()
+	lka, ok := clientLeases.leases[client]
+	if ok {
+		delete(clientLeases.leases, client)
+	}
+	clientLeases.mu.Unlock()
+	if lka != nil {
+		lka.donec <- struct{}{}
+		<-lka.donec
+	}
+}
+
+// RevokeSessionLease revokes the session lease.
+func RevokeSessionLease(client *clientv3.Client) (err error) {
+	clientLeases.mu.Lock()
+	lka := clientLeases.leases[client]
+	clientLeases.mu.Unlock()
+	StopSessionLease(client)
+	if lka != nil {
+		req := &pb.LeaseRevokeRequest{ID: int64(lka.id)}
+		_, err = client.Lease.LeaseRevoke(context.TODO(), req)
+	}
+	return err
+}
+
+func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lease.LeaseID, error) {
+	clm.mu.Lock()
+	defer clm.mu.Unlock()
+	if lka, ok := clm.leases[client]; ok {
+		return lka.id, nil
+	}
+
+	resp, err := client.Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: ttl})
+	if err != nil {
+		return lease.NoLease, err
+	}
+	id := lease.LeaseID(resp.ID)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	keepAlive, err := client.Lease.LeaseKeepAlive(ctx)
+	if err != nil || keepAlive == nil {
+		return lease.NoLease, err
+	}
+
+	lka := &leaseKeepAlive{id: id, donec: make(chan struct{})}
+	clm.leases[client] = lka
+
+	// keep the lease alive until clientection error
+	go func() {
+		defer func() {
+			keepAlive.CloseSend()
+			clm.mu.Lock()
+			delete(clm.leases, client)
+			clm.mu.Unlock()
+			cancel()
+			close(lka.donec)
+		}()
+
+		ttl := resp.TTL
+		for {
+			lreq := &pb.LeaseKeepAliveRequest{ID: int64(id)}
+			select {
+			case <-lka.donec:
+				return
+			case <-time.After(time.Duration(ttl/2) * time.Second):
+			}
+			if err := keepAlive.Send(lreq); err != nil {
+				break
+			}
+			resp, err := keepAlive.Recv()
+			if err != nil {
+				break
+			}
+			ttl = resp.TTL
+		}
+	}()
+
+	return id, nil
+}

+ 129 - 0
integration/v3_double_barrier_test.go

@@ -0,0 +1,129 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package integration
+
+import (
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/contrib/recipes"
+)
+
+func TestDoubleBarrier(t *testing.T) {
+	clus := newClusterV3(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	defer closeSessionLease(clus)
+
+	waiters := 10
+
+	b := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters)
+	donec := make(chan struct{})
+	for i := 0; i < waiters-1; i++ {
+		go func() {
+			b := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters)
+			if err := b.Enter(); err != nil {
+				t.Fatalf("could not enter on barrier (%v)", err)
+			}
+			donec <- struct{}{}
+			if err := b.Leave(); err != nil {
+				t.Fatalf("could not leave on barrier (%v)", err)
+			}
+			donec <- struct{}{}
+		}()
+	}
+
+	time.Sleep(10 * time.Millisecond)
+	select {
+	case <-donec:
+		t.Fatalf("barrier did not enter-wait")
+	default:
+	}
+
+	if err := b.Enter(); err != nil {
+		t.Fatalf("could not enter last barrier (%v)", err)
+	}
+
+	timerC := time.After(time.Duration(waiters*100) * time.Millisecond)
+	for i := 0; i < waiters-1; i++ {
+		select {
+		case <-timerC:
+			t.Fatalf("barrier enter timed out")
+		case <-donec:
+		}
+	}
+
+	time.Sleep(10 * time.Millisecond)
+	select {
+	case <-donec:
+		t.Fatalf("barrier did not leave-wait")
+	default:
+	}
+
+	b.Leave()
+	timerC = time.After(time.Duration(waiters*100) * time.Millisecond)
+	for i := 0; i < waiters-1; i++ {
+		select {
+		case <-timerC:
+			t.Fatalf("barrier leave timed out")
+		case <-donec:
+		}
+	}
+}
+
+func TestDoubleBarrierFailover(t *testing.T) {
+	clus := newClusterV3(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	defer closeSessionLease(clus)
+
+	waiters := 10
+	donec := make(chan struct{})
+
+	// sacrificial barrier holder; lease will be revoked
+	go func() {
+		b := recipe.NewDoubleBarrier(clus.clients[0], "test-barrier", waiters)
+		if err := b.Enter(); err != nil {
+			t.Fatalf("could not enter on barrier (%v)", err)
+		}
+		donec <- struct{}{}
+	}()
+
+	for i := 0; i < waiters-1; i++ {
+		go func() {
+			b := recipe.NewDoubleBarrier(clus.clients[1], "test-barrier", waiters)
+			if err := b.Enter(); err != nil {
+				t.Fatalf("could not enter on barrier (%v)", err)
+			}
+			donec <- struct{}{}
+			b.Leave()
+			donec <- struct{}{}
+		}()
+	}
+
+	// wait for barrier enter to unblock
+	for i := 0; i < waiters; i++ {
+		<-donec
+	}
+	// kill lease, expect Leave unblock
+	recipe.RevokeSessionLease(clus.clients[0])
+	// join on rest of waiters
+	for i := 0; i < waiters-1; i++ {
+		<-donec
+	}
+}
+
+func closeSessionLease(clus *clusterV3) {
+	for _, client := range clus.clients {
+		recipe.StopSessionLease(client)
+	}
+}

+ 135 - 0
integration/v3_election_test.go

@@ -0,0 +1,135 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package integration
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/contrib/recipes"
+)
+
+// TestElectionWait tests if followers can correcty wait for elections.
+func TestElectionWait(t *testing.T) {
+	clus := newClusterV3(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	defer closeSessionLease(clus)
+
+	leaders := 3
+	followers := 3
+
+	electedc := make(chan string)
+	nextc := []chan struct{}{}
+
+	// wait for all elections
+	donec := make(chan struct{})
+	for i := 0; i < followers; i++ {
+		nextc = append(nextc, make(chan struct{}))
+		go func(ch chan struct{}) {
+			for j := 0; j < leaders; j++ {
+				b := recipe.NewElection(clus.RandClient(), "test-election")
+				s, err := b.Wait()
+				if err != nil {
+					t.Fatalf("could not wait for election (%v)", err)
+				}
+				electedc <- s
+				// wait for next election round
+				<-ch
+			}
+			donec <- struct{}{}
+		}(nextc[i])
+	}
+
+	// elect some leaders
+	for i := 0; i < leaders; i++ {
+		go func() {
+			e := recipe.NewElection(clus.RandClient(), "test-election")
+			ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
+			if err := e.Volunteer(ev); err != nil {
+				t.Fatalf("failed volunteer (%v)", err)
+			}
+			// wait for followers to accept leadership
+			for j := 0; j < followers; j++ {
+				s := <-electedc
+				if s != ev {
+					t.Errorf("wrong election value got %s, wanted %s", s, ev)
+				}
+			}
+			// let next leader take over
+			if err := e.Resign(); err != nil {
+				t.Fatalf("failed resign (%v)", err)
+			}
+			// tell followers to start listening for next leader
+			for j := 0; j < followers; j++ {
+				nextc[j] <- struct{}{}
+			}
+		}()
+	}
+
+	// wait on followers
+	for i := 0; i < followers; i++ {
+		<-donec
+	}
+}
+
+// TestElectionFailover tests that an election will
+func TestElectionFailover(t *testing.T) {
+	clus := newClusterV3(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	defer closeSessionLease(clus)
+
+	// first leader (elected)
+	e := recipe.NewElection(clus.clients[0], "test-election")
+	if err := e.Volunteer("foo"); err != nil {
+		t.Fatalf("failed volunteer (%v)", err)
+	}
+
+	// check first leader
+	s, err := e.Wait()
+	if err != nil {
+		t.Fatalf("could not wait for first election (%v)", err)
+	}
+	if s != "foo" {
+		t.Fatalf("wrong election result. got %s, wanted foo", s)
+	}
+
+	// next leader
+	electedc := make(chan struct{})
+	go func() {
+		e := recipe.NewElection(clus.clients[1], "test-election")
+		if err := e.Volunteer("bar"); err != nil {
+			t.Fatal(err)
+		}
+		electedc <- struct{}{}
+	}()
+
+	// invoke leader failover
+	if err := recipe.RevokeSessionLease(clus.clients[0]); err != nil {
+		t.Fatal(err)
+	}
+
+	// check new leader
+	e = recipe.NewElection(clus.clients[2], "test-election")
+	s, err = e.Wait()
+	if err != nil {
+		t.Fatalf("could not wait for second election (%v)", err)
+	}
+	if s != "bar" {
+		t.Fatalf("wrong election result. got %s, wanted bar", s)
+	}
+
+	// leader must ack election (otherwise, Volunteer may see closed conn)
+	<-electedc
+}