package sarama

import (
	"testing"
	"time"
)

var (
	produceRequestEmpty = []byte{
		0x00, 0x00,
		0x00, 0x00, 0x00, 0x00,
		0x00, 0x00, 0x00, 0x00}

	produceRequestHeader = []byte{
		0x01, 0x23,
		0x00, 0x00, 0x04, 0x44,
		0x00, 0x00, 0x00, 0x00}

	produceRequestOneMessage = []byte{
		0x01, 0x23,
		0x00, 0x00, 0x04, 0x44,
		0x00, 0x00, 0x00, 0x01,
		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
		0x00, 0x00, 0x00, 0x01,
		0x00, 0x00, 0x00, 0xAD,
		0x00, 0x00, 0x00, 0x1C,
		// messageSet
		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
		0x00, 0x00, 0x00, 0x10,
		// message
		0x23, 0x96, 0x4a, 0xf7, // CRC
		0x00,
		0x00,
		0xFF, 0xFF, 0xFF, 0xFF,
		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

	produceRequestOneRecord = []byte{
		0xFF, 0xFF, // Transaction ID
		0x01, 0x23, // Required Acks
		0x00, 0x00, 0x04, 0x44, // Timeout
		0x00, 0x00, 0x00, 0x01, // Number of Topics
		0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
		0x00, 0x00, 0x00, 0x01, // Number of Partitions
		0x00, 0x00, 0x00, 0xAD, // Partition
		0x00, 0x00, 0x00, 0x52, // Records length
		// recordBatch
		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
		0x00, 0x00, 0x00, 0x46,
		0x00, 0x00, 0x00, 0x00,
		0x02,
		0xCA, 0x33, 0xBC, 0x05,
		0x00, 0x00,
		0x00, 0x00, 0x00, 0x01,
		0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
		0x00, 0x00,
		0x00, 0x00, 0x00, 0x00,
		0x00, 0x00, 0x00, 0x01,
		// record
		0x28,
		0x00,
		0x0A,
		0x00,
		0x08, 0x01, 0x02, 0x03, 0x04,
		0x06, 0x05, 0x06, 0x07,
		0x02,
		0x06, 0x08, 0x09, 0x0A,
		0x04, 0x0B, 0x0C,
	}
)

func TestProduceRequest(t *testing.T) {
	request := new(ProduceRequest)
	testRequest(t, "empty", request, produceRequestEmpty)

	request.RequiredAcks = 0x123
	request.Timeout = 0x444
	testRequest(t, "header", request, produceRequestHeader)

	request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
	testRequest(t, "one message", request, produceRequestOneMessage)

	request.Version = 3
	batch := &RecordBatch{
		LastOffsetDelta: 1,
		Version:         2,
		FirstTimestamp:  time.Unix(1479847795, 0),
		MaxTimestamp:    time.Unix(0, 0),
		Records: []*Record{{
			TimestampDelta: 5 * time.Millisecond,
			Key:            []byte{0x01, 0x02, 0x03, 0x04},
			Value:          []byte{0x05, 0x06, 0x07},
			Headers: []*RecordHeader{{
				Key:   []byte{0x08, 0x09, 0x0A},
				Value: []byte{0x0B, 0x0C},
			}},
		}},
	}
	request.AddBatch("topic", 0xAD, batch)
	packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
	// compressRecords field is not populated on decoding because consumers
	// are only interested in decoded records.
	batch.compressedRecords = nil
	testRequestDecode(t, "one record", request, packet)
}