(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...(trunk libT) undo r9465 + r9466, which was the experimental new request manager. It still still needs some tinkering before it's ready for the nightlies...

This commit is contained in:
Charles Kerr 2009-11-02 00:17:30 +00:00
parent e47af45db1
commit 4d817b0701
4 changed files with 226 additions and 187 deletions

View File

@ -85,17 +85,10 @@ enum
MYFLAG_UNREACHABLE = 2,
/* the minimum we'll wait before attempting to reconnect to a peer */
MINIMUM_RECONNECT_INTERVAL_SECS = 5,
/* this is how many blocks we'll try to queue up
* for the iterator to walk through */
ITERATOR_BLOCK_BUFFER_SIZE = 4096,
/* if the number of blocks in the iterator queue drops
* below this number, fill it up with more */
ITERATOR_LOW_MARK = 256
MINIMUM_RECONNECT_INTERVAL_SECS = 5
};
/**
***
**/
@ -143,21 +136,12 @@ tr_atomAddrStr( const struct peer_atom * atom )
return tr_peerIoAddrStr( &atom->addr, atom->port );
}
struct tr_blockIteratorItem
{
tr_block_index_t block;
uint8_t requestCount;
};
struct tr_blockIterator
{
tr_bool didLoop;
int pos;
int size;
int begin;
struct tr_blockIteratorItem items[ITERATOR_BLOCK_BUFFER_SIZE];
time_t expirationDate;
struct tr_torrent_peers * t;
tr_priority_t priority;
tr_block_index_t blockIndex, blockCount, *blocks;
tr_piece_index_t pieceIndex, pieceCount, *pieces;
};
typedef struct tr_torrent_peers
@ -173,6 +157,7 @@ typedef struct tr_torrent_peers
tr_peer * optimistic; /* the optimistic peer, or NULL if none */
struct tr_blockIterator * refillQueue; /* used in refillPulse() */
struct tr_peerMgr * manager;
int * pendingRequestCount;
tr_bool isRunning;
}
@ -185,6 +170,7 @@ struct tr_peerMgr
tr_timer * bandwidthTimer;
tr_timer * rechokeTimer;
tr_timer * reconnectTimer;
tr_timer * refillUpkeepTimer;
};
#define tordbg( t, ... ) \
@ -430,6 +416,7 @@ torrentDestructor( void * vt )
tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
tr_ptrArrayDestruct( &t->peers, NULL );
tr_free( t->pendingRequestCount );
tr_free( t );
}
@ -471,6 +458,7 @@ torrentConstructor( tr_peerMgr * manager,
static int bandwidthPulse ( void * vmgr );
static int rechokePulse ( void * vmgr );
static int reconnectPulse ( void * vmgr );
static int refillUpkeep ( void * vmgr );
tr_peerMgr*
tr_peerMgrNew( tr_session * session )
@ -492,6 +480,9 @@ deleteTimers( struct tr_peerMgr * m )
if( m->reconnectTimer )
tr_timerFree( &m->reconnectTimer );
if( m->refillUpkeepTimer )
tr_timerFree( &m->refillUpkeepTimer );
}
void
@ -548,178 +539,213 @@ tr_peerMgrPeerIsSeed( const tr_torrent * tor,
*****
****/
static void
assertValidPiece( Torrent * t, tr_piece_index_t piece )
{
assert( t );
assert( t->tor );
assert( piece < t->tor->info.pieceCount );
}
static int
getPieceRequests( Torrent * t, tr_piece_index_t piece )
{
assertValidPiece( t, piece );
return t->pendingRequestCount ? t->pendingRequestCount[piece] : 0;
}
static void
incrementPieceRequests( Torrent * t, tr_piece_index_t piece )
{
assertValidPiece( t, piece );
if( t->pendingRequestCount == NULL )
t->pendingRequestCount = tr_new0( int, t->tor->info.pieceCount );
t->pendingRequestCount[piece]++;
}
static void
decrementPieceRequests( Torrent * t, tr_piece_index_t piece )
{
assertValidPiece( t, piece );
if( t->pendingRequestCount )
t->pendingRequestCount[piece]--;
}
struct tr_refill_piece
{
tr_priority_t priority;
uint32_t piece;
uint32_t peerCount;
int random;
int pendingRequestCount;
int missingBlockCount;
};
static int
compareRefillPiece( const void * aIn, const void * bIn )
{
const struct tr_refill_piece * a = aIn;
const struct tr_refill_piece * b = bIn;
/* if one piece has a higher priority, it goes first */
if( a->priority != b->priority )
return a->priority > b->priority ? -1 : 1;
/* have a per-priority endgame */
if( a->pendingRequestCount != b->pendingRequestCount )
return a->pendingRequestCount < b->pendingRequestCount ? -1 : 1;
/* fewer missing pieces goes first */
if( a->missingBlockCount != b->missingBlockCount )
return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
/* otherwise if one has fewer peers, it goes first */
if( a->peerCount != b->peerCount )
return a->peerCount < b->peerCount ? -1 : 1;
/* otherwise go with our random seed */
if( a->random != b->random )
return a->random < b->random ? -1 : 1;
return 0;
}
static tr_piece_index_t *
getPreferredPieces( Torrent * t, tr_piece_index_t * pieceCount )
{
const tr_torrent * tor = t->tor;
const tr_info * inf = &tor->info;
tr_piece_index_t i;
tr_piece_index_t poolSize = 0;
tr_piece_index_t * pool = tr_new( tr_piece_index_t , inf->pieceCount );
int peerCount;
const tr_peer ** peers;
assert( torrentIsLocked( t ) );
peers = (const tr_peer**) tr_ptrArrayBase( &t->peers );
peerCount = tr_ptrArraySize( &t->peers );
/* make a list of the pieces that we want but don't have */
for( i = 0; i < inf->pieceCount; ++i )
if( !tor->info.pieces[i].dnd
&& !tr_cpPieceIsComplete( &tor->completion, i ) )
pool[poolSize++] = i;
/* sort the pool by which to request next */
if( poolSize > 1 )
{
tr_piece_index_t j;
struct tr_refill_piece * p = tr_new( struct tr_refill_piece, poolSize );
for( j = 0; j < poolSize; ++j )
{
int k;
const tr_piece_index_t piece = pool[j];
struct tr_refill_piece * setme = p + j;
setme->piece = piece;
setme->priority = inf->pieces[piece].priority;
setme->peerCount = 0;
setme->random = tr_cryptoWeakRandInt( INT_MAX );
setme->pendingRequestCount = getPieceRequests( t, piece );
setme->missingBlockCount
= tr_cpMissingBlocksInPiece( &tor->completion, piece );
for( k = 0; k < peerCount; ++k )
{
const tr_peer * peer = peers[k];
if( peer->peerIsInterested
&& !peer->clientIsChoked
&& tr_bitfieldHas( peer->have, piece ) )
++setme->peerCount;
}
}
qsort( p, poolSize, sizeof( struct tr_refill_piece ),
compareRefillPiece );
for( j = 0; j < poolSize; ++j )
pool[j] = p[j].piece;
tr_free( p );
}
*pieceCount = poolSize;
return pool;
}
static struct tr_blockIterator*
blockIteratorNew( Torrent * t )
{
struct tr_blockIterator * i = tr_new0( struct tr_blockIterator, 1 );
i->pos = 0;
i->size = 0;
i->priority = TR_PRI_HIGH;
i->expirationDate = time( NULL ) + PIECE_LIST_SHELF_LIFE_SECS;
i->t = t;
i->pieces = getPreferredPieces( t, &i->pieceCount );
i->blocks = tr_new0( tr_block_index_t, t->tor->blockCountInPiece );
tordbg( t, "creating new refill queue.. it contains %"PRIu32" pieces", i->pieceCount );
return i;
}
static int
compareIteratorItems( const void * va, const void * vb )
{
const struct tr_blockIteratorItem * a = va;
const struct tr_blockIteratorItem * b = vb;
if( a->block < b->block ) return -1;
if( a->block > b->block ) return 1;
return 0;
}
static void
blockIteratorRefill( struct tr_blockIterator * it )
{
int pieceCount;
tr_piece_index_t * pieces;
const tr_torrent * tor = it->t->tor;
const tr_info * inf = tr_torrentInfo( tor );
/* build a pool of the pieces we might request blocks from */
pieces = tr_new( tr_piece_index_t, inf->pieceCount );
pieceCount = 0;
{
tr_piece_index_t i;
for( i=0; i<inf->pieceCount; ++i )
if( !inf->pieces[i].dnd )
if( inf->pieces[i].priority == it->priority )
if( !tr_cpPieceIsComplete( &tor->completion, i ) )
pieces[pieceCount++] = i;
}
/* while we're short on blocks and there are still pieces left... */
while( ( it->size < ITERATOR_BLOCK_BUFFER_SIZE ) && ( pieceCount > 0 ) )
{
tr_block_index_t i;
tr_block_index_t b;
tr_block_index_t e;
/* pull a random piece out of the pool */
const int poolIndex = tr_cryptoRandInt( pieceCount );
const tr_piece_index_t piece = pieces[poolIndex];
pieces[poolIndex] = pieces[--pieceCount];
/* add the piece's blocks that we don't have to our iterator */
b = tr_torPieceFirstBlock( tor, piece );
e = b + tr_torPieceCountBlocks( tor, piece );
for( i=b; (i!=e) && (it->size < ITERATOR_BLOCK_BUFFER_SIZE); ++i ) {
if( !tr_cpBlockIsCompleteFast( &tor->completion, i ) ) {
int pos;
struct tr_blockIteratorItem tmp;
tr_bool match;
tmp.block = i;
tmp.requestCount = 0;
pos = tr_lowerBound( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems, &match );
if( match )
continue;
memmove( it->items+pos+1, it->items+pos, sizeof(struct tr_blockIteratorItem)*(it->size-pos) );
it->items[pos] = tmp;
++it->size;
}
}
}
if( it->pos >= it->size )
it->pos = 0;
/* cleanup */
tr_free( pieces );
}
static void
blockIteratorRewind( struct tr_blockIterator * it )
{
/* if we don't have any blocks in the iterator,
* try to regenerate a list of blocks in the current priority.
* if that fails, go to the next lower priority and retry. */
for( ;; ) {
if( it->size <= ITERATOR_LOW_MARK )
blockIteratorRefill( it );
if( it->size > 0 )
break;
if( it->priority == TR_PRI_LOW )
return;
--it->priority; /* try a lower priority */
}
it->begin = it->pos;
it->didLoop = FALSE;
}
static tr_bool
blockIteratorNext( struct tr_blockIterator * it, tr_block_index_t * setme )
blockIteratorNext( struct tr_blockIterator * i, tr_block_index_t * setme )
{
static const int single_req_threshold = 100;
static const int double_req_threshold = 50;
tr_bool found;
Torrent * t = i->t;
tr_torrent * tor = t->tor;
while( !it->didLoop )
while( ( i->blockIndex == i->blockCount )
&& ( i->pieceIndex < i->pieceCount ) )
{
++it->pos;
it->pos %= it->size;
it->didLoop |= it->pos == it->begin;
const tr_piece_index_t index = i->pieces[i->pieceIndex++];
const tr_block_index_t b = tr_torPieceFirstBlock( tor, index );
const tr_block_index_t e = b + tr_torPieceCountBlocks( tor, index );
tr_block_index_t block;
if( ( it->items[it->pos].requestCount >= 1 ) && ( it->size >= single_req_threshold ) )
continue;
if( ( it->items[it->pos].requestCount >= 2 ) && ( it->size >= double_req_threshold ) )
continue;
assert( index < tor->info.pieceCount );
*setme = it->items[it->pos].block;
return TRUE;
i->blockCount = 0;
i->blockIndex = 0;
for( block=b; block!=e; ++block )
if( !tr_cpBlockIsCompleteFast( &tor->completion, block ) )
i->blocks[i->blockCount++] = block;
}
return FALSE;
}
static void
blockIteratorInvalidate( struct tr_blockIterator * it )
{
it->size = 0;
it->priority = TR_PRI_HIGH;
}
assert( i->blockCount <= tor->blockCountInPiece );
void
tr_peerMgrFilePrioritiesChanged( tr_torrent * tor )
{
if( ( tor != NULL ) && ( tor->torrentPeers != NULL ) && ( tor->torrentPeers->refillQueue != NULL ) )
blockIteratorInvalidate( tor->torrentPeers->refillQueue );
if(( found = ( i->blockIndex < i->blockCount )))
*setme = i->blocks[i->blockIndex++];
return found;
}
static void
blockIteratorRemoveBlock( struct tr_blockIterator * it, tr_block_index_t block )
blockIteratorSkipCurrentPiece( struct tr_blockIterator * i )
{
if( it != NULL )
{
struct tr_blockIteratorItem tmp, *pos;
tmp.block = block;
pos = bsearch( &tmp, it->items, it->size, sizeof(struct tr_blockIteratorItem), compareIteratorItems );
if( pos != NULL )
{
const int i = pos - it->items;
assert( pos->block == block );
assert( i >= 0 );
assert( i < it->size );
--it->size;
memmove( it->items+i, it->items+i+1, sizeof(struct tr_blockIteratorItem) * (it->size-i) );
if( it->pos > i )
--it->pos;
}
}
i->blockIndex = i->blockCount;
}
static void
blockIteratorFree( struct tr_blockIterator ** inout )
{
tr_free( *inout );
struct tr_blockIterator * it = *inout;
if( it != NULL )
{
tr_free( it->blocks );
tr_free( it->pieces );
tr_free( it );
}
*inout = NULL;
}
static tr_peer**
getPeersUploadingToClient( Torrent * t, int * setmeCount )
getPeersUploadingToClient( Torrent * t,
int * setmeCount )
{
int j;
int peerCount = 0;
@ -756,6 +782,27 @@ getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
return (uint32_t)( blockPos - piecePos );
}
static int
refillUpkeep( void * vmgr )
{
tr_torrent * tor = NULL;
tr_peerMgr * mgr = vmgr;
time_t now;
managerLock( mgr );
now = time( NULL );
while(( tor = tr_torrentNext( mgr->session, tor ))) {
Torrent * t = tor->torrentPeers;
if( t && t->refillQueue && ( t->refillQueue->expirationDate <= now ) ) {
tordbg( t, "refill queue is past its shelf date; discarding." );
blockIteratorFree( &t->refillQueue );
}
}
managerUnlock( mgr );
return TRUE;
}
static void
sortPeersByLiveliness( tr_peer ** peers, void ** clientData, int n, uint64_t now );
@ -773,10 +820,8 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
if( !t->isRunning )
return;
if( tr_torrentIsSeed( t->tor ) ) {
blockIteratorFree( &t->refillQueue );
if( tr_torrentIsSeed( t->tor ) )
return;
}
torrentLock( t );
tordbg( t, "Refilling Request Buffers..." );
@ -790,8 +835,6 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
webseeds = tr_memdup( tr_ptrArrayBase( &t->webseeds ),
webseedCount * sizeof( tr_webseed* ) );
blockIteratorRewind( t->refillQueue );
while( ( webseedCount || peerCount )
&& (( hasNext = blockIteratorNext( t->refillQueue, &block ))) )
{
@ -821,7 +864,7 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
break;
case TR_ADDREQ_OK:
++t->refillQueue->items[t->refillQueue->pos].requestCount;
incrementPieceRequests( t, index );
handled = TRUE;
break;
@ -842,6 +885,7 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
break;
case TR_ADDREQ_OK:
incrementPieceRequests( t, index );
handled = TRUE;
break;
@ -850,12 +894,20 @@ refillPulse( int fd UNUSED, short type UNUSED, void * vtorrent )
break;
}
}
if( !handled )
blockIteratorSkipCurrentPiece( t->refillQueue );
}
/* cleanup */
tr_free( webseeds );
tr_free( peers );
if( !hasNext ) {
tordbg( t, "refill queue has no more blocks to request... freeing (webseed count: %d, peer count: %d)", webseedCount, peerCount );
blockIteratorFree( &t->refillQueue );
}
torrentUnlock( t );
}
@ -987,7 +1039,7 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
break;
case TR_PEER_CANCEL:
blockIteratorRemoveBlock( t->refillQueue, _tr_block( t->tor, e->pieceIndex, e->offset ) );
decrementPieceRequests( t, e->pieceIndex );
break;
case TR_PEER_PEER_GOT_DATA:
@ -1081,7 +1133,7 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
tr_cpBlockAdd( &tor->completion, block );
tr_torrentSetDirty( tor );
blockIteratorRemoveBlock( t->refillQueue, block );
decrementPieceRequests( t, e->pieceIndex );
broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
@ -1568,6 +1620,9 @@ ensureMgrTimersExist( struct tr_peerMgr * m )
if( m->reconnectTimer == NULL )
m->reconnectTimer = tr_timerNew( s, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
if( m->refillUpkeepTimer == NULL )
m->refillUpkeepTimer = tr_timerNew( s, refillUpkeep, m, REFILL_UPKEEP_PERIOD_MSEC );
}
void

