Browse Source

Merge pull request #6864 from heyitsanthony/watch-doc

Documentation: add grpc gateway watch example
Anthony Romano 9 years ago
parent
commit
428cb21a3f

+ 15 - 0
Documentation/dev-guide/api_grpc_gateway.md

@@ -8,6 +8,8 @@ etcd v3 uses [gRPC][grpc] for its messaging protocol. The etcd project includes
 
 The gateway accepts a [JSON mapping][json-mapping] for etcd's [protocol buffer][api-ref] message definitions. Note that `key` and `value` fields are defined as byte arrays and therefore must be base64 encoded in JSON.
 
+Use `curl` to put and get a key:
+
 ```bash
 <<COMMENT
 https://www.base64encode.org/
@@ -17,11 +19,24 @@ COMMENT
 
 curl -L http://localhost:2379/v3alpha/kv/put \
 	-X POST -d '{"key": "Zm9v", "value": "YmFy"}'
+# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"}}
 
 curl -L http://localhost:2379/v3alpha/kv/range \
 	-X POST -d '{"key": "Zm9v"}'
+# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
 ```
 
+Use `curl` to watch a key:
+
+```bash
+curl http://localhost:2379/v3alpha/watch \
+        -X POST -d '{"create_request": {"key":"Zm9v"} }' &
+# {"result":{"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"1","raft_term":"2"},"created":true}}
+
+curl -L http://localhost:2379/v3alpha/kv/put \
+	-X POST -d '{"key": "Zm9v", "value": "YmFy"}' >/dev/null 2>&1
+# {"result":{"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"2"},"events":[{"kv":{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}}]}}
+```
 
 ## Swagger
 

+ 5 - 1
e2e/v2_curl_test.go

@@ -120,7 +120,8 @@ type cURLReq struct {
 	username string
 	password string
 
-	isTLS bool
+	isTLS   bool
+	timeout int
 
 	endpoint string
 
@@ -151,6 +152,9 @@ func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []stri
 	} else {
 		cmdArgs = append(cmdArgs, "-L", ep)
 	}
+	if req.timeout != 0 {
+		cmdArgs = append(cmdArgs, "-m", fmt.Sprintf("%d", req.timeout))
+	}
 
 	switch method {
 	case "POST", "PUT":

+ 40 - 3
e2e/v3_curl_test.go

@@ -18,7 +18,7 @@ import (
 	"encoding/json"
 	"testing"
 
-	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 )
 
@@ -47,14 +47,14 @@ func testCurlPutGetGRPCGateway(t *testing.T, cfg *etcdProcessClusterConfig) {
 		expectPut = `"revision":"`
 		expectGet = `"value":"`
 	)
-	putData, err := json.Marshal(&etcdserverpb.PutRequest{
+	putData, err := json.Marshal(&pb.PutRequest{
 		Key:   key,
 		Value: value,
 	})
 	if err != nil {
 		t.Fatal(err)
 	}
-	rangeData, err := json.Marshal(&etcdserverpb.RangeRequest{
+	rangeData, err := json.Marshal(&pb.RangeRequest{
 		Key: key,
 	})
 	if err != nil {
@@ -74,3 +74,40 @@ func testCurlPutGetGRPCGateway(t *testing.T, cfg *etcdProcessClusterConfig) {
 		}
 	}
 }
+
+func TestV3CurlWatch(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	epc, err := newEtcdProcessCluster(&configNoTLS)
+	if err != nil {
+		t.Fatalf("could not start etcd process cluster (%v)", err)
+	}
+	defer func() {
+		if cerr := epc.Close(); err != nil {
+			t.Fatalf("error closing etcd processes (%v)", cerr)
+		}
+	}()
+
+	// store "bar" into "foo"
+	putreq, err := json.Marshal(&pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err = cURLPost(epc, cURLReq{endpoint: "/v3alpha/kv/put", value: string(putreq), expected: "revision"}); err != nil {
+		t.Fatalf("failed put with curl (%v)", err)
+	}
+	// watch for first update to "foo"
+	wcr := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}
+	wreq, err := json.Marshal(wcr)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// marshaling the grpc to json gives:
+	// "{"RequestUnion":{"CreateRequest":{"key":"Zm9v","start_revision":1}}}"
+	// but the gprc-gateway expects a different format..
+	wstr := `{"create_request" : ` + string(wreq) + "}"
+	// expects "bar", timeout after 2 seconds since stream waits forever
+	if err = cURLPost(epc, cURLReq{endpoint: "/v3alpha/watch", value: wstr, expected: `"YmFy"`, timeout: 2}); err != nil {
+		t.Fatal(err)
+	}
+}

+ 6 - 3
etcdserver/api/v3rpc/watch.go

@@ -131,10 +131,14 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 	// but when stream.Context().Done() is closed, the stream's recv
 	// may continue to block since it uses a different context, leading to
 	// deadlock when calling sws.close().
-	go func() { errc <- sws.recvLoop() }()
-
+	go func() {
+		if rerr := sws.recvLoop(); rerr != nil {
+			errc <- rerr
+		}
+	}()
 	select {
 	case err = <-errc:
+		close(sws.ctrlStream)
 	case <-stream.Context().Done():
 		err = stream.Context().Err()
 		// the only server-side cancellation is noleader for now.
@@ -147,7 +151,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 }
 
 func (sws *serverWatchStream) recvLoop() error {
-	defer close(sws.ctrlStream)
 	for {
 		req, err := sws.gRPCStream.Recv()
 		if err == io.EOF {

+ 8 - 6
integration/v3_watch_test.go

@@ -948,21 +948,23 @@ func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
 // returned closing the WatchClient stream. Or the response will
 // be returned.
 func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) {
-	rCh := make(chan *pb.WatchResponse)
+	rCh := make(chan *pb.WatchResponse, 1)
+	donec := make(chan struct{})
+	defer close(donec)
 	go func() {
 		resp, _ := wc.Recv()
-		rCh <- resp
+		select {
+		case rCh <- resp:
+		case <-donec:
+		}
 	}()
 	select {
 	case nr := <-rCh:
 		return false, nr
 	case <-time.After(timeout):
 	}
+	// didn't get response
 	wc.CloseSend()
-	rv, ok := <-rCh
-	if rv != nil || !ok {
-		return false, rv
-	}
 	return true, nil
 }