diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index 539e5a888..62734b8c5 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -70,7 +70,7 @@ enum KEEPALIVE_INTERVAL_SECS = 90, /* idle seconds before we send a keepalive */ PEX_INTERVAL = (60 * 1000), /* msec between calls to sendPex() */ - PEER_PULSE_INTERVAL = (133), /* msec between calls to pulse() */ + PEER_PULSE_INTERVAL = (100), /* msec between calls to pulse() */ RATE_PULSE_INTERVAL = (333), /* msec between calls to ratePulse() */ }; @@ -110,6 +110,7 @@ struct tr_peermsgs tr_publisher_t * publisher; + struct evbuffer * outBlock; /* buffer of all the current piece message */ struct evbuffer * outMessages; /* buffer of all the non-piece messages */ struct evbuffer * inBlock; /* the block we're currently receiving */ tr_list * peerAskedFor; @@ -234,25 +235,6 @@ protocolSendChoke( tr_peermsgs * msgs, int choke ) tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE ); } -static void -protocolSendPiece( tr_peermsgs * msgs, - const struct peer_request * r, - const uint8_t * pieceData ) -{ - tr_peerIo * io = msgs->io; - struct evbuffer * out = evbuffer_new( ); - - dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length ); - tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length ); - tr_peerIoWriteUint8 ( io, out, BT_PIECE ); - tr_peerIoWriteUint32( io, out, r->index ); - tr_peerIoWriteUint32( io, out, r->offset ); - tr_peerIoWriteBytes ( io, out, pieceData, r->length ); - tr_peerIoWriteBuf ( io, out ); - - evbuffer_free( out ); -} - /** *** EVENTS **/ @@ -1363,27 +1345,28 @@ static int canWrite( const tr_peermsgs * msgs ) { /* don't let our outbuffer get too large */ - if( tr_peerIoWriteBytesWaiting( msgs->io ) > 8192 ) + if( tr_peerIoWriteBytesWaiting( msgs->io ) > 4096 ) return FALSE; return TRUE; } -static int -canUpload( const tr_peermsgs * msgs ) +static size_t +getUploadMax( const tr_peermsgs * msgs ) { + static const size_t maxval = ~0; const tr_torrent * tor = msgs->torrent; if( !canWrite( msgs ) ) - return FALSE; + return 0; if( tor->uploadLimitMode == TR_SPEEDLIMIT_GLOBAL ) - return !tor->handle->useUploadLimit || tr_rcCanTransfer( tor->handle->upload ); + return tor->handle->useUploadLimit ? tr_rcBytesLeft( tor->handle->upload ) : maxval; if( tor->uploadLimitMode == TR_SPEEDLIMIT_SINGLE ) - return tr_rcCanTransfer( tor->upload ); + return tr_rcBytesLeft( tor->upload ); - return TRUE; + return maxval; } static int @@ -1397,11 +1380,22 @@ ratePulse( void * vmsgs ) return TRUE; } +static struct peer_request* +popNextRequest( tr_peermsgs * msgs ) +{ + struct peer_request * ret; + ret = tr_list_pop_front( &msgs->peerAskedForFast ); + if( !ret ) + ret = tr_list_pop_front( &msgs->peerAskedFor); + return ret; +} + static int pulse( void * vmsgs ) { const time_t now = time( NULL ); - tr_peermsgs * msgs = (tr_peermsgs *) vmsgs; + tr_peermsgs * msgs = vmsgs; + struct peer_request * r; size_t len; /* if we froze out a downloaded block because of speed limits, @@ -1417,42 +1411,54 @@ pulse( void * vmsgs ) if( !canWrite( msgs ) ) { } + else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) )) + { + const size_t uploadMax = getUploadMax( msgs ); + const size_t outlen = MIN( len, uploadMax ); + tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen ); + evbuffer_drain( msgs->outBlock, outlen ); + msgs->clientSentAnythingAt = now; + peerGotBytes( msgs, outlen ); + len -= outlen; + dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len ); + fflush( stdout ); + } else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) )) { tr_peerIoWriteBuf( msgs->io, msgs->outMessages ); msgs->clientSentAnythingAt = now; } - else if( msgs->peerAskedForFast || msgs->peerAskedFor ) - { - if( canUpload( msgs ) ) - { - struct peer_request * r; - uint8_t * buf; - - r = tr_list_pop_front( &msgs->peerAskedForFast ); - if( r == NULL ) - r = tr_list_pop_front( &msgs->peerAskedFor); - - buf = tr_new( uint8_t, r->length ); - - if( requestIsValid( msgs, r ) - && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) - && !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) ) - { - protocolSendPiece( msgs, r, buf ); - peerGotBytes( msgs, r->length ); - msgs->clientSentAnythingAt = now; - } - - tr_free( buf ); - tr_free( r ); - } - } else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) { sendKeepalive( msgs ); } + if( !EVBUFFER_LENGTH( msgs->outBlock ) + && (( r = popNextRequest( msgs ))) + && requestIsValid( msgs, r ) + && tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) ) + { + uint8_t * buf = tr_new( uint8_t, r->length ); + + if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) ) + { + tr_peerIo * io = msgs->io; + struct evbuffer * out = msgs->outBlock; + + dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length ); + tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length ); + tr_peerIoWriteUint8 ( io, out, BT_PIECE ); + tr_peerIoWriteUint32( io, out, r->index ); + tr_peerIoWriteUint32( io, out, r->offset ); + tr_peerIoWriteBytes ( io, out, buf, r->length ); + } + + tr_free( buf ); + tr_free( r ); + + pulse( msgs ); /* start sending it right away */ + } + return TRUE; /* loop forever */ } @@ -1660,6 +1666,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent, m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL ); m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL ); m->outMessages = evbuffer_new( ); + m->outBlock = evbuffer_new( ); m->inBlock = evbuffer_new( ); m->peerAllowedPieces = NULL; m->clientAllowedPieces = NULL; @@ -1724,6 +1731,7 @@ tr_peerMsgsFree( tr_peermsgs* msgs ) tr_list_free( &msgs->peerAskedForFast, tr_free ); tr_list_free( &msgs->peerAskedFor, tr_free ); evbuffer_free( msgs->outMessages ); + evbuffer_free( msgs->outBlock ); evbuffer_free( msgs->inBlock ); tr_free( msgs->pex ); msgs->pexCount = 0; diff --git a/libtransmission/ratecontrol.c b/libtransmission/ratecontrol.c index 6660b208b..5680b9694 100644 --- a/libtransmission/ratecontrol.c +++ b/libtransmission/ratecontrol.c @@ -112,6 +112,29 @@ tr_rcCanTransfer( const tr_ratecontrol * r ) return ret; } +size_t +tr_rcBytesLeft( const tr_ratecontrol * r ) +{ + size_t bytes = 0; + + if( r != NULL ) + { + float cur, max; + size_t kb; + + tr_lockLock( (tr_lock*)r->lock ); + + cur = rateForInterval( r, SHORT_INTERVAL_MSEC ); + max = r->limit; + kb = max>cur ? max-cur : 0; + bytes = kb * 1024u; + + tr_lockUnlock( (tr_lock*)r->lock ); + } + + return bytes; +} + float tr_rcRate( const tr_ratecontrol * r ) { diff --git a/libtransmission/ratecontrol.h b/libtransmission/ratecontrol.h index fe9e05f64..16f1c7a01 100644 --- a/libtransmission/ratecontrol.h +++ b/libtransmission/ratecontrol.h @@ -31,6 +31,7 @@ tr_ratecontrol * tr_rcInit( void ); void tr_rcSetLimit( tr_ratecontrol *, int ); int tr_rcGetLimit( const tr_ratecontrol * ); int tr_rcCanTransfer( const tr_ratecontrol * ); +size_t tr_rcBytesLeft( const tr_ratecontrol * ); void tr_rcTransferred( tr_ratecontrol *, size_t byteCount ); float tr_rcRate( const tr_ratecontrol * ); void tr_rcReset( tr_ratecontrol * );