index.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package es
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "zero/core/fx"
  7. "zero/core/logx"
  8. "zero/core/syncx"
  9. "github.com/olivere/elastic"
  10. )
  11. const sharedCallsKey = "ensureIndex"
  12. type (
  13. IndexFormat func(time.Time) string
  14. IndexFunc func() string
  15. Index struct {
  16. client *elastic.Client
  17. indexFormat IndexFormat
  18. index string
  19. lock sync.RWMutex
  20. sharedCalls syncx.SharedCalls
  21. }
  22. )
  23. func NewIndex(client *elastic.Client, indexFormat IndexFormat) *Index {
  24. return &Index{
  25. client: client,
  26. indexFormat: indexFormat,
  27. sharedCalls: syncx.NewSharedCalls(),
  28. }
  29. }
  30. func (idx *Index) GetIndex(t time.Time) string {
  31. index := idx.indexFormat(t)
  32. if err := idx.ensureIndex(index); err != nil {
  33. logx.Error(err)
  34. }
  35. return index
  36. }
  37. func (idx *Index) ensureIndex(index string) error {
  38. idx.lock.RLock()
  39. if index == idx.index {
  40. idx.lock.RUnlock()
  41. return nil
  42. }
  43. idx.lock.RUnlock()
  44. _, err := idx.sharedCalls.Do(sharedCallsKey, func() (i interface{}, err error) {
  45. idx.lock.Lock()
  46. defer idx.lock.Unlock()
  47. existsService := elastic.NewIndicesExistsService(idx.client)
  48. existsService.Index([]string{index})
  49. exist, err := existsService.Do(context.Background())
  50. if err != nil {
  51. return nil, err
  52. }
  53. if exist {
  54. idx.index = index
  55. return nil, nil
  56. }
  57. createService := idx.client.CreateIndex(index)
  58. if err := fx.DoWithRetries(func() error {
  59. // is it necessary to check the result?
  60. _, err := createService.Do(context.Background())
  61. return err
  62. }); err != nil {
  63. return nil, err
  64. }
  65. idx.index = index
  66. return nil, nil
  67. })
  68. return err
  69. }