123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // +build cluster_proxy
- package integration
- import (
- "sync"
- "go.etcd.io/etcd/clientv3"
- "go.etcd.io/etcd/clientv3/namespace"
- "go.etcd.io/etcd/proxy/grpcproxy"
- "go.etcd.io/etcd/proxy/grpcproxy/adapter"
- )
- var (
- pmu sync.Mutex
- proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy)
- )
- const proxyNamespace = "proxy-namespace"
- type grpcClientProxy struct {
- grpc grpcAPI
- wdonec <-chan struct{}
- kvdonec <-chan struct{}
- lpdonec <-chan struct{}
- }
- func toGRPC(c *clientv3.Client) grpcAPI {
- pmu.Lock()
- defer pmu.Unlock()
- if v, ok := proxies[c]; ok {
- return v.grpc
- }
- // test namespacing proxy
- c.KV = namespace.NewKV(c.KV, proxyNamespace)
- c.Watcher = namespace.NewWatcher(c.Watcher, proxyNamespace)
- c.Lease = namespace.NewLease(c.Lease, proxyNamespace)
- // test coalescing/caching proxy
- kvp, kvpch := grpcproxy.NewKvProxy(c)
- wp, wpch := grpcproxy.NewWatchProxy(c)
- lp, lpch := grpcproxy.NewLeaseProxy(c)
- mp := grpcproxy.NewMaintenanceProxy(c)
- clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs
- authp := grpcproxy.NewAuthProxy(c)
- lockp := grpcproxy.NewLockProxy(c)
- electp := grpcproxy.NewElectionProxy(c)
- grpc := grpcAPI{
- adapter.ClusterServerToClusterClient(clp),
- adapter.KvServerToKvClient(kvp),
- adapter.LeaseServerToLeaseClient(lp),
- adapter.WatchServerToWatchClient(wp),
- adapter.MaintenanceServerToMaintenanceClient(mp),
- adapter.AuthServerToAuthClient(authp),
- adapter.LockServerToLockClient(lockp),
- adapter.ElectionServerToElectionClient(electp),
- }
- proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch}
- return grpc
- }
- type proxyCloser struct {
- clientv3.Watcher
- wdonec <-chan struct{}
- kvdonec <-chan struct{}
- lclose func()
- lpdonec <-chan struct{}
- }
- func (pc *proxyCloser) Close() error {
- // client ctx is canceled before calling close, so kv and lp will close out
- <-pc.kvdonec
- err := pc.Watcher.Close()
- <-pc.wdonec
- pc.lclose()
- <-pc.lpdonec
- return err
- }
- func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
- c, err := clientv3.New(cfg)
- if err != nil {
- return nil, err
- }
- rpc := toGRPC(c)
- c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
- pmu.Lock()
- lc := c.Lease
- c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout)
- c.Watcher = &proxyCloser{
- Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c),
- wdonec: proxies[c].wdonec,
- kvdonec: proxies[c].kvdonec,
- lclose: func() { lc.Close() },
- lpdonec: proxies[c].lpdonec,
- }
- pmu.Unlock()
- return c, nil
- }
|