2017-08-05 18:17:15 +00:00
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"fmt"
"math"
"strings"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// maxPayload is the maximum number of bytes to devote to actual ids in
// acknowledgement or modifyAckDeadline requests. A serialized
// AcknowledgeRequest proto has a small constant overhead, plus the size of the
// subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A
// ModifyAckDeadlineRequest has an additional few bytes for the deadline. We
// don't know the subscription name here, so we just assume the size exclusive
// of ids is 100 bytes.
//
// With gRPC there is no way for the client to know the server's max message size (it is
// configurable on the server). We know from experience that it
// it 512K.
const (
maxPayload = 512 * 1024
reqFixedOverhead = 100
overheadPerID = 3
maxSendRecvBytes = 20 * 1024 * 1024 // 20M
)
func convertMessages ( rms [ ] * pb . ReceivedMessage ) ( [ ] * Message , error ) {
msgs := make ( [ ] * Message , 0 , len ( rms ) )
for i , m := range rms {
msg , err := toMessage ( m )
if err != nil {
return nil , fmt . Errorf ( "pubsub: cannot decode the retrieved message at index: %d, message: %+v" , i , m )
}
msgs = append ( msgs , msg )
}
return msgs , nil
}
func trunc32 ( i int64 ) int32 {
if i > math . MaxInt32 {
i = math . MaxInt32
}
return int32 ( i )
}
2018-01-23 18:40:42 +00:00
// func newStreamingPuller(ctx context.Context, subc *vkit.SubscriberClient, subName string, ackDeadlineSecs int32) *streamingPuller {
// p := &streamingPuller{
// ctx: ctx,
// subName: subName,
// ackDeadlineSecs: ackDeadlineSecs,
// subc: subc,
// }
// p.c = sync.NewCond(&p.mu)
// return p
// }
// type streamingPuller struct {
// ctx context.Context
// subName string
// ackDeadlineSecs int32
// subc *vkit.SubscriberClient
// mu sync.Mutex
// c *sync.Cond
// inFlight bool
// closed bool // set after CloseSend called
// spc pb.Subscriber_StreamingPullClient
// err error
// }
// // open establishes (or re-establishes) a stream for pulling messages.
// // It takes care that only one RPC is in flight at a time.
// func (p *streamingPuller) open() error {
// p.c.L.Lock()
// defer p.c.L.Unlock()
// p.openLocked()
// return p.err
// }
// func (p *streamingPuller) openLocked() {
// if p.inFlight {
// // Another goroutine is opening; wait for it.
// for p.inFlight {
// p.c.Wait()
// }
// return
// }
// // No opens in flight; start one.
// // Keep the lock held, to avoid a race where we
// // close the old stream while opening a new one.
// p.inFlight = true
// spc, err := p.subc.StreamingPull(p.ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
// if err == nil {
// err = spc.Send(&pb.StreamingPullRequest{
// Subscription: p.subName,
// StreamAckDeadlineSeconds: p.ackDeadlineSecs,
// })
// }
// p.spc = spc
// p.err = err
// p.inFlight = false
// p.c.Broadcast()
// }
// func (p *streamingPuller) call(f func(pb.Subscriber_StreamingPullClient) error) error {
// p.c.L.Lock()
// defer p.c.L.Unlock()
// // Wait for an open in flight.
// for p.inFlight {
// p.c.Wait()
// }
// var err error
// var bo gax.Backoff
// for {
// select {
// case <-p.ctx.Done():
// p.err = p.ctx.Err()
// default:
// }
// if p.err != nil {
// return p.err
// }
// spc := p.spc
// // Do not call f with the lock held. Only one goroutine calls Send
// // (streamingMessageIterator.sender) and only one calls Recv
// // (streamingMessageIterator.receiver). If we locked, then a
// // blocked Recv would prevent a Send from happening.
// p.c.L.Unlock()
// err = f(spc)
// p.c.L.Lock()
// if !p.closed && err != nil && isRetryable(err) {
// // Sleep with exponential backoff. Normally we wouldn't hold the lock while sleeping,
// // but here it can't do any harm, since the stream is broken anyway.
// gax.Sleep(p.ctx, bo.Pause())
// p.openLocked()
// continue
// }
// // Not an error, or not a retryable error; stop retrying.
// p.err = err
// return err
// }
// }
2017-08-05 18:17:15 +00:00
// Logic from https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java.
func isRetryable ( err error ) bool {
s , ok := status . FromError ( err )
if ! ok { // includes io.EOF, normal stream close, which causes us to reopen
return true
}
switch s . Code ( ) {
case codes . DeadlineExceeded , codes . Internal , codes . Canceled , codes . ResourceExhausted :
return true
case codes . Unavailable :
return ! strings . Contains ( s . Message ( ) , "Server shutdownNow invoked" )
default :
return false
}
}
2018-01-23 18:40:42 +00:00
// func (p *streamingPuller) fetchMessages() ([]*Message, error) {
// var res *pb.StreamingPullResponse
// err := p.call(func(spc pb.Subscriber_StreamingPullClient) error {
// var err error
// res, err = spc.Recv()
// return err
// })
// if err != nil {
// return nil, err
// }
// return convertMessages(res.ReceivedMessages)
// }
// func (p *streamingPuller) send(req *pb.StreamingPullRequest) error {
// // Note: len(modAckIDs) == len(modSecs)
// var rest *pb.StreamingPullRequest
// for len(req.AckIds) > 0 || len(req.ModifyDeadlineAckIds) > 0 {
// req, rest = splitRequest(req, maxPayload)
// err := p.call(func(spc pb.Subscriber_StreamingPullClient) error {
// x := spc.Send(req)
// return x
// })
// if err != nil {
// return err
// }
// req = rest
// }
// return nil
// }
// func (p *streamingPuller) closeSend() {
// p.mu.Lock()
// p.closed = true
// p.spc.CloseSend()
// p.mu.Unlock()
// }
2017-08-05 18:17:15 +00:00
// Split req into a prefix that is smaller than maxSize, and a remainder.
func splitRequest ( req * pb . StreamingPullRequest , maxSize int ) ( prefix , remainder * pb . StreamingPullRequest ) {
const int32Bytes = 4
// Copy all fields before splitting the variable-sized ones.
remainder = & pb . StreamingPullRequest { }
* remainder = * req
// Split message so it isn't too big.
size := reqFixedOverhead
i := 0
for size < maxSize && ( i < len ( req . AckIds ) || i < len ( req . ModifyDeadlineAckIds ) ) {
if i < len ( req . AckIds ) {
size += overheadPerID + len ( req . AckIds [ i ] )
}
if i < len ( req . ModifyDeadlineAckIds ) {
size += overheadPerID + len ( req . ModifyDeadlineAckIds [ i ] ) + int32Bytes
}
i ++
}
min := func ( a , b int ) int {
if a < b {
return a
}
return b
}
j := i
if size > maxSize {
j --
}
k := min ( j , len ( req . AckIds ) )
remainder . AckIds = req . AckIds [ k : ]
req . AckIds = req . AckIds [ : k ]
k = min ( j , len ( req . ModifyDeadlineAckIds ) )
remainder . ModifyDeadlineAckIds = req . ModifyDeadlineAckIds [ k : ]
remainder . ModifyDeadlineSeconds = req . ModifyDeadlineSeconds [ k : ]
req . ModifyDeadlineAckIds = req . ModifyDeadlineAckIds [ : k ]
req . ModifyDeadlineSeconds = req . ModifyDeadlineSeconds [ : k ]
return req , remainder
}