package sarama

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

func TestDefaultConsumerConfigValidates(t *testing.T) {
	config := NewConsumerConfig()
	if err := config.Validate(); err != nil {
		t.Error(err)
	}
}

func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
	config := NewPartitionConsumerConfig()
	if err := config.Validate(); err != nil {
		t.Error(err)
	}
}

func TestConsumerOffsetManual(t *testing.T) {
	seedBroker := NewMockBroker(t, 1)
	leader := NewMockBroker(t, 2)

	metadataResponse := new(MetadataResponse)
	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
	seedBroker.Returns(metadataResponse)

	for i := 0; i <= 10; i++ {
		fetchResponse := new(FetchResponse)
		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
		leader.Returns(fetchResponse)
	}

	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)

	if err != nil {
		t.Fatal(err)
	}

	master, err := NewConsumer(client, nil)
	if err != nil {
		t.Fatal(err)
	}

	config := NewPartitionConsumerConfig()
	config.OffsetMethod = OffsetMethodManual
	config.OffsetValue = 1234
	consumer, err := master.ConsumePartition("my_topic", 0, config)
	if err != nil {
		t.Fatal(err)
	}
	seedBroker.Close()

	for i := 0; i < 10; i++ {
		event := <-consumer.Events()
		if event.Err != nil {
			t.Error(event.Err)
		}
		if event.Offset != int64(i+1234) {
			t.Error("Incorrect message offset!")
		}
	}

	safeClose(t, consumer)
	safeClose(t, client)
	leader.Close()
}

func TestConsumerLatestOffset(t *testing.T) {
	seedBroker := NewMockBroker(t, 1)
	leader := NewMockBroker(t, 2)

	metadataResponse := new(MetadataResponse)
	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
	seedBroker.Returns(metadataResponse)

	offsetResponse := new(OffsetResponse)
	offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
	leader.Returns(offsetResponse)

	fetchResponse := new(FetchResponse)
	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
	leader.Returns(fetchResponse)

	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}
	seedBroker.Close()

	master, err := NewConsumer(client, nil)
	if err != nil {
		t.Fatal(err)
	}

	config := NewPartitionConsumerConfig()
	config.OffsetMethod = OffsetMethodNewest
	consumer, err := master.ConsumePartition("my_topic", 0, config)
	if err != nil {
		t.Fatal(err)
	}

	leader.Close()
	safeClose(t, consumer)
	safeClose(t, client)

	// we deliver one message, so it should be one higher than we return in the OffsetResponse
	if consumer.offset != 0x010102 {
		t.Error("Latest offset not fetched correctly:", consumer.offset)
	}
}

func TestConsumerFunnyOffsets(t *testing.T) {
	// for topics that are compressed and/or compacted (different things!) we have to be
	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
	// possibly starting prior to the actual value we requested
	seedBroker := NewMockBroker(t, 1)
	leader := NewMockBroker(t, 2)

	metadataResponse := new(MetadataResponse)
	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
	seedBroker.Returns(metadataResponse)

	fetchResponse := new(FetchResponse)
	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
	leader.Returns(fetchResponse)

	fetchResponse = new(FetchResponse)
	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
	leader.Returns(fetchResponse)

	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	master, err := NewConsumer(client, nil)
	if err != nil {
		t.Fatal(err)
	}

	config := NewPartitionConsumerConfig()
	config.OffsetMethod = OffsetMethodManual
	config.OffsetValue = 2
	consumer, err := master.ConsumePartition("my_topic", 0, config)

	event := <-consumer.Events()
	if event.Err != nil {
		t.Error(event.Err)
	}
	if event.Offset != 3 {
		t.Error("Incorrect message offset!")
	}

	leader.Close()
	seedBroker.Close()
	safeClose(t, consumer)
	safeClose(t, client)
}

