|
|
@@ -97,30 +97,40 @@ func (pc *PartitionConsumer) AsyncClose() {
|
|
|
func (pc *PartitionConsumer) Close() error {
|
|
|
if !pc.consumed {
|
|
|
pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
|
|
|
+ return errPartitionConsumerNotStarted
|
|
|
}
|
|
|
|
|
|
pc.AsyncClose()
|
|
|
|
|
|
- var errs = make(sarama.ConsumerErrors, 0)
|
|
|
+ var (
|
|
|
+ closeErr error
|
|
|
+ wg sync.WaitGroup
|
|
|
+ )
|
|
|
+
|
|
|
+ wg.Add(1)
|
|
|
go func() {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ var errs = make(sarama.ConsumerErrors, 0)
|
|
|
for err := range pc.errors {
|
|
|
errs = append(errs, err)
|
|
|
}
|
|
|
+
|
|
|
+ if len(errs) > 0 {
|
|
|
+ closeErr = errs
|
|
|
+ }
|
|
|
}()
|
|
|
|
|
|
+ wg.Add(1)
|
|
|
go func() {
|
|
|
+ defer wg.Done()
|
|
|
for _ = range pc.messages {
|
|
|
// drain
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- pc.l.Lock()
|
|
|
- pc.l.Unlock()
|
|
|
-
|
|
|
- if len(errs) > 0 {
|
|
|
- return errs
|
|
|
- }
|
|
|
- return nil
|
|
|
+ wg.Wait()
|
|
|
+ return closeErr
|
|
|
}
|
|
|
|
|
|
// Errors implements the Errors method from the sarama.PartitionConsumer interface.
|