From 1101f6710e8bd52c2c647126f328b518f5f9a295 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Thu, 26 Nov 2009 05:13:58 +0000 Subject: [PATCH] (trunk libT) #2430 "Peer atom pool grows too large" -- add an atom expiration system along the lines of the suggestions in this ticket's comments. jch and KyleK, let me know if you think anything in this commit needs improvement. --- libtransmission/peer-mgr.c | 191 ++++++++++++++++++++++++++++++--- libtransmission/peer-msgs.c | 4 +- libtransmission/resume.c | 2 +- libtransmission/rpcimpl.c | 2 +- libtransmission/transmission.h | 4 +- 5 files changed, 181 insertions(+), 22 deletions(-) diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index 3330ea537..299cad091 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -41,6 +41,9 @@ enum { + /* how frequently to cull old atoms */ + ATOM_PERIOD_MSEC = ( 60 * 1000 ), + /* how frequently to change which peers are choked */ RECHOKE_PERIOD_MSEC = ( 10 * 1000 ), @@ -120,6 +123,10 @@ struct peer_atom tr_address addr; time_t time; /* when the peer's connection status last changed */ time_t piece_data_time; + + /* similar to a TTL field, but less rigid -- + * if the swarm is small, the atom will be kept past this date. */ + time_t shelf_date; }; static tr_bool @@ -185,6 +192,7 @@ struct tr_peerMgr tr_timer * rechokeTimer; tr_timer * reconnectTimer; tr_timer * refillUpkeepTimer; + tr_timer * atomTimer; }; #define tordbg( t, ... ) \ @@ -267,7 +275,7 @@ comparePeerAtomToAddress( const void * va, const void * vb ) } static int -comparePeerAtoms( const void * va, const void * vb ) +compareAtomsByAddress( const void * va, const void * vb ) { const struct peer_atom * b = vb; @@ -463,12 +471,6 @@ torrentConstructor( tr_peerMgr * manager, return t; } - -static int bandwidthPulse ( void * vmgr ); -static int rechokePulse ( void * vmgr ); -static int reconnectPulse ( void * vmgr ); -static int refillUpkeep ( void * vmgr ); - tr_peerMgr* tr_peerMgrNew( tr_session * session ) { @@ -481,6 +483,9 @@ tr_peerMgrNew( tr_session * session ) static void deleteTimers( struct tr_peerMgr * m ) { + if( m->atomTimer ) + tr_timerFree( &m->atomTimer ); + if( m->bandwidthTimer ) tr_timerFree( &m->bandwidthTimer ); @@ -1385,12 +1390,31 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt ) torrentUnlock( t ); } +static int +getDefaultShelfLife( uint8_t from ) +{ + /* in general, peers obtained from firsthand contact + * are better than those from secondhand, etc etc */ + switch( from ) + { + case TR_PEER_FROM_INCOMING : return 60 * 60 * 8; + case TR_PEER_FROM_LTEP : return 60 * 60 * 8; + case TR_PEER_FROM_TRACKER : return 60 * 60 * 4; + case TR_PEER_FROM_PEX : return 60 * 60 * 2; + case TR_PEER_FROM_RESUME : return 60 * 60; + case TR_PEER_FROM_DHT : return 60 * 40; + default : return 60 * 60; + } +} + + static void -ensureAtomExists( Torrent * t, - const tr_address * addr, - tr_port port, - uint8_t flags, - uint8_t from ) +ensureAtomExists( Torrent * t, + const time_t now, + const tr_address * addr, + const tr_port port, + const uint8_t flags, + const uint8_t from ) { assert( tr_isAddress( addr ) ); assert( from < TR_PEER_FROM__MAX ); @@ -1398,13 +1422,17 @@ ensureAtomExists( Torrent * t, if( getExistingAtom( t, addr ) == NULL ) { struct peer_atom * a; + const int jitter = tr_cryptoWeakRandInt( 120 ); + a = tr_new0( struct peer_atom, 1 ); a->addr = *addr; a->port = port; a->flags = flags; a->from = from; + a->shelf_date = now + getDefaultShelfLife( from ) + jitter; + tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress ); + tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) ); - tr_ptrArrayInsertSorted( &t->pool, a, comparePeerAtoms ); } } @@ -1472,9 +1500,11 @@ myHandshakeDoneCB( tr_handshake * handshake, else /* looking good */ { struct peer_atom * atom; - ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING ); + const time_t now = time( NULL ); + + ensureAtomExists( t, now, addr, port, 0, TR_PEER_FROM_INCOMING ); atom = getExistingAtom( t, addr ); - atom->time = time( NULL ); + atom->time = now; atom->piece_data_time = 0; if( atom->myflags & MYFLAG_BANNED ) @@ -1584,7 +1614,7 @@ tr_peerMgrAddPex( tr_torrent * tor, if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) ) if( tr_isValidPeerAddress( &pex->addr, pex->port ) ) - ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from ); + ensureAtomExists( t, time( NULL ), &pex->addr, pex->port, pex->flags, from ); managerUnlock( t->manager ); } @@ -1824,11 +1854,19 @@ tr_peerMgrGetPeers( tr_torrent * tor, return count; } +static int atomPulse ( void * vmgr ); +static int bandwidthPulse ( void * vmgr ); +static int rechokePulse ( void * vmgr ); +static int reconnectPulse ( void * vmgr ); + static void ensureMgrTimersExist( struct tr_peerMgr * m ) { tr_session * s = m->session; + if( m->atomTimer == NULL ) + m->atomTimer = tr_timerNew( s, atomPulse, m, ATOM_PERIOD_MSEC ); + if( m->bandwidthTimer == NULL ) m->bandwidthTimer = tr_timerNew( s, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC ); @@ -2907,3 +2945,124 @@ bandwidthPulse( void * vmgr ) managerUnlock( mgr ); return TRUE; } + +/*** +**** +***/ + +static int +compareAtomPtrsByAddress( const void * va, const void *vb ) +{ + const struct peer_atom * a = * (const struct peer_atom**) va; + const struct peer_atom * b = * (const struct peer_atom**) vb; + + assert( tr_isAtom( a ) ); + assert( tr_isAtom( b ) ); + + return tr_compareAddresses( &a->addr, &b->addr ); +} + +static time_t tr_now = 0; + +/* best come first, worst go last */ +static int +compareAtomPtrsByShelfDate( const void * va, const void *vb ) +{ + time_t atime; + time_t btime; + const struct peer_atom * a = * (const struct peer_atom**) va; + const struct peer_atom * b = * (const struct peer_atom**) vb; + const int data_time_cutoff_secs = 60 * 60; + + assert( tr_isAtom( a ) ); + assert( tr_isAtom( b ) ); + + /* primary key: the last piece data time *if* it was within the last hour */ + atime = a->piece_data_time; if( atime + data_time_cutoff_secs < tr_now ) atime = 0; + btime = b->piece_data_time; if( btime + data_time_cutoff_secs < tr_now ) btime = 0; + if( atime != btime ) + return atime > btime ? -1 : 1; + + /* secondary key: shelf date. */ + if( a->shelf_date != b->shelf_date ) + return a->shelf_date > b->shelf_date ? -1 : 1; + + return 0; +} + +static int +getMaxAtomCount( const tr_torrent * tor ) +{ + /* FIXME: this curve should be smoother... */ + const int n = tor->maxConnectedPeers; + if( n >= 200 ) return n * 1.5; + if( n >= 100 ) return n * 2; + if( n >= 50 ) return n * 3; + if( n >= 20 ) return n * 5; + return n * 10; +} + +static int +atomPulse( void * vmgr ) +{ + tr_torrent * tor = NULL; + tr_peerMgr * mgr = vmgr; + managerLock( mgr ); + + while(( tor = tr_torrentNext( mgr->session, tor ))) + { + int atomCount; + Torrent * t = tor->torrentPeers; + const int maxAtomCount = getMaxAtomCount( tor ); + struct peer_atom ** atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount ); + + if( atomCount > maxAtomCount ) /* we've got too many atoms... time to prune */ + { + int i; + int keepCount = 0; + int testCount = 0; + struct peer_atom ** keep = tr_new( struct peer_atom*, atomCount ); + struct peer_atom ** test = tr_new( struct peer_atom*, atomCount ); + tordbg( t, "max atom count is %d; have %d.. pruning\n", maxAtomCount, atomCount ); + + /* keep the ones that are in use */ + for( i=0; iaddr ) ) + keep[keepCount++] = atom; + else + test[testCount++] = atom; + } + + /* if there's room, keep the best of what's left */ + i = 0; + if( keepCount < maxAtomCount ) { + tr_now = time( NULL ); + qsort( test, testCount, sizeof( struct peer_atom * ), compareAtomPtrsByShelfDate ); + while( ipool, NULL ); + t->pool = TR_PTR_ARRAY_INIT; + qsort( keep, keepCount, sizeof( struct peer_atom * ), compareAtomPtrsByAddress ); + for( i=0; iaddr, &keep[i+1]->addr ) < 0 ); + tr_ptrArrayAppend( &t->pool, keep[i] ); + } + + /* cleanup */ + tr_free( test ); + tr_free( keep ); + } + } + + managerUnlock( mgr ); + return TRUE; +} diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index 337988965..575a268e4 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -984,13 +984,13 @@ parseLtepHandshake( tr_peermsgs * msgs, if( tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len) && addr_len == 4 ) { pex.addr.type = TR_AF_INET; memcpy( &pex.addr.addr.addr4, addr, 4 ); - tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_ALT, &pex ); + tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex ); } if( tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len) && addr_len == 16 ) { pex.addr.type = TR_AF_INET6; memcpy( &pex.addr.addr.addr6, addr, 16 ); - tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_ALT, &pex ); + tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex ); } /* get peer's maximum request queue size */ diff --git a/libtransmission/resume.c b/libtransmission/resume.c index e5fdc66af..471a6ed68 100644 --- a/libtransmission/resume.c +++ b/libtransmission/resume.c @@ -115,7 +115,7 @@ addPeers( tr_torrent * tor, const uint8_t * buf, int buflen ) memcpy( &pex, buf + ( i * sizeof( tr_pex ) ), sizeof( tr_pex ) ); if( tr_isPex( &pex ) ) { - tr_peerMgrAddPex( tor, TR_PEER_FROM_CACHE, &pex ); + tr_peerMgrAddPex( tor, TR_PEER_FROM_RESUME, &pex ); ++numAdded; } } diff --git a/libtransmission/rpcimpl.c b/libtransmission/rpcimpl.c index 7e111d86d..11b096e11 100644 --- a/libtransmission/rpcimpl.c +++ b/libtransmission/rpcimpl.c @@ -514,7 +514,7 @@ addField( const tr_torrent * tor, tr_benc * d, const char * key ) { tr_benc * tmp = tr_bencDictAddDict( d, key, 4 ); const int * f = st->peersFrom; - tr_bencDictAddInt( tmp, "fromCache", f[TR_PEER_FROM_CACHE] ); + tr_bencDictAddInt( tmp, "fromCache", f[TR_PEER_FROM_RESUME] ); tr_bencDictAddInt( tmp, "fromIncoming", f[TR_PEER_FROM_INCOMING] ); tr_bencDictAddInt( tmp, "fromPex", f[TR_PEER_FROM_PEX] ); tr_bencDictAddInt( tmp, "fromTracker", f[TR_PEER_FROM_TRACKER] ); diff --git a/libtransmission/transmission.h b/libtransmission/transmission.h index 6f107ac82..b606f98db 100644 --- a/libtransmission/transmission.h +++ b/libtransmission/transmission.h @@ -1580,9 +1580,9 @@ enum TR_PEER_FROM_INCOMING = 0, /* connections made to the listening port */ TR_PEER_FROM_TRACKER = 1, /* peers received from a tracker */ TR_PEER_FROM_DHT = 2, /* peers learnt from the DHT */ - TR_PEER_FROM_CACHE = 3, /* peers read from the peer cache */ + TR_PEER_FROM_RESUME = 3, /* peers read from the .resume file */ TR_PEER_FROM_PEX = 4, /* peers discovered via PEX */ - TR_PEER_FROM_ALT = 5, /* alternate peer address */ + TR_PEER_FROM_LTEP = 5, /* peer address provided in an LTEP handshake */ TR_PEER_FROM__MAX };