Ticket #398 (peer-msgs.c:905: failed assertion `msglen == 0' in 0.90)

This commit is contained in:
Charles Kerr 2007-10-27 15:45:03 +00:00
parent 83df860896
commit b00ee3e568
1 changed files with 248 additions and 230 deletions

View File

@ -113,6 +113,7 @@ struct tr_peermsgs
struct evbuffer * outMessages; /* buffer of all the non-piece messages */
struct evbuffer * inBlock; /* the block we're currently receiving */
tr_list * peerAskedFor;
tr_list * peerAskedForFast;
tr_list * clientAskedFor;
tr_list * clientWillAskFor;
@ -393,6 +394,12 @@ updateInterest( tr_peermsgs * msgs )
#define MIN_CHOKE_PERIOD_SEC 10
static void
cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
{
tr_list_free( &msgs->peerAskedFor, tr_free );
}
void
tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
{
@ -409,24 +416,8 @@ tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
else if( msgs->info->peerIsChoked != choke )
{
msgs->info->peerIsChoked = choke;
if( choke )
{
tr_list * walk;
for( walk = msgs->peerAskedFor; walk != NULL; )
{
tr_list * next = walk->next;
/* don't reject a peer's fast allowed requests at choke */
struct peer_request *req = walk->data;
if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
{
tr_list_remove_data( &msgs->peerAskedFor, req );
tr_free( req );
}
walk = next;
}
}
cancelAllRequestsToClientExceptFast( msgs );
protocolSendChoke( msgs, choke );
msgs->info->chokeChangedAt = time( NULL );
}
@ -477,14 +468,16 @@ sendFastReject( tr_peermsgs * msgs,
{
assert( msgs != NULL );
assert( length > 0 );
/* reject the request */
const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
if( tr_peerIoSupportsFEXT( msgs->io ) )
{
const uint32_t len = sizeof(uint8_t) + 3 * sizeof(uint32_t);
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, len );
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_REJECT );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
}
}
static void
@ -534,7 +527,7 @@ reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t
}
static int
requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
{
return reqIsValid( msgs, req->index, req->offset, req->length );
}
@ -630,7 +623,7 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
}
static void
tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
cancelAllRequestsToPeer( tr_peermsgs * msgs )
{
struct peer_request * req;
@ -884,6 +877,98 @@ updatePeerProgress( tr_peermsgs * msgs )
firePeerProgress( msgs );
}
static int
clientCanSendFastBlock( const tr_peermsgs * msgs UNUSED )
{
/* FIXME(tiennou): base this on how many blocks we've already sent this
* peer, or maybe how many fast blocks per minute we've sent overall,
* or maybe how much bandwidth we're already using up w/o fast peers.
* I don't know what the Right Thing here is, but
* the previous measurement of how many pieces we have is not enough. */
return FALSE;
}
static void
peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
{
const int reqIsValid = requestIsValid( msgs, req );
const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( msgs->torrent->completion, req->index );
const int peerIsChoked = msgs->info->peerIsChoked;
const int peerIsFast = tr_peerIoSupportsFEXT( msgs->io );
const int pieceIsFast = reqIsValid && tr_bitfieldHas( msgs->peerAllowedPieces, req->index );
const int canSendFast = clientCanSendFastBlock( msgs );
if( !reqIsValid ) /* bad request */
{
dbgmsg( msgs, "rejecting an invalid request." );
sendFastReject( msgs, req->index, req->offset, req->length );
}
else if( !clientHasPiece ) /* we don't have it */
{
dbgmsg( msgs, "rejecting request for a piece we don't have." );
sendFastReject( msgs, req->index, req->offset, req->length );
}
else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
{
tr_peerMsgsSetChoke( msgs, 1 );
sendFastReject( msgs, req->index, req->offset, req->length );
}
else if( peerIsChoked && peerIsFast && ( !pieceIsFast || !canSendFast ) )
{
sendFastReject( msgs, req->index, req->offset, req->length );
}
else /* YAY */
{
struct peer_request * tmp = tr_new( struct peer_request, 1 );
*tmp = *req;
if( peerIsFast && pieceIsFast )
tr_list_append( &msgs->peerAskedForFast, tmp );
else
tr_list_append( &msgs->peerAskedFor, tmp );
}
}
static int
messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
{
switch( id )
{
case BT_CHOKE:
case BT_UNCHOKE:
case BT_INTERESTED:
case BT_NOT_INTERESTED:
case BT_HAVE_ALL:
case BT_HAVE_NONE:
return len==1;
case BT_HAVE:
case BT_SUGGEST:
case BT_ALLOWED_FAST:
return len==5;
case BT_BITFIELD:
return len == (msg->torrent->info.pieceCount+7u)/8u + 1u;
case BT_REQUEST:
case BT_CANCEL:
case BT_REJECT:
return len==13;
case BT_PIECE:
return len > 9;
case BT_PORT:
return len==3;
case BT_LTEP:
return len >= 2;
default:
return FALSE;
}
}
static int
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
{
@ -895,218 +980,144 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
return READ_MORE;
tr_peerIoReadUint8( msgs->io, inbuf, &id );
msglen--;
dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
if( !messageLengthIsCorrect( msgs, id, msglen ) )
{
fireGotError( msgs );
return READ_DONE;
}
--msglen;
switch( id )
{
case BT_CHOKE:
dbgmsg( msgs, "got Choke" );
assert( msglen == 0 );
msgs->info->clientIsChoked = 1;
#if 0
tr_list * walk;
for( walk = msgs->peerAskedFor; walk != NULL; )
{
tr_list * next = walk->next;
/* We shouldn't reject a peer's fast allowed requests at choke */
struct peer_request *req = walk->data;
if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index ) )
{
tr_list_remove_data( &msgs->peerAskedFor, req );
tr_free( req );
}
walk = next;
}
#endif
tr_peerMsgsCancelAllRequests( msgs );
break;
case BT_CHOKE:
dbgmsg( msgs, "got Choke" );
msgs->info->clientIsChoked = 1;
cancelAllRequestsToPeer( msgs );
cancelAllRequestsToClientExceptFast( msgs );
break;
case BT_UNCHOKE:
dbgmsg( msgs, "got Unchoke" );
assert( msglen == 0 );
msgs->info->clientIsChoked = 0;
fireNeedReq( msgs );
break;
case BT_UNCHOKE:
dbgmsg( msgs, "got Unchoke" );
msgs->info->clientIsChoked = 0;
fireNeedReq( msgs );
break;
case BT_INTERESTED:
dbgmsg( msgs, "got Interested" );
assert( msglen == 0 );
msgs->info->peerIsInterested = 1;
tr_peerMsgsSetChoke( msgs, 0 );
break;
case BT_INTERESTED:
dbgmsg( msgs, "got Interested" );
msgs->info->peerIsInterested = 1;
tr_peerMsgsSetChoke( msgs, 0 );
break;
case BT_NOT_INTERESTED:
dbgmsg( msgs, "got Not Interested" );
assert( msglen == 0 );
msgs->info->peerIsInterested = 0;
break;
case BT_NOT_INTERESTED:
dbgmsg( msgs, "got Not Interested" );
msgs->info->peerIsInterested = 0;
break;
case BT_HAVE:
assert( msglen == 4 );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
tr_bitfieldAdd( msgs->info->have, ui32 );
updatePeerProgress( msgs );
tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
dbgmsg( msgs, "got Have: %u", ui32 );
break;
case BT_HAVE:
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
dbgmsg( msgs, "got Have: %u", ui32 );
tr_bitfieldAdd( msgs->info->have, ui32 );
updatePeerProgress( msgs );
tr_rcTransferred( msgs->torrent->swarmspeed, msgs->torrent->info.pieceSize );
break;
case BT_BITFIELD: {
const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
dbgmsg( msgs, "got a bitfield" );
assert( msglen == msgs->info->have->len );
tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
updatePeerProgress( msgs );
tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
fireNeedReq( msgs );
break;
}
case BT_BITFIELD: {
const int clientIsSeed = tr_torrentIsSeed( msgs->torrent );
dbgmsg( msgs, "got a bitfield" );
tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
updatePeerProgress( msgs );
tr_peerMsgsSetChoke( msgs, !clientIsSeed || (msgs->info->progress<1.0) );
fireNeedReq( msgs );
break;
}
case BT_REQUEST: {
struct peer_request * req;
assert( msglen == 12 );
req = tr_new( struct peer_request, 1 );
tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
if ( !requestIsValid( msgs, req ) )
{
dbgmsg( msgs, "BT_REQUEST: invalid request, ignoring" );
tr_free( req );
break;
}
/*
If we're not choking him -> continue
If we're choking him
it doesn't support FPE -> He's deaf, reCHOKE and bail...
it support FPE
If the asked piece is not allowed
OR he's above our threshold
OR we don't have the requested piece -> Reject
Else
Asked piece allowed AND he's below our threshold -> continue...
*/
case BT_REQUEST: {
struct peer_request req;
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
dbgmsg( msgs, "got Request: %u:%u->%u", req.index, req.offset, req.length );
peerMadeRequest( msgs, &req );
break;
}
case BT_CANCEL: {
struct peer_request req;
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
break;
}
case BT_PIECE: {
dbgmsg( msgs, "got a Piece!" );
assert( msgs->blockToUs.length == 0 );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
msgs->blockToUs.length = msglen - 8;
assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
return READ_AGAIN;
break;
}
case BT_PORT: {
dbgmsg( msgs, "Got a BT_PORT" );
tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
break;
}
case BT_SUGGEST: {
/* FIXME(tiennou) */
uint32_t index;
tr_peerIoReadUint32( msgs->io, inbuf, &index );
break;
}
case BT_HAVE_ALL:
dbgmsg( msgs, "Got a BT_HAVE_ALL" );
tr_bitfieldAddRange( msgs->info->have, 0, msgs->torrent->info.pieceCount );
updatePeerProgress( msgs );
break;
case BT_HAVE_NONE:
dbgmsg( msgs, "Got a BT_HAVE_NONE" );
tr_bitfieldClear( msgs->info->have );
updatePeerProgress( msgs );
break;
case BT_REJECT: {
struct peer_request req;
dbgmsg( msgs, "Got a BT_REJECT" );
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
break;
}
case BT_ALLOWED_FAST: {
dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
break;
}
case BT_LTEP:
dbgmsg( msgs, "Got a BT_LTEP" );
parseLtep( msgs, msglen, inbuf );
break;
if ( msgs->info->peerIsChoked )
{
if ( !tr_peerIoSupportsFEXT( msgs->io ) )
{
dbgmsg( msgs, "BT_REQUEST: peer is choked, ignoring" );
/* Didn't he get it? */
tr_peerMsgsSetChoke( msgs, 1 );
tr_free( req );
break;
}
else
{
if ( !tr_bitfieldHas( msgs->peerAllowedPieces, req->index )
|| ( msgs->info->progress * (float)msgs->torrent->info.pieceCount) >= MAX_ALLOWED_SET_COUNT
|| !tr_cpPieceIsComplete( msgs->torrent->completion, req->index ) )
{
dbgmsg( msgs, "BT_REQUEST: peer requests an un-fastallowed piece" );
sendFastReject( msgs, req->index, req->offset, req->length );
tr_free( req );
break;
}
dbgmsg( msgs, "BT_REQUEST: fast allowed piece, accepting request" );
}
}
tr_list_append( &msgs->peerAskedFor, req );
break;
}
case BT_CANCEL: {
struct peer_request req;
void * data;
assert( msglen == 12 );
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
tr_free( data );
break;
}
case BT_PIECE: {
dbgmsg( msgs, "got a Piece!" );
assert( msgs->blockToUs.length == 0 );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
msgs->blockToUs.length = msglen - 8;
assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
return READ_AGAIN;
break;
}
case BT_PORT: {
dbgmsg( msgs, "Got a BT_PORT" );
assert( msglen == 2 );
tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
break;
}
case BT_SUGGEST: {
/* tiennou TODO */
break;
}
case BT_HAVE_ALL: {
assert( msglen == 0 );
dbgmsg( msgs, "Got a BT_HAVE_ALL" );
memset( msgs->info->have->bits, 1, msgs->info->have->len );
updatePeerProgress( msgs );
break;
}
case BT_HAVE_NONE: {
assert( msglen == 0 );
dbgmsg( msgs, "Got a BT_HAVE_NONE" );
tr_bitfieldClear( msgs->info->have );
updatePeerProgress( msgs );
break;
}
case BT_REJECT: {
struct peer_request req;
tr_list * node;
assert( msglen == 12 );
dbgmsg( msgs, "Got a BT_REJECT" );
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
if( node != NULL ) {
void * data = node->data;
tr_list_remove_data( &msgs->peerAskedFor, data );
tr_free( data );
dbgmsg( msgs, "found the req that peer has rejected... cancelled." );
}
break;
}
case BT_ALLOWED_FAST: {
assert( msglen == 4 );
dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
break;
}
case BT_LTEP:
dbgmsg( msgs, "Got a BT_LTEP" );
parseLtep( msgs, msglen, inbuf );
break;
default:
dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
tr_peerIoDrain( msgs->io, inbuf, msglen );
assert( 0 );
default:
dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
tr_peerIoDrain( msgs->io, inbuf, msglen );
break;
}
msgs->incomingMessageLength = -1;
@ -1411,12 +1422,18 @@ pulse( void * vmsgs )
tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
msgs->clientSentAnythingAt = now;
}
else if(( msgs->peerAskedFor ))
else if( msgs->peerAskedForFast || msgs->peerAskedFor )
{
if( canUpload( msgs ) )
{
struct peer_request * r = tr_list_pop_front( &msgs->peerAskedFor );
uint8_t * buf = tr_new( uint8_t, r->length );
struct peer_request * r;
uint8_t * buf;
r = tr_list_pop_front( &msgs->peerAskedForFast );
if( r == NULL )
r = tr_list_pop_front( &msgs->peerAskedFor);
buf = tr_new( uint8_t, r->length );
if( requestIsValid( msgs, r )
&& tr_cpPieceIsComplete( msgs->torrent->completion, r->index )
@ -1703,6 +1720,7 @@ tr_peerMsgsFree( tr_peermsgs* msgs )
tr_publisherFree( &msgs->publisher );
tr_list_free( &msgs->clientWillAskFor, tr_free );
tr_list_free( &msgs->clientAskedFor, tr_free );
tr_list_free( &msgs->peerAskedForFast, tr_free );
tr_list_free( &msgs->peerAskedFor, tr_free );
evbuffer_free( msgs->outMessages );
evbuffer_free( msgs->inBlock );