Browse Source

cron: fix data races accessing the `running` state variable

Rob Figueiredo 6 years ago
parent
commit
9aa5b7e823
2 changed files with 33 additions and 11 deletions
  1. 26 11
      cron.go
  2. 7 0
      cron_test.go

+ 26 - 11
cron.go

@@ -19,6 +19,7 @@ type Cron struct {
 	snapshot  chan chan []Entry
 	running   bool
 	logger    Logger
+	runningMu sync.Mutex
 	location  *time.Location
 	parser    Parser
 	nextID    EntryID
@@ -101,21 +102,22 @@ func (s byTime) Less(i, j int) bool {
 //
 //   Chain
 //     Description: Wrap submitted jobs to customize behavior.
-//     Default:     A chain that recovers panics.
+//     Default:     A chain that recovers panics and logs them to stderr.
 //
 // See "cron.With*" to modify the default behavior.
 func New(opts ...Option) *Cron {
 	c := &Cron{
-		entries:  nil,
-		chain:    NewChain(),
-		add:      make(chan *Entry),
-		stop:     make(chan struct{}),
-		snapshot: make(chan chan []Entry),
-		remove:   make(chan EntryID),
-		running:  false,
-		logger:   DefaultLogger,
-		location: time.Local,
-		parser:   standardParser,
+		entries:   nil,
+		chain:     NewChain(),
+		add:       make(chan *Entry),
+		stop:      make(chan struct{}),
+		snapshot:  make(chan chan []Entry),
+		remove:    make(chan EntryID),
+		running:   false,
+		runningMu: sync.Mutex{},
+		logger:    DefaultLogger,
+		location:  time.Local,
+		parser:    standardParser,
 	}
 	for _, opt := range opts {
 		opt(c)
@@ -149,6 +151,8 @@ func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
 // Schedule adds a Job to the Cron to be run on the given schedule.
 // The job is wrapped with the configured Chain.
 func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
+	c.runningMu.Lock()
+	defer c.runningMu.Unlock()
 	c.nextID++
 	entry := &Entry{
 		ID:         c.nextID,
@@ -166,6 +170,8 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
 
 // Entries returns a snapshot of the cron entries.
 func (c *Cron) Entries() []Entry {
+	c.runningMu.Lock()
+	defer c.runningMu.Unlock()
 	if c.running {
 		replyChan := make(chan []Entry, 1)
 		c.snapshot <- replyChan
@@ -191,6 +197,8 @@ func (c *Cron) Entry(id EntryID) Entry {
 
 // Remove an entry from being run in the future.
 func (c *Cron) Remove(id EntryID) {
+	c.runningMu.Lock()
+	defer c.runningMu.Unlock()
 	if c.running {
 		c.remove <- id
 	} else {
@@ -200,6 +208,8 @@ func (c *Cron) Remove(id EntryID) {
 
 // Start the cron scheduler in its own goroutine, or no-op if already started.
 func (c *Cron) Start() {
+	c.runningMu.Lock()
+	defer c.runningMu.Unlock()
 	if c.running {
 		return
 	}
@@ -209,10 +219,13 @@ func (c *Cron) Start() {
 
 // Run the cron scheduler, or no-op if already running.
 func (c *Cron) Run() {
+	c.runningMu.Lock()
 	if c.running {
+		c.runningMu.Unlock()
 		return
 	}
 	c.running = true
+	c.runningMu.Unlock()
 	c.run()
 }
 
@@ -303,6 +316,8 @@ func (c *Cron) now() time.Time {
 // Stop stops the cron scheduler if it is running; otherwise it does nothing.
 // A context is returned so the caller can wait for running jobs to complete.
 func (c *Cron) Stop() context.Context {
+	c.runningMu.Lock()
+	defer c.runningMu.Unlock()
 	if c.running {
 		c.stop <- struct{}{}
 		c.running = false

+ 7 - 0
cron_test.go

@@ -671,6 +671,13 @@ func TestStopAndWait(t *testing.T) {
 	})
 }
 
+func TestMultiThreadedStartAndStop(t *testing.T) {
+	cron := New()
+	go cron.Run()
+	time.Sleep(2 * time.Millisecond)
+	cron.Stop()
+}
+
 func wait(wg *sync.WaitGroup) chan bool {
 	ch := make(chan bool)
 	go func() {