|
@@ -164,38 +164,28 @@ func (b *broker) rcvResponseLoop() {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (b *broker) sendRequest(clientID *string, body requestEncoder) (*responsePromise, error) {
|
|
|
|
|
- req := request{b.correlation_id, clientID, body}
|
|
|
|
|
- packet, err := buildBytes(&req)
|
|
|
|
|
|
|
+func (b *broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) error {
|
|
|
|
|
+ fullRequest := request{b.correlation_id, clientID, req}
|
|
|
|
|
+ packet, err := buildBytes(&fullRequest)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, body.expectResponse()}
|
|
|
|
|
|
|
+ sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, req.expectResponse()}
|
|
|
|
|
|
|
|
b.requests <- sendRequest
|
|
b.requests <- sendRequest
|
|
|
sendRequest.response.packets <- *packet // we cheat to avoid poofing up more channels than necessary
|
|
sendRequest.response.packets <- *packet // we cheat to avoid poofing up more channels than necessary
|
|
|
b.correlation_id++
|
|
b.correlation_id++
|
|
|
- return &sendRequest.response, nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// returns true if there was a response, even if there was an error decoding it (in
|
|
|
|
|
-// which case it will also return an error of some sort)
|
|
|
|
|
-func (b *broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) error {
|
|
|
|
|
- responseChan, err := b.sendRequest(clientID, req)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
select {
|
|
select {
|
|
|
- case buf := <-responseChan.packets:
|
|
|
|
|
|
|
+ case buf := <-sendRequest.response.packets:
|
|
|
// Only try to decode if we got a response.
|
|
// Only try to decode if we got a response.
|
|
|
if buf != nil {
|
|
if buf != nil {
|
|
|
decoder := realDecoder{raw: buf}
|
|
decoder := realDecoder{raw: buf}
|
|
|
err = res.decode(&decoder)
|
|
err = res.decode(&decoder)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- case err = <-responseChan.errors:
|
|
|
|
|
|
|
+ case err = <-sendRequest.response.errors:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
return err
|