123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- // Copyright 2018 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
- package endpoint
- import (
- "context"
- "fmt"
- "net"
- "net/url"
- "strings"
- "sync"
- "google.golang.org/grpc/resolver"
- )
- const scheme = "endpoint"
- var (
- targetPrefix = fmt.Sprintf("%s://", scheme)
- bldr *builder
- )
- func init() {
- bldr = &builder{
- resolverGroups: make(map[string]*ResolverGroup),
- }
- resolver.Register(bldr)
- }
- type builder struct {
- mu sync.RWMutex
- resolverGroups map[string]*ResolverGroup
- }
- // NewResolverGroup creates a new ResolverGroup with the given id.
- func NewResolverGroup(id string) (*ResolverGroup, error) {
- return bldr.newResolverGroup(id)
- }
- // ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
- // up-to-date.
- type ResolverGroup struct {
- mu sync.RWMutex
- id string
- endpoints []string
- resolvers []*Resolver
- }
- func (e *ResolverGroup) addResolver(r *Resolver) {
- e.mu.Lock()
- addrs := epsToAddrs(e.endpoints...)
- e.resolvers = append(e.resolvers, r)
- e.mu.Unlock()
- r.cc.NewAddress(addrs)
- }
- func (e *ResolverGroup) removeResolver(r *Resolver) {
- e.mu.Lock()
- for i, er := range e.resolvers {
- if er == r {
- e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
- break
- }
- }
- e.mu.Unlock()
- }
- // SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
- // immediately with the new endpoints.
- func (e *ResolverGroup) SetEndpoints(endpoints []string) {
- addrs := epsToAddrs(endpoints...)
- e.mu.Lock()
- e.endpoints = endpoints
- for _, r := range e.resolvers {
- r.cc.NewAddress(addrs)
- }
- e.mu.Unlock()
- }
- // Target constructs a endpoint target using the endpoint id of the ResolverGroup.
- func (e *ResolverGroup) Target(endpoint string) string {
- return Target(e.id, endpoint)
- }
- // Target constructs a endpoint resolver target.
- func Target(id, endpoint string) string {
- return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
- }
- // IsTarget checks if a given target string in an endpoint resolver target.
- func IsTarget(target string) bool {
- return strings.HasPrefix(target, "endpoint://")
- }
- func (e *ResolverGroup) Close() {
- bldr.close(e.id)
- }
- // Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
- func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
- if len(target.Authority) < 1 {
- return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
- }
- id := target.Authority
- es, err := b.getResolverGroup(id)
- if err != nil {
- return nil, fmt.Errorf("failed to build resolver: %v", err)
- }
- r := &Resolver{
- endpointID: id,
- cc: cc,
- }
- es.addResolver(r)
- return r, nil
- }
- func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
- b.mu.RLock()
- _, ok := b.resolverGroups[id]
- b.mu.RUnlock()
- if ok {
- return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
- }
- es := &ResolverGroup{id: id}
- b.mu.Lock()
- b.resolverGroups[id] = es
- b.mu.Unlock()
- return es, nil
- }
- func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
- b.mu.RLock()
- es, ok := b.resolverGroups[id]
- b.mu.RUnlock()
- if !ok {
- return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
- }
- return es, nil
- }
- func (b *builder) close(id string) {
- b.mu.Lock()
- delete(b.resolverGroups, id)
- b.mu.Unlock()
- }
- func (b *builder) Scheme() string {
- return scheme
- }
- // Resolver provides a resolver for a single etcd cluster, identified by name.
- type Resolver struct {
- endpointID string
- cc resolver.ClientConn
- sync.RWMutex
- }
- // TODO: use balancer.epsToAddrs
- func epsToAddrs(eps ...string) (addrs []resolver.Address) {
- addrs = make([]resolver.Address, 0, len(eps))
- for _, ep := range eps {
- addrs = append(addrs, resolver.Address{Addr: ep})
- }
- return addrs
- }
- func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
- func (r *Resolver) Close() {
- es, err := bldr.getResolverGroup(r.endpointID)
- if err != nil {
- return
- }
- es.removeResolver(r)
- }
- // ParseEndpoint endpoint parses an endpoint of the form
- // (http|https)://<host>*|(unix|unixs)://<path>)
- // and returns a protocol ('tcp' or 'unix'),
- // host (or filepath if a unix socket),
- // scheme (http, https, unix, unixs).
- func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
- proto = "tcp"
- host = endpoint
- url, uerr := url.Parse(endpoint)
- if uerr != nil || !strings.Contains(endpoint, "://") {
- return proto, host, scheme
- }
- scheme = url.Scheme
- // strip scheme:// prefix since grpc dials by host
- host = url.Host
- switch url.Scheme {
- case "http", "https":
- case "unix", "unixs":
- proto = "unix"
- host = url.Host + url.Path
- default:
- proto, host = "", ""
- }
- return proto, host, scheme
- }
- // ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
- // If the target is malformed, an error is returned.
- func ParseTarget(target string) (string, string, error) {
- noPrefix := strings.TrimPrefix(target, targetPrefix)
- if noPrefix == target {
- return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
- }
- parts := strings.SplitN(noPrefix, "/", 2)
- if len(parts) != 2 {
- return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
- }
- return parts[0], parts[1], nil
- }
- // Dialer dials a endpoint using net.Dialer.
- // Context cancelation and timeout are supported.
- func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
- proto, host, _ := ParseEndpoint(dialEp)
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- default:
- }
- dialer := &net.Dialer{}
- if deadline, ok := ctx.Deadline(); ok {
- dialer.Deadline = deadline
- }
- return dialer.DialContext(ctx, proto, host)
- }
|