Explorar el Código

cron/chain: add Chain & JobWrapper types to decorate job executions
with cross-cutting concerns

Fixes #191

Rob Figueiredo hace 6 años
padre
commit
1973b3eee3
Se han modificado 8 ficheros con 419 adiciones y 63 borrados
  1. 25 21
      README.md
  2. 117 0
      chain.go
  3. 235 0
      chain_test.go
  4. 20 25
      cron.go
  5. 5 3
      cron_test.go
  6. 13 0
      doc.go
  7. 4 4
      option.go
  8. 0 10
      option_test.go

+ 25 - 21
README.md

@@ -3,20 +3,16 @@
 
 # cron
 
-## DRAFT - Upgrading to v3
+## Upgrading to v3 (June 2019)
 
 cron v3 is a major upgrade to the library that addresses all outstanding bugs,
-feature requests, and clarifications around usage. It is based on a merge of
-master which contains various fixes to issues found over the years and the v2
-branch which contains some backwards-incompatible features like the ability to
-remove cron jobs. In addition, v3 adds support for Go Modules and cleans up
-rough edges like the timezone support.
+feature requests, and rough edges. It is based on a merge of master which
+contains various fixes to issues found over the years and the v2 branch which
+contains some backwards-incompatible features like the ability to remove cron
+jobs. In addition, v3 adds support for Go Modules and cleans up rough edges like
+the timezone support.
 
-It is currently IN DEVELOPMENT and will be considered released once a 3.0
-version is tagged. It is backwards INCOMPATIBLE with both the v1 and v2
-branches.
-
-Updates required:
+It is backwards incompatible with both v1 and v2. These updates are required:
 
 - The v1 branch accepted an optional seconds field at the beginning of the cron
   spec. This is non-standard and has led to a lot of confusion. The new default
@@ -45,16 +41,24 @@ Updates required:
 
   UPDATING: No update is required.
 
-Planned updates before calling v3 done:
-
-- Job "Interceptors" (name tbd), which make it easy for callers to mix desired
-  behavior like the following:
-  - Recover any panics from jobs
-  - Block this job if the previous run hasn't completed yet
-  - Logging job invocations
+- The new Chain & JobWrapper types allow you to install "interceptors" to add
+  cross-cutting behavior like the following:
+  - Recover any panics from jobs (activated by default)
+  - Delay a job's execution if the previous run hasn't completed yet
+  - Skip a job's execution if the previous run hasn't completed yet
+  - Log each job's invocations
   - Notification when jobs are completed
 
-- Fix all open bugs
+  To avoid breaking backward compatibility, Entry.Job continues to be the value
+  that was submitted, and Entry has a new WrappedJob property which is the one
+  that is actually run.
+
+  UPDATING: `cron.WithPanicLogger` has been removed. Please update your code to
+  use this JobWrapper:
+
+      cron.New(cron.WithChain(
+          cron.RecoverWithLogger(logger),
+      ))
 
 ### Background - Cron spec format
 
@@ -70,5 +74,5 @@ There are two cron spec formats in common usage:
 [the Quartz Scheduler]: http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html
 
 The original version of this package included an optional "seconds" field, which
-made it incompatible with both of these formats. Instead, the schedule parser
-has been extended to support both types.
+made it incompatible with both of these formats. Now, the "standard" format is
+the default format accepted, and the Quartz format is opt-in.

+ 117 - 0
chain.go

