package sarama

import (
	"reflect"
	"runtime"
	"strconv"
	"strings"
	"testing"
	"time"

	"github.com/davecgh/go-spew/spew"
)

var recordBatchTestCases = []struct {
	name         string
	batch        RecordBatch
	encoded      []byte
	oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
}{
	{
		name: "empty record",
		batch: RecordBatch{
			Version:        2,
			FirstTimestamp: time.Unix(0, 0),
			MaxTimestamp:   time.Unix(0, 0),
			Records:        []*Record{},
		},
		encoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 49, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,                // Version
			89, 95, 183, 221, // CRC
			0, 0, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 0, // Number of Records
		},
	},
	{
		name: "control batch",
		batch: RecordBatch{
			Version:        2,
			Control:        true,
			FirstTimestamp: time.Unix(0, 0),
			MaxTimestamp:   time.Unix(0, 0),
			Records:        []*Record{},
		},
		encoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 49, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,               // Version
			81, 46, 67, 217, // CRC
			0, 32, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 0, // Number of Records
		},
	},
	{
		name: "uncompressed record",
		batch: RecordBatch{
			Version:         2,
			FirstTimestamp:  time.Unix(1479847795, 0),
			MaxTimestamp:    time.Unix(0, 0),
			LastOffsetDelta: 0,
			Records: []*Record{{
				TimestampDelta: 5 * time.Millisecond,
				Key:            []byte{1, 2, 3, 4},
				Value:          []byte{5, 6, 7},
				Headers: []*RecordHeader{{
					Key:   []byte{8, 9, 10},
					Value: []byte{11, 12},
				}},
			}},
			recordsLen: 21,
		},
		encoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 70, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,                // Version
			84, 121, 97, 253, // CRC
			0, 0, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 1, // Number of Records
			40, // Record Length
			0,  // Attributes
			10, // Timestamp Delta
			0,  // Offset Delta
			8,  // Key Length
			1, 2, 3, 4,
			6, // Value Length
			5, 6, 7,
			2,        // Number of Headers
			6,        // Header Key Length
			8, 9, 10, // Header Key
			4,      // Header Value Length
			11, 12, // Header Value
		},
	},
	{
		name: "gzipped record",
		batch: RecordBatch{
			Version:          2,
			Codec:            CompressionGZIP,
			CompressionLevel: CompressionLevelDefault,
			FirstTimestamp:   time.Unix(1479847795, 0),
			MaxTimestamp:     time.Unix(0, 0),
			LastOffsetDelta:  0,
			Records: []*Record{{
				TimestampDelta: 5 * time.Millisecond,
				Key:            []byte{1, 2, 3, 4},
				Value:          []byte{5, 6, 7},
				Headers: []*RecordHeader{{
					Key:   []byte{8, 9, 10},
					Value: []byte{11, 12},
				}},
			}},
			recordsLen: 21,
		},
		encoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 94, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,                  // Version
			159, 236, 182, 189, // CRC
			0, 1, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 1, // Number of Records
			31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
			99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
		},
		oldGoEncoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 94, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,               // Version
			0, 216, 14, 210, // CRC
			0, 1, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 1, // Number of Records
			31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
			99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
		},
	},
	{
		name: "snappy compressed record",
		batch: RecordBatch{
			Version:         2,
			Codec:           CompressionSnappy,
			FirstTimestamp:  time.Unix(1479847795, 0),
			MaxTimestamp:    time.Unix(0, 0),
			LastOffsetDelta: 0,
			Records: []*Record{{
				TimestampDelta: 5 * time.Millisecond,
				Key:            []byte{1, 2, 3, 4},
				Value:          []byte{5, 6, 7},
				Headers: []*RecordHeader{{
					Key:   []byte{8, 9, 10},
					Value: []byte{11, 12},
				}},
			}},
			recordsLen: 21,
		},
		encoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 72, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,              // Version
			21, 0, 159, 97, // CRC
			0, 2, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 1, // Number of Records
			21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
		},
	},
	{
		name: "lz4 compressed record",
		batch: RecordBatch{
			Version:         2,
			Codec:           CompressionLZ4,
			FirstTimestamp:  time.Unix(1479847795, 0),
			MaxTimestamp:    time.Unix(0, 0),
			LastOffsetDelta: 0,
			Records: []*Record{{
				TimestampDelta: 5 * time.Millisecond,
				Key:            []byte{1, 2, 3, 4},
				Value:          []byte{5, 6, 7},
				Headers: []*RecordHeader{{
					Key:   []byte{8, 9, 10},
					Value: []byte{11, 12},
				}},
			}},
			recordsLen: 21,
		},
		encoded: []byte{
			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
			0, 0, 0, 89, // Length
			0, 0, 0, 0, // Partition Leader Epoch
			2,                 // Version
			169, 74, 119, 197, // CRC
			0, 3, // Attributes
			0, 0, 0, 0, // Last Offset Delta
			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
			0, 0, // Producer Epoch
			0, 0, 0, 0, // First Sequence
			0, 0, 0, 1, // Number of Records
			4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
			6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
		},
	},
}

func isOldGo(t *testing.T) bool {
	v := strings.Split(runtime.Version()[2:], ".")
	if len(v) < 2 {
		t.Logf("Can't parse version: %s", runtime.Version())
		return false
	}
	maj, err := strconv.Atoi(v[0])
	if err != nil {
		t.Logf("Can't parse version: %s", runtime.Version())
		return false
	}
	min, err := strconv.Atoi(v[1])
	if err != nil {
		t.Logf("Can't parse version: %s", runtime.Version())
		return false
	}
	return maj < 1 || (maj == 1 && min < 8)
}

func TestRecordBatchEncoding(t *testing.T) {
	for _, tc := range recordBatchTestCases {
		if tc.oldGoEncoded != nil && isOldGo(t) {
			testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
		} else {
			testEncodable(t, tc.name, &tc.batch, tc.encoded)
		}
	}
}

func TestRecordBatchDecoding(t *testing.T) {
	for _, tc := range recordBatchTestCases {
		batch := RecordBatch{}
		testDecodable(t, tc.name, &batch, tc.encoded)
		for _, r := range batch.Records {
			r.length = varintLengthField{}
		}
		for _, r := range tc.batch.Records {
			r.length = varintLengthField{}
		}
		// The compression level is not restored on decoding. It is not needed
		// anyway. We only set it here to ensure that comparision succeeds.
		batch.CompressionLevel = tc.batch.CompressionLevel
		if !reflect.DeepEqual(batch, tc.batch) {
			t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
		}
	}
}