progress on the "speed limits kill my transfer rate" bug.

This commit is contained in:
Charles Kerr 2007-11-16 20:40:03 +00:00
parent d2a5dd50d3
commit 0ac302b302
4 changed files with 136 additions and 65 deletions

View File

@ -1030,7 +1030,6 @@ tr_handshakeNew( tr_peerIo * io,
handshake->doneUserData = doneUserData;
handshake->handle = tr_peerIoGetHandle( io );
tr_peerIoSetIOMode( handshake->io, EV_READ|EV_WRITE, 0 );
tr_peerIoSetIOFuncs( handshake->io, canRead, NULL, gotError, handshake );
if( tr_peerIoIsIncoming( handshake->io ) )

View File

@ -98,7 +98,6 @@ canReadWrapper( struct bufferevent * e, void * userData )
if( EVBUFFER_LENGTH( e->input ) )
continue;
case READ_MORE:
tr_peerIoSetIOMode( c, EV_READ, 0 );
case READ_DONE:
done = 1;
}
@ -119,6 +118,8 @@ gotErrorWrapper( struct bufferevent * e, short what, void * userData )
***
**/
void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
static tr_peerIo*
tr_peerIoNew( struct tr_handle * handle,
const struct in_addr * in_addr,
@ -142,6 +143,8 @@ tr_peerIoNew( struct tr_handle * handle,
c );
bufferevent_settimeout( c->bufev, IO_TIMEOUT_SECS, IO_TIMEOUT_SECS );
bufferevent_enable( c->bufev, EV_READ|EV_WRITE );
bufferevent_setwatermark( c->bufev, EV_READ, 0, 1024 );
return c;
}
@ -233,6 +236,13 @@ tr_peerIoGetAddrStr( const tr_peerIo * io )
return tr_peerIoAddrStr( &io->in_addr, io->port );
}
void
tr_peerIoTryRead( tr_peerIo * io )
{
if( EVBUFFER_LENGTH( io->bufev->input ) )
canReadWrapper( io->bufev, io );
}
void
tr_peerIoSetIOFuncs( tr_peerIo * io,
tr_can_read_cb readcb,
@ -245,16 +255,7 @@ tr_peerIoSetIOFuncs( tr_peerIo * io,
io->gotError = errcb;
io->userData = userData;
if( EVBUFFER_LENGTH( io->bufev->input ) )
canReadWrapper( io->bufev, io );
}
void
tr_peerIoSetIOMode( tr_peerIo * io, short enable, short disable )
{
assert( tr_amInEventThread( io->handle ) );
bufferevent_enable( io->bufev, enable );
bufferevent_disable( io->bufev, disable );
tr_peerIoTryRead( io );
}
int
@ -284,6 +285,7 @@ tr_peerIoReconnect( tr_peerIo * io )
io );
bufferevent_settimeout( io->bufev, IO_TIMEOUT_SECS, IO_TIMEOUT_SECS );
bufferevent_enable( io->bufev, EV_READ|EV_WRITE );
bufferevent_setwatermark( io->bufev, EV_READ, 0, 1024 );
return 0;
}

View File

@ -108,12 +108,10 @@ void tr_peerIoSetIOFuncs( tr_peerIo * io,
tr_net_error_cb errcb,
void * user_data );
void tr_peerIoSetIOMode ( tr_peerIo * io,
short enable_mode,
short disable_mode );
size_t tr_peerIoWriteBytesWaiting( const tr_peerIo * io );
void tr_peerIoTryRead( tr_peerIo * io );
void tr_peerIoWrite( tr_peerIo * io,
const void * writeme,
int writeme_len );

View File

