|
@@ -123,9 +123,9 @@ func (ce ConsumerError) Error() string {
|
|
|
// ConsumeErrors is a type that wraps a batch of errors and implements the Error interface.
|
|
// ConsumeErrors is a type that wraps a batch of errors and implements the Error interface.
|
|
|
// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
|
|
// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
|
|
|
// when stopping.
|
|
// when stopping.
|
|
|
-type ConsumeErrors []error
|
|
|
|
|
|
|
+type ConsumerErrors []*ConsumerError
|
|
|
|
|
|
|
|
-func (ce ConsumeErrors) Error() string {
|
|
|
|
|
|
|
+func (ce ConsumerErrors) Error() string {
|
|
|
return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
|
|
return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -397,7 +397,7 @@ func (child *PartitionConsumer) Close() error {
|
|
|
// drain
|
|
// drain
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- var errors ConsumeErrors
|
|
|
|
|
|
|
+ var errors ConsumerErrors
|
|
|
for err := range child.errors {
|
|
for err := range child.errors {
|
|
|
errors = append(errors, err)
|
|
errors = append(errors, err)
|
|
|
}
|
|
}
|