|
|
@@ -310,6 +310,7 @@ type partitionConsumer struct {
|
|
|
|
|
|
trigger, dying chan none
|
|
|
responseResult error
|
|
|
+ closeOnce sync.Once
|
|
|
|
|
|
fetchSize int32
|
|
|
offset int64
|
|
|
@@ -412,7 +413,9 @@ func (child *partitionConsumer) AsyncClose() {
|
|
|
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
|
|
|
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
|
|
|
// also just close itself)
|
|
|
- close(child.dying)
|
|
|
+ child.closeOnce.Do(func() {
|
|
|
+ close(child.dying)
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
func (child *partitionConsumer) Close() error {
|