|
@@ -142,6 +142,30 @@ func (b *Broker) Fetch(clientID *string, request *FetchRequest) (*FetchResponse,
|
|
|
return response, nil
|
|
return response, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (b *Broker) CommitOffset(clientID *string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
|
|
|
|
|
+ response := new(OffsetCommitResponse)
|
|
|
|
|
+
|
|
|
|
|
+ err := b.sendAndReceive(clientID, request, response)
|
|
|
|
|
+
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return response, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (b *Broker) FetchOffset(clientID *string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
|
|
|
|
|
+ response := new(OffsetFetchResponse)
|
|
|
|
|
+
|
|
|
|
|
+ err := b.sendAndReceive(clientID, request, response)
|
|
|
|
|
+
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return response, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
|
|
func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
|
|
|
b.lock.Lock()
|
|
b.lock.Lock()
|
|
|
defer b.lock.Unlock()
|
|
defer b.lock.Unlock()
|