* try to get peers to scale up to speed faster.

* remove unnecessary steps when receiving block data from peers.
This commit is contained in:
Charles Kerr 2007-11-09 01:22:15 +00:00
parent 6ee138e0fd
commit 99ea26c1c9
2 changed files with 91 additions and 133 deletions

View File

@ -39,9 +39,6 @@
***
**/
#define MAX_ALLOWED_SET_COUNT 10 /* number of pieces generated for allow-fast,
threshold for fast-allowing others */
enum
{
BT_CHOKE = 0,
@ -67,17 +64,22 @@ enum
MAX_REQUEST_BYTE_COUNT = (16 * 1024), /* drop requests who want too much */
KEEPALIVE_INTERVAL_SECS = 90, /* idle seconds before we send a keepalive */
/* idle seconds before we send a keepalive */
KEEPALIVE_INTERVAL_SECS = 90,
PEX_INTERVAL = (60 * 1000), /* msec between calls to sendPex() */
PEER_PULSE_INTERVAL = (100), /* msec between calls to pulse() */
RATE_PULSE_INTERVAL = (333), /* msec between calls to ratePulse() */
/* number of pieces generated for allow-fast,
threshold for fast-allowing others */
MAX_ALLOWED_SET_COUNT = 10
};
enum
{
AWAITING_BT_LENGTH,
AWAITING_BT_MESSAGE,
READING_BT_PIECE
AWAITING_BT_MESSAGE
};
struct peer_request
@ -91,11 +93,12 @@ struct peer_request
static int
compareRequest( const void * va, const void * vb )
{
struct peer_request * a = (struct peer_request*) va;
struct peer_request * b = (struct peer_request*) vb;
if( a->index != b->index ) return a->index - b->index;
if( a->offset != b->offset ) return a->offset - b->offset;
if( a->length != b->length ) return a->length - b->length;
int i;
const struct peer_request * a = va;
const struct peer_request * b = vb;
if(( i = tr_compareUint32( a->index, b->index ))) return i;
if(( i = tr_compareUint32( a->offset, b->offset ))) return i;
if(( i = tr_compareUint32( a->length, b->length ))) return i;
return 0;
}
@ -111,7 +114,6 @@ struct tr_peermsgs
struct evbuffer * outBlock; /* buffer of all the current piece message */
struct evbuffer * outMessages; /* buffer of all the non-piece messages */
struct evbuffer * inBlock; /* the block we're currently receiving */
tr_list * peerAskedFor;
tr_list * peerAskedForFast;
tr_list * clientAskedFor;
@ -121,8 +123,6 @@ struct tr_peermsgs
tr_timer * pulseTimer;
tr_timer * pexTimer;
struct peer_request blockToUs; /* the block currntly being sent to us */
time_t lastReqAddedAt;
time_t clientSentPexAt;
time_t clientSentAnythingAt;
@ -282,13 +282,13 @@ fireClientHave( tr_peermsgs * msgs, uint32_t pieceIndex )
}
static void
fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t length )
fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peermsgs_event e = blankEvent;
e.eventType = TR_PEERMSG_CLIENT_BLOCK;
e.pieceIndex = pieceIndex;
e.offset = offset;
e.length = length;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
publish( msgs, &e );
}
@ -432,13 +432,12 @@ sendFastSuggest( tr_peermsgs * msgs,
}
#endif
static void
sendFastHave( tr_peermsgs * msgs,
int all)
sendFastHave( tr_peermsgs * msgs, int all )
{
dbgmsg( msgs, "w00t telling them we %s pieces", (all ? "HAVE_ALL" : "HAVE_NONE" ) );
dbgmsg( msgs, "w00t telling them we have %s pieces", (all ? "ALL" : "NONE" ) );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL : BT_HAVE_NONE ) );
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, ( all ? BT_HAVE_ALL
: BT_HAVE_NONE ) );
updateInterest( msgs );
}
@ -491,7 +490,10 @@ sendFastAllowedSet( tr_peermsgs * msgs )
**/
static int
reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
reqIsValid( const tr_peermsgs * msgs,
uint32_t index,
uint32_t offset,
uint32_t length )
{
const tr_torrent * tor = msgs->torrent;
@ -528,10 +530,10 @@ pumpRequestQueue( tr_peermsgs * msgs )
while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
{
struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
protocolSendRequest( msgs, req );
req->time_requested = msgs->lastReqAddedAt = time( NULL );
tr_list_append( &msgs->clientAskedFor, req );
struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
protocolSendRequest( msgs, r );
r->time_requested = msgs->lastReqAddedAt = time( NULL );
tr_list_append( &msgs->clientAskedFor, r );
++count;
++sent;
}
@ -889,7 +891,7 @@ peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
dbgmsg( msgs, "rejecting request for a piece we don't have." );
sendFastReject( msgs, req->index, req->offset, req->length );
}
else if( peerIsChoked && !peerIsFast ) /* maybe he doesn't know he's choked? */
else if( peerIsChoked && !peerIsFast ) /* doesn't he know he's choked? */
{
tr_peerMsgsSetChoke( msgs, 1 );
sendFastReject( msgs, req->index, req->offset, req->length );
@ -949,6 +951,8 @@ messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
}
}
static void
clientGotBlock( tr_peermsgs * msgs, struct evbuffer * inbuf, const struct peer_request * req );
static int
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
@ -1037,16 +1041,15 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
break;
}
case BT_PIECE:
dbgmsg( msgs, "got a Piece!" );
assert( msgs->blockToUs.length == 0 );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
msgs->blockToUs.length = msglen - 8;
assert( EVBUFFER_LENGTH(msgs->inBlock) == 0 );
msgs->state = msgs->blockToUs.length ? READING_BT_PIECE : AWAITING_BT_LENGTH;
return READ_AGAIN;
case BT_PIECE: {
struct peer_request req;
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
req.length = msglen - 8;
dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length );
clientGotBlock( msgs, inbuf, &req );
break;
}
case BT_PORT:
dbgmsg( msgs, "Got a BT_PORT" );
@ -1169,16 +1172,13 @@ gotBadPiece( tr_peermsgs * msgs, uint32_t pieceIndex )
}
static void
gotUnwantedBlock( tr_peermsgs * msgs,
uint32_t index UNUSED,
uint32_t offset UNUSED,
uint32_t length )
clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
{
reassignBytesToCorrupt( msgs, length );
reassignBytesToCorrupt( msgs, req->length );
}
static void
addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
{
if( !msgs->info->blame )
msgs->info->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
@ -1186,34 +1186,38 @@ addUsToBlamefield( tr_peermsgs * msgs, uint32_t index )
}
static void
gotBlock( tr_peermsgs * msgs,
struct evbuffer * inbuf,
uint32_t index,
uint32_t offset,
uint32_t length )
clientGotBlock( tr_peermsgs * msgs, struct evbuffer * inbuf, const struct peer_request * req )
{
int i;
uint8_t * data;
tr_torrent * tor = msgs->torrent;
const int block = _tr_block( tor, index, offset );
struct peer_request key, *req;
const int block = _tr_block( tor, req->index, req->offset );
struct peer_request *myreq;
assert( msgs != NULL );
assert( inbuf != NULL );
assert( req != NULL );
assert( req->length > 0 );
assert( EVBUFFER_LENGTH( inbuf ) >= req->length );
assert( req->length == (uint32_t)tr_torBlockCountBytes( msgs->torrent, block ) );
/* save the block */
dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
/**
*** Remove the block from our `we asked for this' list
**/
key.index = index;
key.offset = offset;
key.length = length;
req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
compareRequest );
if( req == NULL ) {
gotUnwantedBlock( msgs, index, offset, length );
myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
if( myreq == NULL ) {
clientGotUnwantedBlock( msgs, req );
dbgmsg( msgs, "we didn't ask for this message..." );
return;
}
dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)",
req->index, req->offset, req->length,
(int)(time(NULL) - req->time_requested) );
tr_free( req );
myreq->index, myreq->offset, myreq->length,
(int)(time(NULL) - myreq->time_requested) );
dbgmsg( msgs, "peer has %d more blocks we've asked for",
tr_list_size(msgs->clientAskedFor));
@ -1222,99 +1226,55 @@ gotBlock( tr_peermsgs * msgs,
**/
if( tr_cpBlockIsComplete( tor->completion, block ) ) {
dbgmsg( msgs, "have this block already..." );
tr_dbg( "have this block already..." );
gotUnwantedBlock( msgs, index, offset, length );
return;
}
if( (int)length != tr_torBlockCountBytes( tor, block ) ) {
dbgmsg( msgs, "block is the wrong length..." );
tr_dbg( "block is the wrong length..." );
gotUnwantedBlock( msgs, index, offset, length );
dbgmsg( msgs, "we have this block already..." );
clientGotUnwantedBlock( msgs, req );
return;
}
/**
*** Write the block
*** Save the block
**/
if( tr_ioWrite( tor, index, offset, length, EVBUFFER_DATA( inbuf )))
data = tr_new( uint8_t, req->length );
tr_peerIoReadBytes( msgs->io, inbuf, data, req->length );
msgs->info->peerSentPieceDataAt = time( NULL );
clientGotBytes( msgs, req->length );
i = tr_ioWrite( tor, req->index, req->offset, req->length, data );
tr_free( data );
if( i )
return;
#warning this sanity check is here to help track down the excess corrupt data bug, but is expensive and should be removed before the next release
{
uint8_t * tmp = tr_new( uint8_t, length );
const int val = tr_ioRead( tor, index, offset, length, tmp );
uint8_t * check = tr_new( uint8_t, req->length );
const int val = tr_ioRead( tor, req->index, req->offset, req->length, check );
assert( !val );
assert( !memcmp( tmp, EVBUFFER_DATA(inbuf), length ) );
tr_free( tmp );
assert( !memcmp( check, data, req->length ) );
tr_free( check );
}
tr_cpBlockAdd( tor->completion, block );
addUsToBlamefield( msgs, index );
addPeerToBlamefield( msgs, req->index );
fireGotBlock( msgs, index, offset, length );
fireGotBlock( msgs, req );
/**
*** Handle if this was the last block in the piece
**/
if( tr_cpPieceIsComplete( tor->completion, index ) )
if( tr_cpPieceIsComplete( tor->completion, req->index ) )
{
if( tr_ioHash( tor, index ) )
if( tr_ioHash( tor, req->index ) )
{
gotBadPiece( msgs, index );
gotBadPiece( msgs, req->index );
return;
}
fireClientHave( msgs, index );
fireClientHave( msgs, req->index );
}
}
static ReadState
readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
{
uint32_t inlen;
uint8_t * tmp;
assert( msgs != NULL );
assert( msgs->blockToUs.length > 0 );
assert( inbuf != NULL );
assert( EVBUFFER_LENGTH( inbuf ) > 0 );
/* read from the inbuf into our block buffer */
inlen = MIN( EVBUFFER_LENGTH(inbuf), msgs->blockToUs.length );
tmp = tr_new( uint8_t, inlen );
tr_peerIoReadBytes( msgs->io, inbuf, tmp, inlen );
evbuffer_add( msgs->inBlock, tmp, inlen );
/* update our tables accordingly */
assert( inlen >= msgs->blockToUs.length );
msgs->blockToUs.length -= inlen;
msgs->info->peerSentPieceDataAt = time( NULL );
clientGotBytes( msgs, inlen );
/* if this was the entire block, save it */
if( !msgs->blockToUs.length )
{
dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
gotBlock( msgs, msgs->inBlock,
msgs->blockToUs.index,
msgs->blockToUs.offset,
EVBUFFER_LENGTH( msgs->inBlock ) );
evbuffer_drain( msgs->inBlock, EVBUFFER_LENGTH( msgs->inBlock ) );
msgs->state = AWAITING_BT_LENGTH;
}
/* cleanup */
tr_free( tmp );
return READ_AGAIN;
}
static ReadState
canRead( struct bufferevent * evin, void * vmsgs )
{
@ -1332,7 +1292,6 @@ canRead( struct bufferevent * evin, void * vmsgs )
{
case AWAITING_BT_LENGTH: ret = readBtLength ( msgs, inbuf ); break;
case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
case READING_BT_PIECE: ret = readBtPiece ( msgs, inbuf ); break;
default: assert( 0 );
}
@ -1370,7 +1329,8 @@ getUploadMax( const tr_peermsgs * msgs )
return 0;
if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL )
return tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval;
return tor->handle->useUploadLimit
? tr_rcBytesLeft( tor->handle->upload ) : maxval;
if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE )
return tr_rcBytesLeft( tor->upload );
@ -1384,8 +1344,8 @@ ratePulse( void * vmsgs )
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
msgs->minActiveRequests = msgs->maxActiveRequests / 2;
msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/5), 100 );
msgs->minActiveRequests = msgs->maxActiveRequests / 3;
return TRUE;
}
@ -1676,7 +1636,6 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
m->outMessages = evbuffer_new( );
m->outBlock = evbuffer_new( );
m->inBlock = evbuffer_new( );
m->peerAllowedPieces = NULL;
m->clientAllowedPieces = NULL;
*setme = tr_publisherSubscribe( m->publisher, func, userData );
@ -1696,7 +1655,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
}
tr_peerIoSetTimeoutSecs( m->io, 150 ); /* error if we don't read or write for 2.5 minutes */
tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
ratePulse( m );
@ -1741,7 +1700,6 @@ tr_peerMsgsFree( tr_peermsgs* msgs )
tr_list_free( &msgs->peerAskedFor, tr_free );
evbuffer_free( msgs->outMessages );
evbuffer_free( msgs->outBlock );
evbuffer_free( msgs->inBlock );
tr_free( msgs->pex );
msgs->pexCount = 0;
tr_free( msgs );

View File

@ -228,7 +228,7 @@ tr_eventInit( tr_handle * handle )
eh = tr_new0( tr_event_handle, 1 );
eh->lock = tr_lockNew( );
eh->h = handle;
eh->pulseInterval = timevalMsec( 50 );
eh->pulseInterval = timevalMsec( 20 );
eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
}