View File

@ -118,8 +118,6 @@ void tr_peerMgrFree( tr_peerMgr * manager );
tr_bool tr_peerMgrPeerIsSeed( const tr_torrent * tor,
const tr_address * addr );
void tr_peerMgrFilePrioritiesChanged( tr_torrent * tor );
void tr_peerMgrAddIncoming( tr_peerMgr * manager,
tr_address * addr,
tr_port port,

View File

@ -40,8 +40,6 @@
#include "utils.h"
#include "version.h"
// #ifdef TRACK_DUPES
/**
***
**/
@ -460,17 +458,10 @@ firePeerProgress( tr_peermsgs * msgs )
publish( msgs, &e );
}
#ifdef TRACK_DUPES
static double blocksGotten = 0.0;
#endif
static void
fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peer_event e = blankEvent;
#ifdef TRACK_DUPES
++blocksGotten;
#endif
e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
e.pieceIndex = req->index;
e.offset = req->offset;
@ -1567,10 +1558,6 @@ decrementDownloadedCount( tr_peermsgs * msgs, uint32_t byteCount )
static TR_INLINE void
clientGotUnwantedBlock( tr_peermsgs * msgs, const struct peer_request * req )
{
#ifdef TRACK_DUPES
static double unwantedGotten = 0.0;
fprintf( stderr, "dupe ratio: %f\n", ++unwantedGotten / blocksGotten );
#endif
decrementDownloadedCount( msgs, req->length );
}

View File

@ -1685,7 +1685,6 @@ tr_torrentSetFilePriorities( tr_torrent * tor,
for( i = 0; i < fileCount; ++i )
tr_torrentInitFilePriority( tor, files[i], priority );
tr_peerMgrFilePrioritiesChanged( tor );
tr_torrentSetDirty( tor );
tr_torrentUnlock( tor );
}