Browse Source

Synchronize access to the cron Entries by providing snapshots. Also, rewrite tests to use WaitGroup instead of blocking on cron.run()

Rob Figueiredo 13 years ago
parent
commit
95e41d4d24
2 changed files with 163 additions and 69 deletions
  1. 48 22
      cron.go
  2. 115 47
      cron_test.go

+ 48 - 22
cron.go

@@ -11,9 +11,11 @@ import (
 // specified by the spec.  See http://en.wikipedia.org/wiki/Cron
 // It may be started and stopped.
 type Cron struct {
-	Entries []*Entry
-	stop    chan struct{}
-	add     chan *Entry
+	entries  []*Entry
+	stop     chan struct{}
+	add      chan *Entry
+	snapshot chan []*Entry
+	running  bool
 }
 
 // Simple interface for submitted cron jobs.
@@ -47,9 +49,11 @@ func (s byTime) Less(i, j int) bool {
 
 func New() *Cron {
 	return &Cron{
-		Entries: nil,
-		add:     make(chan *Entry, 1),
-		stop:    make(chan struct{}, 1),
+		entries:  nil,
+		add:      make(chan *Entry),
+		stop:     make(chan struct{}),
+		snapshot: make(chan []*Entry),
+		running:  false,
 	}
 }
 
@@ -64,45 +68,55 @@ func (c *Cron) AddFunc(spec string, cmd func()) {
 
 func (c *Cron) AddJob(spec string, cmd Job) {
 	entry := &Entry{Parse(spec), time.Time{}, cmd}
-	select {
-	case c.add <- entry:
-		// The run loop accepted the entry, nothing more to do.
+	if !c.running {
+		c.entries = append(c.entries, entry)
 		return
-	default:
-		// No one listening to that channel, so just add to the array.
-		c.Entries = append(c.Entries, entry)
-		entry.Next = entry.Schedule.Next(time.Now().Local()) // Just in case..
 	}
+
+	c.add <- entry
+}
+
+// Return a snapshot of the cron entries.
+func (c *Cron) Entries() []*Entry {
+	if c.running {
+		c.snapshot <- nil
+		x := <-c.snapshot
+		return x
+	}
+	return c.entrySnapshot()
 }
 
 func (c *Cron) Start() {
-	go c.Run()
+	c.running = true
+	go c.run()
 }
 
-func (c *Cron) Run() {
+// Run the scheduler.. this is private just due to the need to synchronize
+// access to the 'running' state variable.
+func (c *Cron) run() {
 	// Figure out the next activation times for each entry.
 	now := time.Now().Local()
-	for _, entry := range c.Entries {
+	for _, entry := range c.entries {
 		entry.Next = entry.Schedule.Next(now)
 	}
 
 	for {
 		// Determine the next entry to run.
-		sort.Sort(byTime(c.Entries))
+		sort.Sort(byTime(c.entries))
 
 		var effective time.Time
-		if len(c.Entries) == 0 || c.Entries[0].Next.IsZero() {
+		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
 			// If there are no entries yet, just sleep - it still handles new entries
 			// and stop requests.
 			effective = now.AddDate(10, 0, 0)
 		} else {
-			effective = c.Entries[0].Next
+			effective = c.entries[0].Next
 		}
 
 		select {
 		case now = <-time.After(effective.Sub(now)):
 			// Run every entry whose next time was this effective time.
-			for _, e := range c.Entries {
+			for _, e := range c.entries {
 				if e.Next != effective {
 					break
 				}
@@ -111,15 +125,27 @@ func (c *Cron) Run() {
 			}
 
 		case newEntry := <-c.add:
-			c.Entries = append(c.Entries, newEntry)
+			c.entries = append(c.entries, newEntry)
 			newEntry.Next = newEntry.Schedule.Next(now)
 
+		case <-c.snapshot:
+			c.snapshot <- c.entrySnapshot()
+
 		case <-c.stop:
 			return
 		}
 	}
 }
 
-func (c Cron) Stop() {
+func (c *Cron) Stop() {
 	c.stop <- struct{}{}
+	c.running = false
+}
+
+func (c *Cron) entrySnapshot() []*Entry {
+	entries := []*Entry{}
+	for _, e := range c.entries {
+		entries = append(entries, &Entry{e.Schedule, e.Next, e.Job})
+	}
+	return entries
 }

+ 115 - 47
cron_test.go

@@ -2,131 +2,199 @@ package cron
 
 import (
 	"fmt"
+	"sync"
 	"testing"
 	"time"
 )
 
+// Many tests schedule a job for every second, and then wait at most a second
+// for it to run.  This amount is just slightly larger than 1 second to
+// compensate for a few milliseconds of runtime.
+const ONE_SECOND = 1*time.Second + 10*time.Millisecond
+
 // Start and stop cron with no entries.
 func TestNoEntries(t *testing.T) {
 	cron := New()
-	done := startAndSignal(cron)
-	go cron.Stop()
+	cron.Start()
+
+	select {
+	case <-time.After(ONE_SECOND):
+		t.FailNow()
+	case <-stop(cron):
+	}
+}
+
+// Start, stop, then add an entry. Verify entry doesn't run.
+func TestStopCausesJobsToNotRun(t *testing.T) {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
+	cron := New()
+	cron.Start()
+	cron.Stop()
+	cron.AddFunc("* * * * * ?", func() { wg.Done() })
 
 	select {
-	case <-time.After(1 * time.Second):
+	case <-time.After(ONE_SECOND):
+		// No job ran!
+	case <-wait(wg):
 		t.FailNow()
-	case <-done:
 	}
 }
 
 // Add a job, start cron, expect it runs.
 func TestAddBeforeRunning(t *testing.T) {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
 	cron := New()
-	cron.AddFunc("* * * * * ?", func() {
-		cron.Stop()
-	})
-	done := startAndSignal(cron)
+	cron.AddFunc("* * * * * ?", func() { wg.Done() })
+	cron.Start()
+	defer cron.Stop()
 
 	// Give cron 2 seconds to run our job (which is always activated).
 	select {
-	case <-time.After(2 * time.Second):
+	case <-time.After(ONE_SECOND):
 		t.FailNow()
-	case <-done:
+	case <-wait(wg):
 	}
 }
 
 // Start cron, add a job, expect it runs.
 func TestAddWhileRunning(t *testing.T) {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
 	cron := New()
-	done := startAndSignal(cron)
-	go func() {
-		cron.AddFunc("* * * * * ?", func() {
-			cron.Stop()
-		})
-	}()
+	cron.Start()
+	defer cron.Stop()
+	cron.AddFunc("* * * * * ?", func() { wg.Done() })
 
 	select {
-	case <-time.After(2 * time.Second):
+	case <-time.After(ONE_SECOND):
 		t.FailNow()
-	case <-done:
+	case <-wait(wg):
 	}
 }
 
 // Test that the entries are correctly sorted.
 // Add a bunch of long-in-the-future entries, and an immediate entry, and ensure
 // that the immediate entry runs immediately.
+// Also: Test that multiple jobs run in the same instant.
 func TestMultipleEntries(t *testing.T) {
+	wg := &sync.WaitGroup{}
+	wg.Add(2)
+
 	cron := New()
 	cron.AddFunc("0 0 0 1 1 ?", func() {})
-	cron.AddFunc("* * * * * ?", func() {
-		cron.Stop()
-	})
+	cron.AddFunc("* * * * * ?", func() { wg.Done() })
+	cron.AddFunc("0 0 0 31 12 ?", func() {})
+	cron.AddFunc("* * * * * ?", func() { wg.Done() })
+
+	cron.Start()
+	defer cron.Stop()
+
+	select {
+	case <-time.After(ONE_SECOND):
+		t.FailNow()
+	case <-wait(wg):
+	}
+}
+
+// Test running the same job twice.
+func TestRunningJobTwice(t *testing.T) {
+	wg := &sync.WaitGroup{}
+	wg.Add(2)
+
+	cron := New()
+	cron.AddFunc("0 0 0 1 1 ?", func() {})
+	cron.AddFunc("* * * * * ?", func() { wg.Done() })
 	cron.AddFunc("0 0 0 31 12 ?", func() {})
-	done := startAndSignal(cron)
+
+	cron.Start()
+	defer cron.Stop()
 
 	select {
-	case <-time.After(2 * time.Second):
+	case <-time.After(2 * ONE_SECOND):
 		t.FailNow()
-	case <-done:
+	case <-wait(wg):
 	}
 }
 
 // Test that the cron is run in the local time zone (as opposed to UTC).
 func TestLocalTimezone(t *testing.T) {
-	cron := New()
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
 	now := time.Now().Local()
 	spec := fmt.Sprintf("%d %d %d %d %d ?",
 		now.Second()+1, now.Minute(), now.Hour(), now.Day(), now.Month())
-	cron.AddFunc(spec, func() { cron.Stop() })
-	done := startAndSignal(cron)
+
+	cron := New()
+	cron.AddFunc(spec, func() { wg.Done() })
+	cron.Start()
+	defer cron.Stop()
 
 	select {
-	case <-time.After(2 * time.Second):
+	case <-time.After(ONE_SECOND):
 		t.FailNow()
-	case <-done:
+	case <-wait(wg):
 	}
 }
 
-type testRunnable struct {
-	cron *Cron
+type testJob struct {
+	wg   *sync.WaitGroup
 	name string
 }
 
-func (t testRunnable) Run() {
-	t.cron.Stop()
+func (t testJob) Run() {
+	t.wg.Done()
 }
 
 // Simple test using Runnables.
-func TestRunnable(t *testing.T) {
+func TestJob(t *testing.T) {
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
 	cron := New()
-	cron.AddJob("0 0 0 30 Feb ?", testRunnable{cron, "job0"})
-	cron.AddJob("0 0 0 1 1 ?", testRunnable{cron, "job1"})
-	cron.AddJob("* * * * * ?", testRunnable{cron, "job2"})
-	cron.AddJob("1 0 0 1 1 ?", testRunnable{cron, "job3"})
+	cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"})
+	cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"})
+	cron.AddJob("* * * * * ?", testJob{wg, "job2"})
+	cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"})
+
+	cron.Start()
+	defer cron.Stop()
 
-	done := startAndSignal(cron)
 	select {
-	case <-time.After(2 * time.Second):
+	case <-time.After(ONE_SECOND):
 		t.FailNow()
-	case <-done:
+	case <-wait(wg):
 	}
 
 	// Ensure the entries are in the right order.
 	answers := []string{"job2", "job1", "job3", "job0"}
 	for i, answer := range answers {
-		actual := cron.Entries[i].Job.(testRunnable).name
+		actual := cron.Entries()[i].Job.(testJob).name
 		if actual != answer {
 			t.Errorf("Jobs not in the right order.  (expected) %s != %s (actual)", answer, actual)
 		}
 	}
 }
 
-// Return a channel that signals when the cron's Start() method returns.
-func startAndSignal(cron *Cron) <-chan struct{} {
-	ch := make(chan struct{})
+func wait(wg *sync.WaitGroup) chan bool {
+	ch := make(chan bool)
+	go func() {
+		wg.Wait()
+		ch <- true
+	}()
+	return ch
+}
+
+func stop(cron *Cron) chan bool {
+	ch := make(chan bool)
 	go func() {
-		cron.Run()
-		ch <- struct{}{}
+		cron.Stop()
+		ch <- true
 	}()
 	return ch
 }