@@ -0,0 +1,117 @@
+package cron
+
+import (
+	"log"
+	"os"
+	"runtime"
+	"sync"
+	"time"
+)
+
+// JobWrapper decorates the given Job with some behavior.
+type JobWrapper func(Job) Job
+
+// Chain is a sequence of JobWrappers that decorates submitted jobs with
+// cross-cutting behaviors like logging or synchronization.
+type Chain struct {
+	wrappers []JobWrapper
+}
+
+// NewChain returns a Chain consisting of the given JobWrappers.
+func NewChain(c ...JobWrapper) Chain {
+	return Chain{c}
+}
+
+// Then decorates the given job with all JobWrappers in the chain.
+//
+// This:
+//     NewChain(m1, m2, m3).Then(job)
+// is equivalent to:
+//     m1(m2(m3(job)))
+func (c Chain) Then(j Job) Job {
+	for i := range c.wrappers {
+		j = c.wrappers[len(c.wrappers)-i-1](j)
+	}
+	return j
+}
+
+// RecoverWithLogger recovers panics in wrapped jobs and logs them.
+func RecoverWithLogger(logger *log.Logger) JobWrapper {
+	return func(j Job) Job {
+		return FuncJob(func() {
+			defer func() {
+				if r := recover(); r != nil {
+					const size = 64 << 10
+					buf := make([]byte, size)
+					buf = buf[:runtime.Stack(buf, false)]
+					logger.Printf("cron: panic running job: %v\n%s", r, buf)
+				}
+			}()
+			j.Run()
+		})
+	}
+}
+
+// Recover panics in wrapped jobs and logs them to os.Stderr using
+// the standard logger / flags.
+func Recover() JobWrapper {
+	return RecoverWithLogger(
+		log.New(os.Stderr, "", log.LstdFlags),
+	)
+}
+
+// DelayIfStillRunning serializes jobs, delaying subsequent runs until the
+// previous one is complete. If more than 10 runs of a job are queued up, it
+// begins skipping jobs instead, to avoid unbounded queue growth.
+func DelayIfStillRunning() JobWrapper {
+	// This is implemented by assigning each invocation a unique id and
+	// inserting that into a queue. On each completion, a condition variable is
+	// signalled to cause all waiting invocations to wake up and see if they are
+	// next in line.
+	// TODO: Could do this much more simply if we didn't care about keeping them in order..
+	const queueSize = 10
+	return func(j Job) Job {
+		var jobQueue []int64
+		var cond = sync.NewCond(&sync.Mutex{})
+		return FuncJob(func() {
+			id := time.Now().UnixNano()
+			cond.L.Lock()
+			if len(jobQueue) >= queueSize {
+				// log skip
+				cond.L.Unlock()
+				return
+			}
+			jobQueue = append(jobQueue, id)
+			for jobQueue[0] != id {
+				cond.Wait()
+			}
+			cond.L.Unlock()
+
+			defer func() {
+				cond.L.Lock()
+				jobQueue = jobQueue[1:]
+				cond.L.Unlock()
+				cond.Broadcast()
+			}()
+			j.Run()
+		})
+	}
+}
+
+// SkipIfStillRunning skips an invocation of the Job if a previous invocation is
+// still running.
+func SkipIfStillRunning() JobWrapper {
+	var ch = make(chan struct{}, 1)
+	ch <- struct{}{}
+	return func(j Job) Job {
+		return FuncJob(func() {
+			select {
+			case v := <-ch:
+				j.Run()
+				ch <- v
+			default:
+				// skip
+			}
+		})
+	}
+}

+ 235 - 0
chain_test.go

