try to rework the bandwidth code yet again s.t. it satisfies all three: (1) fairly distributes bandwidth across all peers, (2) scales well in high-bandwidth situations, (3) is good at hitting and staying at bandwidth limits/goals

This commit is contained in:
Charles Kerr 2008-12-20 22:19:34 +00:00
parent efa404ff25
commit 72ded3b272
6 changed files with 323 additions and 146 deletions

View File

@ -294,40 +294,59 @@ tr_bandwidthAllocate( tr_bandwidth * b,
tr_direction dir,
int period_msec )
{
int n;
int i, n, peerCount;
tr_ptrArray * tmp;
struct tr_peerIo ** peers;
/* allocateBandwidth() is a helper function with two purposes:
* 1. allocate bandwidth to b and its subtree
* 2. accumulate an array of all the peerIos from b and its subtree. */
tmp = tr_ptrArrayNew( );
allocateBandwidth( b, dir, period_msec, tmp );
peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &n );
peers = (struct tr_peerIo**) tr_ptrArrayPeek( tmp, &peerCount );
/* loop through all the peers, reading and writing in small chunks,
* until we run out of bandwidth or peers. we do it this way to
* prevent one peer from using up all the bandwidth */
#if 0
fprintf( stderr, "%s - %d peers\n", (dir==TR_UP)?"up":"down", n );
#endif
while( n > 0 )
/* Stop all peers from listening for the socket to be ready for IO.
* See "Second phase of IO" lower in this function for more info. */
for( i=0; i<peerCount; ++i )
tr_peerIoSetEnabled( peers[i], dir, FALSE );
/* First phase of IO. Tries to distribute bandwidth in a fair/even manner
* to avoid "greedy peers" from starving out the other peers: loop through
* peers in a round-robin fashion, giving each one of them them small chunks
* of bandwidth to use. (It's small to conserve some of the bandwidth
* until the end of the loop). Keep looping until we run out of bandwidth
* or peers that are ready to use it. */
n = peerCount;
i = n ? tr_cryptoWeakRandInt( n ) : 0; /* pick a random starting point */
for( ; n>0; )
{
int i;
for( i=0; i<n; )
{
const int increment = n==1 ? 4096 : 1024;
const int byteCount = tr_peerIoFlush( peers[i], dir, increment);
const int increment = n==1 ? 4096 : 1024;
const int byteCount = tr_peerIoFlush( peers[i], dir, increment);
#if 0
if( byteCount )
fprintf( stderr, "peer %p: %d bytes\n", peers[i], byteCount );
#endif
if( byteCount == increment )
++i;
else
peers[i] = peers[--n];
if( byteCount == increment )
++i;
else {
/* peer is done writing for now; move it to the end of the list */
tr_peerIo * tmp = peers[i];
peers[i] = peers[n-1];
peers[n-1] = tmp;
--n;
}
assert( i <= n );
if( i == n )
i = 0;
}
/* Second phase of IO. To help us scale well in high bandiwdth situations
* such as LANs, enable on-demand IO for peers with bandwidth left to burn.
* This on-demand IO for a peer is enabled until either (1) the peer runs
* out of bandwidth, or (2) the next tr_bandwidthAllocate() call, when we
* start all over again. */
for( i=0; i<peerCount; ++i )
if( tr_peerIoHasBandwidthLeft( peers[i], dir ) )
tr_peerIoSetEnabled( peers[i], dir, TRUE );
/* cleanup */
tr_ptrArrayFree( tmp, NULL );
}

View File

@ -1090,19 +1090,26 @@ fireDoneFunc( tr_handshake * handshake,
return success;
}
void
tr_handshakeFree( tr_handshake * handshake )
{
if( handshake->io )
tr_peerIoFree( handshake->io );
tr_free( handshake );
}
static int
tr_handshakeDone( tr_handshake * handshake,
int isOK )
{
int success;
tr_bool success;
dbgmsg( handshake, "handshakeDone: %s", isOK ? "connected" : "aborting" );
tr_peerIoSetIOFuncs( handshake->io, NULL, NULL, NULL, NULL );
success = fireDoneFunc( handshake, isOK );
tr_free( handshake );
return success ? READ_LATER : READ_ERR;
}
@ -1192,6 +1199,19 @@ tr_handshakeGetIO( tr_handshake * handshake )
return handshake->io;
}
struct tr_peerIo*
tr_handshakeStealIO( tr_handshake * handshake )
{
struct tr_peerIo * io;
assert( handshake );
assert( handshake->io );
io = handshake->io;
handshake->io = NULL;
return io;
}
const tr_address *
tr_handshakeGetAddr( const struct tr_handshake * handshake,
tr_port * port )

