bulkinserter.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package mongo
  2. import (
  3. "time"
  4. "github.com/globalsign/mgo"
  5. "github.com/tal-tech/go-zero/core/executors"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. )
  8. const (
  9. flushInterval = time.Second
  10. maxBulkRows = 1000
  11. )
  12. type (
  13. // ResultHandler is a handler that used to handle results.
  14. ResultHandler func(*mgo.BulkResult, error)
  15. // A BulkInserter is used to insert bulk of mongo records.
  16. BulkInserter struct {
  17. executor *executors.PeriodicalExecutor
  18. inserter *dbInserter
  19. }
  20. )
  21. // NewBulkInserter returns a BulkInserter.
  22. func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func() string) *BulkInserter {
  23. inserter := &dbInserter{
  24. session: session,
  25. dbName: dbName,
  26. collectionNamer: collectionNamer,
  27. }
  28. return &BulkInserter{
  29. executor: executors.NewPeriodicalExecutor(flushInterval, inserter),
  30. inserter: inserter,
  31. }
  32. }
  33. // Flush flushes the inserter, writes all pending records.
  34. func (bi *BulkInserter) Flush() {
  35. bi.executor.Flush()
  36. }
  37. // Insert inserts doc.
  38. func (bi *BulkInserter) Insert(doc interface{}) {
  39. bi.executor.Add(doc)
  40. }
  41. // SetResultHandler sets the result handler.
  42. func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
  43. bi.executor.Sync(func() {
  44. bi.inserter.resultHandler = handler
  45. })
  46. }
  47. type dbInserter struct {
  48. session *mgo.Session
  49. dbName string
  50. collectionNamer func() string
  51. documents []interface{}
  52. resultHandler ResultHandler
  53. }
  54. func (in *dbInserter) AddTask(doc interface{}) bool {
  55. in.documents = append(in.documents, doc)
  56. return len(in.documents) >= maxBulkRows
  57. }
  58. func (in *dbInserter) Execute(objs interface{}) {
  59. docs := objs.([]interface{})
  60. if len(docs) == 0 {
  61. return
  62. }
  63. bulk := in.session.DB(in.dbName).C(in.collectionNamer()).Bulk()
  64. bulk.Insert(docs...)
  65. bulk.Unordered()
  66. result, err := bulk.Run()
  67. if in.resultHandler != nil {
  68. in.resultHandler(result, err)
  69. } else if err != nil {
  70. logx.Error(err)
  71. }
  72. }
  73. func (in *dbInserter) RemoveAll() interface{} {
  74. documents := in.documents
  75. in.documents = nil
  76. return documents
  77. }