(libT) #1468: another stab at getting the peer transfer speeds both fast and a little more consistent.

This commit is contained in:
Charles Kerr 2008-11-24 04:21:23 +00:00
parent 1d6710d150
commit be5e6774ad
13 changed files with 385 additions and 334 deletions

View File

@ -12,6 +12,7 @@ AM_CFLAGS = \
noinst_LIBRARIES = libtransmission.a
libtransmission_a_SOURCES = \
bandwidth.c \
bencode.c \
blocklist.c \
clients.c \
@ -56,6 +57,7 @@ libtransmission_a_SOURCES = \
wildmat.c
noinst_HEADERS = \
bandwidth.c \
bencode.h \
blocklist.h \
clients.h \

170
libtransmission/bandwidth.c Normal file
View File

@ -0,0 +1,170 @@
/*
* This file Copyright (C) 2008 Charles Kerr <charles@rebelbase.com>
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*
* $Id:$
*/
#include <limits.h>
#include "transmission.h"
#include "bandwidth.h"
#include "utils.h"
/***
****
***/
enum
{
HISTORY_MSEC = 2000,
INTERVAL_MSEC = HISTORY_MSEC,
GRANULARITY_MSEC = 250,
HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC )
};
struct bratecontrol
{
int newest;
struct { uint64_t date, size; } transfers[HISTORY_SIZE];
};
static float
getSpeed( const struct bratecontrol * r )
{
const uint64_t interval_msec = HISTORY_MSEC;
uint64_t bytes = 0;
const uint64_t cutoff = tr_date ( ) - interval_msec;
int i = r->newest;
for( ;; )
{
if( r->transfers[i].date <= cutoff )
break;
bytes += r->transfers[i].size;
if( --i == -1 ) i = HISTORY_SIZE - 1; /* circular history */
if( i == r->newest ) break; /* we've come all the way around */
}
return ( bytes / 1024.0 ) * ( 1000.0 / interval_msec );
}
static void
bytesUsed( struct bratecontrol * r, size_t size )
{
const uint64_t now = tr_date ( );
if( r->transfers[r->newest].date + GRANULARITY_MSEC >= now )
r->transfers[r->newest].size += size;
else
{
if( ++r->newest == HISTORY_SIZE ) r->newest = 0;
r->transfers[r->newest].date = now;
r->transfers[r->newest].size = size;
}
}
/******
*******
*******
******/
struct tr_bandwidth
{
unsigned int isLimited : 1;
size_t bytesLeft;
struct bratecontrol raw;
struct bratecontrol piece;
tr_session * session;
};
/***
****
***/
tr_bandwidth*
tr_bandwidthNew( tr_session * session )
{
tr_bandwidth * b = tr_new0( tr_bandwidth, 1 );
b->session = session;
return b;
}
void
tr_bandwidthFree( tr_bandwidth * b )
{
tr_free( b );
}
/***
****
***/
void
tr_bandwidthSetLimited( tr_bandwidth * b,
size_t bytesLeft )
{
b->isLimited = 1;
b->bytesLeft = bytesLeft;
}
void
tr_bandwidthSetUnlimited( tr_bandwidth * b )
{
b->isLimited = 0;
}
size_t
tr_bandwidthClamp( const tr_bandwidth * b,
size_t byteCount )
{
const size_t n = byteCount;
if( b && b->isLimited )
byteCount = MIN( byteCount, b->bytesLeft );
/* if( n != byteCount ) fprintf( stderr, "%p: %zu clamped to %zu\n", b, n, byteCount ); */
return byteCount;
}
/***
****
***/
double
tr_bandwidthGetRawSpeed( const tr_bandwidth * b )
{
return getSpeed( &b->raw );
}
double
tr_bandwidthGetPieceSpeed( const tr_bandwidth * b UNUSED )
{
return getSpeed( &b->piece );
}
void
tr_bandwidthUsed( tr_bandwidth * b,
size_t byteCount,
int isPieceData )
{
if( b->isLimited && isPieceData )
{
b->bytesLeft -= MIN( b->bytesLeft, byteCount );
/* fprintf( stderr, "%p used %zu bytes ... %zu left\n", b, byteCount, b->bytesLeft ); */
}
bytesUsed( &b->raw, byteCount );
if( isPieceData )
bytesUsed( &b->piece, byteCount );
}

View File

