1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2025-03-12 07:03:44 +00:00

(trunk libT) #2430 "Peer atom pool grows too large" -- add an atom expiration system along the lines of the suggestions in this ticket's comments. jch and KyleK, let me know if you think anything in this commit needs improvement.

This commit is contained in:
Charles Kerr 2009-11-26 05:13:58 +00:00
parent 3e921edd2a
commit 1101f6710e
5 changed files with 181 additions and 22 deletions

View file

@ -41,6 +41,9 @@
enum
{
/* how frequently to cull old atoms */
ATOM_PERIOD_MSEC = ( 60 * 1000 ),
/* how frequently to change which peers are choked */
RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
@ -120,6 +123,10 @@ struct peer_atom
tr_address addr;
time_t time; /* when the peer's connection status last changed */
time_t piece_data_time;
/* similar to a TTL field, but less rigid --
* if the swarm is small, the atom will be kept past this date. */
time_t shelf_date;
};
static tr_bool
@ -185,6 +192,7 @@ struct tr_peerMgr
tr_timer * rechokeTimer;
tr_timer * reconnectTimer;
tr_timer * refillUpkeepTimer;
tr_timer * atomTimer;
};
#define tordbg( t, ... ) \
@ -267,7 +275,7 @@ comparePeerAtomToAddress( const void * va, const void * vb )
}
static int
comparePeerAtoms( const void * va, const void * vb )
compareAtomsByAddress( const void * va, const void * vb )
{
const struct peer_atom * b = vb;
@ -463,12 +471,6 @@ torrentConstructor( tr_peerMgr * manager,
return t;
}
static int bandwidthPulse ( void * vmgr );
static int rechokePulse ( void * vmgr );
static int reconnectPulse ( void * vmgr );
static int refillUpkeep ( void * vmgr );
tr_peerMgr*
tr_peerMgrNew( tr_session * session )
{
@ -481,6 +483,9 @@ tr_peerMgrNew( tr_session * session )
static void
deleteTimers( struct tr_peerMgr * m )
{
if( m->atomTimer )
tr_timerFree( &m->atomTimer );
if( m->bandwidthTimer )
tr_timerFree( &m->bandwidthTimer );
@ -1385,12 +1390,31 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
torrentUnlock( t );
}
static int
getDefaultShelfLife( uint8_t from )
{
/* in general, peers obtained from firsthand contact
* are better than those from secondhand, etc etc */
switch( from )
{
case TR_PEER_FROM_INCOMING : return 60 * 60 * 8;
case TR_PEER_FROM_LTEP : return 60 * 60 * 8;
case TR_PEER_FROM_TRACKER : return 60 * 60 * 4;
case TR_PEER_FROM_PEX : return 60 * 60 * 2;
case TR_PEER_FROM_RESUME : return 60 * 60;
case TR_PEER_FROM_DHT : return 60 * 40;
default : return 60 * 60;
}
}
static void
ensureAtomExists( Torrent * t,
const tr_address * addr,
tr_port port,
uint8_t flags,
uint8_t from )
ensureAtomExists( Torrent * t,
const time_t now,
const tr_address * addr,
const tr_port port,
const uint8_t flags,
const uint8_t from )
{
assert( tr_isAddress( addr ) );
assert( from < TR_PEER_FROM__MAX );
@ -1398,13 +1422,17 @@ ensureAtomExists( Torrent * t,
if( getExistingAtom( t, addr ) == NULL )
{
struct peer_atom * a;
const int jitter = tr_cryptoWeakRandInt( 120 );
a = tr_new0( struct peer_atom, 1 );
a->addr = *addr;
a->port = port;
a->flags = flags;
a->from = from;
a->shelf_date = now + getDefaultShelfLife( from ) + jitter;
tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms );
}
}
@ -1472,9 +1500,11 @@ myHandshakeDoneCB( tr_handshake * handshake,
else /* looking good */
{
struct peer_atom * atom;
ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
const time_t now = time( NULL );
ensureAtomExists( t, now, addr, port, 0, TR_PEER_FROM_INCOMING );
atom = getExistingAtom( t, addr );
atom->time = time( NULL );
atom->time = now;
atom->piece_data_time = 0;
if( atom->myflags & MYFLAG_BANNED )
@ -1584,7 +1614,7 @@ tr_peerMgrAddPex( tr_torrent * tor,
if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
ensureAtomExists( t, time( NULL ), &pex->addr, pex->port, pex->flags, from );
managerUnlock( t->manager );
}
@ -1824,11 +1854,19 @@ tr_peerMgrGetPeers( tr_torrent * tor,
return count;
}
static int atomPulse ( void * vmgr );
static int bandwidthPulse ( void * vmgr );
static int rechokePulse ( void * vmgr );
static int reconnectPulse ( void * vmgr );
static void
ensureMgrTimersExist( struct tr_peerMgr * m )
{
tr_session * s = m->session;
if( m->atomTimer == NULL )
m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC );
if( m->bandwidthTimer == NULL )
m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
@ -2907,3 +2945,124 @@ bandwidthPulse( void * vmgr )
managerUnlock( mgr );
return TRUE;
}
/***
****
***/
static int
compareAtomPtrsByAddress( const void * va, const void *vb )
{
const struct peer_atom * a = * (const struct peer_atom**) va;
const struct peer_atom * b = * (const struct peer_atom**) vb;
assert( tr_isAtom( a ) );
assert( tr_isAtom( b ) );
return tr_compareAddresses( &a->addr, &b->addr );
}
static time_t tr_now = 0;
/* best come first, worst go last */
static int
compareAtomPtrsByShelfDate( const void * va, const void *vb )
{
time_t atime;
time_t btime;
const struct peer_atom * a = * (const struct peer_atom**) va;
const struct peer_atom * b = * (const struct peer_atom**) vb;
const int data_time_cutoff_secs = 60 * 60;
assert( tr_isAtom( a ) );
assert( tr_isAtom( b ) );
/* primary key: the last piece data time *if* it was within the last hour */
atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0;
btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0;
if( atime != btime )
return atime > btime ? -1 : 1;
/* secondary key: shelf date. */
if( a->shelf_date != b->shelf_date )
return a->shelf_date > b->shelf_date ? -1 : 1;
return 0;
}
static int
getMaxAtomCount( const tr_torrent * tor )
{
/* FIXME: this curve should be smoother... */
const int n = tor->maxConnectedPeers;
if( n >= 200 ) return n * 1.5;
if( n >= 100 ) return n * 2;
if( n >= 50 ) return n * 3;
if( n >= 20 ) return n * 5;
return n * 10;
}
static int
atomPulse( void * vmgr )
{
tr_torrent * tor = NULL;
tr_peerMgr * mgr = vmgr;
managerLock( mgr );
while(( tor = tr_torrentNext( mgr->session, tor )))
{
int atomCount;
Torrent * t = tor->torrentPeers;
const int maxAtomCount = getMaxAtomCount( tor );
struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */
{
int i;
int keepCount = 0;
int testCount = 0;
struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount );
struct peer_atom ** test = tr_new( struct peer_atom*, atomCount );
tordbg( t, "max atom count is %d; have %d.. pruning\n", maxAtomCount, atomCount );
/* keep the ones that are in use */
for( i=0; i<atomCount; ++i ) {
struct peer_atom * atom = atoms[i];
if( peerIsInUse( t, &atom->addr ) )
keep[keepCount++] = atom;
else
test[testCount++] = atom;
}
/* if there's room, keep the best of what's left */
i = 0;
if( keepCount < maxAtomCount ) {
tr_now = time( NULL );
qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate );
while( i<testCount && keepCount<maxAtomCount )
keep[keepCount++] = test[i++];
}
/* free the culled atoms */
while( i<testCount )
tr_free( test[i++] );
/* rebuild Torrent.pool with what's left */
tr_ptrArrayDestruct( &t->pool, NULL );
t->pool = TR_PTR_ARRAY_INIT;
qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress );
for( i=0; i<keepCount; ++i ) {
if( i+1<keepCount )
assert( tr_compareAddresses( &keep[i]->addr, &keep[i+1]->addr ) < 0 );
tr_ptrArrayAppend( &t->pool, keep[i] );
}
/* cleanup */
tr_free( test );
tr_free( keep );
}
}
managerUnlock( mgr );
return TRUE;
}

