package sarama

import (
	"fmt"
	"testing"
	"time"
)

var (
	produceResponseNoBlocksV0 = []byte{
		0x00, 0x00, 0x00, 0x00}

	produceResponseManyBlocksVersions = map[int][]byte{
		0: {
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x03, 'f', 'o', 'o',
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x00, 0x00, 0x01, // Partition 1
			0x00, 0x02, // ErrInvalidMessage
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
		},

		1: {
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x03, 'f', 'o', 'o',
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x00, 0x00, 0x01, // Partition 1
			0x00, 0x02, // ErrInvalidMessage
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255

			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
		},
		2: {
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x03, 'f', 'o', 'o',
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x00, 0x00, 0x01, // Partition 1
			0x00, 0x02, // ErrInvalidMessage
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)

			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
		},
		7: { // version 7 adds StartOffset
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x03, 'f', 'o', 'o',
			0x00, 0x00, 0x00, 0x01,

			0x00, 0x00, 0x00, 0x01, // Partition 1
			0x00, 0x02, // ErrInvalidMessage
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50

			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
		},
	}
)

func TestProduceResponseDecode(t *testing.T) {
	response := ProduceResponse{}

	testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
	if len(response.Blocks) != 0 {
		t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
	}

	for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
		t.Logf("Decoding produceResponseManyBlocks version %d", v)
		testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
		if len(response.Blocks) != 1 {
			t.Error("Decoding produced", len(response.Blocks), "topics where there was 1")
		}
		if len(response.Blocks["foo"]) != 1 {
			t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one")
		}
		block := response.GetBlock("foo", 1)
		if block == nil {
			t.Error("Decoding did not produce a block for foo/1")
		} else {
			if block.Err != ErrInvalidMessage {
				t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
			}
			if block.Offset != 255 {
				t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
			}
			if v >= 2 {
				if block.Timestamp != time.Unix(1, 0) {
					t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
				}
			}
			if v >= 7 {
				if block.StartOffset != 50 {
					t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset)
				}
			}
		}
		if v >= 1 {
			if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
				t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
			}
		}
	}
}

func TestProduceResponseEncode(t *testing.T) {
	response := ProduceResponse{}
	response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
	testEncodable(t, "empty", &response, produceResponseNoBlocksV0)

	response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
	response.Blocks["foo"][1] = &ProduceResponseBlock{
		Err:         ErrInvalidMessage,
		Offset:      255,
		Timestamp:   time.Unix(1, 0),
		StartOffset: 50,
	}
	response.ThrottleTime = 100 * time.Millisecond
	for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
		response.Version = int16(v)
		testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
	}
}

func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
	response := ProduceResponse{}
	response.Version = 2
	response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
	response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
	response.Blocks["t"][0] = &ProduceResponseBlock{
		Err:    ErrNoError,
		Offset: 0,
		// Use a timestamp before Unix time
		Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
	}
	response.ThrottleTime = 100 * time.Millisecond
	_, err := encode(&response, nil)
	if err == nil {
		t.Error("Expecting error, got nil")
	}
	if _, ok := err.(PacketEncodingError); !ok {
		t.Error("Expecting PacketEncodingError, got:", err)
	}
}