@ -0,0 +1,50 @@
/*
* This file Copyright (C) 2008 Charles Kerr <charles@rebelbase.com>
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*
* $Id:$
*/
#ifndef TR_BANDWIDTH_H
#define TR_BANDWIDTH_H
typedef struct tr_bandwidth tr_bandwidth;
/**
***
**/
tr_bandwidth* tr_bandwidthNew ( tr_session * session );
void tr_bandwidthFree ( tr_bandwidth * bandwidth );
/**
***
**/
void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth,
size_t byteCount );
void tr_bandwidthSetUnlimited ( tr_bandwidth * bandwidth );
size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth,
size_t byteCount );
/**
***
**/
double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth );
double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth );
void tr_bandwidthUsed ( tr_bandwidth * bandwidth,
size_t byteCount,
int isPieceData );
#endif

View File

@ -1179,9 +1179,6 @@ tr_handshakeNew( tr_peerIo * io,
{
tr_handshake * handshake;
tr_peerIoSetBandwidthUnlimited( io, TR_UP );
tr_peerIoSetBandwidthUnlimited( io, TR_DOWN );
handshake = tr_new0( tr_handshake, 1 );
handshake->io = io;
handshake->crypto = tr_peerIoGetCrypto( io );

View File

@ -128,7 +128,9 @@ setSndBuf( tr_session * session, int fd )
if( fd >= 0 )
{
const int sndbuf = session->so_sndbuf;
const int rcvbuf = session->so_rcvbuf;
setsockopt( fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof( sndbuf ) );
setsockopt( fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof( rcvbuf ) );
}
}

View File

@ -26,6 +26,7 @@
#include <event.h>
#include "transmission.h"
#include "bandwidth.h"
#include "crypto.h"
#include "list.h"
#include "net.h"
@ -69,12 +70,6 @@ addPacketOverhead( size_t d )
tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \
} while( 0 )
struct tr_bandwidth
{
unsigned int isUnlimited : 1;
size_t bytesLeft;
};
struct tr_datatype
{
unsigned int isPieceData : 1;
@ -111,9 +106,8 @@ struct tr_peerIo
size_t bufferSize[2];
struct tr_bandwidth bandwidth[2];
tr_crypto * crypto;
tr_bandwidth * bandwidth[2];
tr_crypto * crypto;
};
/**
@ -124,70 +118,48 @@ static void
adjustOutputBuffer( tr_peerIo * io )
{
struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev );
size_t curLive = EVBUFFER_LENGTH( live );
size_t maxLive = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_sndbuf );
if( io->bandwidth[TR_UP].isUnlimited )
if( ( curLive < maxLive ) && EVBUFFER_LENGTH( io->output ) )
{
bufferevent_write_buffer( io->bufev, io->output );
}
else if( io->bandwidth[TR_UP].bytesLeft > EVBUFFER_LENGTH( live ) )
{
/* there's free space in bufev's output buffer;
try to fill it up */
const size_t desiredLength = io->bandwidth[TR_UP].bytesLeft;
const size_t under = desiredLength - EVBUFFER_LENGTH( live );
const size_t n = MIN( under, EVBUFFER_LENGTH( io->output ) );
size_t freeSpace = maxLive - curLive;
size_t n = MIN( freeSpace, EVBUFFER_LENGTH( io->output ) );
bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n );
evbuffer_drain( io->output, n );
}
else if( io->bandwidth[TR_UP].bytesLeft < EVBUFFER_LENGTH( live ) )
{
/* bufev's output buffer exceeds our bandwidth allocation;
move the excess out of bufev so it can't be sent yet */
const size_t desiredLength = io->bandwidth[TR_UP].bytesLeft;
const size_t over = EVBUFFER_LENGTH( live ) - desiredLength;
struct evbuffer * buf = evbuffer_new( );
evbuffer_add( buf, EVBUFFER_DATA( live ) + desiredLength, over );
evbuffer_add_buffer( buf, io->output );
evbuffer_free( io->output );
io->output = buf;
EVBUFFER_LENGTH( live ) = desiredLength;
curLive += n;
}
if( EVBUFFER_LENGTH( live ) )
{
io->bufferSize[TR_UP] = curLive;
if( curLive )
bufferevent_enable( io->bufev, EV_WRITE );
}
io->bufferSize[TR_UP] = EVBUFFER_LENGTH( live );
dbgmsg( io, "after adjusting the output buffer, its size is now %zu",
io->bufferSize[TR_UP] );
dbgmsg( io, "after adjusting the output buffer, its size is now %zu", curLive );
}
static void
adjustInputBuffer( tr_peerIo * io )
{
if( io->bandwidth[TR_DOWN].isUnlimited )
/* FIXME: the max read size probably needs to vary depending on the
* number of peers that we have connected... 1024 is going to force
* us way over the limit when there are lots of peers */
static const int maxBufSize = 1024;
const size_t n = tr_bandwidthClamp( io->bandwidth[TR_DOWN], maxBufSize );
if( !n )
{
dbgmsg( io, "unlimited reading..." );
bufferevent_setwatermark( io->bufev, EV_READ, 0, 0 );
bufferevent_enable( io->bufev, EV_READ );
dbgmsg( io, "disabling reads because we've hit our limit" );
bufferevent_disable( io->bufev, EV_READ );
}
else
{
const size_t n = io->bandwidth[TR_DOWN].bytesLeft;
if( n == 0 )
{
dbgmsg( io, "disabling reads because we've hit our limit" );
bufferevent_disable( io->bufev, EV_READ );
}
else
{
dbgmsg( io, "enabling reading of %zu more bytes", n );
bufferevent_setwatermark( io->bufev, EV_READ, 0, n );
bufferevent_enable( io->bufev, EV_READ );
}
dbgmsg( io, "enabling reading of %zu more bytes", n );
bufferevent_setwatermark( io->bufev, EV_READ, 0, n );
bufferevent_enable( io->bufev, EV_READ );
}
io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( io->bufev ) );
}
/***
@ -214,12 +186,6 @@ didWriteWrapper( struct bufferevent * e,
const size_t chunk_length = MIN( next->length, payload );
const size_t n = addPacketOverhead( chunk_length );
if( next->isPieceData )
{
struct tr_bandwidth * b = &io->bandwidth[TR_UP];
b->bytesLeft -= MIN( b->bytesLeft, n );
}
if( io->didWrite )
io->didWrite( io, n, next->isPieceData, io->userData );
@ -242,22 +208,9 @@ canReadWrapper( struct bufferevent * e,
int err = 0;
tr_peerIo * io = vio;
tr_session * session = io->session;
const size_t len = EVBUFFER_LENGTH( EVBUFFER_INPUT( e ) );
dbgmsg( io, "canRead" );
/* if the input buffer has grown, record the bytes that were read */
if( len > io->bufferSize[TR_DOWN] )
{
const size_t payload = len - io->bufferSize[TR_DOWN];
const size_t n = addPacketOverhead( payload );
struct tr_bandwidth * b = io->bandwidth + TR_DOWN;
b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
dbgmsg( io, "%zu new input bytes. bytesLeft is %zu", n, b->bytesLeft );
adjustInputBuffer( io );
}
/* try to consume the input buffer */
if( io->canRead )
{
@ -289,7 +242,7 @@ canReadWrapper( struct bufferevent * e,
}
if( !err )
io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( e ) );
adjustInputBuffer( io );
}
static void
@ -348,8 +301,6 @@ tr_peerIoNew( tr_session * session,
io->timeout = IO_TIMEOUT_SECS;
io->timeCreated = time( NULL );
io->output = evbuffer_new( );
io->bandwidth[TR_UP].isUnlimited = 1;
io->bandwidth[TR_DOWN].isUnlimited = 1;
bufevNew( io );
return io;
}
@ -614,60 +565,34 @@ tr_peerIoSupportsFEXT( const tr_peerIo * io )
size_t
tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
{
const size_t desiredBufferLen = 4096;
const size_t desiredLiveLen = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_rcvbuf );
const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) );
const size_t currentLbufLen = EVBUFFER_LENGTH( io->output );
const size_t desiredLiveLen = io->bandwidth[TR_UP].isUnlimited
? INT_MAX
: io->bandwidth[TR_UP].bytesLeft;
const size_t currentLen = currentLiveLen + currentLbufLen;
const size_t desiredLen = desiredBufferLen + desiredLiveLen;
size_t freeSpace = 0;
const size_t desiredQueueLen = io->session->so_sndbuf;
const size_t currentQueueLen = EVBUFFER_LENGTH( io->output );
const size_t desiredLen = desiredLiveLen + desiredQueueLen;
const size_t currentLen = currentLiveLen + currentQueueLen;
size_t freeSpace = 0;
if( desiredLen > currentLen )
freeSpace = desiredLen - currentLen;
else
freeSpace = 0;
return freeSpace;
}
void
tr_peerIoAllocateBandwidth( tr_peerIo * io,
tr_direction direction,
size_t bytesLeft )
tr_peerIoSetBandwidth( tr_peerIo * io,
tr_direction direction,
tr_bandwidth * bandwidth )
{
struct tr_bandwidth * b;
assert( io );
assert( direction == TR_UP || direction == TR_DOWN );
b = io->bandwidth + direction;
b->isUnlimited = 0;
b->bytesLeft = bytesLeft;
io->bandwidth[direction] = bandwidth;
adjustOutputBuffer( io );
adjustInputBuffer( io );
}
void
tr_peerIoSetBandwidthUnlimited( tr_peerIo * io,
tr_direction direction )
{
struct tr_bandwidth * b;
assert( io );
assert( direction == TR_UP || direction == TR_DOWN );
b = io->bandwidth + direction;
b->isUnlimited = 1;
b->bytesLeft = 0;
adjustInputBuffer( io );
adjustOutputBuffer( io );
if( direction == TR_UP )
adjustOutputBuffer( io );
else
adjustInputBuffer( io );
}
/**
@ -711,10 +636,7 @@ tr_peerIoWrite( tr_peerIo * io,
assert( tr_amInEventThread( io->session ) );
dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
if( io->bandwidth[TR_UP].isUnlimited )
bufferevent_write( io->bufev, writeme, writemeLen );
else
evbuffer_add( io->output, writeme, writemeLen );
evbuffer_add( io->output, writeme, writemeLen );
datatype = tr_new( struct tr_datatype, 1 );
datatype->isPieceData = isPieceData != 0;

View File

@ -20,7 +20,7 @@
struct in_addr;
struct evbuffer;
struct bufferevent;
struct tr_handle;
struct tr_bandwidth;
struct tr_crypto;
typedef struct tr_peerIo tr_peerIo;
@ -200,12 +200,9 @@ void tr_peerIoDrain( tr_peerIo * io,
size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io );
void tr_peerIoAllocateBandwidth( tr_peerIo * io,
tr_direction direction,
size_t bytesLeft );
void tr_peerIoSetBandwidthUnlimited( tr_peerIo * io,
tr_direction direction );
void tr_peerIoSetBandwidth( tr_peerIo * io,
tr_direction direction,
struct tr_bandwidth * bandwidth );
#endif

View File

@ -19,6 +19,8 @@
#include <event.h>
#include "transmission.h"
#include "bandwidth.h"
#include "bencode.h"
#include "blocklist.h"
#include "clients.h"
#include "completion.h"
@ -56,21 +58,21 @@ enum
RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
/* how frequently to reallocate bandwidth */
BANDWIDTH_PERIOD_MSEC = 250,
BANDWIDTH_PERIOD_MSEC = 333,
/* max # of peers to ask fer per torrent per reconnect pulse */
MAX_RECONNECTIONS_PER_PULSE = 2,
MAX_RECONNECTIONS_PER_PULSE = 4,
/* max number of peers to ask for per second overall.
* this throttle is to avoid overloading the router */
MAX_CONNECTIONS_PER_SECOND = 4,
MAX_CONNECTIONS_PER_SECOND = 8,
/* number of unchoked peers per torrent.
* FIXME: this probably ought to be configurable */
MAX_UNCHOKED_PEERS = 14,
/* number of bad pieces a peer is allowed to send before we ban them */
MAX_BAD_PIECES_PER_PEER = 3,
MAX_BAD_PIECES_PER_PEER = 5,
/* use for bitwise operations w/peer_atom.myflags */
MYFLAG_BANNED = 1,
@ -1025,17 +1027,16 @@ peerCallbackFunc( void * vpeer,
/* add it to the raw upload speed */
if( peer )
tr_rcTransferred ( peer->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
/* maybe add it to the piece upload speed */
if( e->wasPieceData ) {
if( peer )
tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
}
tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
/* update the stats */
if( e->wasPieceData )
tr_statsAddUploaded( tor->session, e->length );
@ -1069,16 +1070,15 @@ peerCallbackFunc( void * vpeer,
/* add it to our raw download speed */
if( peer )
tr_rcTransferred ( peer->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
/* maybe add it to the piece upload speed */
if( e->wasPieceData ) {
if( peer )
tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
}
tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
/* update the stats */
if( e->wasPieceData )
@ -1939,8 +1939,7 @@ rechoke( Torrent * t )
n->peer = peer;
n->isInterested = peer->peerIsInterested;
n->isChoked = peer->peerIsChoked;
n->rate = (int)(tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER )
+ tr_peerGetPieceSpeed( peer, TR_PEER_TO_CLIENT ) );
n->rate = tr_peerGetPieceSpeed( peer, TR_CLIENT_TO_PEER ) * 1024;
}
}
@ -2357,8 +2356,40 @@ reconnectPulse( void * vtorrent )
*****
****/
#if 0
#define DEBUG_DIRECTION TR_UP
static void
pumpAllPeers( tr_peerMgr * mgr )
{
const int torrentCount = tr_ptrArraySize( mgr->torrents );
int i, j;
for( i = 0; i < torrentCount; ++i )
{
Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
{
tr_peer * peer = tr_ptrArrayNth( t->peers, j );
tr_peerMsgsPulse( peer->msgs );
}
}
}
static void
setTorrentBandwidth( Torrent * t,
tr_direction dir,
tr_bandwidth * bandwidth )
{
int i;
int peerCount = 0;
tr_peer ** peers = getConnectedPeers( t, &peerCount );
for( i=0; i<peerCount; ++i )
tr_peerIoSetBandwidth( peers[i]->io, dir, bandwidth );
tr_free( peers );
}
#if 1
#define DEBUG_DIRECTION TR_DOWN
#endif
static double
@ -2378,7 +2409,7 @@ allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpe
const double current_bytes_per_pulse = bytesPerPulse( currentSpeed );
const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed );
const double min = desired_bytes_per_pulse * 0.90;
const double max = desired_bytes_per_pulse * 1.25;
const double max = desired_bytes_per_pulse * 1.50;
const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
- ( current_bytes_per_pulse * pulses_per_history );
double clamped;
@ -2390,7 +2421,7 @@ allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpe
#ifdef DEBUG_DIRECTION
if( dir == DEBUG_DIRECTION )
fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (%5.2f)\n",
fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n",
currentSpeed,
desiredSpeed,
clamped/1024.0,
@ -2399,150 +2430,22 @@ fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (%5.
return clamped;
}
/**
* Distributes a fixed amount of bandwidth among a set of peers.
*
* @param peerArray peers whose client-to-peer bandwidth will be set
* @param direction whether to allocate upload or download bandwidth
* @param currentSpeed current speed in KiB/s for this set of peers
* @param desiredSpeed desired speed in KiB/s for this set of peers
*/
static void
setPeerBandwidth( tr_ptrArray * peerArray,
const tr_direction direction,
double currentSpeed,
double desiredSpeed )
{
const int MINIMUM_WELFARE_BYTES = bytesPerPulse( 5 );
int i;
double welfare;
double meritBytes;
double meritMultiplier;
double welfareBytes;
const int peerCount = tr_ptrArraySize( peerArray );
const double bytes = allocateHowMuch( direction, currentSpeed, desiredSpeed );
tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( peerArray );
/* how many bytes we'll allocate based on merit.
*
* 1. When just getting started we want to give all the peers a lot
* of `welfare' allocation because we don't know which ones will
* turn out to be productive for us.
*
* 2. When we've reached steady state and are near our bandwidth limit,
* the bandwidth spent on `welfare' is going to come from peers that
* we already know are productive... which is probably a waste.
*
* 3. So we tie the merit/welfare allocations to the current speed.
* the closer the current speed gets to the maximum speed, the less
* welfare we allocate.
*
* 4. We always need to allocate /some/ welfare bytes, otherwise
* the other peers will starve.
*/
meritBytes = bytesPerPulse( MIN( currentSpeed * 1.2, desiredSpeed ) );
welfareBytes = bytes > meritBytes ? bytes - meritBytes : 0;
if( welfareBytes < MINIMUM_WELFARE_BYTES )
welfareBytes = MINIMUM_WELFARE_BYTES;
meritBytes = bytes - welfareBytes;
meritMultiplier = currentSpeed > 0.01 ? meritBytes / currentSpeed : 0.0;
#ifdef DEBUG_DIRECTION
if( direction == DEBUG_DIRECTION )
fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f) - k[%.1f] merit [%.1f] welfare [%.1f]\n", currentSpeed, desiredSpeed, bytes/1024.0, meritBytes/1024.0, welfareBytes/1024.0 );
#endif
/* how much welfare each peer gets */
welfare = welfareBytes / peerCount;
for( i=0; i<peerCount; ++i )
{
tr_peer * peer = peers[i];
const size_t merit = tr_rcRate( peers[i]->pieceSpeed[direction] ) * meritMultiplier;
tr_peerIoAllocateBandwidth( peer->io, direction, merit + welfare );
}
}
static void
givePeersUnlimitedBandwidth( tr_ptrArray * peers,
tr_direction direction )
{
const int n = tr_ptrArraySize( peers );
int i;
for( i = 0; i < n; ++i )
{
tr_peer * peer = tr_ptrArrayNth( peers, i );
tr_peerIoSetBandwidthUnlimited( peer->io, direction );
}
}
static void
pumpAllPeers( tr_peerMgr * mgr )
{
const int torrentCount = tr_ptrArraySize( mgr->torrents );
int i, j;
for( i = 0; i < torrentCount; ++i )
{
Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
{
tr_peer * peer = tr_ptrArrayNth( t->peers, j );
tr_peerMsgsPulse( peer->msgs );
}
}
}
static void
getBandwidthPeers( Torrent * t,
tr_direction dir,
tr_ptrArray * appendme,
double * speed )
{
int i, peerCount;
tr_peer ** peers;
assert( t );
assert( torrentIsLocked( t ) );
assert( dir == TR_UP || dir == TR_DOWN );
assert( appendme );
assert( speed );
peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount );
for( i=0; i<peerCount; ++i )
{
tr_peer * p = peers[i];
if( p->msgs )
{
if( ( ( dir == TR_DOWN ) && clientIsDownloadingFrom( p ) ) || ( ( dir == TR_UP ) && clientIsUploadingTo( p ) ) )
{
tr_ptrArrayAppend( appendme, p );
*speed += tr_rcRate( p->pieceSpeed[dir] );
}
}
}
}
/**
* Allocate bandwidth for each peer connection.
*
* @param mgr the peer manager
* @param direction whether to allocate upload or download bandwidth
* @return the amount of directional bandwidth used since the last pulse.
*/
static void
allocateBandwidth( tr_peerMgr * mgr,
tr_direction direction )
{
tr_session * session = mgr->session;
const int torrentCount = tr_ptrArraySize( mgr->torrents );
Torrent ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
tr_ptrArray * globalPool = tr_ptrArrayNew( );
double globalPoolSpeed = 0;
int i;
int i;
tr_session * session = mgr->session;
const int torrentCount = tr_ptrArraySize( mgr->torrents );
Torrent ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
tr_bandwidth * global_pool = session->bandwidth[direction];
assert( mgr );
assert( direction == TR_UP || direction == TR_DOWN );
@ -2570,36 +2473,52 @@ allocateBandwidth( tr_peerMgr * mgr,
switch( speedMode )
{
case TR_SPEEDLIMIT_UNLIMITED:
givePeersUnlimitedBandwidth( t->peers, direction );
{
tr_bandwidth * b = t->tor->bandwidth[direction];
tr_bandwidthSetUnlimited( b );
setTorrentBandwidth( t, direction, b );
break;
}
case TR_SPEEDLIMIT_SINGLE: {
tr_ptrArray * peers = tr_ptrArrayNew( );
double speed = 0;
getBandwidthPeers( t, direction, peers, &speed );
setPeerBandwidth( peers, direction, speed, tr_torrentGetSpeedLimit( t->tor, direction ) );
tr_ptrArrayFree( peers, NULL );
case TR_SPEEDLIMIT_SINGLE:
{
tr_bandwidth * b = t->tor->bandwidth[direction];
const double currentSpeed = tr_bandwidthGetPieceSpeed( b );
const double desiredSpeed = tr_torrentGetSpeedLimit( t->tor, direction );
const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
#ifdef DEBUG_DIRECTION
if( direction == DEBUG_DIRECTION )
fprintf( stderr, "single: currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", currentSpeed, desiredSpeed, bytesPerPulse );
#endif
tr_bandwidthSetLimited( b, bytesPerPulse );
setTorrentBandwidth( t, direction, b );
break;
}
case TR_SPEEDLIMIT_GLOBAL:
getBandwidthPeers( t, direction, globalPool, &globalPoolSpeed );
{
setTorrentBandwidth( t, direction, global_pool );
break;
}
}
}
/* handle the global pool's connections */
if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
givePeersUnlimitedBandwidth( globalPool, direction );
else
setPeerBandwidth( globalPool, direction, globalPoolSpeed,
tr_sessionGetSpeedLimit( session, direction ) );
tr_bandwidthSetUnlimited( global_pool );
else {
const double currentSpeed = tr_bandwidthGetPieceSpeed( global_pool );
const double desiredSpeed = tr_sessionGetSpeedLimit( session, direction );
const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
#ifdef DEBUG_DIRECTION
if( direction == DEBUG_DIRECTION )
fprintf( stderr, "global(%p): currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", global_pool, currentSpeed, desiredSpeed, bytesPerPulse );
#endif
tr_bandwidthSetLimited( session->bandwidth[direction], bytesPerPulse );
}
/* now that we've allocated bandwidth, pump all the connected peers */
pumpAllPeers( mgr );
/* cleanup */
tr_ptrArrayFree( globalPool, NULL );
}
static int
@ -2617,4 +2536,3 @@ bandwidthPulse( void * vmgr )
managerUnlock( mgr );
return TRUE;
}