View File

@ -39,8 +39,13 @@ const struct tr_address *
tr_handshakeGetAddr( const struct tr_handshake * handshake,
tr_port * port );
void tr_handshakeFree( tr_handshake * handshake );
void tr_handshakeAbort( tr_handshake * handshake );
struct tr_peerIo* tr_handshakeGetIO( tr_handshake * handshake );
struct tr_peerIo* tr_handshakeStealIO( tr_handshake * handshake );
#endif

View File

@ -34,7 +34,6 @@
#include "utils.h"
#define MAGIC_NUMBER 206745
#define IO_TIMEOUT_SECS 8
static size_t
getPacketOverhead( size_t d )
@ -87,7 +86,6 @@ struct tr_peerIo
int magicNumber;
uint8_t encryptionMode;
uint8_t timeout;
tr_port port;
int socket;
@ -111,6 +109,9 @@ struct tr_peerIo
struct evbuffer * inbuf;
struct evbuffer * outbuf;
struct event event_read;
struct event event_write;
};
/***
@ -118,12 +119,8 @@ struct tr_peerIo
***/
static void
didWriteWrapper( void * unused UNUSED,
size_t bytes_transferred,
void * vio )
didWriteWrapper( tr_peerIo * io, size_t bytes_transferred )
{
tr_peerIo * io = vio;
while( bytes_transferred )
{
struct tr_datatype * next = io->output_datatypes->data;
@ -146,13 +143,10 @@ didWriteWrapper( void * unused UNUSED,
}
static void
canReadWrapper( void * unused UNUSED,
size_t bytes_transferred UNUSED,
void * vio )
canReadWrapper( tr_peerIo * io )
{
int done = 0;
int err = 0;
tr_peerIo * io = vio;
tr_bool done = 0;
tr_bool err = 0;
tr_session * session = io->session;
dbgmsg( io, "canRead" );
@ -168,14 +162,13 @@ canReadWrapper( void * unused UNUSED,
const size_t oldLen = EVBUFFER_LENGTH( io->inbuf );
const int ret = io->canRead( io, io->userData, &piece );
if( ret != READ_ERR )
{
const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
if( piece )
tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
if( used != piece )
tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
}
const size_t used = oldLen - EVBUFFER_LENGTH( io->inbuf );
if( piece )
tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
if( used != piece )
tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
switch( ret )
{
@ -199,45 +192,154 @@ canReadWrapper( void * unused UNUSED,
}
}
#if 0
static void
gotErrorWrapper( struct tr_iobuf * iobuf,
short what,
void * userData )
{
tr_peerIo * c = userData;
#define _isBool(b) (((b)==0 || (b)==1))
if( c->gotError )
c->gotError( iobuf, what, c->userData );
static int
isPeerIo( const tr_peerIo * io )
{
return ( io != NULL )
&& ( io->magicNumber == MAGIC_NUMBER )
&& ( tr_isAddress( &io->addr ) )
&& ( _isBool( io->isEncrypted ) )
&& ( _isBool( io->isIncoming ) )
&& ( _isBool( io->peerIdIsSet ) )
&& ( _isBool( io->extendedProtocolSupported ) )
&& ( _isBool( io->fastExtensionSupported ) );
}
static void
event_read_cb( int fd, short event UNUSED, void * vio )
{
int res;
short what = EVBUFFER_READ;
tr_peerIo * io = vio;
const size_t howmuch = tr_bandwidthClamp( io->bandwidth, TR_DOWN, io->session->so_rcvbuf );
const tr_direction dir = TR_DOWN;
assert( isPeerIo( io ) );
dbgmsg( io, "libevent says this peer is ready to read" );
/* if we don't have any bandwidth left, stop reading */
if( howmuch < 1 ) {
tr_peerIoSetEnabled( io, dir, FALSE );
return;
}
res = evbuffer_read( io->inbuf, fd, howmuch );
if( res == -1 ) {
if( errno == EAGAIN || errno == EINTR )
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if( res == 0 ) {
/* eof case */
what |= EVBUFFER_EOF;
}
if( res <= 0 )
goto error;
tr_peerIoSetEnabled( io, dir, TRUE );
/* Invoke the user callback - must always be called last */
canReadWrapper( io );
return;
reschedule:
tr_peerIoSetEnabled( io, dir, TRUE );
return;
error:
if( io->gotError != NULL )
io->gotError( io, what, io->userData );
}
static int
tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch )
{
struct evbuffer * buffer = io->outbuf;
int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
#ifdef WIN32
n = send(fd, buffer->buffer, n, 0 );
#else
n = write(fd, buffer->buffer, n );
#endif
dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
if( n == -1 )
return -1;
if (n == 0)
return 0;
evbuffer_drain( buffer, n );
return n;
}
static void
event_write_cb( int fd, short event UNUSED, void * vio )
{
int res = 0;
short what = EVBUFFER_WRITE;
tr_peerIo * io = vio;
size_t howmuch;
const tr_direction dir = TR_UP;
assert( isPeerIo( io ) );
dbgmsg( io, "libevent says this peer is ready to write" );
howmuch = MIN( (size_t)io->session->so_sndbuf, EVBUFFER_LENGTH( io->outbuf ) );
howmuch = tr_bandwidthClamp( io->bandwidth, dir, howmuch );
/* if we don't have any bandwidth left, stop writing */
if( howmuch < 1 ) {
tr_peerIoSetEnabled( io, dir, FALSE );
return;
}
res = tr_evbuffer_write( io, fd, howmuch );
if (res == -1) {
#ifndef WIN32
/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
* *set errno. thus this error checking is not portable*/
if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
#else
goto reschedule;
#endif
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
if( EVBUFFER_LENGTH( io->outbuf ) )
tr_peerIoSetEnabled( io, dir, TRUE );
didWriteWrapper( io, res );
return;
reschedule:
if( EVBUFFER_LENGTH( io->outbuf ) )
tr_peerIoSetEnabled( io, dir, TRUE );
return;
error:
io->gotError( io, what, io->userData );
}
/**
***
**/
#if 0
static void
bufevNew( tr_peerIo * io )
{
io->iobuf = tr_iobuf_new( io->session,
io->bandwidth,
io->socket,
EV_READ | EV_WRITE,
canReadWrapper,
didWriteWrapper,
gotErrorWrapper,
io );
tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
}
#endif
static int
isPeerIo( const tr_peerIo * io )
{
return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
}
static int
isFlag( int flag )
@ -266,10 +368,11 @@ tr_peerIoNew( tr_session * session,
io->port = port;
io->socket = socket;
io->isIncoming = isIncoming != 0;
io->timeout = IO_TIMEOUT_SECS;
io->timeCreated = time( NULL );
io->inbuf = evbuffer_new( );
io->outbuf = evbuffer_new( );
event_set( &io->event_read, io->socket, EV_READ, event_read_cb, io );
event_set( &io->event_write, io->socket, EV_WRITE, event_write_cb, io );
#if 0
bufevNew( io );
#endif
@ -314,6 +417,8 @@ io_dtor( void * vio )
{
tr_peerIo * io = vio;
event_del( &io->event_read );
event_del( &io->event_write );
tr_peerIoSetBandwidth( io, NULL );
evbuffer_free( io->outbuf );
evbuffer_free( io->inbuf );
@ -377,30 +482,17 @@ tr_peerIoGetAddrStr( const tr_peerIo * io )
return tr_peerIoAddrStr( &io->addr, io->port );
}
#if 0
static void
tr_peerIoTryRead( tr_peerIo * io )
{
if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
(*canReadWrapper)( io->iobuf, ~0, io );
}
#endif
void
tr_peerIoSetIOFuncs( tr_peerIo * io,
tr_can_read_cb readcb,
tr_did_write_cb writecb,
tr_net_error_cb errcb,
void * userData )
tr_peerIoSetIOFuncs( tr_peerIo * io,
tr_can_read_cb readcb,
tr_did_write_cb writecb,
tr_net_error_cb errcb,
void * userData )
{
io->canRead = readcb;
io->didWrite = writecb;
io->gotError = errcb;
io->userData = userData;
#if 0
tr_peerIoTryRead( io );
#endif
}
tr_bool
@ -436,17 +528,6 @@ tr_peerIoReconnect( tr_peerIo * io )
return -1;
}
#if 0
void
tr_peerIoSetTimeoutSecs( tr_peerIo * io,
int secs )
{
io->timeout = secs;
tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
}
#endif
/**
***
**/
@ -563,7 +644,8 @@ getDesiredOutputBufferSize( const tr_peerIo * io )
const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
const double period = 20; /* arbitrary */
return MAX( maxBlockSize*20.5, currentSpeed*1024*period );
const double numBlocks = 5.5; /* the 5 is arbitrary; the .5 is to leave room for messages */
return MAX( maxBlockSize*numBlocks, currentSpeed*1024*period );
}
size_t
@ -804,8 +886,8 @@ tr_peerIoTryRead( tr_peerIo * io, size_t howmuch )
dbgmsg( io, "read %d from peer (%s)", res, (res==-1?strerror(errno):"") );
if( res > 0 )
canReadWrapper( io, res, io );
if( EVBUFFER_LENGTH( io->inbuf ) )
canReadWrapper( io );
if( ( res <= 0 ) && ( io->gotError ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
{
@ -826,25 +908,13 @@ tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch )
assert( isPeerIo( io ) );
howmuch = tr_bandwidthClamp( io->bandwidth, TR_UP, howmuch );
howmuch = MIN( howmuch, EVBUFFER_LENGTH( io->outbuf ) );
n = (int) howmuch;
#ifdef WIN32
n = send( io->socket, EVBUFFER_DATA( io->outbuf ), n, 0 );
#else
n = write( io->socket, EVBUFFER_DATA( io->outbuf ), n );
#endif
dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?strerror(errno):"") );
n = tr_evbuffer_write( io, io->socket, (int)howmuch );
if( n > 0 )
{
evbuffer_drain( io->outbuf, n );
didWriteWrapper( io, n );
didWriteWrapper( NULL, n, io );
}
if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) )
{
if( ( n < 0 ) && ( io->gotError ) && ( errno != EPIPE ) && ( errno != EAGAIN ) && ( errno != EINTR ) && ( errno != EINPROGRESS ) ) {
short what = EVBUFFER_WRITE | EVBUFFER_ERROR;
io->gotError( io, what, io->userData );
}
@ -875,3 +945,54 @@ tr_peerIoGetReadBuffer( tr_peerIo * io )
return io->inbuf;
}
tr_bool
tr_peerIoHasBandwidthLeft( const tr_peerIo * io, tr_direction dir )
{
assert( isPeerIo( io ) );
assert( dir==TR_UP || dir==TR_DOWN );
return tr_bandwidthClamp( io->bandwidth, dir, 1024 ) > 0;
}
/***
****
****/
static void
event_enable( tr_peerIo * io, short event )
{
assert( isPeerIo( io ) );
if( event & EV_READ )
event_add( &io->event_read, NULL );
if( event & EV_WRITE )
event_add( &io->event_write, NULL );
}
static void
event_disable( struct tr_peerIo * io, short event )
{
assert( isPeerIo( io ) );
if( event & EV_READ )
event_del( &io->event_read );
if( event & EV_WRITE )
event_del( &io->event_write );
}
void
tr_peerIoSetEnabled( tr_peerIo * io,
tr_direction dir,
tr_bool isEnabled )
{
const short event = dir == TR_UP ? EV_WRITE : EV_READ;
if( isEnabled )
event_enable( io, event );
else
event_disable( io, event );
}

View File

@ -25,7 +25,6 @@ struct evbuffer;
struct tr_address;
struct tr_bandwidth;
struct tr_crypto;
struct tr_iobuf;
typedef struct tr_peerIo tr_peerIo;
/**
@ -213,6 +212,13 @@ void tr_peerIoBandwidthUsed( tr_peerIo * io,
***
**/
tr_bool tr_peerIoHasBandwidthLeft( const tr_peerIo * io,
tr_direction direction );
void tr_peerIoSetEnabled( tr_peerIo * io,
tr_direction dir,
tr_bool isEnabled );
int tr_peerIoFlush( tr_peerIo * io,
tr_direction dir,
size_t byteLimit );

View File

@ -57,7 +57,7 @@ enum
RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
/* how frequently to reallocate bandwidth */
BANDWIDTH_PERIOD_MSEC = 100,
BANDWIDTH_PERIOD_MSEC = 500,
/* max # of peers to ask fer per torrent per reconnect pulse */
MAX_RECONNECTIONS_PER_PULSE = 4,
@ -143,6 +143,7 @@ struct tr_peerMgr
tr_session * session;
tr_ptrArray * torrents; /* Torrent */
tr_ptrArray * incomingHandshakes; /* tr_handshake */
tr_ptrArray * finishedHandshakes; /* tr_handshake */
tr_timer * bandwidthTimer;
};
@ -463,6 +464,7 @@ tr_peerMgrNew( tr_session * session )
m->session = session;
m->torrents = tr_ptrArrayNew( );
m->incomingHandshakes = tr_ptrArrayNew( );
m->finishedHandshakes = tr_ptrArrayNew( );
m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
return m;
}
@ -470,6 +472,8 @@ tr_peerMgrNew( tr_session * session )
void
tr_peerMgrFree( tr_peerMgr * manager )
{
tr_handshake * handshake;
managerLock( manager );
tr_timerFree( &manager->bandwidthTimer );
@ -481,6 +485,11 @@ tr_peerMgrFree( tr_peerMgr * manager )
tr_ptrArrayFree( manager->incomingHandshakes, NULL );
while(( handshake = tr_ptrArrayPop( manager->finishedHandshakes )))
tr_handshakeFree( handshake );
tr_ptrArrayFree( manager->finishedHandshakes, NULL );
/* free the torrents. */
tr_ptrArrayFree( manager->torrents, torrentDestructor );
@ -1194,11 +1203,11 @@ getPeerCount( const Torrent * t )
/* FIXME: this is kind of a mess. */
static tr_bool
myHandshakeDoneCB( tr_handshake * handshake,
tr_peerIo * io,
myHandshakeDoneCB( tr_handshake * handshake,
tr_peerIo * io,
int isConnected,
const uint8_t * peer_id,
void * vmanager )
void * vmanager )
{
tr_bool ok = isConnected;
tr_bool success = FALSE;
@ -1240,8 +1249,6 @@ myHandshakeDoneCB( tr_handshake * handshake,
if( atom )
++atom->numFails;
}
tr_peerIoFree( io );
}
else /* looking good */
{
@ -1255,13 +1262,11 @@ myHandshakeDoneCB( tr_handshake * handshake,
{
tordbg( t, "banned peer %s tried to reconnect",
tr_peerIoAddrStr( &atom->addr, atom->port ) );
tr_peerIoFree( io );
}
else if( tr_peerIoIsIncoming( io )
&& ( getPeerCount( t ) >= getMaxPeerCount( t->tor ) ) )
{
tr_peerIoFree( io );
}
else
{
@ -1269,7 +1274,6 @@ myHandshakeDoneCB( tr_handshake * handshake,
if( peer ) /* we already have this peer */
{
tr_peerIoFree( io );
}
else
{
@ -1285,7 +1289,7 @@ myHandshakeDoneCB( tr_handshake * handshake,
}
peer->port = port;
peer->io = io;
peer->io = tr_handshakeStealIO( handshake );
tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
tr_peerIoSetBandwidth( io, peer->bandwidth );
@ -1294,6 +1298,9 @@ myHandshakeDoneCB( tr_handshake * handshake,
}
}
if( !success )
tr_ptrArrayAppend( manager->finishedHandshakes, handshake );
if( t )
torrentUnlock( t );
@ -1544,17 +1551,9 @@ tr_peerMgrGetPeers( tr_peerMgr * manager,
}
}
#warning this for loop can be removed when we are sure the bug is fixed
for( i=0; i<peersReturning; ++i )
assert( tr_isAddress( &pex[i].addr ) );
assert( ( walk - pex ) == peersReturning );
qsort( pex, peersReturning, sizeof( tr_pex ), tr_pexCompare );
#warning this for loop can be removed when we are sure the bug is fixed
for( i=0; i<peersReturning; ++i )
assert( tr_isAddress( &pex[i].addr ) );
*setme_pex = pex;
}
@ -2394,13 +2393,20 @@ pumpAllPeers( tr_peerMgr * mgr )
static int
bandwidthPulse( void * vmgr )
{
tr_handshake * handshake;
tr_peerMgr * mgr = vmgr;
managerLock( mgr );
/* FIXME: this next line probably isn't necessary... */
pumpAllPeers( mgr );
/* allocate bandwidth to the peers */
tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
pumpAllPeers( mgr );
/* free all the finished handshakes */
while(( handshake = tr_ptrArrayPop( mgr->finishedHandshakes )))
tr_handshakeFree( handshake );
managerUnlock( mgr );
return TRUE;