Browse Source

cron/chain: add Chain & JobWrapper types to decorate job executions
with cross-cutting concerns

These JobWrappers are provided:
- SkipIfStillRunning skips a job if the previous invocation is still running.
- DelayIfStillRunning blocks a job invocation until the previous one is done.
- Recover, to recover from panics in the job.

BREAKING: This removes the default behavior of recovering from job panics.
That must now be opted-into using WithChain(Recover(logger)).
Having it be the default behavior can be surprising (see issue #192)
and is at odds with most libraries.

Fixes #191
Fixes #192

Rob Figueiredo 6 năm trước cách đây
mục cha
commit
d279950244
9 tập tin đã thay đổi với 391 bổ sung70 xóa
  1. 30 21
      README.md
  2. 92 0
      chain.go
  3. 221 0
      chain_test.go
  4. 20 30
      cron.go
  5. 7 5
      cron_test.go
  6. 13 0
      doc.go
  7. 4 0
      logger.go
  8. 4 4
      option.go
  9. 0 10
      option_test.go

+ 30 - 21
README.md

@@ -3,25 +3,33 @@
 
 # cron
 
-## DRAFT - Upgrading to v3
+## Upgrading to v3 (June 2019)
 
 cron v3 is a major upgrade to the library that addresses all outstanding bugs,
-feature requests, and clarifications around usage. It is based on a merge of
-master which contains various fixes to issues found over the years and the v2
-branch which contains some backwards-incompatible features like the ability to
-remove cron jobs. In addition, v3 adds support for Go Modules and cleans up
-rough edges like the timezone support.
-
-It is currently IN DEVELOPMENT and will be considered released once a 3.0
-version is tagged. It is backwards INCOMPATIBLE with both the v1 and v2
-branches.
+feature requests, and rough edges. It is based on a merge of master which
+contains various fixes to issues found over the years and the v2 branch which
+contains some backwards-incompatible features like the ability to remove cron
+jobs. In addition, v3 adds support for Go Modules and cleans up rough edges like
+the timezone support.
 
 New features:
 
 - Extensible, key/value logging via an interface that complies with
   the github.com/go-logr/logr project.
 
-Updates required:
+- The new Chain & JobWrapper types allow you to install "interceptors" to add
+  cross-cutting behavior like the following:
+  - Recover any panics from jobs (activated by default)
+  - Delay a job's execution if the previous run hasn't completed yet
+  - Skip a job's execution if the previous run hasn't completed yet
+  - Log each job's invocations
+  - Notification when jobs are completed
+
+  To avoid breaking backward compatibility, Entry.Job continues to be the value
+  that was submitted, and Entry has a new WrappedJob property which is the one
+  that is actually run.
+
+It is backwards incompatible with both v1 and v2. These updates are required:
 
 - The v1 branch accepted an optional seconds field at the beginning of the cron
   spec. This is non-standard and has led to a lot of confusion. The new default
@@ -50,16 +58,17 @@ Updates required:
 
   UPDATING: No update is required.
 
-Planned updates before calling v3 done:
+- By default, cron will no longer recover panics in jobs that it runs.
+  Recovering can be surprising (see issue #192) and seems to be at odds with
+  typical behavior of libraries. Relatedly, the `cron.WithPanicLogger` option
+  has been removed to accommodate the more general JobWrapper type.
 
-- Job "Interceptors" (name tbd), which make it easy for callers to mix desired
-  behavior like the following:
-  - Recover any panics from jobs
-  - Block this job if the previous run hasn't completed yet
-  - Logging job invocations
-  - Notification when jobs are completed
+  UPDATING: To opt into panic recovery and configure the panic logger:
+
+      cron.New(cron.WithChain(
+          cron.Recover(logger),  // or use cron.DefaultLogger
+      ))
 
-- Fix all open bugs
 
 ### Background - Cron spec format
 
@@ -75,5 +84,5 @@ There are two cron spec formats in common usage:
 [the Quartz Scheduler]: http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html
 
 The original version of this package included an optional "seconds" field, which
-made it incompatible with both of these formats. Instead, the schedule parser
-has been extended to support both types.
+made it incompatible with both of these formats. Now, the "standard" format is
+the default format accepted, and the Quartz format is opt-in.

+ 92 - 0
chain.go

@@ -0,0 +1,92 @@
+package cron
+
+import (
+	"fmt"
+	"runtime"
+	"sync"
+	"time"
+)
+
+// JobWrapper decorates the given Job with some behavior.
+type JobWrapper func(Job) Job
+
+// Chain is a sequence of JobWrappers that decorates submitted jobs with
+// cross-cutting behaviors like logging or synchronization.
+type Chain struct {
+	wrappers []JobWrapper
+}
+
+// NewChain returns a Chain consisting of the given JobWrappers.
+func NewChain(c ...JobWrapper) Chain {
+	return Chain{c}
+}
+
+// Then decorates the given job with all JobWrappers in the chain.
+//
+// This:
+//     NewChain(m1, m2, m3).Then(job)
+// is equivalent to:
+//     m1(m2(m3(job)))
+func (c Chain) Then(j Job) Job {
+	for i := range c.wrappers {
+		j = c.wrappers[len(c.wrappers)-i-1](j)
+	}
+	return j
+}
+
+// Recover panics in wrapped jobs and log them with the provided logger.
+func Recover(logger Logger) JobWrapper {
+	return func(j Job) Job {
+		return FuncJob(func() {
+			defer func() {
+				if r := recover(); r != nil {
+					const size = 64 << 10
+					buf := make([]byte, size)
+					buf = buf[:runtime.Stack(buf, false)]
+					err, ok := r.(error)
+					if !ok {
+						err = fmt.Errorf("%v", r)
+					}
+					logger.Error(err, "panic", "stack", "...\n"+string(buf))
+				}
+			}()
+			j.Run()
+		})
+	}
+}
+
+// DelayIfStillRunning serializes jobs, delaying subsequent runs until the
+// previous one is complete. Jobs running after a delay of more than a minute
+// have the delay logged at Info.
+func DelayIfStillRunning(logger Logger) JobWrapper {
+	return func(j Job) Job {
+		var mu sync.Mutex
+		return FuncJob(func() {
+			start := time.Now()
+			mu.Lock()
+			defer mu.Unlock()
+			if dur := time.Since(start); dur > time.Minute {
+				logger.Info("delay", "duration", dur)
+			}
+			j.Run()
+		})
+	}
+}
+
+// SkipIfStillRunning skips an invocation of the Job if a previous invocation is
+// still running. It logs skips to the given logger at Info level.
+func SkipIfStillRunning(logger Logger) JobWrapper {
+	var ch = make(chan struct{}, 1)
+	ch <- struct{}{}
+	return func(j Job) Job {
+		return FuncJob(func() {
+			select {
+			case v := <-ch:
+				j.Run()
+				ch <- v
+			default:
+				logger.Info("skip")
+			}
+		})
+	}
+}

+ 221 - 0
chain_test.go

@@ -0,0 +1,221 @@
+package cron
+
+import (
+	"io/ioutil"
+	"log"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+)
+
+func appendingJob(slice *[]int, value int) Job {
+	var m sync.Mutex
+	return FuncJob(func() {
+		m.Lock()
+		*slice = append(*slice, value)
+		m.Unlock()
+	})
+}
+
+func appendingWrapper(slice *[]int, value int) JobWrapper {
+	return func(j Job) Job {
+		return FuncJob(func() {
+			appendingJob(slice, value).Run()
+			j.Run()
+		})
+	}
+}
+
+func TestChain(t *testing.T) {
+	var nums []int
+	var (
+		append1 = appendingWrapper(&nums, 1)
+		append2 = appendingWrapper(&nums, 2)
+		append3 = appendingWrapper(&nums, 3)
+		append4 = appendingJob(&nums, 4)
+	)
+	NewChain(append1, append2, append3).Then(append4).Run()
+	if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
+		t.Error("unexpected order of calls:", nums)
+	}
+}
+
+func TestChainRecover(t *testing.T) {
+	panickingJob := FuncJob(func() {
+		panic("panickingJob panics")
+	})
+
+	t.Run("panic exits job by default", func(t *testing.T) {
+		defer func() {
+			if err := recover(); err == nil {
+				t.Errorf("panic expected, but none received")
+			}
+		}()
+		NewChain().Then(panickingJob).
+			Run()
+	})
+
+	t.Run("Recovering JobWrapper recovers", func(t *testing.T) {
+		NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
+			Then(panickingJob).
+			Run()
+	})
+
+	t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) {
+		NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
+			Then(panickingJob).
+			Run()
+	})
+}
+
+type countJob struct {
+	m       sync.Mutex
+	started int
+	done    int
+	delay   time.Duration
+}
+
+func (j *countJob) Run() {
+	j.m.Lock()
+	j.started++
+	j.m.Unlock()
+	time.Sleep(j.delay)
+	j.m.Lock()
+	j.done++
+	j.m.Unlock()
+}
+
+func (j *countJob) Started() int {
+	defer j.m.Unlock()
+	j.m.Lock()
+	return j.started
+}
+
+func (j *countJob) Done() int {
+	defer j.m.Unlock()
+	j.m.Lock()
+	return j.done
+}
+
+func TestChainDelayIfStillRunning(t *testing.T) {
+
+	t.Run("runs immediately", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
+		go wrappedJob.Run()
+		time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
+		if c := j.Done(); c != 1 {
+			t.Errorf("expected job run once, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run immediate if first done", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+		time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
+		if c := j.Done(); c != 2 {
+			t.Errorf("expected job run twice, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run delayed if first not done", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+
+		// After 5ms, the first job is still in progress, and the second job was
+		// run but should be waiting for it to finish.
+		time.Sleep(5 * time.Millisecond)
+		started, done := j.Started(), j.Done()
+		if started != 1 || done != 0 {
+			t.Error("expected first job started, but not finished, got", started, done)
+		}
+
+		// Verify that the second job completes.
+		time.Sleep(25 * time.Millisecond)
+		started, done = j.Started(), j.Done()
+		if started != 2 || done != 2 {
+			t.Error("expected both jobs done, got", started, done)
+		}
+	})
+
+}
+
+func TestChainSkipIfStillRunning(t *testing.T) {
+
+	t.Run("runs immediately", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
+		go wrappedJob.Run()
+		time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
+		if c := j.Done(); c != 1 {
+			t.Errorf("expected job run once, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run immediate if first done", func(t *testing.T) {
+		var j countJob
+		wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+		time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
+		if c := j.Done(); c != 2 {
+			t.Errorf("expected job run twice, immediately, got %d", c)
+		}
+	})
+
+	t.Run("second run skipped if first not done", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
+		go func() {
+			go wrappedJob.Run()
+			time.Sleep(time.Millisecond)
+			go wrappedJob.Run()
+		}()
+
+		// After 5ms, the first job is still in progress, and the second job was
+		// aleady skipped.
+		time.Sleep(5 * time.Millisecond)
+		started, done := j.Started(), j.Done()
+		if started != 1 || done != 0 {
+			t.Error("expected first job started, but not finished, got", started, done)
+		}
+
+		// Verify that the first job completes and second does not run.
+		time.Sleep(25 * time.Millisecond)
+		started, done = j.Started(), j.Done()
+		if started != 1 || done != 1 {
+			t.Error("expected second job skipped, got", started, done)
+		}
+	})
+
+	t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
+		var j countJob
+		j.delay = 10 * time.Millisecond
+		wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
+		for i := 0; i < 11; i++ {
+			go wrappedJob.Run()
+		}
+		time.Sleep(200 * time.Millisecond)
+		done := j.Done()
+		if done != 1 {
+			t.Error("expected 1 jobs executed, 10 jobs dropped, got", done)
+		}
+	})
+
+}

+ 20 - 30
cron.go

@@ -1,8 +1,6 @@
 package cron
 
 import (
-	"fmt"
-	"runtime"
 	"sort"
 	"time"
 )
@@ -12,6 +10,7 @@ import (
 // be inspected while running.
 type Cron struct {
 	entries  []*Entry
+	chain    Chain
 	stop     chan struct{}
 	add      chan *Entry
 	remove   chan EntryID
@@ -54,7 +53,12 @@ type Entry struct {
 	// Prev is the last time this job was run, or the zero time if never.
 	Prev time.Time
 
-	// Job is the thing to run when the Schedule is activated.
+	// WrappedJob is the thing to run when the Schedule is activated.
+	WrappedJob Job
+
+	// Job is the thing that was submitted to cron.
+	// It is kept around so that user code that needs to get at the job later,
+	// e.g. via Entries() can do so.
 	Job Job
 }
 
@@ -88,18 +92,19 @@ func (s byTime) Less(i, j int) bool {
 //     Description: The time zone in which schedules are interpreted
 //     Default:     time.Local
 //
-//   PanicLogger
-//     Description: How to log Jobs that panic
-//     Default:     Log the panic to os.Stderr
-//
 //   Parser
-//     Description:
-//     Default:     Parser that accepts the spec described here: https://en.wikipedia.org/wiki/Cron
+//     Description: Parser converts cron spec strings into cron.Schedules.
+//     Default:     Accepts this spec: https://en.wikipedia.org/wiki/Cron
+//
+//   Chain
+//     Description: Wrap submitted jobs to customize behavior.
+//     Default:     A chain that recovers panics.
 //
 // See "cron.With*" to modify the default behavior.
 func New(opts ...Option) *Cron {
 	c := &Cron{
 		entries:  nil,
+		chain:    NewChain(),
 		add:      make(chan *Entry),
 		stop:     make(chan struct{}),
 		snapshot: make(chan chan []Entry),
@@ -139,12 +144,14 @@ func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
 }
 
 // Schedule adds a Job to the Cron to be run on the given schedule.
+// The job is wrapped with the configured Chain.
 func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
 	c.nextID++
 	entry := &Entry{
-		ID:       c.nextID,
-		Schedule: schedule,
-		Job:      cmd,
+		ID:         c.nextID,
+		Schedule:   schedule,
+		WrappedJob: c.chain.Then(cmd),
+		Job:        cmd,
 	}
 	if !c.running {
 		c.entries = append(c.entries, entry)
@@ -206,23 +213,6 @@ func (c *Cron) Run() {
 	c.run()
 }
 
-func (c *Cron) runWithRecovery(j Job) {
-	defer func() {
-		if r := recover(); r != nil {
-			const size = 64 << 10
-			buf := make([]byte, size)
-			buf = buf[:runtime.Stack(buf, false)]
-			var err error
-			err, ok := r.(error)
-			if !ok {
-				err = fmt.Errorf("%v", r)
-			}
-			c.logger.Error(err, "panic running job", "stack", "...\n"+string(buf))
-		}
-	}()
-	j.Run()
-}
-
 // run the scheduler.. this is private just due to the need to synchronize
 // access to the 'running' state variable.
 func (c *Cron) run() {
@@ -259,7 +249,7 @@ func (c *Cron) run() {
 					if e.Next.After(now) || e.Next.IsZero() {
 						break
 					}
-					go c.runWithRecovery(e.Job)
+					go e.WrappedJob.Run()
 					e.Prev = e.Next
 					e.Next = e.Schedule.Next(now)
 					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)

+ 7 - 5
cron_test.go

@@ -34,13 +34,14 @@ func (sw *syncWriter) String() string {
 	return sw.wr.String()
 }
 
-func newBufLogger(sw *syncWriter) *log.Logger {
-	return log.New(sw, "", log.LstdFlags)
+func newBufLogger(sw *syncWriter) Logger {
+	return PrintfLogger(log.New(sw, "", log.LstdFlags))
 }
 
 func TestFuncPanicRecovery(t *testing.T) {
 	var buf syncWriter
-	cron := New(WithParser(secondParser), WithPanicLogger(newBufLogger(&buf)))
+	cron := New(WithParser(secondParser),
+		WithChain(Recover(newBufLogger(&buf))))
 	cron.Start()
 	defer cron.Stop()
 	cron.AddFunc("* * * * * ?", func() {
@@ -66,7 +67,8 @@ func TestJobPanicRecovery(t *testing.T) {
 	var job DummyJob
 
 	var buf syncWriter
-	cron := New(WithParser(secondParser), WithPanicLogger(newBufLogger(&buf)))
+	cron := New(WithParser(secondParser),
+		WithChain(Recover(newBufLogger(&buf))))
 	cron.Start()
 	defer cron.Stop()
 	cron.AddJob("* * * * * ?", job)
@@ -573,5 +575,5 @@ func stop(cron *Cron) chan bool {
 
 // newWithSeconds returns a Cron with the seconds field enabled.
 func newWithSeconds() *Cron {
-	return New(WithParser(secondParser))
+	return New(WithParser(secondParser), WithChain())
 }

+ 13 - 0
doc.go

@@ -150,6 +150,19 @@ The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility.
 Be aware that jobs scheduled during daylight-savings leap-ahead transitions will
 not be run!
 
+Job Wrappers / Chain
+
+A Cron runner may be configured with a chain of job wrappers to add
+cross-cutting functionality to all submitted jobs. For example, they may be used
+to achieve the following effects:
+
+  - Recover any panics from jobs (activated by default)
+  - Delay a job's execution if the previous run hasn't completed yet
+  - Skip a job's execution if the previous run hasn't completed yet
+  - Log each job's invocations
+
+Install wrappers using the `cron.WithChain` option.
+
 Thread safety
 
 Since the Cron service runs concurrently with the calling code, some amount of

+ 4 - 0
logger.go

@@ -1,6 +1,7 @@
 package cron
 
 import (
+	"io/ioutil"
 	"log"
 	"os"
 	"strings"
@@ -10,6 +11,9 @@ import (
 // DefaultLogger is used by Cron if none is specified.
 var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
 
+// DiscardLogger can be used by callers to discard all log messages.
+var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0))
+
 // Logger is the interface used in this package for logging, so that any backend
 // can be plugged in. It is a subset of the github.com/go-logr/logr interface.
 type Logger interface {

+ 4 - 4
option.go

@@ -1,7 +1,6 @@
 package cron
 
 import (
-	"log"
 	"time"
 )
 
@@ -30,10 +29,11 @@ func WithParser(p Parser) Option {
 	}
 }
 
-// WithPanicLogger overrides the logger used for logging job panics.
-func WithPanicLogger(l *log.Logger) Option {
+// WithChain specifies Job wrappers to apply to all jobs added to this cron.
+// Refer to the Chain* functions in this package for provided wrappers.
+func WithChain(wrappers ...JobWrapper) Option {
 	return func(c *Cron) {
-		c.logger = PrintfLogger(l)
+		c.chain = NewChain(wrappers...)
 	}
 }
 

+ 0 - 10
option_test.go

@@ -1,7 +1,6 @@
 package cron
 
 import (
-	"bytes"
 	"log"
 	"strings"
 	"testing"
@@ -23,15 +22,6 @@ func TestWithParser(t *testing.T) {
 	}
 }
 
-func TestWithPanicLogger(t *testing.T) {
-	var b bytes.Buffer
-	var logger = log.New(&b, "", log.LstdFlags)
-	c := New(WithPanicLogger(logger))
-	if c.logger.(printfLogger).logger != logger {
-		t.Error("expected provided logger")
-	}
-}
-
 func TestWithVerboseLogger(t *testing.T) {
 	var buf syncWriter
 	var logger = log.New(&buf, "", log.LstdFlags)