|
|
@@ -1,11 +1,13 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
+ "errors"
|
|
|
"fmt"
|
|
|
- "io"
|
|
|
"net"
|
|
|
"testing"
|
|
|
"time"
|
|
|
+
|
|
|
+ metrics "github.com/rcrowley/go-metrics"
|
|
|
)
|
|
|
|
|
|
func ExampleBroker() {
|
|
|
@@ -107,77 +109,96 @@ func TestSimpleBrokerCommunication(t *testing.T) {
|
|
|
|
|
|
}
|
|
|
|
|
|
-func TestReceiveSASLOAuthBearerServerResponse(t *testing.T) {
|
|
|
+var ErrTokenFailure = errors.New("Failure generating token")
|
|
|
+
|
|
|
+type TokenProvider struct {
|
|
|
+ accessToken string
|
|
|
+ err error
|
|
|
+}
|
|
|
+
|
|
|
+func (t *TokenProvider) Token() (string, error) {
|
|
|
+ return t.accessToken, t.err
|
|
|
+}
|
|
|
+
|
|
|
+func newTokenProvider(accessToken string, err error) *TokenProvider {
|
|
|
+ return &TokenProvider{
|
|
|
+ accessToken: accessToken,
|
|
|
+ err: err,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func TestReceiveSASLOAuthBearerClientResponse(t *testing.T) {
|
|
|
|
|
|
testTable := []struct {
|
|
|
- name string
|
|
|
- buf []byte
|
|
|
- err error
|
|
|
+ name string
|
|
|
+ err error
|
|
|
+ tokProvider *TokenProvider
|
|
|
}{
|
|
|
{"OK server response",
|
|
|
- []byte{
|
|
|
- 0, 0, 0, 14,
|
|
|
- 0, 0, 0, 0,
|
|
|
- 0, 0,
|
|
|
- 255, 255, // no error message
|
|
|
- 0, 0, 0, 2, 'o', 'k',
|
|
|
- },
|
|
|
- nil},
|
|
|
+ nil,
|
|
|
+ newTokenProvider("access-token-123", nil),
|
|
|
+ },
|
|
|
{"SASL authentication failure response",
|
|
|
- []byte{
|
|
|
- 0, 0, 0, 19,
|
|
|
- 0, 0, 0, 0,
|
|
|
- 0, 58,
|
|
|
- 0, 3, 'e', 'r', 'r',
|
|
|
- 0, 0, 0, 4, 'f', 'a', 'i', 'l',
|
|
|
- },
|
|
|
- ErrSASLAuthenticationFailed},
|
|
|
- {"Truncated header",
|
|
|
- []byte{
|
|
|
- 0, 0, 0, 12,
|
|
|
- },
|
|
|
- io.ErrUnexpectedEOF},
|
|
|
- {"Truncated response message",
|
|
|
- []byte{
|
|
|
- 0, 0, 0, 12,
|
|
|
- 0, 0, 0, 0,
|
|
|
- 0, 0,
|
|
|
- },
|
|
|
- io.ErrUnexpectedEOF},
|
|
|
+ ErrSASLAuthenticationFailed,
|
|
|
+ newTokenProvider("access-token-123", nil),
|
|
|
+ },
|
|
|
+ {"Token generation error",
|
|
|
+ ErrTokenFailure,
|
|
|
+ newTokenProvider("access-token-123", ErrTokenFailure),
|
|
|
+ },
|
|
|
}
|
|
|
|
|
|
for i, test := range testTable {
|
|
|
|
|
|
- in, out := net.Pipe()
|
|
|
+ // mockBroker mocks underlying network logic and broker responses
|
|
|
+ mockBroker := NewMockBroker(t, 0)
|
|
|
|
|
|
- defer func() {
|
|
|
- if err := out.Close(); err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- }()
|
|
|
+ mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t)
|
|
|
|
|
|
- b := &Broker{conn: out}
|
|
|
+ if e, ok := test.err.(KError); ok {
|
|
|
+ mockSASLAuthResponse = mockSASLAuthResponse.SetError(e)
|
|
|
+ }
|
|
|
|
|
|
- go func() {
|
|
|
- defer func() {
|
|
|
- if err := in.Close(); err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- }()
|
|
|
- if _, err := in.Write(test.buf); err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- }()
|
|
|
+ mockBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "SaslAuthenticateRequest": mockSASLAuthResponse,
|
|
|
+ "SaslHandshakeRequest": NewMockSaslHandshakeResponse(t).
|
|
|
+ SetEnabledMechanisms([]string{SASLTypeOAuth}),
|
|
|
+ })
|
|
|
+
|
|
|
+ // broker executes SASL requests against mockBroker
|
|
|
+ broker := NewBroker(mockBroker.Addr())
|
|
|
+ broker.requestRate = metrics.NilMeter{}
|
|
|
+ broker.outgoingByteRate = metrics.NilMeter{}
|
|
|
+ broker.incomingByteRate = metrics.NilMeter{}
|
|
|
+ broker.requestSize = metrics.NilHistogram{}
|
|
|
+ broker.responseSize = metrics.NilHistogram{}
|
|
|
+ broker.responseRate = metrics.NilMeter{}
|
|
|
+ broker.requestLatency = metrics.NilHistogram{}
|
|
|
+
|
|
|
+ conf := NewConfig()
|
|
|
+ broker.conf = conf
|
|
|
+
|
|
|
+ dialer := net.Dialer{
|
|
|
+ Timeout: conf.Net.DialTimeout,
|
|
|
+ KeepAlive: conf.Net.KeepAlive,
|
|
|
+ LocalAddr: conf.Net.LocalAddr,
|
|
|
+ }
|
|
|
|
|
|
- bytesRead, err := b.receiveSASLOAuthBearerServerResponse(0)
|
|
|
+ conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
|
|
|
|
|
|
- if len(test.buf) != bytesRead {
|
|
|
- t.Errorf("[%d]:[%s] Expected %d bytes read, got %d\n", i, test.name, len(test.buf), bytesRead)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
+ broker.conn = conn
|
|
|
+
|
|
|
+ err = broker.sendAndReceiveSASLOAuth(test.tokProvider)
|
|
|
+
|
|
|
if test.err != err {
|
|
|
t.Errorf("[%d]:[%s] Expected %s error, got %s\n", i, test.name, test.err, err)
|
|
|
}
|
|
|
+
|
|
|
+ mockBroker.Close()
|
|
|
}
|
|
|
}
|
|
|
|