@ -80,7 +80,9 @@ enum
enum
{
AWAITING_BT_LENGTH,
AWAITING_BT_MESSAGE
AWAITING_BT_ID,
AWAITING_BT_MESSAGE,
AWAITING_BT_PIECE
};
struct peer_request
@ -103,6 +105,16 @@ compareRequest( const void * va, const void * vb )
return 0;
}
/* this is raw, unchanged data from the peer regarding
* the current message that it's sending us. */
struct tr_incoming
{
uint32_t length; /* includes the +1 for id length */
uint8_t id;
struct peer_request blockReq; /* metadata for incoming blocks */
struct evbuffer * block; /* piece data for incoming blocks */
};
struct tr_peermsgs
{
tr_peer * info;
@ -141,10 +153,11 @@ struct tr_peermsgs
uint8_t state;
uint8_t ut_pex_id;
uint16_t pexCount;
uint32_t incomingMessageLength;
uint32_t maxActiveRequests;
uint32_t minActiveRequests;
struct tr_incoming incoming;
tr_pex * pex;
};
@ -864,9 +877,8 @@ static int
readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
{
uint32_t len;
const size_t needlen = sizeof(uint32_t);
if( EVBUFFER_LENGTH(inbuf) < needlen )
if( EVBUFFER_LENGTH(inbuf) < sizeof(len) )
return READ_MORE;
tr_peerIoReadUint32( msgs->io, inbuf, &len );
@ -874,13 +886,41 @@ readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
if( len == 0 ) /* peer sent us a keepalive message */
dbgmsg( msgs, "got KeepAlive" );
else {
msgs->incomingMessageLength = len;
msgs->state = AWAITING_BT_MESSAGE;
msgs->incoming.length = len;
msgs->state = AWAITING_BT_ID;
}
dbgmsg( msgs, "readBtLength: got a length of %d, msgs->state is now %d", (int)len, (int)msgs->state );
return READ_AGAIN;
}
static int
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf );
static int
readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf )
{
uint8_t id;
if( EVBUFFER_LENGTH(inbuf) < sizeof(uint8_t) )
return READ_MORE;
tr_peerIoReadUint8( msgs->io, inbuf, &id );
msgs->incoming.id = id;
if( id==BT_PIECE )
{
msgs->state = AWAITING_BT_PIECE;
return READ_AGAIN;
}
else if( msgs->incoming.length != 1 )
{
msgs->state = AWAITING_BT_MESSAGE;
return READ_AGAIN;
}
else return readBtMessage( msgs, inbuf );
}
static void
updatePeerProgress( tr_peermsgs * msgs )
{
@ -988,30 +1028,82 @@ messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
static int
clientGotBlock( tr_peermsgs * msgs, const uint8_t * block, const struct peer_request * req );
static int
readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
{
struct peer_request * req = &msgs->incoming.blockReq;
dbgmsg( msgs, "In readBtPiece" );
if( !req->length )
{
if( EVBUFFER_LENGTH( inbuf ) < 8 )
return READ_MORE;
tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
req->length = msgs->incoming.length - 9;
dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
return READ_AGAIN;
}
else
{
int err;
/* read in another chunk of data */
const size_t nLeft = req->length - EVBUFFER_LENGTH(msgs->incoming.block);
size_t n = MIN( nLeft, EVBUFFER_LENGTH(inbuf) );
uint8_t * buf = tr_new( uint8_t, n );
tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
evbuffer_add( msgs->incoming.block, buf, n );
tr_free( buf );
dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
(int)n, req->index, req->offset, req->length,
(int)( req->length - EVBUFFER_LENGTH(msgs->incoming.block) ) );
if( EVBUFFER_LENGTH(msgs->incoming.block) < req->length )
return READ_MORE;
/* we've got the whole block ... process it */
err = clientGotBlock( msgs, EVBUFFER_DATA(msgs->incoming.block), req );
/* cleanup */
evbuffer_drain( msgs->incoming.block, EVBUFFER_LENGTH(msgs->incoming.block) );
req->length = 0;
msgs->state = AWAITING_BT_LENGTH;
if( !err )
return READ_AGAIN;
else {
fireGotAssertError( msgs );
return READ_DONE;
}
}
}
static int
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
{
int ret;
uint8_t id;
uint32_t ui32;
uint32_t msglen = msgs->incomingMessageLength;
uint32_t msglen = msgs->incoming.length;
const uint8_t id = msgs->incoming.id;
const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
if( EVBUFFER_LENGTH(inbuf) < msglen )
return READ_MORE;
dbgmsg( msgs, "in readBtMessage" );
--msglen; // id length
if( EVBUFFER_LENGTH(inbuf) < msglen ) {
dbgmsg( msgs, " too short!!! " );
return READ_MORE;
}
tr_peerIoReadUint8( msgs->io, inbuf, &id );
dbgmsg( msgs, "got BT id %d, len %d, buffer size is %d", (int)id, (int)msglen, (int)EVBUFFER_LENGTH(inbuf) );
if( !messageLengthIsCorrect( msgs, id, msglen ) )
if( !messageLengthIsCorrect( msgs, id, msglen+1 ) )
{
dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
fireGotError( msgs );
return READ_DONE;
}
--msglen;
ret = 0;
switch( id )
{
case BT_CHOKE:
@ -1080,19 +1172,9 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
break;
}
case BT_PIECE: {
uint8_t * block;
struct peer_request req;
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
req.length = msglen - 8;
block = tr_new( uint8_t, req.length );
tr_peerIoReadBytes( msgs->io, inbuf, block, req.length );
dbgmsg( msgs, "got a Block %u:%u->%u", req.index, req.offset, req.length );
ret = clientGotBlock( msgs, block, &req );
tr_free( block );
case BT_PIECE:
assert( 0 ); /* should be handled elsewhere! */
break;
}
case BT_PORT:
dbgmsg( msgs, "Got a BT_PORT" );
@ -1146,31 +1228,16 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
default:
dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
tr_peerIoDrain( msgs->io, inbuf, msglen );
assert( 0 );
break;
}
dbgmsg( msgs, "startBufLen was %d, msglen was %d, current inbuf len is %d", (int)startBufLen, (int)(msglen+1), (int)EVBUFFER_LENGTH(inbuf) );
if( ret == (int)TR_ERROR_ASSERT )
{
fireGotAssertError( msgs );
return READ_DONE;
}
else if( ret == TR_OK )
{
assert( msglen + 1 == msgs->incomingMessageLength );
assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msgs->incomingMessageLength );
assert( msglen + 1 == msgs->incoming.length );
assert( EVBUFFER_LENGTH(inbuf) == startBufLen - msglen );
msgs->incomingMessageLength = -1;
msgs->state = AWAITING_BT_LENGTH;
return READ_AGAIN;
}
else
{
fireGotError( msgs );
return READ_DONE;
}
msgs->state = AWAITING_BT_LENGTH;
return READ_AGAIN;
}
static void
@ -1340,16 +1407,19 @@ canRead( struct bufferevent * evin, void * vmsgs )
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
dbgmsg( msgs, "canRead, state is %d", msgs->state );
if( !canDownload( msgs ) )
{
dbgmsg( msgs, "oh, but canDownload failed" );
msgs->notListening = 1;
tr_peerIoSetIOMode ( msgs->io, 0, EV_READ );
ret = READ_DONE;
}
else switch( msgs->state )
{
case AWAITING_BT_LENGTH: ret = readBtLength ( msgs, inbuf ); break;
case AWAITING_BT_ID: ret = readBtId ( msgs, inbuf ); break;
case AWAITING_BT_MESSAGE: ret = readBtMessage ( msgs, inbuf ); break;
case AWAITING_BT_PIECE: ret = readBtPiece ( msgs, inbuf ); break;
default: assert( 0 );
}
@ -1430,7 +1500,7 @@ pulse( void * vmsgs )
if( msgs->notListening && canDownload( msgs ) )
{
msgs->notListening = 0;
tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
tr_peerIoTryRead( msgs->io );
}
pumpRequestQueue( msgs );
@ -1689,10 +1759,12 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
m->info->clientIsInterested = 0;
m->info->peerIsInterested = 0;
m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
m->state = AWAITING_BT_LENGTH;
m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
m->outMessages = evbuffer_new( );
m->incoming.block = evbuffer_new( );
m->outBlock = evbuffer_new( );
m->peerAllowedPieces = NULL;
m->clientAllowedPieces = NULL;
@ -1718,7 +1790,6 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of inactivity */
tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
ratePulse( m );
if ( tr_peerIoSupportsLTEP( m->io ) )
@ -1762,6 +1833,7 @@ tr_peerMsgsFree( tr_peermsgs* msgs )
tr_bitfieldFree( msgs->peerAllowedPieces );
tr_bitfieldFree( msgs->clientAllowedPieces );
tr_bitfieldFree( msgs->clientSuggestedPieces );
evbuffer_free( msgs->incoming.block );
evbuffer_free( msgs->outMessages );
evbuffer_free( msgs->outBlock );
tr_free( msgs->pex );