writer.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package es
  2. import (
  3. "context"
  4. "time"
  5. "zero/core/executors"
  6. "zero/core/logx"
  7. "zero/stash/config"
  8. "github.com/olivere/elastic"
  9. )
  10. const docType = "doc"
  11. type (
  12. Writer struct {
  13. client *elastic.Client
  14. indexer *Index
  15. inserter *executors.ChunkExecutor
  16. }
  17. valueWithTime struct {
  18. t time.Time
  19. val string
  20. }
  21. )
  22. func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) {
  23. client, err := elastic.NewClient(
  24. elastic.SetSniff(false),
  25. elastic.SetURL(c.Hosts...),
  26. elastic.SetGzip(c.Compress),
  27. )
  28. if err != nil {
  29. return nil, err
  30. }
  31. writer := Writer{
  32. client: client,
  33. indexer: indexer,
  34. }
  35. writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
  36. return &writer, nil
  37. }
  38. func (w *Writer) Write(t time.Time, val string) error {
  39. return w.inserter.Add(valueWithTime{
  40. t: t,
  41. val: val,
  42. }, len(val))
  43. }
  44. func (w *Writer) execute(vals []interface{}) {
  45. var bulk = w.client.Bulk()
  46. for _, val := range vals {
  47. pair := val.(valueWithTime)
  48. req := elastic.NewBulkIndexRequest().Index(w.indexer.GetIndex(pair.t)).Type(docType).Doc(pair.val)
  49. bulk.Add(req)
  50. }
  51. _, err := bulk.Do(context.Background())
  52. if err != nil {
  53. logx.Error(err)
  54. }
  55. }