publisher.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package discov
  2. import (
  3. "github.com/tal-tech/go-zero/core/discov/internal"
  4. "github.com/tal-tech/go-zero/core/lang"
  5. "github.com/tal-tech/go-zero/core/logx"
  6. "github.com/tal-tech/go-zero/core/proc"
  7. "github.com/tal-tech/go-zero/core/syncx"
  8. "github.com/tal-tech/go-zero/core/threading"
  9. "go.etcd.io/etcd/clientv3"
  10. )
  11. type (
  12. // PublisherOption defines the method to customize a Publisher.
  13. PublisherOption func(client *Publisher)
  14. // A Publisher can be used to publish the value to an etcd cluster on the given key.
  15. Publisher struct {
  16. endpoints []string
  17. key string
  18. fullKey string
  19. id int64
  20. value string
  21. lease clientv3.LeaseID
  22. quit *syncx.DoneChan
  23. pauseChan chan lang.PlaceholderType
  24. resumeChan chan lang.PlaceholderType
  25. }
  26. )
  27. // NewPublisher returns a Publisher.
  28. // endpoints is the hosts of the etcd cluster.
  29. // key:value are a pair to be published.
  30. // opts are used to customize the Publisher.
  31. func NewPublisher(endpoints []string, key, value string, opts ...PublisherOption) *Publisher {
  32. publisher := &Publisher{
  33. endpoints: endpoints,
  34. key: key,
  35. value: value,
  36. quit: syncx.NewDoneChan(),
  37. pauseChan: make(chan lang.PlaceholderType),
  38. resumeChan: make(chan lang.PlaceholderType),
  39. }
  40. for _, opt := range opts {
  41. opt(publisher)
  42. }
  43. return publisher
  44. }
  45. // KeepAlive keeps key:value alive.
  46. func (p *Publisher) KeepAlive() error {
  47. cli, err := internal.GetRegistry().GetConn(p.endpoints)
  48. if err != nil {
  49. return err
  50. }
  51. p.lease, err = p.register(cli)
  52. if err != nil {
  53. return err
  54. }
  55. proc.AddWrapUpListener(func() {
  56. p.Stop()
  57. })
  58. return p.keepAliveAsync(cli)
  59. }
  60. // Pause pauses the renewing of key:value.
  61. func (p *Publisher) Pause() {
  62. p.pauseChan <- lang.Placeholder
  63. }
  64. // Resume resumes the renewing of key:value.
  65. func (p *Publisher) Resume() {
  66. p.resumeChan <- lang.Placeholder
  67. }
  68. // Stop stops the renewing and revokes the registration.
  69. func (p *Publisher) Stop() {
  70. p.quit.Close()
  71. }
  72. func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
  73. ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
  74. if err != nil {
  75. return err
  76. }
  77. threading.GoSafe(func() {
  78. for {
  79. select {
  80. case _, ok := <-ch:
  81. if !ok {
  82. p.revoke(cli)
  83. if err := p.KeepAlive(); err != nil {
  84. logx.Errorf("KeepAlive: %s", err.Error())
  85. }
  86. return
  87. }
  88. case <-p.pauseChan:
  89. logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
  90. p.revoke(cli)
  91. select {
  92. case <-p.resumeChan:
  93. if err := p.KeepAlive(); err != nil {
  94. logx.Errorf("KeepAlive: %s", err.Error())
  95. }
  96. return
  97. case <-p.quit.Done():
  98. return
  99. }
  100. case <-p.quit.Done():
  101. p.revoke(cli)
  102. return
  103. }
  104. }
  105. })
  106. return nil
  107. }
  108. func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
  109. resp, err := client.Grant(client.Ctx(), TimeToLive)
  110. if err != nil {
  111. return clientv3.NoLease, err
  112. }
  113. lease := resp.ID
  114. if p.id > 0 {
  115. p.fullKey = makeEtcdKey(p.key, p.id)
  116. } else {
  117. p.fullKey = makeEtcdKey(p.key, int64(lease))
  118. }
  119. _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
  120. return lease, err
  121. }
  122. func (p *Publisher) revoke(cli internal.EtcdClient) {
  123. if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
  124. logx.Error(err)
  125. }
  126. }
  127. // WithId customizes a Publisher with the id.
  128. func WithId(id int64) PublisherOption {
  129. return func(publisher *Publisher) {
  130. publisher.id = id
  131. }
  132. }