| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- package es
- import (
- "context"
- "sync"
- "time"
- "zero/core/fx"
- "zero/core/logx"
- "zero/core/syncx"
- "github.com/olivere/elastic"
- )
- const sharedCallsKey = "ensureIndex"
- type (
- IndexFormat func(time.Time) string
- IndexFunc func() string
- Index struct {
- client *elastic.Client
- indexFormat IndexFormat
- index string
- lock sync.RWMutex
- sharedCalls syncx.SharedCalls
- }
- )
- func NewIndex(client *elastic.Client, indexFormat IndexFormat) *Index {
- return &Index{
- client: client,
- indexFormat: indexFormat,
- sharedCalls: syncx.NewSharedCalls(),
- }
- }
- func (idx *Index) GetIndex(t time.Time) string {
- index := idx.indexFormat(t)
- if err := idx.ensureIndex(index); err != nil {
- logx.Error(err)
- }
- return index
- }
- func (idx *Index) ensureIndex(index string) error {
- idx.lock.RLock()
- if index == idx.index {
- idx.lock.RUnlock()
- return nil
- }
- idx.lock.RUnlock()
- _, err := idx.sharedCalls.Do(sharedCallsKey, func() (i interface{}, err error) {
- idx.lock.Lock()
- defer idx.lock.Unlock()
- existsService := elastic.NewIndicesExistsService(idx.client)
- existsService.Index([]string{index})
- exist, err := existsService.Do(context.Background())
- if err != nil {
- return nil, err
- }
- if exist {
- idx.index = index
- return nil, nil
- }
- createService := idx.client.CreateIndex(index)
- if err := fx.DoWithRetries(func() error {
- // is it necessary to check the result?
- _, err := createService.Do(context.Background())
- return err
- }); err != nil {
- return nil, err
- }
- idx.index = index
- return nil, nil
- })
- return err
- }
|