redisblockingnode.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package redis
  2. import (
  3. "fmt"
  4. red "github.com/go-redis/redis"
  5. "github.com/tal-tech/go-zero/core/logx"
  6. )
  7. type ClosableNode interface {
  8. RedisNode
  9. Close()
  10. }
  11. func CreateBlockingNode(r *Redis) (ClosableNode, error) {
  12. timeout := readWriteTimeout + blockingQueryTimeout
  13. switch r.Type {
  14. case NodeType:
  15. client := red.NewClient(&red.Options{
  16. Addr: r.Addr,
  17. Password: r.Pass,
  18. DB: defaultDatabase,
  19. MaxRetries: maxRetries,
  20. PoolSize: 1,
  21. MinIdleConns: 1,
  22. ReadTimeout: timeout,
  23. })
  24. return &clientBridge{client}, nil
  25. case ClusterType:
  26. client := red.NewClusterClient(&red.ClusterOptions{
  27. Addrs: []string{r.Addr},
  28. Password: r.Pass,
  29. MaxRetries: maxRetries,
  30. PoolSize: 1,
  31. MinIdleConns: 1,
  32. ReadTimeout: timeout,
  33. })
  34. return &clusterBridge{client}, nil
  35. default:
  36. return nil, fmt.Errorf("unknown redis type: %s", r.Type)
  37. }
  38. }
  39. type (
  40. clientBridge struct {
  41. *red.Client
  42. }
  43. clusterBridge struct {
  44. *red.ClusterClient
  45. }
  46. )
  47. func (bridge *clientBridge) Close() {
  48. if err := bridge.Client.Close(); err != nil {
  49. logx.Errorf("Error occurred on close redis client: %s", err)
  50. }
  51. }
  52. func (bridge *clusterBridge) Close() {
  53. if err := bridge.ClusterClient.Close(); err != nil {
  54. logx.Errorf("Error occurred on close redis cluster: %s", err)
  55. }
  56. }