election.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3election
  15. import (
  16. "errors"
  17. "golang.org/x/net/context"
  18. "github.com/coreos/etcd/clientv3"
  19. "github.com/coreos/etcd/clientv3/concurrency"
  20. epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
  21. )
  22. // ErrMissingLeaderKey is returned when election API request
  23. // is missing the "leader" field.
  24. var ErrMissingLeaderKey = errors.New(`"leader" field must be provided`)
  25. type electionServer struct {
  26. c *clientv3.Client
  27. }
  28. func NewElectionServer(c *clientv3.Client) epb.ElectionServer {
  29. return &electionServer{c}
  30. }
  31. func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest) (*epb.CampaignResponse, error) {
  32. s, err := es.session(ctx, req.Lease)
  33. if err != nil {
  34. return nil, err
  35. }
  36. e := concurrency.NewElection(s, string(req.Name))
  37. if err = e.Campaign(ctx, string(req.Value)); err != nil {
  38. return nil, err
  39. }
  40. return &epb.CampaignResponse{
  41. Header: e.Header(),
  42. Leader: &epb.LeaderKey{
  43. Name: req.Name,
  44. Key: []byte(e.Key()),
  45. Rev: e.Rev(),
  46. Lease: int64(s.Lease()),
  47. },
  48. }, nil
  49. }
  50. func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
  51. if req.Leader == nil {
  52. return nil, ErrMissingLeaderKey
  53. }
  54. s, err := es.session(ctx, req.Leader.Lease)
  55. if err != nil {
  56. return nil, err
  57. }
  58. e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
  59. if err := e.Proclaim(ctx, string(req.Value)); err != nil {
  60. return nil, err
  61. }
  62. return &epb.ProclaimResponse{Header: e.Header()}, nil
  63. }
  64. func (es *electionServer) Observe(req *epb.LeaderRequest, stream epb.Election_ObserveServer) error {
  65. s, err := es.session(stream.Context(), -1)
  66. if err != nil {
  67. return err
  68. }
  69. e := concurrency.NewElection(s, string(req.Name))
  70. ch := e.Observe(stream.Context())
  71. for stream.Context().Err() == nil {
  72. select {
  73. case <-stream.Context().Done():
  74. case resp, ok := <-ch:
  75. if !ok {
  76. return nil
  77. }
  78. lresp := &epb.LeaderResponse{Header: resp.Header, Kv: resp.Kvs[0]}
  79. if err := stream.Send(lresp); err != nil {
  80. return err
  81. }
  82. }
  83. }
  84. return stream.Context().Err()
  85. }
  86. func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*epb.LeaderResponse, error) {
  87. s, err := es.session(ctx, -1)
  88. if err != nil {
  89. return nil, err
  90. }
  91. l, lerr := concurrency.NewElection(s, string(req.Name)).Leader(ctx)
  92. if lerr != nil {
  93. return nil, lerr
  94. }
  95. return &epb.LeaderResponse{Header: l.Header, Kv: l.Kvs[0]}, nil
  96. }
  97. func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
  98. if req.Leader == nil {
  99. return nil, ErrMissingLeaderKey
  100. }
  101. s, err := es.session(ctx, req.Leader.Lease)
  102. if err != nil {
  103. return nil, err
  104. }
  105. e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
  106. if err := e.Resign(ctx); err != nil {
  107. return nil, err
  108. }
  109. return &epb.ResignResponse{Header: e.Header()}, nil
  110. }
  111. func (es *electionServer) session(ctx context.Context, lease int64) (*concurrency.Session, error) {
  112. s, err := concurrency.NewSession(
  113. es.c,
  114. concurrency.WithLease(clientv3.LeaseID(lease)),
  115. concurrency.WithContext(ctx),
  116. )
  117. if err != nil {
  118. return nil, err
  119. }
  120. s.Orphan()
  121. return s, nil
  122. }