diff --git a/libtransmission/peer-io.c b/libtransmission/peer-io.c index c1f91a0e2..c0308b3fb 100644 --- a/libtransmission/peer-io.c +++ b/libtransmission/peer-io.c @@ -30,7 +30,6 @@ #include "list.h" #include "net.h" #include "peer-io.h" -#include "ratecontrol.h" #include "trevent.h" #include "utils.h" @@ -72,9 +71,8 @@ addPacketOverhead( size_t d ) struct tr_bandwidth { - unsigned int isUnlimited : 1; - size_t bytesUsed; - size_t bytesLeft; + unsigned int isUnlimited : 1; + size_t bytesLeft; }; struct tr_datatype @@ -220,7 +218,6 @@ didWriteWrapper( struct bufferevent * e, { struct tr_bandwidth * b = &io->bandwidth[TR_UP]; b->bytesLeft -= MIN( b->bytesLeft, n ); - b->bytesUsed += n; } if( io->didWrite ) @@ -256,8 +253,7 @@ canReadWrapper( struct bufferevent * e, const size_t n = addPacketOverhead( payload ); struct tr_bandwidth * b = io->bandwidth + TR_DOWN; b->bytesLeft -= MIN( b->bytesLeft, (size_t)n ); - b->bytesUsed += n; - dbgmsg( io, "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu", n, b->bytesUsed, b->bytesLeft ); + dbgmsg( io, "%zu new input bytes. bytesLeft is %zu", n, b->bytesLeft ); adjustInputBuffer( io ); } @@ -615,15 +611,6 @@ tr_peerIoSupportsFEXT( const tr_peerIo * io ) *** **/ -size_t -tr_peerIoGetBandwidthUsed( const tr_peerIo * io, - tr_direction direction ) -{ - assert( io ); - assert( direction == TR_UP || direction == TR_DOWN ); - return io->bandwidth[direction].bytesUsed; -} - size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ) { @@ -649,9 +636,9 @@ tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ) } void -tr_peerIoSetBandwidth( tr_peerIo * io, - tr_direction direction, - size_t bytesLeft ) +tr_peerIoAllocateBandwidth( tr_peerIo * io, + tr_direction direction, + size_t bytesLeft ) { struct tr_bandwidth * b; @@ -660,7 +647,6 @@ tr_peerIoSetBandwidth( tr_peerIo * io, b = io->bandwidth + direction; b->isUnlimited = 0; - b->bytesUsed = 0; b->bytesLeft = bytesLeft; adjustOutputBuffer( io ); @@ -678,7 +664,6 @@ tr_peerIoSetBandwidthUnlimited( tr_peerIo * io, b = io->bandwidth + direction; b->isUnlimited = 1; - b->bytesUsed = 0; b->bytesLeft = 0; adjustInputBuffer( io ); @@ -882,4 +867,3 @@ tr_peerIoGetAge( const tr_peerIo * io ) { return time( NULL ) - io->timeCreated; } - diff --git a/libtransmission/peer-io.h b/libtransmission/peer-io.h index 2a0debfa5..3426e3823 100644 --- a/libtransmission/peer-io.h +++ b/libtransmission/peer-io.h @@ -198,14 +198,11 @@ void tr_peerIoDrain( tr_peerIo * io, *** **/ -size_t tr_peerIoGetBandwidthUsed( const tr_peerIo * io, - tr_direction direction ); - size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ); -void tr_peerIoSetBandwidth( tr_peerIo * io, - tr_direction direction, - size_t bytesLeft ); +void tr_peerIoAllocateBandwidth( tr_peerIo * io, + tr_direction direction, + size_t bytesLeft ); void tr_peerIoSetBandwidthUnlimited( tr_peerIo * io, tr_direction direction ); diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index 07a831320..2af1f909a 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -67,7 +67,7 @@ enum /* number of unchoked peers per torrent. * FIXME: this probably ought to be configurable */ - MAX_UNCHOKED_PEERS = 12, + MAX_UNCHOKED_PEERS = 14, /* number of bad pieces a peer is allowed to send before we ban them */ MAX_BAD_PIECES_PER_PEER = 3, @@ -130,7 +130,6 @@ struct tr_peerMgr tr_ptrArray * torrents; /* Torrent */ tr_ptrArray * incomingHandshakes; /* tr_handshake */ tr_timer * bandwidthTimer; - tr_ratecontrol * globalPoolRawSpeed[2]; }; #define tordbg( t, ... ) \ @@ -520,8 +519,6 @@ tr_peerMgrNew( tr_session * session ) m->session = session; m->torrents = tr_ptrArrayNew( ); m->incomingHandshakes = tr_ptrArrayNew( ); - m->globalPoolRawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); - m->globalPoolRawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC ); return m; } @@ -532,8 +529,6 @@ tr_peerMgrFree( tr_peerMgr * manager ) managerLock( manager ); tr_timerFree( &manager->bandwidthTimer ); - tr_rcClose( manager->globalPoolRawSpeed[TR_CLIENT_TO_PEER] ); - tr_rcClose( manager->globalPoolRawSpeed[TR_PEER_TO_CLIENT] ); /* free the handshakes. Abort invokes handshakeDoneCB(), which removes * the item from manager->handshakes, so this is a little roundabout... */ @@ -2362,17 +2357,29 @@ reconnectPulse( void * vtorrent ) ***** ****/ +#if 0 +#define DEBUG_DIRECTION TR_UP +#endif + static double -allocateHowMuch( double desired_average_kb_per_sec, - const tr_ratecontrol * ratecontrol ) +bytesPerPulse( double KiB_per_second ) +{ + return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 ); +} + +/* + * @param currentSpeed current speed in KiB/s + * @param desiredSpeed desired speed in KiB/s + */ +static double +allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed ) { const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC; - const double seconds_per_pulse = BANDWIDTH_PERIOD_MSEC / 1000.0; - const double baseline_bytes_per_pulse = desired_average_kb_per_sec * 1024.0 * seconds_per_pulse; - const double min = baseline_bytes_per_pulse * 0.80; - const double max = baseline_bytes_per_pulse * 1.20; - const double current_bytes_per_pulse = tr_rcRate( ratecontrol ) * 1024.0 * seconds_per_pulse; - const double next_pulse_bytes = baseline_bytes_per_pulse * ( pulses_per_history + 1 ) + const double current_bytes_per_pulse = bytesPerPulse( currentSpeed ); + const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed ); + const double min = desired_bytes_per_pulse * 0.90; + const double max = desired_bytes_per_pulse * 1.25; + const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 ) - ( current_bytes_per_pulse * pulses_per_history ); double clamped; @@ -2381,91 +2388,81 @@ allocateHowMuch( double desired_average_kb_per_sec, clamped = MAX( clamped, min ); clamped = MIN( clamped, max ); -#if 0 -fprintf( stderr, "desiredAvgKB is %5.2f, rate is %5.2f, allocating %5.2f (%5.2f)\n", - desired_average_kb_per_sec, - tr_rcRate( ratecontrol ), +#ifdef DEBUG_DIRECTION +if( dir == DEBUG_DIRECTION ) +fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (%5.2f)\n", + currentSpeed, + desiredSpeed, clamped/1024.0, next_pulse_bytes/1024.0 ); #endif return clamped; } - /** * Distributes a fixed amount of bandwidth among a set of peers. * * @param peerArray peers whose client-to-peer bandwidth will be set * @param direction whether to allocate upload or download bandwidth - * @param history recent bandwidth history for these peers - * @param desiredAvgKB overall bandwidth goal for this set of peers + * @param currentSpeed current speed in KiB/s for this set of peers + * @param desiredSpeed desired speed in KiB/s for this set of peers */ static void setPeerBandwidth( tr_ptrArray * peerArray, const tr_direction direction, - const tr_ratecontrol * ratecontrol, - double desiredAvgKB ) + double currentSpeed, + double desiredSpeed ) { - const int peerCount = tr_ptrArraySize( peerArray ); - const double bytes = allocateHowMuch( desiredAvgKB, ratecontrol ); - const double welfareBytes = MIN( 2048, bytes * 0.2 ); - const double meritBytes = MAX( 0, bytes - welfareBytes ); - tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( peerArray ); + const int MINIMUM_WELFARE_BYTES = bytesPerPulse( 5 ); int i; double welfare; - size_t bytesUsed; + double meritBytes; + double meritMultiplier; + double welfareBytes; + const int peerCount = tr_ptrArraySize( peerArray ); + const double bytes = allocateHowMuch( direction, currentSpeed, desiredSpeed ); + tr_peer ** peers = (tr_peer**) tr_ptrArrayBase( peerArray ); - assert( meritBytes >= 0.0 ); - assert( welfareBytes >= 0.0 ); - assert( direction == TR_UP || direction == TR_DOWN ); + /* how many bytes we'll allocate based on merit. + * + * 1. When just getting started we want to give all the peers a lot + * of `welfare' allocation because we don't know which ones will + * turn out to be productive for us. + * + * 2. When we've reached steady state and are near our bandwidth limit, + * the bandwidth spent on `welfare' is going to come from peers that + * we already know are productive... which is probably a waste. + * + * 3. So we tie the merit/welfare allocations to the current speed. + * the closer the current speed gets to the maximum speed, the less + * welfare we allocate. + * + * 4. We always need to allocate /some/ welfare bytes, otherwise + * the other peers will starve. + */ + meritBytes = bytesPerPulse( MIN( currentSpeed * 1.2, desiredSpeed ) ); + welfareBytes = bytes > meritBytes ? bytes - meritBytes : 0; + if( welfareBytes < MINIMUM_WELFARE_BYTES ) + welfareBytes = MINIMUM_WELFARE_BYTES; + meritBytes = bytes - welfareBytes; + meritMultiplier = currentSpeed > 0.01 ? meritBytes / currentSpeed : 0.0; - for( i=bytesUsed=0; iio, direction ); +#ifdef DEBUG_DIRECTION +if( direction == DEBUG_DIRECTION ) +fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f) - k[%.1f] merit [%.1f] welfare [%.1f]\n", currentSpeed, desiredSpeed, bytes/1024.0, meritBytes/1024.0, welfareBytes/1024.0 ); +#endif - welfare = welfareBytes / peerCount; + /* how much welfare each peer gets */ + welfare = welfareBytes / peerCount; for( i=0; iio, direction ) ) / bytesUsed - : ( meritBytes / peerCount ); - tr_peerIoSetBandwidth( peer->io, direction, merit + welfare ); + const size_t merit = tr_rcRate( peers[i]->pieceSpeed[direction] ) * meritMultiplier; + tr_peerIoAllocateBandwidth( peer->io, direction, merit + welfare ); } } -static size_t -countHandshakeBandwidth( tr_ptrArray * handshakes, - tr_direction direction ) -{ - const int n = tr_ptrArraySize( handshakes ); - int i; - size_t total; - - for( i = total = 0; i < n; ++i ) - { - tr_peerIo * io = tr_handshakeGetIO( tr_ptrArrayNth( handshakes, i ) ); - total += tr_peerIoGetBandwidthUsed( io, direction ); - } - return total; -} - -static size_t -countPeerBandwidth( tr_ptrArray * peers, - tr_direction direction ) -{ - const int n = tr_ptrArraySize( peers ); - int i; - size_t total; - - for( i = total = 0; i < n; ++i ) - { - tr_peer * peer = tr_ptrArrayNth( peers, i ); - total += tr_peerIoGetBandwidthUsed( peer->io, direction ); - } - return total; -} - static void givePeersUnlimitedBandwidth( tr_ptrArray * peers, tr_direction direction ) @@ -2499,21 +2496,34 @@ pumpAllPeers( tr_peerMgr * mgr ) static void getBandwidthPeers( Torrent * t, - tr_direction direction, - tr_ptrArray * appendme ) + tr_direction dir, + tr_ptrArray * appendme, + double * speed ) { int i, peerCount; tr_peer ** peers; + assert( t ); assert( torrentIsLocked( t ) ); + assert( dir == TR_UP || dir == TR_DOWN ); + assert( appendme ); + assert( speed ); peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &peerCount ); for( i=0; imsgs ) - if( ( ( direction == TR_PEER_TO_CLIENT ) && clientIsDownloadingFrom( peers[i] ) ) - || ( ( direction == TR_CLIENT_TO_PEER ) && clientIsUploadingTo( peers[i] ) ) ) - tr_ptrArrayAppend( appendme, peers[i] ); + { + tr_peer * p = peers[i]; + + if( p->msgs ) + { + if( ( ( dir == TR_DOWN ) && clientIsDownloadingFrom( p ) ) || ( ( dir == TR_UP ) && clientIsUploadingTo( p ) ) ) + { + tr_ptrArrayAppend( appendme, p ); + *speed += tr_rcRate( p->pieceSpeed[dir] ); + } + } + } } /** @@ -2523,7 +2533,7 @@ getBandwidthPeers( Torrent * t, * @param direction whether to allocate upload or download bandwidth * @return the amount of directional bandwidth used since the last pulse. */ -static double +static void allocateBandwidth( tr_peerMgr * mgr, tr_direction direction ) { @@ -2531,8 +2541,7 @@ allocateBandwidth( tr_peerMgr * mgr, const int torrentCount = tr_ptrArraySize( mgr->torrents ); Torrent ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents ); tr_ptrArray * globalPool = tr_ptrArrayNew( ); - double allBytesUsed = 0; - size_t poolBytesUsed = 0; + double globalPoolSpeed = 0; int i; assert( mgr ); @@ -2544,22 +2553,12 @@ allocateBandwidth( tr_peerMgr * mgr, for( i=0; itor ) == TR_STATUS_STOPPED ) continue; - used = countPeerBandwidth( t->peers, direction ); - countHandshakeBandwidth( t->outgoingHandshakes, direction ); - - /* remember this torrent's bytes used */ - tr_rcTransferred( t->tor->rawSpeed[direction], used ); - - /* add this torrent's bandwidth use to allBytesUsed */ - allBytesUsed += used; - /* if piece data is disallowed, don't bother limiting bandwidth -- * we won't be asking for, or sending out, any pieces */ if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) ) @@ -2574,37 +2573,26 @@ allocateBandwidth( tr_peerMgr * mgr, givePeersUnlimitedBandwidth( t->peers, direction ); break; - case TR_SPEEDLIMIT_SINGLE: - { + case TR_SPEEDLIMIT_SINGLE: { tr_ptrArray * peers = tr_ptrArrayNew( ); - getBandwidthPeers( t, direction, peers ); - setPeerBandwidth( peers, direction, - t->tor->rawSpeed[direction], - tr_torrentGetSpeedLimit( t->tor, direction ) ); + double speed = 0; + getBandwidthPeers( t, direction, peers, &speed ); + setPeerBandwidth( peers, direction, speed, tr_torrentGetSpeedLimit( t->tor, direction ) ); tr_ptrArrayFree( peers, NULL ); break; } case TR_SPEEDLIMIT_GLOBAL: - getBandwidthPeers( t, direction, globalPool ); - poolBytesUsed += used; + getBandwidthPeers( t, direction, globalPool, &globalPoolSpeed ); break; } } - /* add incoming handshakes to the global pool */ - i = countHandshakeBandwidth( mgr->incomingHandshakes, direction ); - allBytesUsed += i; - poolBytesUsed += i; - - tr_rcTransferred( mgr->globalPoolRawSpeed[direction], poolBytesUsed ); - /* handle the global pool's connections */ if( !tr_sessionIsSpeedLimitEnabled( session, direction ) ) givePeersUnlimitedBandwidth( globalPool, direction ); else - setPeerBandwidth( globalPool, direction, - mgr->globalPoolRawSpeed[direction], + setPeerBandwidth( globalPool, direction, globalPoolSpeed, tr_sessionGetSpeedLimit( session, direction ) ); /* now that we've allocated bandwidth, pump all the connected peers */ @@ -2612,7 +2600,6 @@ allocateBandwidth( tr_peerMgr * mgr, /* cleanup */ tr_ptrArrayFree( globalPool, NULL ); - return allBytesUsed; } static int