View File

@ -1631,11 +1631,14 @@ clientGotBlock( tr_peermsgs * msgs,
return 0;
}
static int peerPulse( void * vmsgs );
static void
didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
{
tr_peermsgs * msgs = vmsgs;
firePeerGotData( msgs, bytesWritten, wasPieceData );
peerPulse( msgs );
}
static ReadState
@ -1698,7 +1701,7 @@ ratePulse( void * vpeer )
const int estimatedBlocksInNext30Seconds =
( rateToClient * 30 * 1024 ) / peer->torrent->blockSize;
peer->minActiveRequests = 4;
peer->minActiveRequests = 8;
peer->maxActiveRequests = peer->minActiveRequests + estimatedBlocksInNext30Seconds;
return TRUE;
}

View File

@ -21,6 +21,7 @@
#include <dirent.h> /* opendir */
#include "transmission.h"
#include "bandwidth.h"
#include "blocklist.h"
#include "fdlimit.h"
#include "list.h"
@ -29,7 +30,6 @@
#include "peer-mgr.h"
#include "platform.h" /* tr_lock */
#include "port-forwarding.h"
#include "ratecontrol.h"
#include "rpc-server.h"
#include "stats.h"
#include "torrent.h"
@ -254,11 +254,8 @@ tr_sessionInitFull( const char * configDir,
h->isProxyAuthEnabled = proxyAuthIsEnabled != 0;
h->proxyUsername = tr_strdup( proxyUsername );
h->proxyPassword = tr_strdup( proxyPassword );
h->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
h->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
h->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
h->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
h->so_sndbuf = 1500 * 3; /* 3x MTU for most ethernet/wireless */
h->so_rcvbuf = 8192;
if( configDir == NULL )
configDir = tr_getDefaultConfigDir( );
@ -286,6 +283,9 @@ tr_sessionInitFull( const char * configDir,
h->shared = tr_sharedInit( h, isPortForwardingEnabled, publicPort );
h->isPortSet = publicPort >= 0;
h->bandwidth[TR_UP] = tr_bandwidthNew( h );
h->bandwidth[TR_DOWN] = tr_bandwidthNew( h );
/* first %s is the application name
second %s is the version number */
tr_inf( _( "%s %s started" ), TR_NAME, LONG_VERSION_STRING );
@ -507,7 +507,7 @@ tr_sessionGetPieceSpeed( const tr_session * session, tr_direction dir )
{
assert( dir==TR_UP || dir==TR_DOWN );
return session ? tr_rcRate( session->pieceSpeed[dir] ) : 0.0;
return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0;
}
double
@ -515,7 +515,7 @@ tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir )
{
assert( dir==TR_UP || dir==TR_DOWN );
return session ? tr_rcRate( session->rawSpeed[dir] ) : 0.0;
return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0;
}
int
@ -629,10 +629,8 @@ tr_sessionClose( tr_handle * session )
}
/* free the session memory */
tr_rcClose( session->pieceSpeed[TR_PEER_TO_CLIENT] );
tr_rcClose( session->pieceSpeed[TR_CLIENT_TO_PEER] );
tr_rcClose( session->rawSpeed[TR_PEER_TO_CLIENT] );
tr_rcClose( session->rawSpeed[TR_CLIENT_TO_PEER] );
tr_bandwidthFree( session->bandwidth[TR_UP] );
tr_bandwidthFree( session->bandwidth[TR_DOWN] );
tr_lockFree( session->lock );
for( i = 0; i < session->metainfoLookupCount; ++i )
tr_free( session->metainfoLookup[i].filename );

