|
@@ -46,7 +46,13 @@ type mockBroker struct {
|
|
|
t *testing.T
|
|
|
latency time.Duration
|
|
|
handler requestHandlerFunc
|
|
|
- handlerMux sync.Mutex
|
|
|
+ history []RequestResponse
|
|
|
+ lock sync.Mutex
|
|
|
+}
|
|
|
+
|
|
|
+type RequestResponse struct {
|
|
|
+ Request requestBody
|
|
|
+ Response encoder
|
|
|
}
|
|
|
|
|
|
func (b *mockBroker) SetLatency(latency time.Duration) {
|
|
@@ -57,9 +63,9 @@ func (b *mockBroker) SetLatency(latency time.Duration) {
|
|
|
|
|
|
|
|
|
func (b *mockBroker) SetHandler(handler requestHandlerFunc) {
|
|
|
- b.handlerMux.Lock()
|
|
|
+ b.lock.Lock()
|
|
|
b.handler = handler
|
|
|
- b.handlerMux.Unlock()
|
|
|
+ b.lock.Unlock()
|
|
|
}
|
|
|
|
|
|
func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
|
|
@@ -77,6 +83,14 @@ func (b *mockBroker) BrokerID() int32 {
|
|
|
return b.brokerID
|
|
|
}
|
|
|
|
|
|
+func (b *mockBroker) History() []RequestResponse {
|
|
|
+ b.lock.Lock()
|
|
|
+ history := make([]RequestResponse, len(b.history))
|
|
|
+ copy(history, b.history)
|
|
|
+ b.lock.Unlock()
|
|
|
+ return history
|
|
|
+}
|
|
|
+
|
|
|
func (b *mockBroker) Port() int32 {
|
|
|
return b.port
|
|
|
}
|
|
@@ -150,9 +164,13 @@ func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
|
|
|
time.Sleep(b.latency)
|
|
|
}
|
|
|
|
|
|
- res := b.requestHandler()(req)
|
|
|
+ b.lock.Lock()
|
|
|
+ res := b.handler(req)
|
|
|
+ b.history = append(b.history, RequestResponse{req.body, res})
|
|
|
+ b.lock.Unlock()
|
|
|
+
|
|
|
if res == nil {
|
|
|
- Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, req)
|
|
|
+ Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
|
|
|
continue
|
|
|
}
|
|
|
Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
|
|
@@ -180,12 +198,6 @@ func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
|
|
|
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
|
|
|
}
|
|
|
|
|
|
-func (b *mockBroker) requestHandler() requestHandlerFunc {
|
|
|
- b.handlerMux.Lock()
|
|
|
- defer b.handlerMux.Unlock()
|
|
|
- return b.handler
|
|
|
-}
|
|
|
-
|
|
|
func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) {
|
|
|
select {
|
|
|
case res, ok := <-b.expectations:
|