@@ -0,0 +1,235 @@
+package cron
+
+import (
+	"io/ioutil"
+	"log"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+)
+
+func appendingJob(slice *[]int, value int) Job {
+	var m sync.Mutex
+	return FuncJob(func() {
+		m.Lock()
+		*slice = append(*slice, value)
+		m.Unlock()
+	})
+}
+
+func appendingWrapper(slice *[]int, value int) JobWrapper {
+	return func(j Job) Job {
+		return FuncJob(func() {
+			appendingJob(slice, value).Run()
+			j.Run()
+		})
+	}
+}
+
+func TestChain(t *testing.T) {
+	var nums []int
+	var (
+		append1 = appendingWrapper(&nums, 1)
+		append2 = appendingWrapper(&nums, 2)
+		append3 = appendingWrapper(&nums, 3)
+		append4 = appendingJob(&nums, 4)
+	)
+	NewChain(append1, append2, append3).Then(append4).Run()
+	if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
+		t.Error("unexpected order of calls:", nums)
+	}
+}
+
+func TestChainRecover(t *testing.T) {
+	panickingJob := FuncJob(func() {
+		panic("panickingJob panics")
+	})
+
+	t.Run("panic exits job by default", func(t *testing.T) {
+		defer func() {
+			if err := recover(); err == nil {
+				t.Errorf("panic expected, but none received")
+			}
+		}()
+		NewChain().Then(panickingJob).
+			Run()
+	})
+
+	t.Run("Recovering JobWrapper recovers", func(t *testing.T) {
+		NewChain(RecoverWithLogger(log.New(ioutil.Discard, "", 0))).
+			Then(panickingJob).
+			Run()
+	})
+
+	t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) {
+		NewChain(RecoverWithLogger(log.New(ioutil.Discard, "", 0))).
+			Then(panickingJob).
+			Run()
+	})
+}
+
+type countJob struct {
+	m       sync.Mutex
+	started int
+	done    int
+	delay   time.Duration
+}
+
+func (j *countJob) Run() {
+	j.m.Lock()
+	j.started++
+	j.m.Unlock()
+	time.Sleep(j.delay)
+	j.m.Lock()
+	j.done++
+	j.m.Unlock()
+}
+
+func (j *countJob) Started() int {
+	defer j.m.Unlock()
+	j.m.Lock()
+	return j.started
+}
+
+func (j *countJob) Done() int {
+	defer j.m.Unlock()
+	j.m.Lock()
+	return j.done
+}
+
+func TestChainDelayIfStillRunning(t *testing.T) {
+
+	t.Run("runs immediately", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
+		go wrappedJob.Run()
+		time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
+		if c := j.Done(); c != 1 {
+			t.Errorf("expected job run once, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run immediate if first done", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+		time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
+		if c := j.Done(); c != 2 {
+			t.Errorf("expected job run twice, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run delayed if first not done", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+
+		// After 5ms, the first job is still in progress, and the second job was
+		// run but should be waiting for it to finish.
+		time.Sleep(5 * time.Millisecond)
+		started, done := j.Started(), j.Done()
+		if started != 1 || done != 0 {
+			t.Error("expected first job started, but not finished, got", started, done)
+		}
+
+		// Verify that the second job completes.
+		time.Sleep(25 * time.Millisecond)
+		started, done = j.Started(), j.Done()
+		if started != 2 || done != 2 {
+			t.Error("expected both jobs done, got", started, done)
+		}
+	})
+
+	t.Run("11th run skipped on long queue", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
+		for i := 0; i < 11; i++ {
+			go wrappedJob.Run()
+		}
+		time.Sleep(200 * time.Millisecond)
+		done := j.Done()
+		if done != 10 {
+			t.Error("expected 10 jobs executed, 1 job dropped, got", done)
+		}
+	})
+
+}
+
+func TestChainSkipIfStillRunning(t *testing.T) {
+
+	t.Run("runs immediately", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
+		go wrappedJob.Run()
+		time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
+		if c := j.Done(); c != 1 {
+			t.Errorf("expected job run once, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run immediate if first done", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+		time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
+		if c := j.Done(); c != 2 {
+			t.Errorf("expected job run twice, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run skipped if first not done", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+
+		// After 5ms, the first job is still in progress, and the second job was
+		// aleady skipped.
+		time.Sleep(5 * time.Millisecond)
+		started, done := j.Started(), j.Done()
+		if started != 1 || done != 0 {
+			t.Error("expected first job started, but not finished, got", started, done)
+		}
+
+		// Verify that the first job completes and second does not run.
+		time.Sleep(25 * time.Millisecond)
+		started, done = j.Started(), j.Done()
+		if started != 1 || done != 1 {
+			t.Error("expected second job skipped, got", started, done)
+		}
+	})
+
+	t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
+		for i := 0; i < 11; i++ {
+			go wrappedJob.Run()
+		}
+		time.Sleep(200 * time.Millisecond)
+		done := j.Done()
+		if done != 1 {
+			t.Error("expected 1 jobs executed, 10 jobs dropped, got", done)
+		}
+	})
+
+}

+ 20 - 25
cron.go

@@ -1,7 +1,6 @@
 package cron
 
 import (
-	"runtime"
 	"sort"
 	"time"
 )
@@ -11,6 +10,7 @@ import (
 // be inspected while running.
 type Cron struct {
 	entries  []*Entry
+	chain    Chain
 	stop     chan struct{}
 	add      chan *Entry
 	remove   chan EntryID
@@ -54,7 +54,12 @@ type Entry struct {
 	// Prev is the last time this job was run, or the zero time if never.
 	Prev time.Time
 
-	// Job is the thing to run when the Schedule is activated.
+	// WrappedJob is the thing to run when the Schedule is activated.
+	WrappedJob Job
+
+	// Job is the thing that was submitted to cron.
+	// It is kept around so that user code that needs to get at the job later,
+	// e.g. via Entries() can do so.
 	Job Job
 }
 
@@ -88,24 +93,24 @@ func (s byTime) Less(i, j int) bool {
 //     Description: The time zone in which schedules are interpreted
 //     Default:     time.Local
 //
-//   PanicLogger
-//     Description: How to log Jobs that panic
-//     Default:     Log the panic to os.Stderr
-//
 //   Parser
-//     Description:
-//     Default:     Parser that accepts the spec described here: https://en.wikipedia.org/wiki/Cron
+//     Description: Parser converts cron spec strings into cron.Schedules.
+//     Default:     Accepts this spec: https://en.wikipedia.org/wiki/Cron
+//
+//   Chain
+//     Description: Wrap submitted jobs to customize behavior.
+//     Default:     A chain that recovers panics.
 //
 // See "cron.With*" to modify the default behavior.
 func New(opts ...Option) *Cron {
 	c := &Cron{
 		entries:  nil,
+		chain:    NewChain(Recover()),
 		add:      make(chan *Entry),
 		stop:     make(chan struct{}),
 		snapshot: make(chan chan []Entry),
 		remove:   make(chan EntryID),
 		running:  false,
-		logger:   DefaultLogger,
 		vlogger:  nil,
 		location: time.Local,
 		parser:   standardParser,
@@ -140,12 +145,14 @@ func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
 }
 
 // Schedule adds a Job to the Cron to be run on the given schedule.
+// The job is wrapped with the configured Chain.
 func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
 	c.nextID++
 	entry := &Entry{
-		ID:       c.nextID,
-		Schedule: schedule,
-		Job:      cmd,
+		ID:         c.nextID,
+		Schedule:   schedule,
+		WrappedJob: c.chain.Then(cmd),
+		Job:        cmd,
 	}
 	if !c.running {
 		c.entries = append(c.entries, entry)
@@ -207,18 +214,6 @@ func (c *Cron) Run() {
 	c.run()
 }
 
-func (c *Cron) runWithRecovery(j Job) {
-	defer func() {
-		if r := recover(); r != nil {
-			const size = 64 << 10
-			buf := make([]byte, size)
-			buf = buf[:runtime.Stack(buf, false)]
-			c.logger.Printf("panic running job: %v\n%s", r, buf)
-		}
-	}()
-	j.Run()
-}
-
 // run the scheduler.. this is private just due to the need to synchronize
 // access to the 'running' state variable.
 func (c *Cron) run() {
@@ -255,7 +250,7 @@ func (c *Cron) run() {
 					if e.Next.After(now) || e.Next.IsZero() {
 						break
 					}
-					go c.runWithRecovery(e.Job)
+					go e.WrappedJob.Run()
 					e.Prev = e.Next
 					e.Next = e.Schedule.Next(now)
 					c.logVerbosef("(%s) started entry %d, next scheduled for %s", now, e.ID, e.Next)

+ 5 - 3
cron_test.go

@@ -40,7 +40,8 @@ func newBufLogger(sw *syncWriter) *log.Logger {
 
 func TestFuncPanicRecovery(t *testing.T) {
 	var buf syncWriter
-	cron := New(WithParser(secondParser), WithPanicLogger(newBufLogger(&buf)))
+	cron := New(WithParser(secondParser),
+		WithChain(RecoverWithLogger(newBufLogger(&buf))))
 	cron.Start()
 	defer cron.Stop()
 	cron.AddFunc("* * * * * ?", func() {
@@ -66,7 +67,8 @@ func TestJobPanicRecovery(t *testing.T) {
 	var job DummyJob
 
 	var buf syncWriter
-	cron := New(WithParser(secondParser), WithPanicLogger(newBufLogger(&buf)))
+	cron := New(WithParser(secondParser),
+		WithChain(RecoverWithLogger(newBufLogger(&buf))))
 	cron.Start()
 	defer cron.Stop()
 	cron.AddJob("* * * * * ?", job)
@@ -509,5 +511,5 @@ func stop(cron *Cron) chan bool {
 
 // newWithSeconds returns a Cron with the seconds field enabled.
 func newWithSeconds() *Cron {
-	return New(WithParser(secondParser))
+	return New(WithParser(secondParser), WithChain())
 }

+ 13 - 0
doc.go

@@ -150,6 +150,19 @@ The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility.
 Be aware that jobs scheduled during daylight-savings leap-ahead transitions will
 not be run!
 
+Job Wrappers / Chain
+
+A Cron runner may be configured with a chain of job wrappers to add
+cross-cutting functionality to all submitted jobs. For example, they may be used
+to achieve the following effects:
+
+  - Recover any panics from jobs (activated by default)
+  - Delay a job's execution if the previous run hasn't completed yet
+  - Skip a job's execution if the previous run hasn't completed yet
+  - Log each job's invocations
+
+Install wrappers using the `cron.WithChain` option.
+
 Thread safety
 
 Since the Cron service runs concurrently with the calling code, some amount of

+ 4 - 4
option.go

@@ -1,7 +1,6 @@
 package cron
 
 import (
-	"log"
 	"time"
 )
 
@@ -30,10 +29,11 @@ func WithParser(p Parser) Option {
 	}
 }
 
-// WithPanicLogger overrides the logger used for logging job panics.
-func WithPanicLogger(l *log.Logger) Option {
+// WithChain specifies Job wrappers to apply to all jobs added to this cron.
+// Refer to the Chain* functions in this package for provided wrappers.
+func WithChain(wrappers ...JobWrapper) Option {
 	return func(c *Cron) {
-		c.logger = l
+		c.chain = NewChain(wrappers...)
 	}
 }
 

+ 0 - 10
option_test.go

@@ -1,7 +1,6 @@
 package cron
 
 import (
-	"bytes"
 	"log"
 	"strings"
 	"testing"
@@ -23,15 +22,6 @@ func TestWithParser(t *testing.T) {
 	}
 }
 
-func TestWithPanicLogger(t *testing.T) {
-	var b bytes.Buffer
-	var logger = log.New(&b, "", log.LstdFlags)
-	c := New(WithPanicLogger(logger))
-	if c.logger != logger {
-		t.Error("expected provided logger")
-	}
-}
-
 func TestWithVerboseLogger(t *testing.T) {
 	var buf syncWriter
 	var logger = log.New(&buf, "", log.LstdFlags)