Browse Source

XXXXXX Don't use string pointers XXXXXX

This makes everything 100x easier because it fixes hashing by topic ID. HOWEVER,
it means we cannot distinguish between -1 == nil and 0 == "" when decoding kafka
strings. At the moment this does not seem to be semantically relevant, but that
day may come...
Evan Huus 12 years ago
parent
commit
e0ba03b9b7

+ 2 - 2
kafka/client.go

@@ -3,11 +3,11 @@ package kafka
 import k "sarama/protocol"
 import k "sarama/protocol"
 
 
 type Client struct {
 type Client struct {
-	id    *string
+	id    string
 	cache *metadataCache
 	cache *metadataCache
 }
 }
 
 
-func NewClient(id *string, host string, port int32) (client *Client, err error) {
+func NewClient(id string, host string, port int32) (client *Client, err error) {
 	client = new(Client)
 	client = new(Client)
 	client.id = id
 	client.id = id
 	client.cache, err = newMetadataCache(client, host, port)
 	client.cache, err = newMetadataCache(client, host, port)

+ 6 - 6
kafka/metadata_cache.go

@@ -30,7 +30,7 @@ func newMetadataCache(client *Client, host string, port int32) (*metadataCache,
 	mc.brokers[starter.ID()] = starter
 	mc.brokers[starter.ID()] = starter
 
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = mc.refreshTopics(make([]*string, 0))
+	err = mc.refreshTopics(make([]string, 0))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -120,19 +120,19 @@ func (mc *metadataCache) update(data *k.MetadataResponse) error {
 		if topic.Err != k.NO_ERROR {
 		if topic.Err != k.NO_ERROR {
 			return topic.Err
 			return topic.Err
 		}
 		}
-		mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
+		mc.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
 		for _, partition := range topic.Partitions {
 			if partition.Err != k.NO_ERROR {
 			if partition.Err != k.NO_ERROR {
 				return partition.Err
 				return partition.Err
 			}
 			}
-			mc.leaders[*topic.Name][partition.Id] = partition.Leader
+			mc.leaders[topic.Name][partition.Id] = partition.Leader
 		}
 		}
 	}
 	}
 
 
 	return nil
 	return nil
 }
 }
 
 
-func (mc *metadataCache) refreshTopics(topics []*string) error {
+func (mc *metadataCache) refreshTopics(topics []string) error {
 	for broker := mc.any(); broker != nil; broker = mc.any() {
 	for broker := mc.any(); broker != nil; broker = mc.any() {
 		response, err := broker.GetMetadata(mc.client.id, &k.MetadataRequest{Topics: topics})
 		response, err := broker.GetMetadata(mc.client.id, &k.MetadataRequest{Topics: topics})
 
 
@@ -154,7 +154,7 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 }
 }
 
 
 func (mc *metadataCache) refreshTopic(topic string) error {
 func (mc *metadataCache) refreshTopic(topic string) error {
-	tmp := make([]*string, 1)
-	tmp[0] = &topic
+	tmp := make([]string, 1)
+	tmp[0] = topic
 	return mc.refreshTopics(tmp)
 	return mc.refreshTopics(tmp)
 }
 }

+ 6 - 2
kafka/producer.go

@@ -60,14 +60,18 @@ func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
 	}
 	}
 
 
 	request := &k.ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
 	request := &k.ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
