|
|
@@ -99,31 +99,45 @@ func (b *Broker) Produce(clientID *string, request *ProduceRequest) (*ProduceRes
|
|
|
return response, nil
|
|
|
}
|
|
|
|
|
|
-func (b *Broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) error {
|
|
|
+func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
|
|
|
b.lock.Lock()
|
|
|
defer b.lock.Unlock()
|
|
|
|
|
|
fullRequest := request{b.correlation_id, clientID, req}
|
|
|
buf, err := encode(&fullRequest)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
_, err = b.conn.Write(buf)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
b.correlation_id++
|
|
|
|
|
|
- if res == nil {
|
|
|
- return nil
|
|
|
+ if !promiseResponse {
|
|
|
+ return nil, nil
|
|
|
}
|
|
|
|
|
|
promise := responsePromise{fullRequest.correlation_id, make(chan []byte), make(chan error)}
|
|
|
b.responses <- promise
|
|
|
|
|
|
+ return &promise, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (b *Broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) error {
|
|
|
+ promise, err := b.send(clientID, req, res != nil)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if promise == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
select {
|
|
|
- case buf = <-promise.packets:
|
|
|
+ case buf := <-promise.packets:
|
|
|
return decode(buf, res)
|
|
|
case err = <-promise.errors:
|
|
|
return err
|