View File

@ -48,7 +48,7 @@ struct tr_metainfo_lookup
char * filename;
};
struct tr_ratecontrol;
struct tr_bandwidth;
struct tr_handle
{
@ -108,12 +108,11 @@ struct tr_handle
/* the size of the output buffer for peer connections */
int so_sndbuf;
/* the rate at which pieces are being transferred between client and peer.
* protocol overhead is NOT included; this is only the piece data */
struct tr_ratecontrol * pieceSpeed[2];
/* the size of the input buffer for peer connections */
int so_rcvbuf;
/* the rate at which bytes are being transferred between client and peer. */
struct tr_ratecontrol * rawSpeed[2];
/* monitors the "global pool" speeds */
struct tr_bandwidth * bandwidth[2];
};
const char * tr_sessionFindTorrentFile( const tr_session * session,

View File

@ -32,6 +32,7 @@
#include <stdlib.h> /* qsort */
#include "transmission.h"
#include "bandwidth.h"
#include "bencode.h"
#include "completion.h"
#include "crypto.h" /* for tr_sha1 */
@ -496,10 +497,8 @@ torrentRealInit( tr_handle * h,
randomizeTiers( info );
tor->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
tor->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
tor->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
tor->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
tor->bandwidth[TR_UP] = tr_bandwidthNew( h );
tor->bandwidth[TR_DOWN] = tr_bandwidthNew( h );
tor->blockSize = getBlockSize( info->pieceSize );
@ -814,10 +813,10 @@ tr_torrentStat( tr_torrent * tor )
&s->peersGettingFromUs,
s->peersFrom );
s->rawUploadSpeed = tr_rcRate( tor->rawSpeed[TR_UP] );
s->rawDownloadSpeed = tr_rcRate( tor->rawSpeed[TR_DOWN] );
s->pieceUploadSpeed = tr_rcRate( tor->pieceSpeed[TR_UP] );
s->pieceDownloadSpeed = tr_rcRate( tor->pieceSpeed[TR_DOWN] );
s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth[TR_UP] );
s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth[TR_DOWN] );
s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_UP] );
s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_DOWN] );
usableSeeds += tor->info.webseedCount;
@ -1100,10 +1099,8 @@ freeTorrent( tr_torrent * tor )
assert( h->torrentCount >= 1 );
h->torrentCount--;
tr_rcClose( tor->pieceSpeed[TR_PEER_TO_CLIENT] );
tr_rcClose( tor->pieceSpeed[TR_CLIENT_TO_PEER] );
tr_rcClose( tor->rawSpeed[TR_PEER_TO_CLIENT] );
tr_rcClose( tor->rawSpeed[TR_CLIENT_TO_PEER] );
tr_bandwidthFree( tor->bandwidth[TR_DOWN] );
tr_bandwidthFree( tor->bandwidth[TR_UP] );
tr_metainfoFree( inf );
tr_free( tor );

View File

@ -25,6 +25,7 @@
#ifndef TR_TORRENT_H
#define TR_TORRENT_H 1
struct tr_bandwidth;
struct tr_ratecontrol;
/**
@ -229,12 +230,7 @@ struct tr_torrent
int uniqueId;
/* the rate at which pieces are being transferred between client and
* its peers. protocol overhead is NOT included; only the piece data */
struct tr_ratecontrol * pieceSpeed[2];
/* the rate at which bytes are being sent between client and peers */
struct tr_ratecontrol * rawSpeed[2];
struct tr_bandwidth * bandwidth[2];
};
#endif