sentinel.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "net"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/go-redis/redis/internal"
  11. "github.com/go-redis/redis/internal/pool"
  12. )
  13. //------------------------------------------------------------------------------
  14. // FailoverOptions are used to configure a failover client and should
  15. // be passed to NewFailoverClient.
  16. type FailoverOptions struct {
  17. // The master name.
  18. MasterName string
  19. // A seed list of host:port addresses of sentinel nodes.
  20. SentinelAddrs []string
  21. SentinelPassword string
  22. // Following options are copied from Options struct.
  23. Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
  24. OnConnect func(*Conn) error
  25. Password string
  26. DB int
  27. MaxRetries int
  28. MinRetryBackoff time.Duration
  29. MaxRetryBackoff time.Duration
  30. DialTimeout time.Duration
  31. ReadTimeout time.Duration
  32. WriteTimeout time.Duration
  33. PoolSize int
  34. MinIdleConns int
  35. MaxConnAge time.Duration
  36. PoolTimeout time.Duration
  37. IdleTimeout time.Duration
  38. IdleCheckFrequency time.Duration
  39. TLSConfig *tls.Config
  40. }
  41. func (opt *FailoverOptions) options() *Options {
  42. return &Options{
  43. Addr: "FailoverClient",
  44. Dialer: opt.Dialer,
  45. OnConnect: opt.OnConnect,
  46. DB: opt.DB,
  47. Password: opt.Password,
  48. MaxRetries: opt.MaxRetries,
  49. MinRetryBackoff: opt.MinRetryBackoff,
  50. MaxRetryBackoff: opt.MaxRetryBackoff,
  51. DialTimeout: opt.DialTimeout,
  52. ReadTimeout: opt.ReadTimeout,
  53. WriteTimeout: opt.WriteTimeout,
  54. PoolSize: opt.PoolSize,
  55. PoolTimeout: opt.PoolTimeout,
  56. IdleTimeout: opt.IdleTimeout,
  57. IdleCheckFrequency: opt.IdleCheckFrequency,
  58. MinIdleConns: opt.MinIdleConns,
  59. MaxConnAge: opt.MaxConnAge,
  60. TLSConfig: opt.TLSConfig,
  61. }
  62. }
  63. // NewFailoverClient returns a Redis client that uses Redis Sentinel
  64. // for automatic failover. It's safe for concurrent use by multiple
  65. // goroutines.
  66. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  67. opt := failoverOpt.options()
  68. opt.init()
  69. failover := &sentinelFailover{
  70. masterName: failoverOpt.MasterName,
  71. sentinelAddrs: failoverOpt.SentinelAddrs,
  72. password: failoverOpt.SentinelPassword,
  73. opt: opt,
  74. }
  75. c := Client{
  76. baseClient: newBaseClient(opt, failover.Pool()),
  77. ctx: context.Background(),
  78. }
  79. c.cmdable = c.Process
  80. c.onClose = failover.Close
  81. return &c
  82. }
  83. //------------------------------------------------------------------------------
  84. type SentinelClient struct {
  85. *baseClient
  86. ctx context.Context
  87. }
  88. func NewSentinelClient(opt *Options) *SentinelClient {
  89. opt.init()
  90. c := &SentinelClient{
  91. baseClient: &baseClient{
  92. opt: opt,
  93. connPool: newConnPool(opt),
  94. },
  95. ctx: context.Background(),
  96. }
  97. return c
  98. }
  99. func (c *SentinelClient) Context() context.Context {
  100. return c.ctx
  101. }
  102. func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient {
  103. if ctx == nil {
  104. panic("nil context")
  105. }
  106. clone := *c
  107. clone.ctx = ctx
  108. return &clone
  109. }
  110. func (c *SentinelClient) Process(cmd Cmder) error {
  111. return c.ProcessContext(c.ctx, cmd)
  112. }
  113. func (c *SentinelClient) ProcessContext(ctx context.Context, cmd Cmder) error {
  114. return c.baseClient.process(ctx, cmd)
  115. }
  116. func (c *SentinelClient) pubSub() *PubSub {
  117. pubsub := &PubSub{
  118. opt: c.opt,
  119. newConn: func(channels []string) (*pool.Conn, error) {
  120. return c.newConn(context.TODO())
  121. },
  122. closeConn: c.connPool.CloseConn,
  123. }
  124. pubsub.init()
  125. return pubsub
  126. }
  127. // Ping is used to test if a connection is still alive, or to
  128. // measure latency.
  129. func (c *SentinelClient) Ping() *StringCmd {
  130. cmd := NewStringCmd("ping")
  131. _ = c.Process(cmd)
  132. return cmd
  133. }
  134. // Subscribe subscribes the client to the specified channels.
  135. // Channels can be omitted to create empty subscription.
  136. func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
  137. pubsub := c.pubSub()
  138. if len(channels) > 0 {
  139. _ = pubsub.Subscribe(channels...)
  140. }
  141. return pubsub
  142. }
  143. // PSubscribe subscribes the client to the given patterns.
  144. // Patterns can be omitted to create empty subscription.
  145. func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
  146. pubsub := c.pubSub()
  147. if len(channels) > 0 {
  148. _ = pubsub.PSubscribe(channels...)
  149. }
  150. return pubsub
  151. }
  152. func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  153. cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
  154. _ = c.Process(cmd)
  155. return cmd
  156. }
  157. func (c *SentinelClient) Sentinels(name string) *SliceCmd {
  158. cmd := NewSliceCmd("sentinel", "sentinels", name)
  159. _ = c.Process(cmd)
  160. return cmd
  161. }
  162. // Failover forces a failover as if the master was not reachable, and without
  163. // asking for agreement to other Sentinels.
  164. func (c *SentinelClient) Failover(name string) *StatusCmd {
  165. cmd := NewStatusCmd("sentinel", "failover", name)
  166. _ = c.Process(cmd)
  167. return cmd
  168. }
  169. // Reset resets all the masters with matching name. The pattern argument is a
  170. // glob-style pattern. The reset process clears any previous state in a master
  171. // (including a failover in progress), and removes every slave and sentinel
  172. // already discovered and associated with the master.
  173. func (c *SentinelClient) Reset(pattern string) *IntCmd {
  174. cmd := NewIntCmd("sentinel", "reset", pattern)
  175. _ = c.Process(cmd)
  176. return cmd
  177. }
  178. // FlushConfig forces Sentinel to rewrite its configuration on disk, including
  179. // the current Sentinel state.
  180. func (c *SentinelClient) FlushConfig() *StatusCmd {
  181. cmd := NewStatusCmd("sentinel", "flushconfig")
  182. _ = c.Process(cmd)
  183. return cmd
  184. }
  185. // Master shows the state and info of the specified master.
  186. func (c *SentinelClient) Master(name string) *StringStringMapCmd {
  187. cmd := NewStringStringMapCmd("sentinel", "master", name)
  188. _ = c.Process(cmd)
  189. return cmd
  190. }
  191. // Masters shows a list of monitored masters and their state.
  192. func (c *SentinelClient) Masters() *SliceCmd {
  193. cmd := NewSliceCmd("sentinel", "masters")
  194. _ = c.Process(cmd)
  195. return cmd
  196. }
  197. // Slaves shows a list of slaves for the specified master and their state.
  198. func (c *SentinelClient) Slaves(name string) *SliceCmd {
  199. cmd := NewSliceCmd("sentinel", "slaves", name)
  200. _ = c.Process(cmd)
  201. return cmd
  202. }
  203. // CkQuorum checks if the current Sentinel configuration is able to reach the
  204. // quorum needed to failover a master, and the majority needed to authorize the
  205. // failover. This command should be used in monitoring systems to check if a
  206. // Sentinel deployment is ok.
  207. func (c *SentinelClient) CkQuorum(name string) *StringCmd {
  208. cmd := NewStringCmd("sentinel", "ckquorum", name)
  209. _ = c.Process(cmd)
  210. return cmd
  211. }
  212. // Monitor tells the Sentinel to start monitoring a new master with the specified
  213. // name, ip, port, and quorum.
  214. func (c *SentinelClient) Monitor(name, ip, port, quorum string) *StringCmd {
  215. cmd := NewStringCmd("sentinel", "monitor", name, ip, port, quorum)
  216. _ = c.Process(cmd)
  217. return cmd
  218. }
  219. // Set is used in order to change configuration parameters of a specific master.
  220. func (c *SentinelClient) Set(name, option, value string) *StringCmd {
  221. cmd := NewStringCmd("sentinel", "set", name, option, value)
  222. _ = c.Process(cmd)
  223. return cmd
  224. }
  225. // Remove is used in order to remove the specified master: the master will no
  226. // longer be monitored, and will totally be removed from the internal state of
  227. // the Sentinel.
  228. func (c *SentinelClient) Remove(name string) *StringCmd {
  229. cmd := NewStringCmd("sentinel", "remove", name)
  230. _ = c.Process(cmd)
  231. return cmd
  232. }
  233. type sentinelFailover struct {
  234. sentinelAddrs []string
  235. opt *Options
  236. password string
  237. pool *pool.ConnPool
  238. poolOnce sync.Once
  239. mu sync.RWMutex
  240. masterName string
  241. _masterAddr string
  242. sentinel *SentinelClient
  243. pubsub *PubSub
  244. }
  245. func (c *sentinelFailover) Close() error {
  246. c.mu.Lock()
  247. defer c.mu.Unlock()
  248. if c.sentinel != nil {
  249. return c.closeSentinel()
  250. }
  251. return nil
  252. }
  253. func (c *sentinelFailover) closeSentinel() error {
  254. firstErr := c.pubsub.Close()
  255. c.pubsub = nil
  256. err := c.sentinel.Close()
  257. if err != nil && firstErr == nil {
  258. firstErr = err
  259. }
  260. c.sentinel = nil
  261. return firstErr
  262. }
  263. func (c *sentinelFailover) Pool() *pool.ConnPool {
  264. c.poolOnce.Do(func() {
  265. opt := *c.opt
  266. opt.Dialer = c.dial
  267. c.pool = newConnPool(&opt)
  268. })
  269. return c.pool
  270. }
  271. func (c *sentinelFailover) dial(ctx context.Context, network, _ string) (net.Conn, error) {
  272. addr, err := c.MasterAddr()
  273. if err != nil {
  274. return nil, err
  275. }
  276. if c.opt.Dialer != nil {
  277. return c.opt.Dialer(ctx, network, addr)
  278. }
  279. return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
  280. }
  281. func (c *sentinelFailover) MasterAddr() (string, error) {
  282. addr, err := c.masterAddr()
  283. if err != nil {
  284. return "", err
  285. }
  286. c.switchMaster(addr)
  287. return addr, nil
  288. }
  289. func (c *sentinelFailover) masterAddr() (string, error) {
  290. c.mu.RLock()
  291. sentinel := c.sentinel
  292. c.mu.RUnlock()
  293. if sentinel != nil {
  294. addr := c.getMasterAddr(sentinel)
  295. if addr != "" {
  296. return addr, nil
  297. }
  298. }
  299. c.mu.Lock()
  300. defer c.mu.Unlock()
  301. if c.sentinel != nil {
  302. addr := c.getMasterAddr(c.sentinel)
  303. if addr != "" {
  304. return addr, nil
  305. }
  306. _ = c.closeSentinel()
  307. }
  308. for i, sentinelAddr := range c.sentinelAddrs {
  309. sentinel := NewSentinelClient(&Options{
  310. Addr: sentinelAddr,
  311. Dialer: c.opt.Dialer,
  312. Password: c.password,
  313. MaxRetries: c.opt.MaxRetries,
  314. DialTimeout: c.opt.DialTimeout,
  315. ReadTimeout: c.opt.ReadTimeout,
  316. WriteTimeout: c.opt.WriteTimeout,
  317. PoolSize: c.opt.PoolSize,
  318. PoolTimeout: c.opt.PoolTimeout,
  319. IdleTimeout: c.opt.IdleTimeout,
  320. IdleCheckFrequency: c.opt.IdleCheckFrequency,
  321. TLSConfig: c.opt.TLSConfig,
  322. })
  323. masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
  324. if err != nil {
  325. internal.Logger.Printf("sentinel: GetMasterAddrByName master=%q failed: %s",
  326. c.masterName, err)
  327. _ = sentinel.Close()
  328. continue
  329. }
  330. // Push working sentinel to the top.
  331. c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
  332. c.setSentinel(sentinel)
  333. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  334. return addr, nil
  335. }
  336. return "", errors.New("redis: all sentinels are unreachable")
  337. }
  338. func (c *sentinelFailover) getMasterAddr(sentinel *SentinelClient) string {
  339. addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
  340. if err != nil {
  341. internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s",
  342. c.masterName, err)
  343. return ""
  344. }
  345. return net.JoinHostPort(addr[0], addr[1])
  346. }
  347. func (c *sentinelFailover) switchMaster(addr string) {
  348. c.mu.RLock()
  349. masterAddr := c._masterAddr
  350. c.mu.RUnlock()
  351. if masterAddr == addr {
  352. return
  353. }
  354. c.mu.Lock()
  355. defer c.mu.Unlock()
  356. if c._masterAddr == addr {
  357. return
  358. }
  359. internal.Logger.Printf("sentinel: new master=%q addr=%q",
  360. c.masterName, addr)
  361. _ = c.Pool().Filter(func(cn *pool.Conn) bool {
  362. return cn.RemoteAddr().String() != addr
  363. })
  364. c._masterAddr = addr
  365. }
  366. func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
  367. if c.sentinel != nil {
  368. panic("not reached")
  369. }
  370. c.sentinel = sentinel
  371. c.discoverSentinels()
  372. c.pubsub = sentinel.Subscribe("+switch-master")
  373. go c.listen(c.pubsub)
  374. }
  375. func (c *sentinelFailover) discoverSentinels() {
  376. sentinels, err := c.sentinel.Sentinels(c.masterName).Result()
  377. if err != nil {
  378. internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
  379. return
  380. }
  381. for _, sentinel := range sentinels {
  382. vals := sentinel.([]interface{})
  383. for i := 0; i < len(vals); i += 2 {
  384. key := vals[i].(string)
  385. if key == "name" {
  386. sentinelAddr := vals[i+1].(string)
  387. if !contains(c.sentinelAddrs, sentinelAddr) {
  388. internal.Logger.Printf("sentinel: discovered new sentinel=%q for master=%q",
  389. sentinelAddr, c.masterName)
  390. c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
  391. }
  392. }
  393. }
  394. }
  395. }
  396. func (c *sentinelFailover) listen(pubsub *PubSub) {
  397. ch := pubsub.Channel()
  398. for {
  399. msg, ok := <-ch
  400. if !ok {
  401. break
  402. }
  403. if msg.Channel == "+switch-master" {
  404. parts := strings.Split(msg.Payload, " ")
  405. if parts[0] != c.masterName {
  406. internal.Logger.Printf("sentinel: ignore addr for master=%q", parts[0])
  407. continue
  408. }
  409. addr := net.JoinHostPort(parts[3], parts[4])
  410. c.switchMaster(addr)
  411. }
  412. }
  413. }
  414. func contains(slice []string, str string) bool {
  415. for _, s := range slice {
  416. if s == str {
  417. return true
  418. }
  419. }
  420. return false
  421. }