fix broken rate control.

This commit is contained in:
Charles Kerr 2007-07-07 04:13:13 +00:00
parent a62302d709
commit 26660b0594
3 changed files with 121 additions and 170 deletions

View File

@ -377,16 +377,11 @@ int tr_peerRead( tr_peer_t * peer )
peer->size *= 2; peer->size *= 2;
peer->buf = realloc( peer->buf, peer->size ); peer->buf = realloc( peer->buf, peer->size );
} }
#if 0
/* Never read more than 1K each time, otherwise the rate /* Read in smallish chunks, otherwise we might read more
control is no use */ * than the download cap is supposed to allow us */
ret = tr_netRecv( peer->socket, &peer->buf[peer->pos], ret = tr_netRecv( peer->socket, &peer->buf[peer->pos],
MIN( 1024, peer->size - peer->pos ) ); MIN( 1024, peer->size - peer->pos ) );
#else
/* Hm, it doesn't *seem* to break rate control... */
ret = tr_netRecv( peer->socket, &peer->buf[peer->pos],
peer->size - peer->pos );
#endif
if( ret & TR_NET_CLOSE ) if( ret & TR_NET_CLOSE )
{ {
@ -568,26 +563,19 @@ writeBegin:
while( ( p = blockPending( tor, peer, &size ) ) ) while( ( p = blockPending( tor, peer, &size ) ) )
{ {
if( SWIFT_ENABLED && !isSeeding && (peer->credit<0) ) if( SWIFT_ENABLED && !isSeeding && (peer->credit<0) )
{
break; break;
}
if( tor->customUploadLimit if( tor->customUploadLimit
? !tr_rcCanTransfer( tor->upload ) ? !tr_rcCanTransfer( tor->upload )
: !tr_rcCanTransfer( tor->handle->upload ) ) : !tr_rcCanTransfer( tor->handle->upload ) )
{
break; break;
}
ret = tr_netSend( peer->socket, p, size ); ret = tr_netSend( peer->socket, p, size );
if( ret & TR_NET_CLOSE ) if( ret & TR_NET_CLOSE )
{
return TR_ERROR; return TR_ERROR;
}
else if( ret & TR_NET_BLOCK ) if( ret & TR_NET_BLOCK )
{
break; break;
}
blockSent( peer, ret ); blockSent( peer, ret );
@ -601,9 +589,7 @@ writeBegin:
peer->outDate = date; peer->outDate = date;
if( !tr_peerAmChoking( peer ) ) if( !tr_peerAmChoking( peer ) )
{
tor->activityDate = date; tor->activityDate = date;
}
/* In case this block is done, you may have messages /* In case this block is done, you may have messages
pending. Send them before we start the next block */ pending. Send them before we start the next block */

View File

@ -25,23 +25,12 @@
#include "transmission.h" #include "transmission.h"
#include "shared.h" #include "shared.h"
/* Maximum number of packets we keep track of. Since most packets are #define GRANULARITY_MSEC 200
* 1 KB, it means we remember the last 2 MB transferred */ #define SHORT_INTERVAL_MSEC 1000
#define HISTORY_SIZE 2048 #define LONG_INTERVAL_MSEC 10000
#define HISTORY_SIZE (LONG_INTERVAL_MSEC / GRANULARITY_MSEC)
/* How far back we go to calculate rates to be displayed in the typedef struct
* interface */
#define LONG_INTERVAL 30000 /* 30 secs */
/* How far back we go to calculate pseudo-instantaneous transfer rates,
* for the actual rate control */
#define SHORT_INTERVAL 1000 /* 1 sec */
/***********************************************************************
* Structures
**********************************************************************/
typedef struct tr_transfer_s
{ {
uint64_t date; uint64_t date;
int size; int size;
@ -50,154 +39,130 @@ tr_transfer_t;
struct tr_ratecontrol_s struct tr_ratecontrol_s
{ {
tr_lock_t lock; tr_rwlock_t lock;
int limit; int limit;
int newest;
/* Circular history: it's empty if transferStop == transferStart,
* full if ( transferStop + 1 ) % HISTORY_SIZE == transferStart */
tr_transfer_t transfers[HISTORY_SIZE]; tr_transfer_t transfers[HISTORY_SIZE];
int transferStart;
int transferStop;
}; };
/* return the xfer rate over the last `interval' seconds in KiB/sec */
/*********************************************************************** static float
* Local prototypes rateForInterval( const tr_ratecontrol_t * r, int interval_msec )
**********************************************************************/
static float rateForInterval( tr_ratecontrol_t * r, int interval );
/***********************************************************************
* Exported functions
**********************************************************************/
tr_ratecontrol_t * tr_rcInit()
{ {
tr_ratecontrol_t * r; uint64_t bytes = 0;
const uint64_t now = tr_date ();
int i = r->newest;
int real_interval_msec = 0;
for( ;; )
{
if( r->transfers[i].date + interval_msec < now )
break;
r = calloc( 1, sizeof( tr_ratecontrol_t ) ); bytes += r->transfers[i].size;
real_interval_msec = now - r->transfers[i].date;
if( --i == -1 ) i = HISTORY_SIZE - 1; /* circular history */
if( i == r->newest ) break; /* we've come all the way around */
}
return !bytes || !real_interval_msec
? 0.0
: (bytes/1024.0) * (1000.0/real_interval_msec);
}
/***
****
***/
tr_ratecontrol_t*
tr_rcInit( void )
{
tr_ratecontrol_t * r = tr_new0( tr_ratecontrol_t, 1 );
r->limit = -1; r->limit = -1;
tr_lockInit( &r->lock ); tr_rwInit( &r->lock );
return r; return r;
} }
void tr_rcSetLimit( tr_ratecontrol_t * r, int limit ) void
{ tr_rcClose( tr_ratecontrol_t * r )
tr_lockLock( &r->lock );
r->limit = limit;
tr_lockUnlock( &r->lock );
}
int tr_rcCanTransfer( tr_ratecontrol_t * r )
{
int ret;
tr_lockLock( &r->lock );
ret = ( r->limit <= 0 ) ? ( r->limit < 0 ) :
( rateForInterval( r, SHORT_INTERVAL ) < r->limit );
tr_lockUnlock( &r->lock );
return ret;
}
void tr_rcTransferred( tr_ratecontrol_t * r, int size )
{
tr_transfer_t * t;
if( size < 100 )
{
/* Don't count small messages */
return;
}
tr_lockLock( &r->lock );
r->transferStop = ( r->transferStop + 1 ) % HISTORY_SIZE;
if( r->transferStop == r->transferStart )
/* History is full, forget about the first (oldest) item */
r->transferStart = ( r->transferStart + 1 ) % HISTORY_SIZE;
t = &r->transfers[r->transferStop];
t->date = tr_date();
t->size = size;
tr_lockUnlock( &r->lock );
}
float tr_rcRate( tr_ratecontrol_t * r )
{
float ret;
tr_lockLock( &r->lock );
ret = rateForInterval( r, LONG_INTERVAL );
tr_lockUnlock( &r->lock );
return ret;
}
void tr_rcReset( tr_ratecontrol_t * r )
{
tr_lockLock( &r->lock );
r->transferStart = 0;
r->transferStop = 0;
tr_lockUnlock( &r->lock );
}
void tr_rcClose( tr_ratecontrol_t * r )
{ {
tr_rcReset( r ); tr_rcReset( r );
tr_lockClose( &r->lock ); tr_rwClose( &r->lock );
free( r ); tr_free( r );
} }
/***
****
***/
/*********************************************************************** int
* Local functions tr_rcCanTransfer( const tr_ratecontrol_t * r )
**********************************************************************/
/***********************************************************************
* rateForInterval
***********************************************************************
* Returns the transfer rate in KB/s on the last 'interval'
* milliseconds
**********************************************************************/
static float rateForInterval( tr_ratecontrol_t * r, int interval )
{ {
tr_transfer_t * t = NULL; int ret;
uint64_t now, then, start; tr_rwReaderLock( (tr_rwlock_t*)&r->lock );
float total = 0;
int i;
now = then = tr_date(); if( r->limit < 0 ) /* unbounded */
start = now - interval; ret = TRUE;
else if( !r->limit ) /* off */
ret = FALSE;
else
ret = rateForInterval( r, SHORT_INTERVAL_MSEC ) < r->limit;
/* Browse the history back in time */ tr_rwReaderUnlock( (tr_rwlock_t*)&r->lock );
for( i = r->transferStop; i != r->transferStart; i-- ) return ret;
{
t = &r->transfers[i];
then = t->date;
if( then < start )
break;
total += t->size;
if( !i )
i = HISTORY_SIZE; /* Loop */
}
#if 0
if( ( r->transferStop + 1 ) % HISTORY_SIZE == r->transferStart
&& i == r->transferStart )
{
/* High bandwidth -> the history isn't big enough to remember
* everything transferred since 'interval' ms ago. Correct the
* interval so that we return the correct rate */
interval = now - t->date;
}
#endif
if( now == then )
return 0.0;
return ( 1000.0f / 1024.0f ) * total / (now - then);
} }
float
tr_rcRate( const tr_ratecontrol_t * r )
{
float ret;
tr_rwReaderLock( (tr_rwlock_t*)&r->lock );
ret = rateForInterval( r, LONG_INTERVAL_MSEC );
tr_rwReaderUnlock( (tr_rwlock_t*)&r->lock );
return ret;
}
/***
****
***/
void
tr_rcTransferred( tr_ratecontrol_t * r, int size )
{
uint64_t now;
if( size < 100 ) /* don't count small messages */
return;
tr_rwWriterLock( &r->lock );
now = tr_date ();
if( r->transfers[r->newest].date + GRANULARITY_MSEC >= now )
r->transfers[r->newest].size += size;
else {
if( ++r->newest == HISTORY_SIZE ) r->newest = 0;
r->transfers[r->newest].date = now;
r->transfers[r->newest].size = size;
}
tr_rwWriterUnlock( &r->lock );
}
void
tr_rcReset( tr_ratecontrol_t * r )
{
tr_rwWriterLock( &r->lock );
r->newest = 0;
memset( r->transfers, 0, sizeof(tr_transfer_t) * HISTORY_SIZE );
tr_rwWriterUnlock( &r->lock );
}
void
tr_rcSetLimit( tr_ratecontrol_t * r, int limit )
{
tr_rwWriterLock( &r->lock );
r->limit = limit;
tr_rwWriterUnlock( &r->lock );
}

View File

@ -24,10 +24,10 @@
typedef struct tr_ratecontrol_s tr_ratecontrol_t; typedef struct tr_ratecontrol_s tr_ratecontrol_t;
tr_ratecontrol_t * tr_rcInit(); tr_ratecontrol_t * tr_rcInit( void );
void tr_rcSetLimit( tr_ratecontrol_t *, int ); void tr_rcSetLimit( tr_ratecontrol_t *, int );
int tr_rcCanTransfer( tr_ratecontrol_t * ); int tr_rcCanTransfer( const tr_ratecontrol_t * );
void tr_rcTransferred( tr_ratecontrol_t *, int ); void tr_rcTransferred( tr_ratecontrol_t *, int );
float tr_rcRate( tr_ratecontrol_t * ); float tr_rcRate( const tr_ratecontrol_t * );
void tr_rcReset( tr_ratecontrol_t * ); void tr_rcReset( tr_ratecontrol_t * );
void tr_rcClose( tr_ratecontrol_t * ); void tr_rcClose( tr_ratecontrol_t * );