(trunk libT) probable fix for the "greedy peer" bug

This commit is contained in:
Charles Kerr 2008-12-15 21:22:08 +00:00
parent 4e33fd5467
commit e813ea69ab
5 changed files with 85 additions and 17 deletions

View File

@ -249,7 +249,7 @@ static void
allocateBandwidth( tr_bandwidth * b,
tr_direction dir,
int period_msec,
tr_ptrArray * addme_buffers )
tr_ptrArray * iobuf_pool )
{
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
@ -268,12 +268,11 @@ allocateBandwidth( tr_bandwidth * b,
#endif
}
/* notify the io buffers that there's more bandwidth available */
{
int i;
const int n = tr_ptrArraySize( b->iobufs );
for( i=0; i<n; ++i )
tr_ptrArrayAppend( addme_buffers, tr_ptrArrayNth( b->iobufs, i ) );
tr_ptrArrayAppend( iobuf_pool, tr_ptrArrayNth( b->iobufs, i ) );
}
#ifdef DEBUG_DIRECTION
@ -286,7 +285,7 @@ fprintf( stderr, "bandwidth %p has %d iobufs\n", b, n );
int i, n=0;
struct tr_bandwidth ** children = (struct tr_bandwidth**) tr_ptrArrayPeek( b->children, &n );
for( i=0; i<n; ++i )
tr_bandwidthAllocate( children[i], dir, period_msec );
allocateBandwidth( children[i], dir, period_msec, iobuf_pool );
}
}
@ -298,21 +297,33 @@ tr_bandwidthAllocate( tr_bandwidth * b,
int n;
tr_ptrArray * tmp;
struct tr_iobuf ** buffers;
const short what = dir==TR_UP ? EV_WRITE : EV_READ;
const size_t chunkSize = 1024; /* arbitrary */
tmp = tr_ptrArrayNew( );
allocateBandwidth( b, dir, period_msec, tmp );
buffers = (struct tr_iobuf**) tr_ptrArrayPeek( tmp, &n );
/* notify the io buffers in a random order s.t. no
particular peer gets to hog all the bandwidth */
while( n > 0 ) {
const int i = tr_cryptoRandInt( n );
tr_iobuf_enable( buffers[i], what );
buffers[i] = buffers[n-1];
--n;
/* loop through all the peers, reading and writing in small chunks,
* until we run out of bandwidth or peers. we do it this way to
* prevent one peer from using up all the bandwidth */
while( n > 0 )
{
int i;
for( i=0; i<n; )
{
int byteCount;
if( dir == TR_UP )
byteCount = tr_iobuf_flush_output_buffer( buffers[i], chunkSize );
else
byteCount = tr_iobuf_tryread( buffers[i], chunkSize );
if( byteCount == (int)chunkSize )
++i;
else
buffers[i] = buffers[--n];
}
}
/* cleanup */
tr_ptrArrayFree( tmp, NULL );
}

View File

@ -69,6 +69,8 @@ struct tr_iobuf
int magicNumber;
int fd;
int timeout_read; /* in seconds */
int timeout_write; /* in seconds */
short enabled; /* events that are currently enabled */
@ -88,9 +90,9 @@ isBuf( const struct tr_iobuf * iobuf )
}
static int
tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t maxlen )
tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t howmuch )
{
int n = MIN( EVBUFFER_LENGTH( buffer ), maxlen );
int n = MIN( EVBUFFER_LENGTH( buffer ), howmuch );
#ifdef WIN32
n = send(fd, buffer->buffer, n, 0 );
@ -106,6 +108,48 @@ tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t maxlen )
return n;
}
int
tr_iobuf_flush_output_buffer( struct tr_iobuf * b, size_t howmuch )
{
int res;
assert( isBuf( b ) );
howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch );
howmuch = MIN( howmuch, EVBUFFER_LENGTH( b->output ) );
res = howmuch ? tr_evbuffer_write( b->output, b->fd, howmuch ) : 0;
if( ( res > 0 ) && ( b->writecb != NULL ) )
(*b->writecb)( b, (size_t)res, b->cbarg );
if( ( res < 0 ) && ( b->errorcb != NULL ) && ( errno != EAGAIN && errno != EINTR && errno != EINPROGRESS ) )
(*b->errorcb)( b, (short)res, b->cbarg );
return res;
}
int
tr_iobuf_tryread( struct tr_iobuf * b, size_t howmuch )
{
int res;
assert( isBuf( b ) );
howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, howmuch );
res = howmuch ? evbuffer_read( b->input, b->fd, howmuch ) : 0;
if( ( res > 0 ) && ( b->readcb != NULL ) )
(*b->readcb)( b, (size_t)res, b->cbarg );
if( ( res < 0 ) && ( b->errorcb != NULL ) && ( errno != EAGAIN && errno != EINTR ) )
(*b->errorcb)( b, (short)res, b->cbarg );
return res;
}
static int
tr_iobuf_add(struct event *ev, int timeout)
{
@ -258,6 +302,7 @@ tr_iobuf_new( tr_session * session,
b = tr_new0( struct tr_iobuf, 1 );
b->magicNumber = MAGIC_NUMBER;
b->fd = fd;
b->session = session;
b->bandwidth = bandwidth;
b->input = evbuffer_new( );

View File

@ -44,6 +44,10 @@
* 1. the action callbacks include the number of bytes transferred
* 2. the up/down speeds are directly constrained by our `bandwidth' object
* 3. the implementation is hidden in the .c file
*
* 4. a late addition nasty hack to read/write on demand, called from
* bandwidth. this actually seems to make a lot of this class redundant
* and probably should be refactored.
*/
struct tr_iobuf;
@ -104,4 +108,8 @@ int tr_iobuf_enable( struct tr_iobuf * iobuf, short event );
@brief event may be EV_READ, EV_WRITE, or EV_READ|EV_WRITE */
int tr_iobuf_disable( struct tr_iobuf * iobuf, short event );
int tr_iobuf_flush_output_buffer( struct tr_iobuf * iobuf, size_t max );
int tr_iobuf_tryread( struct tr_iobuf * iobuf, size_t max );
#endif

View File

@ -543,13 +543,13 @@ static size_t
getDesiredOutputBufferSize( const tr_peerIo * io )
{
/* this is all kind of arbitrary, but what seems to work well is
* being large enough to hold the next 15 seconds' worth of input,
* or two and a half blocks, whichever is bigger.
* being large enough to hold the next 20 seconds' worth of input,
* or a few blocks, whichever is bigger.
* It's okay to tweak this as needed */
const double maxBlockSize = 16 * 1024; /* 16 KiB is from BT spec */
const double currentSpeed = tr_bandwidthGetPieceSpeed( io->bandwidth, TR_UP );
const double period = 20; /* arbitrary */
return MAX( maxBlockSize*2.5, currentSpeed*1024*period );
return MAX( maxBlockSize*5.5, currentSpeed*1024*period );
}
size_t

View File

@ -1132,6 +1132,7 @@ peerCallbackFunc( void * vpeer,
{
addStrike( t, peer );
peer->doPurge = 1;
tordbg( t, "setting doPurge because we got an EINVAL error" );
}
else if( ( e->err == ERANGE )
|| ( e->err == EMSGSIZE )
@ -1139,6 +1140,7 @@ peerCallbackFunc( void * vpeer,
{
/* some protocol error from the peer */
peer->doPurge = 1;
tordbg( t, "setting doPurge because we got an ERANGE, EMSGSIZE, or ENOTCONN error" );
}
else /* a local error, such as an IO error */
{
@ -2289,6 +2291,8 @@ reconnectPulse( void * vtorrent )
atom->numFails = 0;
else
++atom->numFails;
fprintf( stderr, "removing bad peer %s\n", tr_peerIoGetAddrStr( peer->io ) );
tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
removePeer( t, peer );
}