func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
	// initial setup
	seedBroker := NewMockBroker(t, 1)
	leader0 := NewMockBroker(t, 2)
	leader1 := NewMockBroker(t, 3)

	metadataResponse := new(MetadataResponse)
	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
	seedBroker.Returns(metadataResponse)

	// launch test goroutines
	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	master, err := NewConsumer(client, nil)
	if err != nil {
		t.Fatal(err)
	}

	config := NewPartitionConsumerConfig()
	config.OffsetMethod = OffsetMethodManual
	config.OffsetValue = 0

	// we expect to end up (eventually) consuming exactly ten messages on each partition
	var wg sync.WaitGroup
	for i := 0; i < 2; i++ {
		consumer, err := master.ConsumePartition("my_topic", int32(i), config)
		if err != nil {
			t.Error(err)
		}
		wg.Add(1)
		go func(partition int32, c *PartitionConsumer) {
			for i := 0; i < 10; i++ {
				event := <-consumer.Events()
				if event.Err != nil {
					t.Error(event.Err, i, partition)
				}
				if event.Offset != int64(i) {
					t.Error("Incorrect message offset!", i, partition, event.Offset)
				}
				if event.Partition != partition {
					t.Error("Incorrect message partition!")
				}
			}
			safeClose(t, consumer)
			wg.Done()
		}(int32(i), consumer)
	}

	// leader0 provides first four messages on partition 0
	fetchResponse := new(FetchResponse)
	for i := 0; i < 4; i++ {
		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
	}
	leader0.Returns(fetchResponse)

	// leader0 says no longer leader of partition 0
	fetchResponse = new(FetchResponse)
	fetchResponse.AddError("my_topic", 0, NotLeaderForPartition)
	leader0.Returns(fetchResponse)

	// metadata assigns both partitions to leader1
	metadataResponse = new(MetadataResponse)
	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
	seedBroker.Returns(metadataResponse)
	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering

	// leader1 provides five messages on partition 1
	fetchResponse = new(FetchResponse)
	for i := 0; i < 5; i++ {
		fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
	}
	leader1.Returns(fetchResponse)

	// leader1 provides three more messages on both partitions
	fetchResponse = new(FetchResponse)
	for i := 0; i < 3; i++ {
		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
		fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
	}
	leader1.Returns(fetchResponse)

	// leader1 provides three more messages on partition0, says no longer leader of partition1
	fetchResponse = new(FetchResponse)
	for i := 0; i < 3; i++ {
		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
	}
	fetchResponse.AddError("my_topic", 1, NotLeaderForPartition)
	leader1.Returns(fetchResponse)

	// metadata assigns 0 to leader1 and 1 to leader0
	metadataResponse = new(MetadataResponse)
	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, NoError)
	seedBroker.Returns(metadataResponse)
	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering

	// leader0 provides two messages on partition 1
	fetchResponse = new(FetchResponse)
	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
	leader0.Returns(fetchResponse)

	// leader0 provides last message  on partition 1
	fetchResponse = new(FetchResponse)
	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
	leader0.Returns(fetchResponse)

	// leader1 provides last message  on partition 0
	fetchResponse = new(FetchResponse)
	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
	leader1.Returns(fetchResponse)

	wg.Wait()
	leader1.Close()
	leader0.Close()
	seedBroker.Close()
	safeClose(t, client)
}

func ExampleConsumer() {
	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	} else {
		fmt.Println("> connected")
	}
	defer client.Close()

	master, err := NewConsumer(client, nil)
	if err != nil {
		panic(err)
	} else {
		fmt.Println("> master consumer ready")
	}

	consumer, err := master.ConsumePartition("my_topic", 0, nil)
	if err != nil {
		panic(err)
	} else {
		fmt.Println("> consumer ready")
	}
	defer consumer.Close()

	msgCount := 0

consumerLoop:
	for {
		select {
		case event := <-consumer.Events():
			if event.Err != nil {
				panic(event.Err)
			}
			msgCount++
		case <-time.After(5 * time.Second):
			fmt.Println("> timed out")
			break consumerLoop
		}
	}
	fmt.Println("Got", msgCount, "messages.")
}