bulkinserter.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package mongo
  2. import (
  3. "time"
  4. "github.com/globalsign/mgo"
  5. "github.com/tal-tech/go-zero/core/executors"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. )
  8. const (
  9. flushInterval = time.Second
  10. maxBulkRows = 1000
  11. )
  12. type (
  13. ResultHandler func(*mgo.BulkResult, error)
  14. BulkInserter struct {
  15. executor *executors.PeriodicalExecutor
  16. inserter *dbInserter
  17. }
  18. )
  19. func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func() string) *BulkInserter {
  20. inserter := &dbInserter{
  21. session: session,
  22. dbName: dbName,
  23. collectionNamer: collectionNamer,
  24. }
  25. return &BulkInserter{
  26. executor: executors.NewPeriodicalExecutor(flushInterval, inserter),
  27. inserter: inserter,
  28. }
  29. }
  30. func (bi *BulkInserter) Flush() {
  31. bi.executor.Flush()
  32. }
  33. func (bi *BulkInserter) Insert(doc interface{}) {
  34. bi.executor.Add(doc)
  35. }
  36. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
  37. bi.executor.Sync(func() {
  38. bi.inserter.resultHandler = handler
  39. })
  40. }
  41. type dbInserter struct {
  42. session *mgo.Session
  43. dbName string
  44. collectionNamer func() string
  45. documents []interface{}
  46. resultHandler ResultHandler
  47. }
  48. func (in *dbInserter) AddTask(doc interface{}) bool {
  49. in.documents = append(in.documents, doc)
  50. return len(in.documents) >= maxBulkRows
  51. }
  52. func (in *dbInserter) Execute(objs interface{}) {
  53. docs := objs.([]interface{})
  54. if len(docs) == 0 {
  55. return
  56. }
  57. bulk := in.session.DB(in.dbName).C(in.collectionNamer()).Bulk()
  58. bulk.Insert(docs...)
  59. bulk.Unordered()
  60. result, err := bulk.Run()
  61. if in.resultHandler != nil {
  62. in.resultHandler(result, err)
  63. } else if err != nil {
  64. logx.Error(err)
  65. }
  66. }
  67. func (in *dbInserter) RemoveAll() interface{} {
  68. documents := in.documents
  69. in.documents = nil
  70. return documents
  71. }