Pārlūkot izejas kodu

cron.Stop: return a context that waits for outstanding jobs to complete.

Fixes #98
Rob Figueiredo 6 gadi atpakaļ
vecāks
revīzija
15ec73dd77
2 mainītis faili ar 151 papildinājumiem un 17 dzēšanām
  1. 35 17
      cron.go
  2. 116 0
      cron_test.go

+ 35 - 17
cron.go

@@ -1,7 +1,9 @@
 package cron
 
 import (
+	"context"
 	"sort"
+	"sync"
 	"time"
 )
 
@@ -9,17 +11,18 @@ import (
 // specified by the schedule. It may be started, stopped, and the entries may
 // be inspected while running.
 type Cron struct {
-	entries  []*Entry
-	chain    Chain
-	stop     chan struct{}
-	add      chan *Entry
-	remove   chan EntryID
-	snapshot chan chan []Entry
-	running  bool
-	logger   Logger
-	location *time.Location
-	parser   Parser
-	nextID   EntryID
+	entries   []*Entry
+	chain     Chain
+	stop      chan struct{}
+	add       chan *Entry
+	remove    chan EntryID
+	snapshot  chan chan []Entry
+	running   bool
+	logger    Logger
+	location  *time.Location
+	parser    Parser
+	nextID    EntryID
+	jobWaiter sync.WaitGroup
 }
 
 // Job is an interface for submitted cron jobs.
@@ -249,7 +252,7 @@ func (c *Cron) run() {
 					if e.Next.After(now) || e.Next.IsZero() {
 						break
 					}
-					go e.WrappedJob.Run()
+					c.startJob(e.WrappedJob)
 					e.Prev = e.Next
 					e.Next = e.Schedule.Next(now)
 					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
@@ -283,18 +286,33 @@ func (c *Cron) run() {
 	}
 }
 
+// startJob runs the given job in a new goroutine.
+func (c *Cron) startJob(j Job) {
+	c.jobWaiter.Add(1)
+	go func() {
+		defer c.jobWaiter.Done()
+		j.Run()
+	}()
+}
+
 // now returns current time in c location
 func (c *Cron) now() time.Time {
 	return time.Now().In(c.location)
 }
 
 // Stop stops the cron scheduler if it is running; otherwise it does nothing.
-func (c *Cron) Stop() {
-	if !c.running {
-		return
+// A context is returned so the caller can wait for running jobs to complete.
+func (c *Cron) Stop() context.Context {
+	if c.running {
+		c.stop <- struct{}{}
+		c.running = false
 	}
-	c.stop <- struct{}{}
-	c.running = false
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		c.jobWaiter.Wait()
+		cancel()
+	}()
+	return ctx
 }
 
 // entrySnapshot returns a copy of the current cron entry list.

+ 116 - 0
cron_test.go

@@ -555,6 +555,122 @@ func TestJobWithZeroTimeDoesNotRun(t *testing.T) {
 	}
 }
 
+func TestStopAndWait(t *testing.T) {
+	t.Run("nothing running, returns immediately", func(t *testing.T) {
+		cron := newWithSeconds()
+		cron.Start()
+		ctx := cron.Stop()
+		select {
+		case <-ctx.Done():
+		case <-time.After(time.Millisecond):
+			t.Error("context was not done immediately")
+		}
+	})
+
+	t.Run("repeated calls to Stop", func(t *testing.T) {
+		cron := newWithSeconds()
+		cron.Start()
+		_ = cron.Stop()
+		time.Sleep(time.Millisecond)
+		ctx := cron.Stop()
+		select {
+		case <-ctx.Done():
+		case <-time.After(time.Millisecond):
+			t.Error("context was not done immediately")
+		}
+	})
+
+	t.Run("a couple fast jobs added, still returns immediately", func(t *testing.T) {
+		cron := newWithSeconds()
+		cron.AddFunc("* * * * * *", func() {})
+		cron.Start()
+		cron.AddFunc("* * * * * *", func() {})
+		cron.AddFunc("* * * * * *", func() {})
+		cron.AddFunc("* * * * * *", func() {})
+		time.Sleep(time.Second)
+		ctx := cron.Stop()
+		select {
+		case <-ctx.Done():
+		case <-time.After(time.Millisecond):
+			t.Error("context was not done immediately")
+		}
+	})
+
+	t.Run("a couple fast jobs and a slow job added, waits for slow job", func(t *testing.T) {
+		cron := newWithSeconds()
+		cron.AddFunc("* * * * * *", func() {})
+		cron.Start()
+		cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
+		cron.AddFunc("* * * * * *", func() {})
+		time.Sleep(time.Second)
+
+		ctx := cron.Stop()
+
+		// Verify that it is not done for at least 750ms
+		select {
+		case <-ctx.Done():
+			t.Error("context was done too quickly immediately")
+		case <-time.After(750 * time.Millisecond):
+			// expected, because the job sleeping for 1 second is still running
+		}
+
+		// Verify that it IS done in the next 500ms (giving 250ms buffer)
+		select {
+		case <-ctx.Done():
+			// expected
+		case <-time.After(1500 * time.Millisecond):
+			t.Error("context not done after job should have completed")
+		}
+	})
+
+	t.Run("repeated calls to stop, waiting for completion and after", func(t *testing.T) {
+		cron := newWithSeconds()
+		cron.AddFunc("* * * * * *", func() {})
+		cron.AddFunc("* * * * * *", func() { time.Sleep(2 * time.Second) })
+		cron.Start()
+		cron.AddFunc("* * * * * *", func() {})
+		time.Sleep(time.Second)
+		ctx := cron.Stop()
+		ctx2 := cron.Stop()
+
+		// Verify that it is not done for at least 1500ms
+		select {
+		case <-ctx.Done():
+			t.Error("context was done too quickly immediately")
+		case <-ctx2.Done():
+			t.Error("context2 was done too quickly immediately")
+		case <-time.After(1500 * time.Millisecond):
+			// expected, because the job sleeping for 2 seconds is still running
+		}
+
+		// Verify that it IS done in the next 1s (giving 500ms buffer)
+		select {
+		case <-ctx.Done():
+			// expected
+		case <-time.After(time.Second):
+			t.Error("context not done after job should have completed")
+		}
+
+		// Verify that ctx2 is also done.
+		select {
+		case <-ctx2.Done():
+			// expected
+		case <-time.After(time.Millisecond):
+			t.Error("context2 not done even though context1 is")
+		}
+
+		// Verify that a new context retrieved from stop is immediately done.
+		ctx3 := cron.Stop()
+		select {
+		case <-ctx3.Done():
+			// expected
+		case <-time.After(time.Millisecond):
+			t.Error("context not done even when cron Stop is completed")
+		}
+
+	})
+}
+
 func wait(wg *sync.WaitGroup) chan bool {
 	ch := make(chan bool)
 	go func() {