View file

@ -984,13 +984,13 @@ parseLtepHandshake( tr_peermsgs * msgs,
if( tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len) && addr_len == 4 ) {
pex.addr.type = TR_AF_INET;
memcpy( &pex.addr.addr.addr4, addr, 4 );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_ALT, &pex );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
}
if( tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len) && addr_len == 16 ) {
pex.addr.type = TR_AF_INET6;
memcpy( &pex.addr.addr.addr6, addr, 16 );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_ALT, &pex );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
}
/* get peer's maximum request queue size */

View file

@ -115,7 +115,7 @@ addPeers( tr_torrent * tor, const uint8_t * buf, int buflen )
memcpy( &pex, buf + ( i * sizeof( tr_pex ) ), sizeof( tr_pex ) );
if( tr_isPex( &pex ) )
{
tr_peerMgrAddPex( tor, TR_PEER_FROM_CACHE, &pex );
tr_peerMgrAddPex( tor, TR_PEER_FROM_RESUME, &pex );
++numAdded;
}
}

View file

@ -514,7 +514,7 @@ addField( const tr_torrent * tor, tr_benc * d, const char * key )
{
tr_benc * tmp = tr_bencDictAddDict( d, key, 4 );
const int * f = st->peersFrom;
tr_bencDictAddInt( tmp, "fromCache", f[TR_PEER_FROM_CACHE] );
tr_bencDictAddInt( tmp, "fromCache", f[TR_PEER_FROM_RESUME] );
tr_bencDictAddInt( tmp, "fromIncoming", f[TR_PEER_FROM_INCOMING] );
tr_bencDictAddInt( tmp, "fromPex", f[TR_PEER_FROM_PEX] );
tr_bencDictAddInt( tmp, "fromTracker", f[TR_PEER_FROM_TRACKER] );

View file

@ -1580,9 +1580,9 @@ enum
TR_PEER_FROM_INCOMING = 0, /* connections made to the listening port */
TR_PEER_FROM_TRACKER = 1, /* peers received from a tracker */
TR_PEER_FROM_DHT = 2, /* peers learnt from the DHT */
TR_PEER_FROM_CACHE = 3, /* peers read from the peer cache */
TR_PEER_FROM_RESUME = 3, /* peers read from the .resume file */
TR_PEER_FROM_PEX = 4, /* peers discovered via PEX */
TR_PEER_FROM_ALT = 5, /* alternate peer address */
TR_PEER_FROM_LTEP = 5, /* peer address provided in an LTEP handshake */
TR_PEER_FROM__MAX
};