producernode.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package dq
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/beanstalkd/go-beanstalk"
  9. )
  10. var ErrTimeBeforeNow = errors.New("can't schedule task to past time")
  11. type producerNode struct {
  12. endpoint string
  13. tube string
  14. conn *connection
  15. }
  16. func NewProducerNode(endpoint, tube string) Producer {
  17. return &producerNode{
  18. endpoint: endpoint,
  19. tube: tube,
  20. conn: newConnection(endpoint, tube),
  21. }
  22. }
  23. func (p *producerNode) At(body []byte, at time.Time) (string, error) {
  24. now := time.Now()
  25. if at.Before(now) {
  26. return "", ErrTimeBeforeNow
  27. }
  28. duration := at.Sub(now)
  29. return p.Delay(body, duration)
  30. }
  31. func (p *producerNode) Close() error {
  32. return p.conn.Close()
  33. }
  34. func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
  35. conn, err := p.conn.get()
  36. if err != nil {
  37. return "", err
  38. }
  39. id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
  40. if err == nil {
  41. return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
  42. }
  43. // the error can only be beanstalk.NameError or beanstalk.ConnError
  44. // just return when the error is beanstalk.NameError, don't reset
  45. switch cerr := err.(type) {
  46. case beanstalk.ConnError:
  47. switch cerr.Err {
  48. case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
  49. beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
  50. beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
  51. // won't reset
  52. default:
  53. // beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
  54. p.conn.reset()
  55. }
  56. }
  57. return "", err
  58. }
  59. func (p *producerNode) Revoke(jointId string) error {
  60. ids := strings.Split(jointId, idSep)
  61. for _, id := range ids {
  62. fields := strings.Split(id, "/")
  63. if len(fields) < 3 {
  64. continue
  65. }
  66. if fields[0] != p.endpoint || fields[1] != p.tube {
  67. continue
  68. }
  69. conn, err := p.conn.get()
  70. if err != nil {
  71. return err
  72. }
  73. n, err := strconv.ParseUint(fields[2], 10, 64)
  74. if err != nil {
  75. return err
  76. }
  77. return conn.Delete(n)
  78. }
  79. // if not in this beanstalk, ignore
  80. return nil
  81. }