Sfoglia il codice sorgente

concurrency: add examples

Anthony Romano 8 anni fa
parent
commit
b7b31e5770

+ 90 - 0
clientv3/concurrency/example_election_test.go

@@ -0,0 +1,90 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency_test
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
+)
+
+func ExampleElection_Campaign() {
+	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer cli.Close()
+
+	// create two separate sessions for election competition
+	s1, err := concurrency.NewSession(cli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s1.Close()
+	e1 := concurrency.NewElection(s1, "/my-election/")
+
+	s2, err := concurrency.NewSession(cli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s2.Close()
+	e2 := concurrency.NewElection(s2, "/my-election/")
+
+	// create competing candidates, with e1 initially losing to e2
+	var wg sync.WaitGroup
+	wg.Add(2)
+	electc := make(chan *concurrency.Election, 2)
+	go func() {
+		defer wg.Done()
+		// delay candidacy so e2 wins first
+		time.Sleep(3 * time.Second)
+		if err := e1.Campaign(context.Background(), "e1"); err != nil {
+			log.Fatal(err)
+		}
+		electc <- e1
+	}()
+	go func() {
+		defer wg.Done()
+		if err := e2.Campaign(context.Background(), "e2"); err != nil {
+			log.Fatal(err)
+		}
+		electc <- e2
+	}()
+
+	cctx, cancel := context.WithCancel(context.TODO())
+	defer cancel()
+
+	e := <-electc
+	fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
+
+	// resign so next candidate can be elected
+	if err := e.Resign(context.TODO()); err != nil {
+		log.Fatal(err)
+	}
+
+	e = <-electc
+	fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
+
+	wg.Wait()
+
+	// Output:
+	// completed first election with e2
+	// completed second election with e1
+}

+ 75 - 0
clientv3/concurrency/example_mutex_test.go

@@ -0,0 +1,75 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency_test
+
+import (
+	"context"
+	"fmt"
+	"log"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
+)
+
+func ExampleMutex_Lock() {
+	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer cli.Close()
+
+	// create two separate sessions for lock competition
+	s1, err := concurrency.NewSession(cli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s1.Close()
+	m1 := concurrency.NewMutex(s1, "/my-lock/")
+
+	s2, err := concurrency.NewSession(cli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s2.Close()
+	m2 := concurrency.NewMutex(s2, "/my-lock/")
+
+	// acquire lock for s1
+	if err := m1.Lock(context.TODO()); err != nil {
+		log.Fatal(err)
+	}
+	fmt.Println("acquired lock for s1")
+
+	m2Locked := make(chan struct{})
+	go func() {
+		defer close(m2Locked)
+		// wait until s1 is locks /my-lock/
+		if err := m2.Lock(context.TODO()); err != nil {
+			log.Fatal(err)
+		}
+	}()
+
+	if err := m1.Unlock(context.TODO()); err != nil {
+		log.Fatal(err)
+	}
+	fmt.Println("released lock for s1")
+
+	<-m2Locked
+	fmt.Println("acquired lock for s2")
+
+	// Output:
+	// acquired lock for s1
+	// released lock for s1
+	// acquired lock for s2
+}

+ 97 - 0
clientv3/concurrency/example_stm_test.go

@@ -0,0 +1,97 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency_test
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"math/rand"
+	"sync"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
+)
+
+// ExampleSTM_apply shows how to use STM with a transactional
+// transfer between balances.
+func ExampleSTM_apply() {
+	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer cli.Close()
+
+	// set up "accounts"
+	totalAccounts := 5
+	for i := 0; i < totalAccounts; i++ {
+		k := fmt.Sprintf("accts/%d", i)
+		if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	exchange := func(stm concurrency.STM) error {
+		from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
+		if from == to {
+			// nothing to do
+			return nil
+		}
+		// read values
+		fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
+		fromV, toV := stm.Get(fromK), stm.Get(toK)
+		fromInt, toInt := 0, 0
+		fmt.Sscanf(fromV, "%d", &fromInt)
+		fmt.Sscanf(toV, "%d", &toInt)
+
+		// transfer amount
+		xfer := fromInt / 2
+		fromInt, toInt = fromInt-xfer, toInt-xfer
+
+		// writeback
+		stm.Put(fromK, fmt.Sprintf("%d", fromInt))
+		stm.Put(toK, fmt.Sprintf("%d", toInt))
+		return nil
+	}
+
+	// concurrently exchange values between accounts
+	var wg sync.WaitGroup
+	wg.Add(10)
+	for i := 0; i < 10; i++ {
+		go func() {
+			defer wg.Done()
+			if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
+				log.Fatal(serr)
+			}
+		}()
+	}
+	wg.Wait()
+
+	// confirm account sum matches sum from beginning.
+	sum := 0
+	accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
+	if err != nil {
+		log.Fatal(err)
+	}
+	for _, kv := range accts.Kvs {
+		v := 0
+		fmt.Sscanf(string(kv.Value), "%d", &v)
+		sum += v
+	}
+
+	fmt.Println("account sum is", sum)
+	// Output:
+	// account sum is 500
+}

+ 44 - 0
clientv3/concurrency/main_test.go

@@ -0,0 +1,44 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency_test
+
+import (
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/integration"
+	"github.com/coreos/etcd/pkg/testutil"
+)
+
+var endpoints []string
+
+// TestMain sets up an etcd cluster for running the examples.
+func TestMain(m *testing.M) {
+	cfg := integration.ClusterConfig{Size: 1}
+	clus := integration.NewClusterV3(nil, &cfg)
+	endpoints = []string{clus.Client(0).Endpoints()[0]}
+	v := m.Run()
+	clus.Terminate(nil)
+	if err := testutil.CheckAfterTest(time.Second); err != nil {
+		fmt.Fprintf(os.Stderr, "%v", err)
+		os.Exit(1)
+	}
+	if v == 0 && testutil.CheckLeakedGoroutine() {
+		os.Exit(1)
+	}
+	os.Exit(v)
+}