浏览代码

Merge pull request #2 from eapache/batcher

Implement batcher pattern
Evan Huus 11 年之前
父节点
当前提交
bd9ff09d5a
共有 4 个文件被更改,包括 249 次插入0 次删除
  1. 1 0
      README.md
  2. 30 0
      batcher/README.md
  3. 102 0
      batcher/batcher.go
  4. 116 0
      batcher/batcher_test.go

+ 1 - 0
README.md

@@ -5,3 +5,4 @@ Resiliency patterns for golang. Currently implemented are:
 - circuit-breaker pattern (in the `breaker` directory)
 - semaphore pattern (in the `semaphore` directory)
 - deadline/timeout pattern (in the `deadline` directory)
+- batching pattern (in the `batcher` directory)

+ 30 - 0
batcher/README.md

@@ -0,0 +1,30 @@
+batcher
+=======
+
+[![Build Status](https://travis-ci.org/eapache/go-resiliency.svg?branch=master)](https://travis-ci.org/eapache/go-resiliency)
+[![GoDoc](https://godoc.org/github.com/eapache/go-resiliency/batcher?status.svg)](https://godoc.org/github.com/eapache/go-resiliency/batcher)
+
+The batching resiliency pattern for golang.
+
+Creating a batcher takes two parameters:
+- the timeout to wait while collecting a batch
+- the function to run once a batch has been collected
+
+You can also optionally set a prefilter to fail queries before they enter the
+batch.
+
+```go
+b := batcher.New(10*time.Millisecond, func(params []interface{}) error {
+	// do something with the batch of parameters
+	return nil
+})
+
+b.Prefilter(func(param interface{}) error {
+	// do some sort of sanity check on the parameter, and return an error if it fails
+	return nil
+})
+
+for i := 0; i < 10; i++ {
+	go b.Run(i)
+}
+```

+ 102 - 0
batcher/batcher.go

@@ -0,0 +1,102 @@
+// Package batcher implements the batching resiliency pattern for Go.
+package batcher
+
+import (
+	"sync"
+	"time"
+)
+
+type work struct {
+	param  interface{}
+	future chan error
+}
+
+// Batcher implements the batching resiliency pattern
+type Batcher struct {
+	timeout   time.Duration
+	prefilter func(interface{}) error
+
+	lock   sync.Mutex
+	submit chan *work
+	doWork func([]interface{}) error
+}
+
+// New constructs a new batcher that will batch all calls to Run that occur within
+// `timeout` time before calling doWork just once for the entire batch.
+func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
+	return &Batcher{
+		timeout: timeout,
+		doWork:  doWork,
+	}
+}
+
+// Run runs the work function with the given parameter, possibly
+// including it in a batch with other calls to Run that occur within the
+// specified timeout. It is safe to call Run concurrently on the same batcher.
+func (b *Batcher) Run(param interface{}) error {
+	if b.prefilter != nil {
+		if err := b.prefilter(param); err != nil {
+			return err
+		}
+	}
+
+	w := &work{
+		param:  param,
+		future: make(chan error, 1),
+	}
+
+	b.submitWork(w)
+
+	return <-w.future
+}
+
+// Prefilter specifies an optional function that can be used to run initial checks on parameters
+// passed to Run before being added to the batch. If the prefilter returns a non-nil error,
+// that error is returned immediately from Run and the batcher is not invoked. A prefilter
+// cannot safely be specified for a batcher if Run has already been invoked. The filter function
+// specified must be concurrency-safe.
+func (b *Batcher) Prefilter(filter func(interface{}) error) {
+	b.prefilter = filter
+}
+
+func (b *Batcher) submitWork(w *work) {
+	b.lock.Lock()
+	defer b.lock.Unlock()
+
+	if b.submit == nil {
+		b.submit = make(chan *work, 4)
+		go b.batch()
+	}
+
+	b.submit <- w
+}
+
+func (b *Batcher) batch() {
+	var params []interface{}
+	var futures []chan error
+	input := b.submit
+
+	go b.timer()
+
+	for work := range input {
+		params = append(params, work.param)
+		futures = append(futures, work.future)
+	}
+
+	ret := b.doWork(params)
+
+	for _, future := range futures {
+		future <- ret
+		close(future)
+	}
+}
+
+func (b *Batcher) timer() {
+	time.Sleep(b.timeout)
+
+	b.lock.Lock()
+	defer b.lock.Unlock()
+
+	close(b.submit)
+	b.submit = nil
+}

+ 116 - 0
batcher/batcher_test.go

@@ -0,0 +1,116 @@
+package batcher
+
+import (
+	"errors"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+var errSomeError = errors.New("errSomeError")
+
+func returnsError(params []interface{}) error {
+	return errSomeError
+}
+
+func returnsSuccess(params []interface{}) error {
+	return nil
+}
+
+func TestBatcherSuccess(t *testing.T) {
+	b := New(10*time.Millisecond, returnsSuccess)
+
+	wg := &sync.WaitGroup{}
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			if err := b.Run(nil); err != nil {
+				t.Error(err)
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+}
+
+func TestBatcherError(t *testing.T) {
+	b := New(10*time.Millisecond, returnsError)
+
+	wg := &sync.WaitGroup{}
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			if err := b.Run(nil); err != errSomeError {
+				t.Error(err)
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+}
+
+func TestBatcherPrefilter(t *testing.T) {
+	b := New(1*time.Millisecond, returnsSuccess)
+
+	b.Prefilter(func(param interface{}) error {
+		if param == nil {
+			return errSomeError
+		}
+		return nil
+	})
+
+	if err := b.Run(nil); err != errSomeError {
+		t.Error(err)
+	}
+
+	if err := b.Run(1); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestBatcherMultipleBatches(t *testing.T) {
+	var iters uint32
+
+	b := New(10*time.Millisecond, func(params []interface{}) error {
+		atomic.AddUint32(&iters, 1)
+		return nil
+	})
+
+	wg := &sync.WaitGroup{}
+
+	for group := 0; group < 5; group++ {
+		for i := 0; i < 10; i++ {
+			wg.Add(1)
+			go func() {
+				if err := b.Run(nil); err != nil {
+					t.Error(err)
+				}
+				wg.Done()
+			}()
+		}
+		time.Sleep(15 * time.Millisecond)
+	}
+
+	wg.Wait()
+
+	if iters != 5 {
+		t.Error("Wrong number of iters:", iters)
+	}
+}
+
+func ExampleBatcher() {
+	b := New(10*time.Millisecond, func(params []interface{}) error {
+		// do something with the batch of parameters
+		return nil
+	})
+
+	b.Prefilter(func(param interface{}) error {
+		// do some sort of sanity check on the parameter, and return an error if it fails
+		return nil
+	})
+
+	for i := 0; i < 10; i++ {
+		go b.Run(i)
+	}
+}