Browse Source

etcdmain: add --experimental-serializable-ordering to grpc proxy

Connect to another endpoint on stale reads.
Anthony Romano 8 years ago
parent
commit
f6acd0316c
1 changed files with 21 additions and 1 deletions
  1. 21 1
      etcdmain/grpc_proxy.go

+ 21 - 1
etcdmain/grpc_proxy.go

@@ -15,6 +15,7 @@
 package etcdmain
 package etcdmain
 
 
 import (
 import (
+	"context"
 	"fmt"
 	"fmt"
 	"math"
 	"math"
 	"net"
 	"net"
@@ -26,6 +27,7 @@ import (
 
 
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3/namespace"
 	"github.com/coreos/etcd/clientv3/namespace"
+	"github.com/coreos/etcd/clientv3/ordering"
 	"github.com/coreos/etcd/etcdserver/api/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/api/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
 	"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
 	"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
 	"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
@@ -69,7 +71,8 @@ var (
 
 
 	grpcProxyNamespace string
 	grpcProxyNamespace string
 
 
-	grpcProxyEnablePprof bool
+	grpcProxyEnablePprof    bool
+	grpcProxyEnableOrdering bool
 )
 )
 
 
 func init() {
 func init() {
@@ -119,6 +122,9 @@ func newGRPCProxyStartCommand() *cobra.Command {
 	cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
 	cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
 	cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
 	cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
 
 
+	// experimental flags
+	cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
+
 	return &cmd
 	return &cmd
 }
 }
 
 
@@ -255,6 +261,20 @@ func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
 }
 }
 
 
 func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
 func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
+	if grpcProxyEnableOrdering {
+		vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
+		client.KV = ordering.NewKV(client.KV, vf)
+		plog.Infof("waiting for linearized read from cluster to recover ordering")
+		for {
+			_, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
+			if err == nil {
+				break
+			}
+			plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err)
+			time.Sleep(time.Second)
+		}
+	}
+
 	if len(grpcProxyNamespace) > 0 {
 	if len(grpcProxyNamespace) > 0 {
 		client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
 		client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
 		client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
 		client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)