cachedsql_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/logx"
  18. "github.com/tal-tech/go-zero/core/stat"
  19. "github.com/tal-tech/go-zero/core/stores/cache"
  20. "github.com/tal-tech/go-zero/core/stores/redis"
  21. "github.com/tal-tech/go-zero/core/stores/sqlx"
  22. )
  23. func init() {
  24. logx.Disable()
  25. stat.SetReporter(nil)
  26. }
  27. func TestCachedConn_GetCache(t *testing.T) {
  28. resetStats()
  29. s, err := miniredis.Run()
  30. if err != nil {
  31. t.Error(err)
  32. }
  33. r := redis.NewRedis(s.Addr(), redis.NodeType)
  34. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  35. var value string
  36. err = c.GetCache("any", &value)
  37. assert.Equal(t, ErrNotFound, err)
  38. s.Set("any", `"value"`)
  39. err = c.GetCache("any", &value)
  40. assert.Nil(t, err)
  41. assert.Equal(t, "value", value)
  42. }
  43. func TestStat(t *testing.T) {
  44. resetStats()
  45. s, err := miniredis.Run()
  46. if err != nil {
  47. t.Error(err)
  48. }
  49. r := redis.NewRedis(s.Addr(), redis.NodeType)
  50. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  51. for i := 0; i < 10; i++ {
  52. var str string
  53. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  54. *v.(*string) = "zero"
  55. return nil
  56. })
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. }
  61. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  62. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  63. }
  64. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  65. resetStats()
  66. s, err := miniredis.Run()
  67. if err != nil {
  68. t.Error(err)
  69. }
  70. r := redis.NewRedis(s.Addr(), redis.NodeType)
  71. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  72. var str string
  73. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  74. return fmt.Sprintf("%s/1234", s)
  75. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  76. *v.(*string) = "zero"
  77. return "primary", nil
  78. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  79. assert.Equal(t, "primary", pri)
  80. *v.(*string) = "xin"
  81. return nil
  82. })
  83. assert.Nil(t, err)
  84. assert.Equal(t, "zero", str)
  85. val, err := r.Get("index")
  86. assert.Nil(t, err)
  87. assert.Equal(t, `"primary"`, val)
  88. val, err = r.Get("primary/1234")
  89. assert.Nil(t, err)
  90. assert.Equal(t, `"zero"`, val)
  91. }
  92. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  93. resetStats()
  94. s, err := miniredis.Run()
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. r := redis.NewRedis(s.Addr(), redis.NodeType)
  99. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  100. cache.WithNotFoundExpiry(time.Second))
  101. var str string
  102. r.Set("index", `"primary"`)
  103. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  104. return fmt.Sprintf("%s/1234", s)
  105. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  106. assert.Fail(t, "should not go here")
  107. return "primary", nil
  108. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  109. *v.(*string) = "xin"
  110. assert.Equal(t, "primary", primary)
  111. return nil
  112. })
  113. assert.Nil(t, err)
  114. assert.Equal(t, "xin", str)
  115. val, err := r.Get("index")
  116. assert.Nil(t, err)
  117. assert.Equal(t, `"primary"`, val)
  118. val, err = r.Get("primary/1234")
  119. assert.Nil(t, err)
  120. assert.Equal(t, `"xin"`, val)
  121. }
  122. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  123. const (
  124. primaryInt8 int8 = 100
  125. primaryInt16 int16 = 10000
  126. primaryInt32 int32 = 10000000
  127. primaryInt64 int64 = 10000000
  128. primaryUint8 uint8 = 100
  129. primaryUint16 uint16 = 10000
  130. primaryUint32 uint32 = 10000000
  131. primaryUint64 uint64 = 10000000
  132. )
  133. tests := []struct {
  134. name string
  135. primary interface{}
  136. primaryCache string
  137. }{
  138. {
  139. name: "int8 primary",
  140. primary: primaryInt8,
  141. primaryCache: fmt.Sprint(primaryInt8),
  142. },
  143. {
  144. name: "int16 primary",
  145. primary: primaryInt16,
  146. primaryCache: fmt.Sprint(primaryInt16),
  147. },
  148. {
  149. name: "int32 primary",
  150. primary: primaryInt32,
  151. primaryCache: fmt.Sprint(primaryInt32),
  152. },
  153. {
  154. name: "int64 primary",
  155. primary: primaryInt64,
  156. primaryCache: fmt.Sprint(primaryInt64),
  157. },
  158. {
  159. name: "uint8 primary",
  160. primary: primaryUint8,
  161. primaryCache: fmt.Sprint(primaryUint8),
  162. },
  163. {
  164. name: "uint16 primary",
  165. primary: primaryUint16,
  166. primaryCache: fmt.Sprint(primaryUint16),
  167. },
  168. {
  169. name: "uint32 primary",
  170. primary: primaryUint32,
  171. primaryCache: fmt.Sprint(primaryUint32),
  172. },
  173. {
  174. name: "uint64 primary",
  175. primary: primaryUint64,
  176. primaryCache: fmt.Sprint(primaryUint64),
  177. },
  178. }
  179. s, err := miniredis.Run()
  180. if err != nil {
  181. t.Error(err)
  182. }
  183. defer s.Close()
  184. for _, test := range tests {
  185. t.Run(test.name, func(t *testing.T) {
  186. resetStats()
  187. s.FlushAll()
  188. r := redis.NewRedis(s.Addr(), redis.NodeType)
  189. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  190. cache.WithNotFoundExpiry(time.Second))
  191. var str string
  192. r.Set("index", test.primaryCache)
  193. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  194. return fmt.Sprintf("%v/1234", s)
  195. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  196. assert.Fail(t, "should not go here")
  197. return test.primary, nil
  198. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  199. *v.(*string) = "xin"
  200. assert.Equal(t, primary, primary)
  201. return nil
  202. })
  203. assert.Nil(t, err)
  204. assert.Equal(t, "xin", str)
  205. val, err := r.Get("index")
  206. assert.Nil(t, err)
  207. assert.Equal(t, test.primaryCache, val)
  208. val, err = r.Get(test.primaryCache + "/1234")
  209. assert.Nil(t, err)
  210. assert.Equal(t, `"xin"`, val)
  211. })
  212. }
  213. }
  214. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  215. caches := map[string]string{
  216. "index": "primary",
  217. "primary/1234": "xin",
  218. }
  219. for k, v := range caches {
  220. t.Run(k+"/"+v, func(t *testing.T) {
  221. resetStats()
  222. s, err := miniredis.Run()
  223. if err != nil {
  224. t.Error(err)
  225. }
  226. s.FlushAll()
  227. defer s.Close()
  228. r := redis.NewRedis(s.Addr(), redis.NodeType)
  229. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  230. cache.WithNotFoundExpiry(time.Second))
  231. var str string
  232. r.Set(k, v)
  233. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  234. return fmt.Sprintf("%s/1234", s)
  235. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  236. *v.(*string) = "xin"
  237. return "primary", nil
  238. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  239. *v.(*string) = "xin"
  240. assert.Equal(t, "primary", primary)
  241. return nil
  242. })
  243. assert.Nil(t, err)
  244. assert.Equal(t, "xin", str)
  245. val, err := r.Get("index")
  246. assert.Nil(t, err)
  247. assert.Equal(t, `"primary"`, val)
  248. val, err = r.Get("primary/1234")
  249. assert.Nil(t, err)
  250. assert.Equal(t, `"xin"`, val)
  251. })
  252. }
  253. }
  254. func TestStatCacheFails(t *testing.T) {
  255. resetStats()
  256. log.SetOutput(ioutil.Discard)
  257. defer log.SetOutput(os.Stdout)
  258. r := redis.NewRedis("localhost:59999", redis.NodeType)
  259. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  260. for i := 0; i < 20; i++ {
  261. var str string
  262. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  263. return errors.New("db failed")
  264. })
  265. assert.NotNil(t, err)
  266. }
  267. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  268. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  269. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  270. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  271. }
  272. func TestStatDbFails(t *testing.T) {
  273. resetStats()
  274. s, err := miniredis.Run()
  275. if err != nil {
  276. t.Error(err)
  277. }
  278. r := redis.NewRedis(s.Addr(), redis.NodeType)
  279. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  280. for i := 0; i < 20; i++ {
  281. var str string
  282. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  283. return errors.New("db failed")
  284. })
  285. assert.NotNil(t, err)
  286. }
  287. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  288. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  289. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  290. }
  291. func TestStatFromMemory(t *testing.T) {
  292. resetStats()
  293. s, err := miniredis.Run()
  294. if err != nil {
  295. t.Error(err)
  296. }
  297. r := redis.NewRedis(s.Addr(), redis.NodeType)
  298. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  299. var all sync.WaitGroup
  300. var wait sync.WaitGroup
  301. all.Add(10)
  302. wait.Add(4)
  303. go func() {
  304. var str string
  305. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  306. *v.(*string) = "zero"
  307. return nil
  308. })
  309. if err != nil {
  310. t.Error(err)
  311. }
  312. wait.Wait()
  313. runtime.Gosched()
  314. all.Done()
  315. }()
  316. for i := 0; i < 4; i++ {
  317. go func() {
  318. var str string
  319. wait.Done()
  320. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  321. *v.(*string) = "zero"
  322. return nil
  323. })
  324. if err != nil {
  325. t.Error(err)
  326. }
  327. all.Done()
  328. }()
  329. }
  330. for i := 0; i < 5; i++ {
  331. go func() {
  332. var str string
  333. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  334. *v.(*string) = "zero"
  335. return nil
  336. })
  337. if err != nil {
  338. t.Error(err)
  339. }
  340. all.Done()
  341. }()
  342. }
  343. all.Wait()
  344. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  345. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  346. }
  347. func TestCachedConnQueryRow(t *testing.T) {
  348. s, err := miniredis.Run()
  349. if err != nil {
  350. t.Error(err)
  351. }
  352. const (
  353. key = "user"
  354. value = "any"
  355. )
  356. var conn trackedConn
  357. var user string
  358. var ran bool
  359. r := redis.NewRedis(s.Addr(), redis.NodeType)
  360. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  361. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  362. ran = true
  363. user = value
  364. return nil
  365. })
  366. assert.Nil(t, err)
  367. actualValue, err := s.Get(key)
  368. assert.Nil(t, err)
  369. var actual string
  370. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  371. assert.Equal(t, value, actual)
  372. assert.Equal(t, value, user)
  373. assert.True(t, ran)
  374. }
  375. func TestCachedConnQueryRowFromCache(t *testing.T) {
  376. s, err := miniredis.Run()
  377. if err != nil {
  378. t.Error(err)
  379. }
  380. const (
  381. key = "user"
  382. value = "any"
  383. )
  384. var conn trackedConn
  385. var user string
  386. var ran bool
  387. r := redis.NewRedis(s.Addr(), redis.NodeType)
  388. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  389. assert.Nil(t, c.SetCache(key, value))
  390. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  391. ran = true
  392. user = value
  393. return nil
  394. })
  395. assert.Nil(t, err)
  396. actualValue, err := s.Get(key)
  397. assert.Nil(t, err)
  398. var actual string
  399. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  400. assert.Equal(t, value, actual)
  401. assert.Equal(t, value, user)
  402. assert.False(t, ran)
  403. }
  404. func TestQueryRowNotFound(t *testing.T) {
  405. s, err := miniredis.Run()
  406. if err != nil {
  407. t.Error(err)
  408. }
  409. const key = "user"
  410. var conn trackedConn
  411. var user string
  412. var ran int
  413. r := redis.NewRedis(s.Addr(), redis.NodeType)
  414. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  415. for i := 0; i < 20; i++ {
  416. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  417. ran++
  418. return sql.ErrNoRows
  419. })
  420. assert.Exactly(t, sqlx.ErrNotFound, err)
  421. }
  422. assert.Equal(t, 1, ran)
  423. }
  424. func TestCachedConnExec(t *testing.T) {
  425. s, err := miniredis.Run()
  426. if err != nil {
  427. t.Error(err)
  428. }
  429. var conn trackedConn
  430. r := redis.NewRedis(s.Addr(), redis.NodeType)
  431. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  432. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  433. assert.Nil(t, err)
  434. assert.True(t, conn.execValue)
  435. }
  436. func TestCachedConnExecDropCache(t *testing.T) {
  437. s, err := miniredis.Run()
  438. if err != nil {
  439. t.Error(err)
  440. }
  441. const (
  442. key = "user"
  443. value = "any"
  444. )
  445. var conn trackedConn
  446. r := redis.NewRedis(s.Addr(), redis.NodeType)
  447. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  448. assert.Nil(t, c.SetCache(key, value))
  449. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  450. return conn.Exec("delete from user_table where id='kevin'")
  451. }, key)
  452. assert.Nil(t, err)
  453. assert.True(t, conn.execValue)
  454. _, err = s.Get(key)
  455. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  456. }
  457. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  458. const key = "user"
  459. var conn trackedConn
  460. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  461. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  462. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  463. return conn.Exec("delete from user_table where id='kevin'")
  464. }, key)
  465. // async background clean, retry logic
  466. assert.Nil(t, err)
  467. }
  468. func TestCachedConnQueryRows(t *testing.T) {
  469. s, err := miniredis.Run()
  470. if err != nil {
  471. t.Error(err)
  472. }
  473. var conn trackedConn
  474. r := redis.NewRedis(s.Addr(), redis.NodeType)
  475. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  476. var users []string
  477. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  478. assert.Nil(t, err)
  479. assert.True(t, conn.queryRowsValue)
  480. }
  481. func TestCachedConnTransact(t *testing.T) {
  482. s, err := miniredis.Run()
  483. if err != nil {
  484. t.Error(err)
  485. }
  486. var conn trackedConn
  487. r := redis.NewRedis(s.Addr(), redis.NodeType)
  488. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  489. err = c.Transact(func(session sqlx.Session) error {
  490. return nil
  491. })
  492. assert.Nil(t, err)
  493. assert.True(t, conn.transactValue)
  494. }
  495. func TestQueryRowNoCache(t *testing.T) {
  496. s, err := miniredis.Run()
  497. if err != nil {
  498. t.Error(err)
  499. }
  500. const (
  501. key = "user"
  502. value = "any"
  503. )
  504. var user string
  505. var ran bool
  506. r := redis.NewRedis(s.Addr(), redis.NodeType)
  507. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  508. user = value
  509. ran = true
  510. return nil
  511. }}
  512. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  513. err = c.QueryRowNoCache(&user, key)
  514. assert.Nil(t, err)
  515. assert.Equal(t, value, user)
  516. assert.True(t, ran)
  517. }
  518. func TestFloatKeyer(t *testing.T) {
  519. primaries := []interface{}{
  520. float32(1),
  521. float64(1),
  522. }
  523. for _, primary := range primaries {
  524. val := floatKeyer(func(i interface{}) string {
  525. return fmt.Sprint(i)
  526. })(primary)
  527. assert.Equal(t, "1", val)
  528. }
  529. }
  530. func resetStats() {
  531. atomic.StoreUint64(&stats.Total, 0)
  532. atomic.StoreUint64(&stats.Hit, 0)
  533. atomic.StoreUint64(&stats.Miss, 0)
  534. atomic.StoreUint64(&stats.DbFails, 0)
  535. }
  536. type dummySqlConn struct {
  537. queryRow func(interface{}, string, ...interface{}) error
  538. }
  539. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  540. return nil, nil
  541. }
  542. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  543. return nil, nil
  544. }
  545. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  546. if d.queryRow != nil {
  547. return d.queryRow(v, query, args...)
  548. }
  549. return nil
  550. }
  551. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  552. return nil
  553. }
  554. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  555. return nil
  556. }
  557. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  558. return nil
  559. }
  560. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  561. return nil
  562. }
  563. type trackedConn struct {
  564. dummySqlConn
  565. execValue bool
  566. queryRowsValue bool
  567. transactValue bool
  568. }
  569. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  570. c.execValue = true
  571. return c.dummySqlConn.Exec(query, args...)
  572. }
  573. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  574. c.queryRowsValue = true
  575. return c.dummySqlConn.QueryRows(v, query, args...)
  576. }
  577. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  578. c.transactValue = true
  579. return c.dummySqlConn.Transact(fn)
  580. }