stash.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package main
  2. import (
  3. "flag"
  4. "time"
  5. "zero/core/conf"
  6. "zero/core/lang"
  7. "zero/core/proc"
  8. "zero/kq"
  9. "zero/stash/config"
  10. "zero/stash/es"
  11. "zero/stash/filter"
  12. "zero/stash/handler"
  13. "github.com/olivere/elastic"
  14. )
  15. const dateFormat = "2006.01.02"
  16. var configFile = flag.String("f", "etc/config.json", "Specify the config file")
  17. func main() {
  18. flag.Parse()
  19. var c config.Config
  20. conf.MustLoad(*configFile, &c)
  21. proc.SetTimeoutToForceQuit(c.GracePeriod)
  22. client, err := elastic.NewClient(
  23. elastic.SetSniff(false),
  24. elastic.SetURL(c.Output.ElasticSearch.Hosts...),
  25. )
  26. lang.Must(err)
  27. indexFormat := c.Output.ElasticSearch.DailyIndexPrefix + dateFormat
  28. var loc *time.Location
  29. if len(c.Output.ElasticSearch.TimeZone) > 0 {
  30. loc, err = time.LoadLocation(c.Output.ElasticSearch.TimeZone)
  31. lang.Must(err)
  32. } else {
  33. loc = time.Local
  34. }
  35. indexer := es.NewIndex(client, func(t time.Time) string {
  36. return t.In(loc).Format(indexFormat)
  37. })
  38. filters := filter.CreateFilters(c)
  39. writer, err := es.NewWriter(c.Output.ElasticSearch, indexer)
  40. lang.Must(err)
  41. handle := handler.NewHandler(writer)
  42. handle.AddFilters(filters...)
  43. handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
  44. q := kq.MustNewQueue(c.Input.Kafka, handle)
  45. q.Start()
  46. }