store.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package kv
  2. import (
  3. "errors"
  4. "log"
  5. "github.com/tal-tech/go-zero/core/errorx"
  6. "github.com/tal-tech/go-zero/core/hash"
  7. "github.com/tal-tech/go-zero/core/stores/cache"
  8. "github.com/tal-tech/go-zero/core/stores/redis"
  9. )
  10. var ErrNoRedisNode = errors.New("no redis node")
  11. type (
  12. Store interface {
  13. Del(keys ...string) (int, error)
  14. Eval(script string, key string, args ...interface{}) (interface{}, error)
  15. Exists(key string) (bool, error)
  16. Expire(key string, seconds int) error
  17. Expireat(key string, expireTime int64) error
  18. Get(key string) (string, error)
  19. Hdel(key, field string) (bool, error)
  20. Hexists(key, field string) (bool, error)
  21. Hget(key, field string) (string, error)
  22. Hgetall(key string) (map[string]string, error)
  23. Hincrby(key, field string, increment int) (int, error)
  24. Hkeys(key string) ([]string, error)
  25. Hlen(key string) (int, error)
  26. Hmget(key string, fields ...string) ([]string, error)
  27. Hset(key, field, value string) error
  28. Hsetnx(key, field, value string) (bool, error)
  29. Hmset(key string, fieldsAndValues map[string]string) error
  30. Hvals(key string) ([]string, error)
  31. Incr(key string) (int64, error)
  32. Incrby(key string, increment int64) (int64, error)
  33. Llen(key string) (int, error)
  34. Lpop(key string) (string, error)
  35. Lpush(key string, values ...interface{}) (int, error)
  36. Lrange(key string, start int, stop int) ([]string, error)
  37. Lrem(key string, count int, value string) (int, error)
  38. Persist(key string) (bool, error)
  39. Pfadd(key string, values ...interface{}) (bool, error)
  40. Pfcount(key string) (int64, error)
  41. Rpush(key string, values ...interface{}) (int, error)
  42. Sadd(key string, values ...interface{}) (int, error)
  43. Scard(key string) (int64, error)
  44. Set(key string, value string) error
  45. Setex(key, value string, seconds int) error
  46. Setnx(key, value string) (bool, error)
  47. SetnxEx(key, value string, seconds int) (bool, error)
  48. Sismember(key string, value interface{}) (bool, error)
  49. Smembers(key string) ([]string, error)
  50. Spop(key string) (string, error)
  51. Srandmember(key string, count int) ([]string, error)
  52. Srem(key string, values ...interface{}) (int, error)
  53. Sscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error)
  54. Ttl(key string) (int, error)
  55. Zadd(key string, score int64, value string) (bool, error)
  56. Zadds(key string, ps ...redis.Pair) (int64, error)
  57. Zcard(key string) (int, error)
  58. Zcount(key string, start, stop int64) (int, error)
  59. Zincrby(key string, increment int64, field string) (int64, error)
  60. Zrange(key string, start, stop int64) ([]string, error)
  61. ZrangeWithScores(key string, start, stop int64) ([]redis.Pair, error)
  62. ZrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error)
  63. ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ([]redis.Pair, error)
  64. Zrank(key, field string) (int64, error)
  65. Zrem(key string, values ...interface{}) (int, error)
  66. Zremrangebyrank(key string, start, stop int64) (int, error)
  67. Zremrangebyscore(key string, start, stop int64) (int, error)
  68. Zrevrange(key string, start, stop int64) ([]string, error)
  69. ZrevrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error)
  70. ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ([]redis.Pair, error)
  71. Zscore(key string, value string) (int64, error)
  72. }
  73. clusterStore struct {
  74. dispatcher *hash.ConsistentHash
  75. }
  76. )
  77. func NewStore(c KvConf) Store {
  78. if len(c) == 0 || cache.TotalWeights(c) <= 0 {
  79. log.Fatal("no cache nodes")
  80. }
  81. // even if only one node, we chose to use consistent hash,
  82. // because Store and redis.Redis has different methods.
  83. dispatcher := hash.NewConsistentHash()
  84. for _, node := range c {
  85. cn := node.NewRedis()
  86. dispatcher.AddWithWeight(cn, node.Weight)
  87. }
  88. return clusterStore{
  89. dispatcher: dispatcher,
  90. }
  91. }
  92. func (cs clusterStore) Del(keys ...string) (int, error) {
  93. var val int
  94. var be errorx.BatchError
  95. for _, key := range keys {
  96. node, e := cs.getRedis(key)
  97. if e != nil {
  98. be.Add(e)
  99. continue
  100. }
  101. if v, e := node.Del(key); e != nil {
  102. be.Add(e)
  103. } else {
  104. val += v
  105. }
  106. }
  107. return val, be.Err()
  108. }
  109. func (cs clusterStore) Eval(script string, key string, args ...interface{}) (interface{}, error) {
  110. node, err := cs.getRedis(key)
  111. if err != nil {
  112. return nil, err
  113. }
  114. return node.Eval(script, []string{key}, args...)
  115. }
  116. func (cs clusterStore) Exists(key string) (bool, error) {
  117. node, err := cs.getRedis(key)
  118. if err != nil {
  119. return false, err
  120. }
  121. return node.Exists(key)
  122. }
  123. func (cs clusterStore) Expire(key string, seconds int) error {
  124. node, err := cs.getRedis(key)
  125. if err != nil {
  126. return err
  127. }
  128. return node.Expire(key, seconds)
  129. }
  130. func (cs clusterStore) Expireat(key string, expireTime int64) error {
  131. node, err := cs.getRedis(key)
  132. if err != nil {
  133. return err
  134. }
  135. return node.Expireat(key, expireTime)
  136. }
  137. func (cs clusterStore) Get(key string) (string, error) {
  138. node, err := cs.getRedis(key)
  139. if err != nil {
  140. return "", err
  141. }
  142. return node.Get(key)
  143. }
  144. func (cs clusterStore) Hdel(key, field string) (bool, error) {
  145. node, err := cs.getRedis(key)
  146. if err != nil {
  147. return false, err
  148. }
  149. return node.Hdel(key, field)
  150. }
  151. func (cs clusterStore) Hexists(key, field string) (bool, error) {
  152. node, err := cs.getRedis(key)
  153. if err != nil {
  154. return false, err
  155. }
  156. return node.Hexists(key, field)
  157. }
  158. func (cs clusterStore) Hget(key, field string) (string, error) {
  159. node, err := cs.getRedis(key)
  160. if err != nil {
  161. return "", err
  162. }
  163. return node.Hget(key, field)
  164. }
  165. func (cs clusterStore) Hgetall(key string) (map[string]string, error) {
  166. node, err := cs.getRedis(key)
  167. if err != nil {
  168. return nil, err
  169. }
  170. return node.Hgetall(key)
  171. }
  172. func (cs clusterStore) Hincrby(key, field string, increment int) (int, error) {
  173. node, err := cs.getRedis(key)
  174. if err != nil {
  175. return 0, err
  176. }
  177. return node.Hincrby(key, field, increment)
  178. }
  179. func (cs clusterStore) Hkeys(key string) ([]string, error) {
  180. node, err := cs.getRedis(key)
  181. if err != nil {
  182. return nil, err
  183. }
  184. return node.Hkeys(key)
  185. }
  186. func (cs clusterStore) Hlen(key string) (int, error) {
  187. node, err := cs.getRedis(key)
  188. if err != nil {
  189. return 0, err
  190. }
  191. return node.Hlen(key)
  192. }
  193. func (cs clusterStore) Hmget(key string, fields ...string) ([]string, error) {
  194. node, err := cs.getRedis(key)
  195. if err != nil {
  196. return nil, err
  197. }
  198. return node.Hmget(key, fields...)
  199. }
  200. func (cs clusterStore) Hset(key, field, value string) error {
  201. node, err := cs.getRedis(key)
  202. if err != nil {
  203. return err
  204. }
  205. return node.Hset(key, field, value)
  206. }
  207. func (cs clusterStore) Hsetnx(key, field, value string) (bool, error) {
  208. node, err := cs.getRedis(key)
  209. if err != nil {
  210. return false, err
  211. }
  212. return node.Hsetnx(key, field, value)
  213. }
  214. func (cs clusterStore) Hmset(key string, fieldsAndValues map[string]string) error {
  215. node, err := cs.getRedis(key)
  216. if err != nil {
  217. return err
  218. }
  219. return node.Hmset(key, fieldsAndValues)
  220. }
  221. func (cs clusterStore) Hvals(key string) ([]string, error) {
  222. node, err := cs.getRedis(key)
  223. if err != nil {
  224. return nil, err
  225. }
  226. return node.Hvals(key)
  227. }
  228. func (cs clusterStore) Incr(key string) (int64, error) {
  229. node, err := cs.getRedis(key)
  230. if err != nil {
  231. return 0, err
  232. }
  233. return node.Incr(key)
  234. }
  235. func (cs clusterStore) Incrby(key string, increment int64) (int64, error) {
  236. node, err := cs.getRedis(key)
  237. if err != nil {
  238. return 0, err
  239. }
  240. return node.Incrby(key, increment)
  241. }
  242. func (cs clusterStore) Llen(key string) (int, error) {
  243. node, err := cs.getRedis(key)
  244. if err != nil {
  245. return 0, err
  246. }
  247. return node.Llen(key)
  248. }
  249. func (cs clusterStore) Lpop(key string) (string, error) {
  250. node, err := cs.getRedis(key)
  251. if err != nil {
  252. return "", err
  253. }
  254. return node.Lpop(key)
  255. }
  256. func (cs clusterStore) Lpush(key string, values ...interface{}) (int, error) {
  257. node, err := cs.getRedis(key)
  258. if err != nil {
  259. return 0, err
  260. }
  261. return node.Lpush(key, values...)
  262. }
  263. func (cs clusterStore) Lrange(key string, start int, stop int) ([]string, error) {
  264. node, err := cs.getRedis(key)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return node.Lrange(key, start, stop)
  269. }
  270. func (cs clusterStore) Lrem(key string, count int, value string) (int, error) {
  271. node, err := cs.getRedis(key)
  272. if err != nil {
  273. return 0, err
  274. }
  275. return node.Lrem(key, count, value)
  276. }
  277. func (cs clusterStore) Persist(key string) (bool, error) {
  278. node, err := cs.getRedis(key)
  279. if err != nil {
  280. return false, err
  281. }
  282. return node.Persist(key)
  283. }
  284. func (cs clusterStore) Pfadd(key string, values ...interface{}) (bool, error) {
  285. node, err := cs.getRedis(key)
  286. if err != nil {
  287. return false, err
  288. }
  289. return node.Pfadd(key, values...)
  290. }
  291. func (cs clusterStore) Pfcount(key string) (int64, error) {
  292. node, err := cs.getRedis(key)
  293. if err != nil {
  294. return 0, err
  295. }
  296. return node.Pfcount(key)
  297. }
  298. func (cs clusterStore) Rpush(key string, values ...interface{}) (int, error) {
  299. node, err := cs.getRedis(key)
  300. if err != nil {
  301. return 0, err
  302. }
  303. return node.Rpush(key, values...)
  304. }
  305. func (cs clusterStore) Sadd(key string, values ...interface{}) (int, error) {
  306. node, err := cs.getRedis(key)
  307. if err != nil {
  308. return 0, err
  309. }
  310. return node.Sadd(key, values...)
  311. }
  312. func (cs clusterStore) Scard(key string) (int64, error) {
  313. node, err := cs.getRedis(key)
  314. if err != nil {
  315. return 0, err
  316. }
  317. return node.Scard(key)
  318. }
  319. func (cs clusterStore) Set(key string, value string) error {
  320. node, err := cs.getRedis(key)
  321. if err != nil {
  322. return err
  323. }
  324. return node.Set(key, value)
  325. }
  326. func (cs clusterStore) Setex(key, value string, seconds int) error {
  327. node, err := cs.getRedis(key)
  328. if err != nil {
  329. return err
  330. }
  331. return node.Setex(key, value, seconds)
  332. }
  333. func (cs clusterStore) Setnx(key, value string) (bool, error) {
  334. node, err := cs.getRedis(key)
  335. if err != nil {
  336. return false, err
  337. }
  338. return node.Setnx(key, value)
  339. }
  340. func (cs clusterStore) SetnxEx(key, value string, seconds int) (bool, error) {
  341. node, err := cs.getRedis(key)
  342. if err != nil {
  343. return false, err
  344. }
  345. return node.SetnxEx(key, value, seconds)
  346. }
  347. func (cs clusterStore) Sismember(key string, value interface{}) (bool, error) {
  348. node, err := cs.getRedis(key)
  349. if err != nil {
  350. return false, err
  351. }
  352. return node.Sismember(key, value)
  353. }
  354. func (cs clusterStore) Smembers(key string) ([]string, error) {
  355. node, err := cs.getRedis(key)
  356. if err != nil {
  357. return nil, err
  358. }
  359. return node.Smembers(key)
  360. }
  361. func (cs clusterStore) Spop(key string) (string, error) {
  362. node, err := cs.getRedis(key)
  363. if err != nil {
  364. return "", err
  365. }
  366. return node.Spop(key)
  367. }
  368. func (cs clusterStore) Srandmember(key string, count int) ([]string, error) {
  369. node, err := cs.getRedis(key)
  370. if err != nil {
  371. return nil, err
  372. }
  373. return node.Srandmember(key, count)
  374. }
  375. func (cs clusterStore) Srem(key string, values ...interface{}) (int, error) {
  376. node, err := cs.getRedis(key)
  377. if err != nil {
  378. return 0, err
  379. }
  380. return node.Srem(key, values...)
  381. }
  382. func (cs clusterStore) Sscan(key string, cursor uint64, match string, count int64) (
  383. keys []string, cur uint64, err error) {
  384. node, err := cs.getRedis(key)
  385. if err != nil {
  386. return nil, 0, err
  387. }
  388. return node.Sscan(key, cursor, match, count)
  389. }
  390. func (cs clusterStore) Ttl(key string) (int, error) {
  391. node, err := cs.getRedis(key)
  392. if err != nil {
  393. return 0, err
  394. }
  395. return node.Ttl(key)
  396. }
  397. func (cs clusterStore) Zadd(key string, score int64, value string) (bool, error) {
  398. node, err := cs.getRedis(key)
  399. if err != nil {
  400. return false, err
  401. }
  402. return node.Zadd(key, score, value)
  403. }
  404. func (cs clusterStore) Zadds(key string, ps ...redis.Pair) (int64, error) {
  405. node, err := cs.getRedis(key)
  406. if err != nil {
  407. return 0, err
  408. }
  409. return node.Zadds(key, ps...)
  410. }
  411. func (cs clusterStore) Zcard(key string) (int, error) {
  412. node, err := cs.getRedis(key)
  413. if err != nil {
  414. return 0, err
  415. }
  416. return node.Zcard(key)
  417. }
  418. func (cs clusterStore) Zcount(key string, start, stop int64) (int, error) {
  419. node, err := cs.getRedis(key)
  420. if err != nil {
  421. return 0, err
  422. }
  423. return node.Zcount(key, start, stop)
  424. }
  425. func (cs clusterStore) Zincrby(key string, increment int64, field string) (int64, error) {
  426. node, err := cs.getRedis(key)
  427. if err != nil {
  428. return 0, err
  429. }
  430. return node.Zincrby(key, increment, field)
  431. }
  432. func (cs clusterStore) Zrank(key, field string) (int64, error) {
  433. node, err := cs.getRedis(key)
  434. if err != nil {
  435. return 0, err
  436. }
  437. return node.Zrank(key, field)
  438. }
  439. func (cs clusterStore) Zrange(key string, start, stop int64) ([]string, error) {
  440. node, err := cs.getRedis(key)
  441. if err != nil {
  442. return nil, err
  443. }
  444. return node.Zrange(key, start, stop)
  445. }
  446. func (cs clusterStore) ZrangeWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  447. node, err := cs.getRedis(key)
  448. if err != nil {
  449. return nil, err
  450. }
  451. return node.ZrangeWithScores(key, start, stop)
  452. }
  453. func (cs clusterStore) ZrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  454. node, err := cs.getRedis(key)
  455. if err != nil {
  456. return nil, err
  457. }
  458. return node.ZrangebyscoreWithScores(key, start, stop)
  459. }
  460. func (cs clusterStore) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
  461. []redis.Pair, error) {
  462. node, err := cs.getRedis(key)
  463. if err != nil {
  464. return nil, err
  465. }
  466. return node.ZrangebyscoreWithScoresAndLimit(key, start, stop, page, size)
  467. }
  468. func (cs clusterStore) Zrem(key string, values ...interface{}) (int, error) {
  469. node, err := cs.getRedis(key)
  470. if err != nil {
  471. return 0, err
  472. }
  473. return node.Zrem(key, values...)
  474. }
  475. func (cs clusterStore) Zremrangebyrank(key string, start, stop int64) (int, error) {
  476. node, err := cs.getRedis(key)
  477. if err != nil {
  478. return 0, err
  479. }
  480. return node.Zremrangebyrank(key, start, stop)
  481. }
  482. func (cs clusterStore) Zremrangebyscore(key string, start, stop int64) (int, error) {
  483. node, err := cs.getRedis(key)
  484. if err != nil {
  485. return 0, err
  486. }
  487. return node.Zremrangebyscore(key, start, stop)
  488. }
  489. func (cs clusterStore) Zrevrange(key string, start, stop int64) ([]string, error) {
  490. node, err := cs.getRedis(key)
  491. if err != nil {
  492. return nil, err
  493. }
  494. return node.Zrevrange(key, start, stop)
  495. }
  496. func (cs clusterStore) ZrevrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  497. node, err := cs.getRedis(key)
  498. if err != nil {
  499. return nil, err
  500. }
  501. return node.ZrevrangebyscoreWithScores(key, start, stop)
  502. }
  503. func (cs clusterStore) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
  504. []redis.Pair, error) {
  505. node, err := cs.getRedis(key)
  506. if err != nil {
  507. return nil, err
  508. }
  509. return node.ZrevrangebyscoreWithScoresAndLimit(key, start, stop, page, size)
  510. }
  511. func (cs clusterStore) Zscore(key string, value string) (int64, error) {
  512. node, err := cs.getRedis(key)
  513. if err != nil {
  514. return 0, err
  515. }
  516. return node.Zscore(key, value)
  517. }
  518. func (cs clusterStore) getRedis(key string) (*redis.Redis, error) {
  519. if val, ok := cs.dispatcher.Get(key); !ok {
  520. return nil, ErrNoRedisNode
  521. } else {
  522. return val.(*redis.Redis), nil
  523. }
  524. }