-	request.AddMessage(&p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
+	request.AddMessage(p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
 
 
 	response, err := broker.Produce(p.client.id, request)
 	response, err := broker.Produce(p.client.id, request)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	block := response.GetBlock(&p.topic, partition)
+	if response == nil {
+		return nil
+	}
+
+	block := response.GetBlock(p.topic, partition)
 	if block == nil {
 	if block == nil {
 		return IncompleteResponse
 		return IncompleteResponse
 	}
 	}

+ 11 - 11
protocol/broker.go

@@ -9,7 +9,7 @@ import (
 // A single Kafka broker. All operations on this object are entirely concurrency-safe.
 // A single Kafka broker. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 type Broker struct {
 	id   int32
 	id   int32
-	host *string
+	host string
 	port int32
 	port int32
 
 
 	correlation_id int32
 	correlation_id int32
@@ -31,7 +31,7 @@ type responsePromise struct {
 func NewBroker(host string, port int32) *Broker {
 func NewBroker(host string, port int32) *Broker {
 	b := new(Broker)
 	b := new(Broker)
 	b.id = -1 // don't know it yet
 	b.id = -1 // don't know it yet
-	b.host = &host
+	b.host = host
 	b.port = port
 	b.port = port
 	return b
 	return b
 }
 }
@@ -45,7 +45,7 @@ func (b *Broker) Connect() error {
 		return AlreadyConnected
 		return AlreadyConnected
 	}
 	}
 
 
-	addr, err := net.ResolveIPAddr("ip", *b.host)
+	addr, err := net.ResolveIPAddr("ip", b.host)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -91,7 +91,7 @@ func (b *Broker) ID() int32 {
 	return b.id
 	return b.id
 }
 }
 
 
-func (b *Broker) GetMetadata(clientID *string, request *MetadataRequest) (*MetadataResponse, error) {
+func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 	response := new(MetadataResponse)
 
 
 	err := b.sendAndReceive(clientID, request, response)
 	err := b.sendAndReceive(clientID, request, response)
@@ -103,7 +103,7 @@ func (b *Broker) GetMetadata(clientID *string, request *MetadataRequest) (*Metad
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) GetOffset(clientID *string, request *OffsetRequest) (*OffsetResponse, error) {
+func (b *Broker) GetOffset(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 	response := new(OffsetResponse)
 
 
 	err := b.sendAndReceive(clientID, request, response)
 	err := b.sendAndReceive(clientID, request, response)
@@ -115,7 +115,7 @@ func (b *Broker) GetOffset(clientID *string, request *OffsetRequest) (*OffsetRes
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) Produce(clientID *string, request *ProduceRequest) (*ProduceResponse, error) {
+func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error) {
 	var response *ProduceResponse
 	var response *ProduceResponse
 	if request.ResponseCondition != NO_RESPONSE {
 	if request.ResponseCondition != NO_RESPONSE {
 		response = new(ProduceResponse)
 		response = new(ProduceResponse)
@@ -130,7 +130,7 @@ func (b *Broker) Produce(clientID *string, request *ProduceRequest) (*ProduceRes
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) Fetch(clientID *string, request *FetchRequest) (*FetchResponse, error) {
+func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error) {
 	response := new(FetchResponse)
 	response := new(FetchResponse)
 
 
 	err := b.sendAndReceive(clientID, request, response)
 	err := b.sendAndReceive(clientID, request, response)
@@ -142,7 +142,7 @@ func (b *Broker) Fetch(clientID *string, request *FetchRequest) (*FetchResponse,
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) CommitOffset(clientID *string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
+func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 	response := new(OffsetCommitResponse)
 	response := new(OffsetCommitResponse)
 
 
 	err := b.sendAndReceive(clientID, request, response)
 	err := b.sendAndReceive(clientID, request, response)
@@ -154,7 +154,7 @@ func (b *Broker) CommitOffset(clientID *string, request *OffsetCommitRequest) (*
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) FetchOffset(clientID *string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
+func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 	response := new(OffsetFetchResponse)
 	response := new(OffsetFetchResponse)
 
 
 	err := b.sendAndReceive(clientID, request, response)
 	err := b.sendAndReceive(clientID, request, response)
@@ -166,7 +166,7 @@ func (b *Broker) FetchOffset(clientID *string, request *OffsetFetchRequest) (*Of
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
+func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	b.lock.Lock()
 	defer b.lock.Unlock()
 	defer b.lock.Unlock()
 
 
@@ -196,7 +196,7 @@ func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool
 	return &promise, nil
 	return &promise, nil
 }
 }
 
 
-func (b *Broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) error {
+func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder) error {
 	promise, err := b.send(clientID, req, res != nil)
 	promise, err := b.send(clientID, req, res != nil)
 
 
 	if err != nil {
 	if err != nil {

+ 3 - 3
protocol/fetch_request.go

@@ -13,7 +13,7 @@ func (f *fetchRequestBlock) encode(pe packetEncoder) {
 type FetchRequest struct {
 type FetchRequest struct {
 	MaxWaitTime int32
 	MaxWaitTime int32
 	MinBytes    int32
 	MinBytes    int32
-	blocks      map[*string]map[int32]*fetchRequestBlock
+	blocks      map[string]map[int32]*fetchRequestBlock
 }
 }
 
 
 func (f *FetchRequest) encode(pe packetEncoder) {
 func (f *FetchRequest) encode(pe packetEncoder) {
@@ -39,9 +39,9 @@ func (f *FetchRequest) version() int16 {
 	return 0
 	return 0
 }
 }
 
 
-func (f *FetchRequest) AddBlock(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
+func (f *FetchRequest) AddBlock(topic string, partition_id int32, fetchOffset int64, maxBytes int32) {
 	if f.blocks == nil {
 	if f.blocks == nil {
-		f.blocks = make(map[*string]map[int32]*fetchRequestBlock)
+		f.blocks = make(map[string]map[int32]*fetchRequestBlock)
 	}
 	}
 
 
 	if f.blocks[topic] == nil {
 	if f.blocks[topic] == nil {

+ 2 - 2
protocol/fetch_response.go

@@ -32,7 +32,7 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 }
 }
 
 
 type FetchResponse struct {
 type FetchResponse struct {
-	Blocks map[*string]map[int32]*FetchResponseBlock
+	Blocks map[string]map[int32]*FetchResponseBlock
 }
 }
 
 
 func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
 func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
@@ -41,7 +41,7 @@ func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	fr.Blocks = make(map[*string]map[int32]*FetchResponseBlock, numTopics)
+	fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		name, err := pd.getString()
 		if err != nil {
 		if err != nil {

+ 1 - 1
protocol/metadata_request.go

@@ -1,7 +1,7 @@
 package protocol
 package protocol
 
 
 type MetadataRequest struct {
 type MetadataRequest struct {
-	Topics []*string
+	Topics []string
 }
 }
 
 
 func (mr *MetadataRequest) encode(pe packetEncoder) {
 func (mr *MetadataRequest) encode(pe packetEncoder) {

+ 1 - 1
protocol/metadata_response.go

@@ -39,7 +39,7 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
 
 
 type TopicMetadata struct {
 type TopicMetadata struct {
 	Err        KError
 	Err        KError
-	Name       *string
+	Name       string
 	Partitions []*PartitionMetadata
 	Partitions []*PartitionMetadata
 }
 }
 
 

+ 5 - 5
protocol/offset_commit_request.go

@@ -2,7 +2,7 @@ package protocol
 
 
 type offsetCommitRequestBlock struct {
 type offsetCommitRequestBlock struct {
 	offset   int64
 	offset   int64
-	metadata *string
+	metadata string
 }
 }
 
 
 func (r *offsetCommitRequestBlock) encode(pe packetEncoder) {
 func (r *offsetCommitRequestBlock) encode(pe packetEncoder) {
@@ -11,8 +11,8 @@ func (r *offsetCommitRequestBlock) encode(pe packetEncoder) {
 }
 }
 
 
 type OffsetCommitRequest struct {
 type OffsetCommitRequest struct {
-	ConsumerGroup *string
-	blocks        map[*string]map[int32]*offsetCommitRequestBlock
+	ConsumerGroup string
+	blocks        map[string]map[int32]*offsetCommitRequestBlock
 }
 }
 
 
 func (r *OffsetCommitRequest) encode(pe packetEncoder) {
 func (r *OffsetCommitRequest) encode(pe packetEncoder) {
@@ -36,9 +36,9 @@ func (r *OffsetCommitRequest) version() int16 {
 	return 0
 	return 0
 }
 }
 
 
-func (r *OffsetCommitRequest) AddBlock(topic *string, partition_id int32, offset int64, metadata *string) {
+func (r *OffsetCommitRequest) AddBlock(topic string, partition_id int32, offset int64, metadata string) {
 	if r.blocks == nil {
 	if r.blocks == nil {
-		r.blocks = make(map[*string]map[int32]*offsetCommitRequestBlock)
+		r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
 	}
 	}
 
 
 	if r.blocks[topic] == nil {
 	if r.blocks[topic] == nil {

+ 2 - 2
protocol/offset_commit_response.go

@@ -1,7 +1,7 @@
 package protocol
 package protocol
 
 
 type OffsetCommitResponse struct {
 type OffsetCommitResponse struct {
-	Errors map[*string]map[int32]KError
+	Errors map[string]map[int32]KError
 }
 }
 
 
 func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
 func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
@@ -10,7 +10,7 @@ func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	r.Errors = make(map[*string]map[int32]KError, numTopics)
+	r.Errors = make(map[string]map[int32]KError, numTopics)
 	for i := 0; i < numTopics; i++ {
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		name, err := pd.getString()
 		if err != nil {
 		if err != nil {

+ 4 - 4
protocol/offset_fetch_request.go

@@ -1,8 +1,8 @@
 package protocol
 package protocol
 
 
 type OffsetFetchRequest struct {
 type OffsetFetchRequest struct {
-	ConsumerGroup *string
-	partitions    map[*string][]int32
+	ConsumerGroup string
+	partitions    map[string][]int32
 }
 }
 
 
 func (r *OffsetFetchRequest) encode(pe packetEncoder) {
 func (r *OffsetFetchRequest) encode(pe packetEncoder) {
@@ -22,9 +22,9 @@ func (r *OffsetFetchRequest) version() int16 {
 	return 0
 	return 0
 }
 }
 
 
-func (r *OffsetFetchRequest) AddPartition(topic *string, partition_id int32) {
+func (r *OffsetFetchRequest) AddPartition(topic string, partition_id int32) {
 	if r.partitions == nil {
 	if r.partitions == nil {
-		r.partitions = make(map[*string][]int32)
+		r.partitions = make(map[string][]int32)
 	}
 	}
 
 
 	r.partitions[topic] = append(r.partitions[topic], partition_id)
 	r.partitions[topic] = append(r.partitions[topic], partition_id)

+ 3 - 3
protocol/offset_fetch_response.go

@@ -2,7 +2,7 @@ package protocol
 
 
 type OffsetFetchResponseBlock struct {
 type OffsetFetchResponseBlock struct {
 	Offset   int64
 	Offset   int64
-	Metadata *string
+	Metadata string
 	Err      KError
 	Err      KError
 }
 }
 
 
@@ -23,7 +23,7 @@ func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 }
 }
 
 
 type OffsetFetchResponse struct {
 type OffsetFetchResponse struct {
-	Blocks map[*string]map[int32]*OffsetFetchResponseBlock
+	Blocks map[string]map[int32]*OffsetFetchResponseBlock
 }
 }
 
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
@@ -32,7 +32,7 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	r.Blocks = make(map[*string]map[int32]*OffsetFetchResponseBlock, numTopics)
+	r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		name, err := pd.getString()
 		if err != nil {
 		if err != nil {

+ 3 - 3
protocol/offset_request.go

@@ -20,7 +20,7 @@ func (r *offsetRequestBlock) encode(pe packetEncoder) {
 }
 }
 
 
 type OffsetRequest struct {
 type OffsetRequest struct {
-	blocks map[*string]map[int32]*offsetRequestBlock
+	blocks map[string]map[int32]*offsetRequestBlock
 }
 }
 
 
 func (r *OffsetRequest) encode(pe packetEncoder) {
 func (r *OffsetRequest) encode(pe packetEncoder) {
@@ -44,9 +44,9 @@ func (r *OffsetRequest) version() int16 {
 	return 0
 	return 0
 }
 }
 
 
-func (r *OffsetRequest) AddBlock(topic *string, partition_id int32, time int64, maxOffsets int32) {
+func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 	if r.blocks == nil {
-		r.blocks = make(map[*string]map[int32]*offsetRequestBlock)
+		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
 	}
 	}
 
 
 	if r.blocks[topic] == nil {
 	if r.blocks[topic] == nil {

+ 2 - 2
protocol/offset_response.go

@@ -17,7 +17,7 @@ func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
 }
 }
 
 
 type OffsetResponse struct {
 type OffsetResponse struct {
-	Blocks map[*string]map[int32]*OffsetResponseBlock
+	Blocks map[string]map[int32]*OffsetResponseBlock
 }
 }
 
 
 func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
 func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
@@ -26,7 +26,7 @@ func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	r.Blocks = make(map[*string]map[int32]*OffsetResponseBlock, numTopics)
+	r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		name, err := pd.getString()
 		if err != nil {
 		if err != nil {

+ 1 - 1
protocol/packet_decoder.go

@@ -16,7 +16,7 @@ type packetDecoder interface {
 
 
 	// misc
 	// misc
 	getError() (KError, error)
 	getError() (KError, error)
-	getString() (*string, error)
+	getString() (string, error)
 	getBytes() ([]byte, error)
 	getBytes() ([]byte, error)
 	getSubset(length int) (packetDecoder, error)
 	getSubset(length int) (packetDecoder, error)
 
 

+ 1 - 1
protocol/packet_encoder.go

@@ -13,7 +13,7 @@ type packetEncoder interface {
 
 
 	// misc
 	// misc
 	putError(in KError)
 	putError(in KError)
-	putString(in *string)
+	putString(in string)
 	putBytes(in []byte)
 	putBytes(in []byte)
 	putRaw(in []byte)
 	putRaw(in []byte)
 
 

+ 3 - 6
protocol/prep_encoder.go

@@ -42,15 +42,12 @@ func (pe *prepEncoder) putError(in KError) {
 	pe.length += 2
 	pe.length += 2
 }
 }
 
 
-func (pe *prepEncoder) putString(in *string) {
+func (pe *prepEncoder) putString(in string) {
 	pe.length += 2
 	pe.length += 2
-	if in == nil {
-		return
-	}
-	if len(*in) > math.MaxInt16 {
+	if len(in) > math.MaxInt16 {
 		pe.err = EncodingError("String too long")
 		pe.err = EncodingError("String too long")
 	} else {
 	} else {
-		pe.length += len(*in)
+		pe.length += len(in)
 	}
 	}
 }
 }
 
 

+ 3 - 3
protocol/produce_request.go

@@ -10,7 +10,7 @@ const (
 type ProduceRequest struct {
 type ProduceRequest struct {
 	ResponseCondition int16
 	ResponseCondition int16
 	Timeout           int32
 	Timeout           int32
-	msgSets           map[*string]map[int32]*MessageSet
+	msgSets           map[string]map[int32]*MessageSet
 }
 }
 
 
 func (p *ProduceRequest) encode(pe packetEncoder) {
 func (p *ProduceRequest) encode(pe packetEncoder) {
@@ -35,9 +35,9 @@ func (p *ProduceRequest) version() int16 {
 	return 0
 	return 0
 }
 }
 
 
-func (p *ProduceRequest) AddMessage(topic *string, partition int32, msg *Message) {
+func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
 	if p.msgSets == nil {
 	if p.msgSets == nil {
-		p.msgSets = make(map[*string]map[int32]*MessageSet)
+		p.msgSets = make(map[string]map[int32]*MessageSet)
 	}
 	}
 
 
 	if p.msgSets[topic] == nil {
 	if p.msgSets[topic] == nil {

+ 3 - 3
protocol/produce_response.go

@@ -20,7 +20,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
 }
 }
 
 
 type ProduceResponse struct {
 type ProduceResponse struct {
-	Blocks map[*string]map[int32]*ProduceResponseBlock
+	Blocks map[string]map[int32]*ProduceResponseBlock
 }
 }
 
 
 func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
@@ -29,7 +29,7 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	pr.Blocks = make(map[*string]map[int32]*ProduceResponseBlock, numTopics)
+	pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		name, err := pd.getString()
 		if err != nil {
 		if err != nil {
@@ -61,7 +61,7 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 	return nil
 	return nil
 }
 }
 
 
-func (pr *ProduceResponse) GetBlock(topic *string, partition int32) *ProduceResponseBlock {
+func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
 	if pr.Blocks == nil {
 	if pr.Blocks == nil {
 		return nil
 		return nil
 	}
 	}

+ 7 - 8
protocol/real_decoder.go

@@ -114,27 +114,26 @@ func (rd *realDecoder) getError() (KError, error) {
 	return KError(val), err
 	return KError(val), err
 }
 }
 
 
-func (rd *realDecoder) getString() (*string, error) {
+func (rd *realDecoder) getString() (string, error) {
 	tmp, err := rd.getInt16()
 	tmp, err := rd.getInt16()
 
 
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return "", err
 	}
 	}
 
 
 	n := int(tmp)
 	n := int(tmp)
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return nil, DecodingError("Negative string length in getString.")
+		return "", DecodingError("Negative string length in getString.")
 	case n == -1:
 	case n == -1:
-		return nil, nil
+		return "", nil
 	case n == 0:
 	case n == 0:
-		return new(string), nil
+		return "", nil
 	case n > rd.remaining():
 	case n > rd.remaining():
-		return nil, DecodingError("String too long in getString.")
+		return "", DecodingError("String too long in getString.")
 	default:
 	default:
-		tmp := new(string)
-		*tmp = string(rd.raw[rd.off : rd.off+n])
+		tmp := string(rd.raw[rd.off : rd.off+n])
 		rd.off += n
 		rd.off += n
 		return tmp, nil
 		return tmp, nil
 	}
 	}

+ 4 - 8
protocol/real_encoder.go

@@ -49,14 +49,10 @@ func (re *realEncoder) putError(in KError) {
 	re.putInt16(int16(in))
 	re.putInt16(int16(in))
 }
 }
 
 
-func (re *realEncoder) putString(in *string) {
-	if in == nil {
-		re.putInt16(-1)
-		return
-	}
-	re.putInt16(int16(len(*in)))
-	copy(re.raw[re.off:], *in)
-	re.off += len(*in)
+func (re *realEncoder) putString(in string) {
+	re.putInt16(int16(len(in)))
+	copy(re.raw[re.off:], in)
+	re.off += len(in)
 }
 }
 
 
 func (re *realEncoder) putBytes(in []byte) {
 func (re *realEncoder) putBytes(in []byte) {

+ 1 - 1
protocol/request.go

@@ -8,7 +8,7 @@ type requestEncoder interface {
 
 
 type request struct {
 type request struct {
 	correlation_id int32
 	correlation_id int32
-	id             *string
+	id             string
 	body           requestEncoder
 	body           requestEncoder
 }
 }