(trunk libT) #3159 "better decision-making when choosing which peers to initiate new connections with" -- experimental commit

This commit is contained in:
Charles Kerr 2010-04-20 21:54:03 +00:00
parent 10fb49059f
commit 546bca723b
12 changed files with 432 additions and 385 deletions

View File

@ -596,24 +596,23 @@ publishWarning( tr_tier * tier, const char * msg )
}
static int
publishNewPeers( tr_tier * tier, tr_bool allAreSeeds,
publishNewPeers( tr_tier * tier, int seeds, int leechers,
const void * compact, int compactLen )
{
tr_tracker_event e = emptyEvent;
e.messageType = TR_TRACKER_PEERS;
e.allAreSeeds = allAreSeeds;
e.seedProbability = seeds+leechers ? (int)((100.0*seeds)/(seeds+leechers)) : -1;
e.compact = compact;
e.compactLen = compactLen;
if( compactLen )
tr_publisherPublish( &tier->tor->tiers->publisher, tier, &e );
tr_publisherPublish( &tier->tor->tiers->publisher, tier, &e );
return compactLen / 6;
}
static int
publishNewPeersCompact( tr_tier * tier, tr_bool allAreSeeds,
publishNewPeersCompact( tr_tier * tier, int seeds, int leechers,
const void * compact, int compactLen )
{
int i;
@ -639,7 +638,7 @@ publishNewPeersCompact( tr_tier * tier, tr_bool allAreSeeds,
compactWalk += 6;
}
publishNewPeers( tier, allAreSeeds, array, arrayLen );
publishNewPeers( tier, seeds, leechers, array, arrayLen );
tr_free( array );
@ -647,7 +646,7 @@ publishNewPeersCompact( tr_tier * tier, tr_bool allAreSeeds,
}
static int
publishNewPeersCompact6( tr_tier * tier, tr_bool allAreSeeds,
publishNewPeersCompact6( tr_tier * tier, int seeds, int leechers,
const void * compact, int compactLen )
{
int i;
@ -671,7 +670,8 @@ publishNewPeersCompact6( tr_tier * tier, tr_bool allAreSeeds,
memcpy( walk + sizeof( addr ), &port, 2 );
walk += sizeof( tr_address ) + 2;
}
publishNewPeers( tier, allAreSeeds, array, arrayLen );
publishNewPeers( tier, seeds, leechers, array, arrayLen );
tr_free( array );
return peerCount;
@ -1201,7 +1201,6 @@ parseAnnounceResponse( tr_tier * tier,
if( bencLoaded && tr_bencIsDict( &benc ) )
{
int peerCount = 0;
int incomplete = -1;
size_t rawlen;
int64_t i;
tr_benc * tmp;
@ -1245,16 +1244,18 @@ parseAnnounceResponse( tr_tier * tier,
tier->currentTracker->tracker_id = tr_strdup( str );
}
if( tr_bencDictFindInt( &benc, "complete", &i ) )
{
if( !tr_bencDictFindInt( &benc, "complete", &i ) )
tier->currentTracker->seederCount = 0;
else {
++scrapeFields;
tier->currentTracker->seederCount = i;
}
if( tr_bencDictFindInt( &benc, "incomplete", &i ) )
{
if( !tr_bencDictFindInt( &benc, "incomplete", &i ) )
tier->currentTracker->leecherCount = 0;
else {
++scrapeFields;
tier->currentTracker->leecherCount = incomplete = i;
tier->currentTracker->leecherCount = i;
}
if( tr_bencDictFindInt( &benc, "downloaded", &i ) )
@ -1266,17 +1267,19 @@ parseAnnounceResponse( tr_tier * tier,
if( tr_bencDictFindRaw( &benc, "peers", &raw, &rawlen ) )
{
/* "compact" extension */
const int allAreSeeds = incomplete == 0;
peerCount += publishNewPeersCompact( tier, allAreSeeds, raw, rawlen );
const int seeders = tier->currentTracker->seederCount;
const int leechers = tier->currentTracker->leecherCount;
peerCount += publishNewPeersCompact( tier, seeders, leechers, raw, rawlen );
gotPeers = TRUE;
}
else if( tr_bencDictFindList( &benc, "peers", &tmp ) )
{
/* original version of peers */
const tr_bool allAreSeeds = incomplete == 0;
const int seeders = tier->currentTracker->seederCount;
const int leechers = tier->currentTracker->leecherCount;
size_t byteCount = 0;
uint8_t * array = parseOldPeers( tmp, &byteCount );
peerCount += publishNewPeers( tier, allAreSeeds, array, byteCount );
peerCount += publishNewPeers( tier, seeders, leechers, array, byteCount );
gotPeers = TRUE;
tr_free( array );
}
@ -1284,8 +1287,9 @@ parseAnnounceResponse( tr_tier * tier,
if( tr_bencDictFindRaw( &benc, "peers6", &raw, &rawlen ) )
{
/* "compact" extension */
const tr_bool allAreSeeds = incomplete == 0;
peerCount += publishNewPeersCompact6( tier, allAreSeeds, raw, rawlen );
const int seeders = tier->currentTracker->seederCount;
const int leechers = tier->currentTracker->leecherCount;
peerCount += publishNewPeersCompact6( tier, seeders, leechers, raw, rawlen );
gotPeers = TRUE;
}

View File

@ -48,7 +48,9 @@ typedef struct
/* for TR_TRACKER_PEERS */
const uint8_t * compact;
int compactLen;
int allAreSeeds;
/* [0...100] for probability a peer is a seed. calculated by the leecher/seeder ratio */
int8_t seedProbability;
}
tr_tracker_event;

View File

@ -101,6 +101,7 @@ enum
struct tr_handshake
{
tr_bool haveReadAnythingFromPeer;
tr_bool havePeerID;
tr_bool haveSentBitTorrentHandshake;
tr_peerIo * io;
@ -431,6 +432,8 @@ readYb( tr_handshake * handshake,
return READ_NOW;
}
handshake->haveReadAnythingFromPeer = TRUE;
/* compute the secret */
evbuffer_remove( inbuf, yb, KEY_LEN );
secret = tr_cryptoComputeSecret( handshake->crypto, yb );
@ -613,6 +616,8 @@ readHandshake( tr_handshake * handshake,
if( EVBUFFER_LENGTH( inbuf ) < INCOMING_HANDSHAKE_LEN )
return READ_LATER;
handshake->haveReadAnythingFromPeer = TRUE;
pstrlen = EVBUFFER_DATA( inbuf )[0]; /* peek, don't read. We may be
handing inbuf to AWAITING_YA */
@ -1100,6 +1105,7 @@ fireDoneFunc( tr_handshake * handshake,
: NULL;
const int success = ( *handshake->doneCB )( handshake,
handshake->io,
handshake->haveReadAnythingFromPeer,
isConnected,
peer_id,
handshake->doneUserData );

View File

@ -32,6 +32,7 @@ typedef struct tr_handshake tr_handshake;
/* returns true on success, false on error */
typedef tr_bool ( *handshakeDoneCB )( struct tr_handshake * handshake,
struct tr_peerIo * io,
tr_bool readAnythingFromPeer,
tr_bool isConnected,
const uint8_t * peerId,
void * userData );

View File

@ -61,8 +61,7 @@ typedef enum
TR_PEER_CLIENT_GOT_REJ,
TR_PEER_PEER_GOT_DATA,
TR_PEER_PEER_PROGRESS,
TR_PEER_ERROR,
TR_PEER_UPLOAD_ONLY
TR_PEER_ERROR
}
PeerEventType;
@ -75,7 +74,6 @@ typedef struct
float progress; /* for PEER_PROGRESS */
int err; /* errno for GOT_ERROR */
tr_bool wasPieceData; /* for GOT_DATA */
tr_bool uploadOnly; /* for UPLOAD_ONLY */
tr_port port; /* for GOT_PORT */
}
tr_peer_event;

View File

@ -64,6 +64,8 @@ enum
* this throttle is to avoid overloading the router */
MAX_CONNECTIONS_PER_SECOND = 12,
MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC/1000.0)),
/* number of bad pieces a peer is allowed to send before we ban them */
MAX_BAD_PIECES_PER_PEER = 5,
@ -111,20 +113,25 @@ enum
*/
struct peer_atom
{
tr_peer * peer; /* will be NULL if not connected */
uint8_t from;
uint8_t flags; /* these match the added_f flags */
uint8_t myflags; /* flags that aren't defined in added_f */
uint8_t uploadOnly; /* UPLOAD_ONLY_ */
uint8_t flags; /* these match the added_f flags */
uint8_t myflags; /* flags that aren't defined in added_f */
uint8_t uploadOnly; /* UPLOAD_ONLY_ */
int8_t seedProbability; /* how likely is this to be a seed... [0..100] or -1 for unknown */
tr_port port;
uint16_t numFails;
tr_address addr;
time_t time; /* when the peer's connection status last changed */
time_t time; /* when the peer's connection status last changed */
time_t piece_data_time;
time_t lastConnectionAttemptAt;
time_t lastConnectionAt;
/* similar to a TTL field, but less rigid --
* if the swarm is small, the atom will be kept past this date. */
time_t shelf_date;
tr_peer * peer; /* will be NULL if not connected */
tr_address addr;
};
#ifdef NDEBUG
@ -196,7 +203,6 @@ struct tr_peerMgr
tr_ptrArray incomingHandshakes; /* tr_handshake */
struct event * bandwidthTimer;
struct event * rechokeTimer;
struct event * reconnectTimer;
struct event * refillUpkeepTimer;
struct event * atomTimer;
};
@ -500,7 +506,6 @@ deleteTimers( struct tr_peerMgr * m )
deleteTimer( &m->atomTimer );
deleteTimer( &m->bandwidthTimer );
deleteTimer( &m->rechokeTimer );
deleteTimer( &m->reconnectTimer );
deleteTimer( &m->refillUpkeepTimer );
}
@ -541,6 +546,32 @@ clientIsUploadingTo( const tr_peer * peer )
****
***/
static void
atomSetSeedProbability( struct peer_atom * atom, int seedProbability )
{
assert( atom != NULL );
assert( -1<=seedProbability && seedProbability<=100 );
atom->seedProbability = seedProbability;
if( seedProbability == 100 )
atom->flags |= ADDED_F_SEED_FLAG;
else if( seedProbability != -1 )
atom->flags &= ~ADDED_F_SEED_FLAG;
}
static void
atomSetSeed( struct peer_atom * atom )
{
atomSetSeedProbability( atom, 100 );
}
static tr_bool
atomIsSeed( const struct peer_atom * atom )
{
return atom->seedProbability == 100;
}
tr_bool
tr_peerMgrPeerIsSeed( const tr_torrent * tor,
const tr_address * addr )
@ -550,7 +581,7 @@ tr_peerMgrPeerIsSeed( const tr_torrent * tor,
const struct peer_atom * atom = getExistingAtom( t, addr );
if( atom )
isSeed = ( atom->flags & ADDED_F_SEED_FLAG ) != 0;
isSeed = atomIsSeed( atom );
return isSeed;
}
@ -1278,19 +1309,6 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
switch( e->eventType )
{
case TR_PEER_UPLOAD_ONLY:
/* update our atom */
if( peer ) {
if( e->uploadOnly ) {
peer->atom->uploadOnly = UPLOAD_ONLY_YES;
peer->atom->flags |= ADDED_F_SEED_FLAG;
} else {
peer->atom->uploadOnly = UPLOAD_ONLY_NO;
peer->atom->flags &= ~ADDED_F_SEED_FLAG;
}
}
break;
case TR_PEER_PEER_GOT_DATA:
{
const time_t now = tr_time( );
@ -1367,9 +1385,8 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
{
struct peer_atom * atom = peer->atom;
if( e->progress >= 1.0 ) {
tordbg( t, "marking peer %s as a seed",
tr_atomAddrStr( atom ) );
atom->flags |= ADDED_F_SEED_FLAG;
tordbg( t, "marking peer %s as a seed", tr_atomAddrStr( atom ) );
atomSetSeed( atom );
}
}
break;
@ -1495,32 +1512,39 @@ getDefaultShelfLife( uint8_t from )
}
}
static void
ensureAtomExists( Torrent * t,
const tr_address * addr,
const tr_port port,
const uint8_t flags,
const int8_t seedProbability,
const uint8_t from )
{
struct peer_atom * a;
assert( tr_isAddress( addr ) );
assert( from < TR_PEER_FROM__MAX );
if( getExistingAtom( t, addr ) == NULL )
{
struct peer_atom * a;
const int jitter = tr_cryptoWeakRandInt( 60*10 );
a = getExistingAtom( t, addr );
if( a == NULL )
{
const int jitter = tr_cryptoWeakRandInt( 60*10 );
a = tr_new0( struct peer_atom, 1 );
a->addr = *addr;
a->port = port;
a->flags = flags;
a->from = from;
a->shelf_date = tr_time( ) + getDefaultShelfLife( from ) + jitter;
atomSetSeedProbability( a, seedProbability );
tr_ptrArrayInsertSorted( &t->pool, a, compareAtomsByAddress );
tordbg( t, "got a new atom: %s", tr_atomAddrStr( a ) );
}
else if( a->seedProbability == -1 )
{
atomSetSeedProbability( a, seedProbability );
}
}
static int
@ -1539,6 +1563,7 @@ getPeerCount( const Torrent * t )
static tr_bool
myHandshakeDoneCB( tr_handshake * handshake,
tr_peerIo * io,
tr_bool readAnythingFromPeer,
tr_bool isConnected,
const uint8_t * peer_id,
void * vmanager )
@ -1581,17 +1606,27 @@ myHandshakeDoneCB( tr_handshake * handshake,
{
struct peer_atom * atom = getExistingAtom( t, addr );
if( atom )
{
++atom->numFails;
if( !readAnythingFromPeer )
{
tordbg( t, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr( atom ), (int)atom->numFails );
atom->myflags |= MYFLAG_UNREACHABLE;
}
}
}
}
else /* looking good */
{
struct peer_atom * atom;
ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
ensureAtomExists( t, addr, port, 0, -1, TR_PEER_FROM_INCOMING );
atom = getExistingAtom( t, addr );
atom->time = tr_time( );
atom->piece_data_time = 0;
atom->lastConnectionAt = tr_time( );
atom->myflags &= ~MYFLAG_UNREACHABLE;
if( atom->myflags & MYFLAG_BANNED )
{
@ -1689,9 +1724,8 @@ tr_isPex( const tr_pex * pex )
}
void
tr_peerMgrAddPex( tr_torrent * tor,
uint8_t from,
const tr_pex * pex )
tr_peerMgrAddPex( tr_torrent * tor, uint8_t from,
const tr_pex * pex, int8_t seedProbability )
{
if( tr_isPex( pex ) ) /* safeguard against corrupt data */
{
@ -1700,12 +1734,24 @@ tr_peerMgrAddPex( tr_torrent * tor,
if( !tr_sessionIsAddressBlocked( t->manager->session, &pex->addr ) )
if( tr_isValidPeerAddress( &pex->addr, pex->port ) )
ensureAtomExists( t, &pex->addr, pex->port, pex->flags, from );
ensureAtomExists( t, &pex->addr, pex->port, pex->flags, seedProbability, from );
managerUnlock( t->manager );
}
}
void
tr_peerMgrMarkAllAsSeeds( tr_torrent * tor )
{
Torrent * t = tor->torrentPeers;
const int n = tr_ptrArraySize( &t->pool );
struct peer_atom ** it = (struct peer_atom**) tr_ptrArrayBase( &t->pool );
struct peer_atom ** end = it + n;
while( it != end )
atomSetSeed( *it++ );
}
tr_pex *
tr_peerMgrCompactToPex( const void * compact,
size_t compactLen,
@ -1966,10 +2012,7 @@ ensureMgrTimersExist( struct tr_peerMgr * m )
if( m->rechokeTimer == NULL )
m->rechokeTimer = createTimer( RECHOKE_PERIOD_MSEC, rechokePulse, m );
if( m->reconnectTimer == NULL )
m->reconnectTimer = createTimer( RECONNECT_PERIOD_MSEC, reconnectPulse, m );
if( m->refillUpkeepTimer == NULL )
if( m->refillUpkeepTimer == NULL )
m->refillUpkeepTimer = createTimer( REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m );
}
@ -2153,7 +2196,7 @@ tr_peerMgrTorrentStats( tr_torrent * tor,
if( clientIsUploadingTo( peer ) )
++*setmePeersGettingFromUs;
if( atom->flags & ADDED_F_SEED_FLAG )
if( atomIsSeed( atom ) )
++*setmeSeedsConnected;
}
@ -2658,12 +2701,14 @@ shouldPeerBeClosed( const Torrent * t,
* and enough time has passed for a pex exchange, then disconnect */
if( tr_torrentIsSeed( tor ) )
{
int peerHasEverything;
if( atom->flags & ADDED_F_SEED_FLAG )
peerHasEverything = TRUE;
else if( peer->progress < tr_cpPercentDone( &tor->completion ) )
peerHasEverything = FALSE;
else {
tr_bool peerHasEverything;
if( atom->seedProbability != -1 )
{
peerHasEverything = atomIsSeed( atom );
}
else
{
tr_bitfield * tmp = tr_bitfieldDup( tr_cpPieceBitfield( &tor->completion ) );
tr_bitsetDifference( tmp, &peer->have );
peerHasEverything = tr_bitfieldCountTrueBits( tmp ) == 0;
@ -2723,36 +2768,6 @@ getPeersToClose( Torrent * t, tr_close_type_t closeType, const time_t now, int *
return ret;
}
static int
compareCandidates( const void * va, const void * vb )
{
const struct peer_atom * a = *(const struct peer_atom**) va;
const struct peer_atom * b = *(const struct peer_atom**) vb;
/* <Charles> Here we would probably want to try reconnecting to
* peers that had most recently given us data. Lots of users have
* trouble with resets due to their routers and/or ISPs. This way we
* can quickly recover from an unwanted reset. So we sort
* piece_data_time in descending order.
*/
if( a->piece_data_time != b->piece_data_time )
return a->piece_data_time < b->piece_data_time ? 1 : -1;
if( a->numFails != b->numFails )
return a->numFails < b->numFails ? -1 : 1;
if( a->time != b->time )
return a->time < b->time ? -1 : 1;
/* In order to avoid fragmenting the swarm, peers from trackers and
* from the DHT should be preferred to peers from PEX. */
if( a->from != b->from )
return a->from < b->from ? -1 : 1;
return 0;
}
static int
getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
{
@ -2780,68 +2795,14 @@ getReconnectIntervalSecs( const struct peer_atom * atom, const time_t now )
default: sec = 120 * 60; break;
}
/* penalize peers that were unreachable the last time we tried */
if( atom->myflags & MYFLAG_UNREACHABLE )
sec += sec;
dbgmsg( "reconnect interval for %s is %d seconds", tr_atomAddrStr( atom ), sec );
return sec;
}
static struct peer_atom **
getPeerCandidates( Torrent * t, const time_t now, int * setmeSize )
{
int i, atomCount, retCount;
struct peer_atom ** atoms;
struct peer_atom ** ret;
const int seed = tr_torrentIsSeed( t->tor );
assert( torrentIsLocked( t ) );
atoms = (struct peer_atom**) tr_ptrArrayPeek( &t->pool, &atomCount );
ret = tr_new( struct peer_atom*, atomCount );
for( i = retCount = 0; i < atomCount; ++i )
{
int interval;
struct peer_atom * atom = atoms[i];
/* peer fed us too much bad data ... we only keep it around
* now to weed it out in case someone sends it to us via pex */
if( atom->myflags & MYFLAG_BANNED )
continue;
/* peer was unconnectable before, so we're not going to keep trying.
* this is needs a separate flag from `banned', since if they try
* to connect to us later, we'll let them in */
if( atom->myflags & MYFLAG_UNREACHABLE )
continue;
/* no need to connect if we're both seeds... */
if( seed && ( ( atom->flags & ADDED_F_SEED_FLAG ) ||
( atom->uploadOnly == UPLOAD_ONLY_YES ) ) )
continue;
/* don't reconnect too often */
interval = getReconnectIntervalSecs( atom, now );
if( ( now - atom->time ) < interval )
{
tordbg( t, "RECONNECT peer %d (%s) is in its grace period of %d seconds..",
i, tr_atomAddrStr( atom ), interval );
continue;
}
/* Don't connect to peers in our blocklist */
if( tr_sessionIsAddressBlocked( t->manager->session, &atom->addr ) )
continue;
/* we don't need two connections to the same peer... */
if( peerIsInUse( t, atom ) )
continue;
ret[retCount++] = atom;
}
if( retCount != 0 )
qsort( ret, retCount, sizeof( struct peer_atom* ), compareCandidates );
*setmeSize = retCount;
return ret;
}
static void
closePeer( Torrent * t, tr_peer * peer )
{
@ -2855,45 +2816,23 @@ closePeer( Torrent * t, tr_peer * peer )
/* if we transferred piece data, then they might be good peers,
so reset their `numFails' weight to zero. otherwise we connected
to them fruitlessly, so mark it as another fail */
if( atom->piece_data_time )
if( atom->piece_data_time ) {
tordbg( t, "resetting atom %s numFails to 0", tr_atomAddrStr(atom) );
atom->numFails = 0;
else
} else {
++atom->numFails;
tordbg( t, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails );
}
tordbg( t, "removing bad peer %s", tr_peerIoGetAddrStr( peer->io ) );
removePeer( t, peer );
}
/* Make a lot of slots available to newly-running torrents...
* once they reach steady state, they shouldn't need as many */
static int
maxNewPeersPerPulse( const Torrent * t )
{
double runTime;
const tr_torrent * tor = t->tor;
assert( tr_isTorrent( tor ) );
runTime = difftime( tr_time( ), tor->startDate );
if( runTime > 480 ) return 1;
if( runTime > 240 ) return 2;
if( runTime > 120 ) return 3;
return 4;
}
static void
reconnectTorrent( Torrent * t )
closeBadPeers( Torrent * t )
{
static time_t prevTime = 0;
static int newConnectionsThisSecond = 0;
const time_t now = tr_time( );
if( prevTime != now )
{
prevTime = now;
newConnectionsThisSecond = 0;
}
if( !t->isRunning )
{
removeAllPeers( t );
@ -2902,116 +2841,13 @@ reconnectTorrent( Torrent * t )
{
int i;
int mustCloseCount;
int maxCandidates;
struct tr_peer ** mustClose;
const int maxPerPulse = maxNewPeersPerPulse( t );
/* disconnect the really bad peers */
mustClose = getPeersToClose( t, TR_MUST_CLOSE, now, &mustCloseCount );
for( i=0; i<mustCloseCount; ++i )
closePeer( t, mustClose[i] );
tr_free( mustClose );
/* decide how many peers can we try to add in this pass */
maxCandidates = maxPerPulse;
if( tr_announcerHasBacklog( t->manager->session->announcer ) )
maxCandidates /= 2;
maxCandidates = MIN( maxCandidates, getMaxPeerCount( t->tor ) - getPeerCount( t ) );
maxCandidates = MIN( maxCandidates, MAX_CONNECTIONS_PER_SECOND - newConnectionsThisSecond );
/* select the best candidates, if they are requested */
if( maxCandidates == 0 )
{
tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
"NO connection candidates needed, %d atoms, "
"max per pulse is %d",
t->tor->info.name, mustCloseCount,
tr_ptrArraySize( &t->pool ),
maxPerPulse );
tordbg( t, "maxCandidates is %d, maxPerPulse is %d, "
"getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
"newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
maxCandidates, maxPerPulse,
getPeerCount( t ), getMaxPeerCount( t->tor ),
newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
}
else
{
int canCloseCount = 0;
int candidateCount;
struct peer_atom ** candidates;
candidates = getPeerCandidates( t, now, &candidateCount );
maxCandidates = MIN( maxCandidates, candidateCount );
/* maybe disconnect some lesser peers, if we have candidates to replace them with */
if( maxCandidates != 0 )
{
struct tr_peer ** canClose = getPeersToClose( t, TR_CAN_CLOSE, now, &canCloseCount );
for( i=0; ( i<canCloseCount ) && ( i<maxCandidates ); ++i )
closePeer( t, canClose[i] );
tr_free( canClose );
}
tordbg( t, "reconnect pulse for [%s]: %d must-close connections, "
"%d can-close connections, %d connection candidates, "
"%d atoms, max per pulse is %d",
t->tor->info.name, mustCloseCount,
canCloseCount, candidateCount,
tr_ptrArraySize( &t->pool ), maxPerPulse );
tordbg( t, "candidateCount is %d, maxPerPulse is %d,"
" getPeerCount(t) is %d, getMaxPeerCount(t) is %d, "
"newConnectionsThisSecond is %d, MAX_CONNECTIONS_PER_SECOND is %d",
candidateCount, maxPerPulse,
getPeerCount( t ), getMaxPeerCount( t->tor ),
newConnectionsThisSecond, MAX_CONNECTIONS_PER_SECOND );
/* add some new ones */
for( i=0; i<maxCandidates; ++i )
{
tr_peerMgr * mgr = t->manager;
struct peer_atom * atom = candidates[i];
tr_peerIo * io;
tordbg( t, "Starting an OUTGOING connection with %s",
tr_atomAddrStr( atom ) );
io = tr_peerIoNewOutgoing( mgr->session,
mgr->session->bandwidth,
&atom->addr,
atom->port,
t->tor->info.hash,
t->tor->completeness == TR_SEED );
if( io == NULL )
{
tordbg( t, "peerIo not created; marking peer %s as unreachable",
tr_atomAddrStr( atom ) );
atom->myflags |= MYFLAG_UNREACHABLE;
}
else
{
tr_handshake * handshake = tr_handshakeNew( io,
mgr->session->encryptionMode,
myHandshakeDoneCB,
mgr );
assert( tr_peerIoGetTorrentHash( io ) );
tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
++newConnectionsThisSecond;
tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
handshakeCompare );
}
atom->time = now;
}
tr_free( candidates );
}
}
}
@ -3167,52 +3003,14 @@ enforceSessionPeerLimit( tr_session * session, uint64_t now )
}
}
struct reconnectTorrentStruct
{
tr_torrent * torrent;
int salt;
};
static int
compareReconnectTorrents( const void * va, const void * vb )
{
int ai, bi;
const struct reconnectTorrentStruct * a = va;
const struct reconnectTorrentStruct * b = vb;
/* primary key: higher priority goes first */
ai = tr_torrentGetPriority( a->torrent );
bi = tr_torrentGetPriority( b->torrent );
if( ai != bi )
return ai > bi ? -1 : 1;
/* secondary key: since users tend to stare at the screens
* watching their downloads' progress, give downloads a
* first shot at attempting outbound peer connections. */
ai = tr_torrentIsSeed( a->torrent );
bi = tr_torrentIsSeed( b->torrent );
if( ai != bi )
return bi ? -1 : 1;
/* tertiary key: random */
if( a->salt != b->salt )
return a->salt - b->salt;
return 0;
}
static void makeNewPeerConnections( tr_peerMgr * mgr, const int max );
static void
reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
{
tr_torrent * tor;
tr_peerMgr * mgr = vmgr;
struct reconnectTorrentStruct * torrents;
int torrentCount;
int i;
uint64_t now;
managerLock( mgr );
now = tr_date( );
const uint64_t now = tr_date( );
/**
*** enforce the per-session and per-torrent peer limits
@ -3227,31 +3025,13 @@ reconnectPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
/* if we're over the per-session peer limits, cull some peers */
enforceSessionPeerLimit( mgr->session, now );
/**
*** try to make new peer connections
**/
/* remove crappy peers */
tor = NULL;
while(( tor = tr_torrentNext( mgr->session, tor )))
closeBadPeers( tor->torrentPeers );
torrentCount = 0;
torrents = tr_new( struct reconnectTorrentStruct,
mgr->session->torrentCount );
while(( tor = tr_torrentNext( mgr->session, tor ))) {
if( tor->isRunning ) {
struct reconnectTorrentStruct * r = torrents + torrentCount++;
r->torrent = tor;
r->salt = tr_cryptoWeakRandInt( 1024 );
}
}
qsort( torrents,
torrentCount, sizeof( struct reconnectTorrentStruct ),
compareReconnectTorrents );
for( i=0; i<torrentCount; ++i )
reconnectTorrent( torrents[i].torrent->torrentPeers );
/* cleanup */
tr_free( torrents );
tr_timerAddMsec( mgr->reconnectTimer, RECONNECT_PERIOD_MSEC );
managerUnlock( mgr );
/* try to make new peer connections */
makeNewPeerConnections( mgr, MAX_CONNECTIONS_PER_PULSE );
}
/****
@ -3312,6 +3092,8 @@ bandwidthPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
if( tor->isRunning && ( tor->error == TR_STAT_LOCAL_ERROR ))
tr_torrentStop( tor );
reconnectPulse( 0, 0, mgr );
tr_timerAddMsec( mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC );
managerUnlock( mgr );
}
@ -3432,3 +3214,253 @@ atomPulse( int foo UNUSED, short bar UNUSED, void * vmgr )
tr_timerAddMsec( mgr->atomTimer, ATOM_PERIOD_MSEC );
managerUnlock( mgr );
}
/***
****
****
****
***/
static inline tr_bool
isBandwidthMaxedOut( const tr_bandwidth * b,
const uint64_t now_msec, tr_direction dir )
{
if( !tr_bandwidthIsLimited( b, dir ) )
return FALSE;
else {
const double got = tr_bandwidthGetPieceSpeed( b, now_msec, dir );
const double want = tr_bandwidthGetDesiredSpeed( b, dir );
return got >= want;
}
}
/* is this atom someone that we'd want to initiate a connection to? */
static tr_bool
isPeerCandidate( const tr_torrent * tor, const struct peer_atom * atom, const time_t now )
{
/* not if we've already got a connection to them... */
if( peerIsInUse( tor->torrentPeers, atom ) )
return FALSE;
/* not if they're banned... */
if( atom->myflags & MYFLAG_BANNED )
return FALSE;
/* not if we just tried them already */
if( ( now - atom->time ) < getReconnectIntervalSecs( atom, now ) )
return FALSE;
/* not if we're both seeds */
if( tr_torrentIsSeed( tor ) )
if( atomIsSeed( atom ) || ( atom->uploadOnly == UPLOAD_ONLY_YES ) )
return FALSE;
/* not if they're blocklisted */
/* FIXME: maybe we should remove this atom altogether? */
if( tr_sessionIsAddressBlocked( tor->session, &atom->addr ) )
return FALSE;
return TRUE;
}
struct peer_candidate
{
int salt;
tr_torrent * tor;
struct peer_atom * atom;
};
static int
compareSeedProbabilities( int a, int b )
{
/* 1. smaller numbers are better
2. prefer leechers to unknown
3. prefer unknown to seeds (FIXME: this is a simplistic test) */
if( a == 100 ) a = 101;
if( b == 100 ) b = 101;
if( a == -1 ) a = 100;
if( b == -1 ) b = 100;
return a - b;
}
static tr_bool
torrentWasRecentlyStarted( const tr_torrent * tor )
{
return difftime( tr_time( ), tor->startDate ) < 120;
}
/* sort an array of peer candidates */
static int
comparePeerCandidates( const void * va, const void * vb )
{
int i, ai, bi;
tr_bool af, bf;
const struct peer_candidate * a = va;
const struct peer_candidate * b = vb;
/* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
af = a->atom->lastConnectionAt < a->atom->lastConnectionAttemptAt;
bf = b->atom->lastConnectionAt < b->atom->lastConnectionAttemptAt;
if( af != bf )
return af ? 1 : -1;
/* prefer the one we attempted least recently (to cycle through all peers) */
if( a->atom->lastConnectionAttemptAt != b->atom->lastConnectionAttemptAt )
return a->atom->lastConnectionAttemptAt < b->atom->lastConnectionAttemptAt ? -1 : 1;
/* prefer peers belonging to a torrent of a higher priority */
ai = tr_torrentGetPriority( a->tor );
bi = tr_torrentGetPriority( b->tor );
if( ai != bi )
return ai > bi ? -1 : 1;
/* prefer recently-started torrents */
af = torrentWasRecentlyStarted( a->tor );
bf = torrentWasRecentlyStarted( a->tor );
if( af != bf )
return af ? -1 : 1;
/* prefer peers that we might have a chance of uploading to */
if(( i = compareSeedProbabilities( a->atom->seedProbability, b->atom->seedProbability )))
return i;
/* salt */
return a->salt - b->salt;
}
/** @return an array of all the atoms we might want to connect to */
static struct peer_candidate*
getPeerCandidates( tr_session * session, int * candidateCount )
{
int n;
tr_torrent * tor;
struct peer_candidate * candidates;
struct peer_candidate * walk;
const time_t now = tr_time( );
const uint64_t now_msec = tr_date( );
/* don't start any new handshakes if we're full up */
n = 0;
tor= NULL;
while(( tor = tr_torrentNext( session, tor )))
n += tr_ptrArraySize( &tor->torrentPeers->peers );
if( tr_sessionGetPeerLimit( session ) <= n ) {
*candidateCount = 0;
return NULL;
}
/* allocate an array of candidates */
n = 0;
tor= NULL;
while(( tor = tr_torrentNext( session, tor )))
n += tr_ptrArraySize( &tor->torrentPeers->pool );
walk = candidates = tr_new( struct peer_candidate, n );
/* populate the candidate array */
tor = NULL;
while(( tor = tr_torrentNext( session, tor )))
{
int i, nAtoms;
struct peer_atom ** atoms;
if( !tor->torrentPeers->isRunning )
continue;
/* if we've already got enough peers in this torrent... */
if( tr_torrentGetPeerLimit( tor ) <= tr_ptrArraySize( &tor->torrentPeers->peers ) )
continue;
/* if we've already got enough speed in this torrent... */
if( tr_torrentIsSeed( tor ) && isBandwidthMaxedOut( tor->bandwidth, now_msec, TR_UP ) )
continue;
atoms = (struct peer_atom**) tr_ptrArrayPeek( &tor->torrentPeers->pool, &nAtoms );
for( i=0; i<nAtoms; ++i )
{
struct peer_atom * atom = atoms[i];
if( isPeerCandidate( tor, atom, now ) )
{
walk->tor = tor;
walk->atom = atom;
walk->salt = tr_cryptoWeakRandInt( 4096 );
++walk;
}
}
}
*candidateCount = walk - candidates;
if( *candidateCount > 1 )
qsort( candidates, *candidateCount, sizeof( struct peer_candidate ), comparePeerCandidates );
return candidates;
}
static void
initiateConnection( tr_peerMgr * mgr, Torrent * t, struct peer_atom * atom )
{
tr_peerIo * io;
const time_t now = tr_time( );
tordbg( t, "Starting an OUTGOING connection with %s", tr_atomAddrStr( atom ) );
io = tr_peerIoNewOutgoing( mgr->session,
mgr->session->bandwidth,
&atom->addr,
atom->port,
t->tor->info.hash,
t->tor->completeness == TR_SEED );
if( io == NULL )
{
tordbg( t, "peerIo not created; marking peer %s as unreachable",
tr_atomAddrStr( atom ) );
atom->myflags |= MYFLAG_UNREACHABLE;
atom->numFails++;
}
else
{
tr_handshake * handshake = tr_handshakeNew( io,
mgr->session->encryptionMode,
myHandshakeDoneCB,
mgr );
assert( tr_peerIoGetTorrentHash( io ) );
tr_peerIoUnref( io ); /* balanced by the implicit ref in tr_peerIoNewOutgoing() */
tr_ptrArrayInsertSorted( &t->outgoingHandshakes, handshake,
handshakeCompare );
}
atom->lastConnectionAttemptAt = now;
atom->time = now;
}
static void
initiateCandidateConnection( tr_peerMgr * mgr, struct peer_candidate * c )
{
#if 0
fprintf( stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n",
tr_atomAddrStr( c->atom ),
tr_torrentName( c->tor ),
(int)c->atom->seedProbability,
tr_torrentIsPrivate( c->tor ) ? "private" : "public",
tr_torrentIsSeed( c->tor ) ? "seed" : "downloader" );
#endif
initiateConnection( mgr, c->tor->torrentPeers, c->atom );
}
static void
makeNewPeerConnections( struct tr_peerMgr * mgr, const int max )
{
int i, n;
struct peer_candidate * candidates;
candidates = getPeerCandidates( mgr->session, &n );
for( i=0; i<n && i<max; ++i )
initiateCandidateConnection( mgr, &candidates[i] );
tr_free( candidates );
}

View File

@ -172,9 +172,15 @@ tr_pex * tr_peerMgrArrayToPex( const void * array,
size_t arrayLen,
size_t * setme_pex_count );
/**
* @param seedProbability [0..100] for likelihood that the peer is a seed; -1 for unknown
*/
void tr_peerMgrAddPex( tr_torrent * tor,
uint8_t from,
const tr_pex * pex );
const tr_pex * pex,
int8_t seedProbability );
void tr_peerMgrMarkAllAsSeeds( tr_torrent * tor );
void tr_peerMgrSetBlame( tr_torrent * tor,
tr_piece_index_t pieceIndex,

View File

@ -464,7 +464,7 @@ protocolSendHaveNone( tr_peermsgs * msgs )
*** EVENTS
**/
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
static void
publish( tr_peermsgs * msgs, tr_peer_event * e )
@ -484,15 +484,6 @@ fireError( tr_peermsgs * msgs, int err )
publish( msgs, &e );
}
static void
fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_UPLOAD_ONLY;
e.uploadOnly = uploadOnly;
publish( msgs, &e );
}
static void
firePeerProgress( tr_peermsgs * msgs )
{
@ -889,6 +880,7 @@ parseLtepHandshake( tr_peermsgs * msgs,
const uint8_t *addr;
size_t addr_len;
tr_pex pex;
int8_t seedProbability = -1;
memset( &pex, 0, sizeof( tr_pex ) );
@ -934,11 +926,8 @@ parseLtepHandshake( tr_peermsgs * msgs,
tr_torrentSetMetadataSizeHint( msgs->torrent, i );
/* look for upload_only (BEP 21) */
if( tr_bencDictFindInt( &val, "upload_only", &i ) ) {
fireUploadOnly( msgs, i!=0 );
if( i )
pex.flags |= ADDED_F_SEED_FLAG;
}
if( tr_bencDictFindInt( &val, "upload_only", &i ) )
seedProbability = i==0 ? 0 : 100;
/* get peer's listening port */
if( tr_bencDictFindInt( &val, "p", &i ) ) {
@ -953,7 +942,7 @@ parseLtepHandshake( tr_peermsgs * msgs,
{
pex.addr.type = TR_AF_INET;
memcpy( &pex.addr.addr.addr4, addr, 4 );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
}
if( tr_peerIoIsIncoming( msgs->peer->io )
@ -962,7 +951,7 @@ parseLtepHandshake( tr_peermsgs * msgs,
{
pex.addr.type = TR_AF_INET6;
memcpy( &pex.addr.addr.addr6, addr, 16 );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
}
/* get peer's maximum request queue size */
@ -1078,7 +1067,11 @@ parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
n = MIN( n, MAX_PEX_PEER_COUNT );
for( i=0; i<n; ++i )
tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
{
int seedProbability = -1;
if( added_f_len < n ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
}
tr_free( pex );
}
@ -1095,7 +1088,11 @@ parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
n = MIN( n, MAX_PEX_PEER_COUNT );
for( i=0; i<n; ++i )
tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex + i );
{
int seedProbability = -1;
if( added_f_len < n ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
}
tr_free( pex );
}

View File

@ -116,7 +116,7 @@ addPeers( tr_torrent * tor, const uint8_t * buf, int buflen )
memcpy( &pex, buf + ( i * sizeof( tr_pex ) ), sizeof( tr_pex ) );
if( tr_isPex( &pex ) )
{
tr_peerMgrAddPex( tor, TR_PEER_FROM_RESUME, &pex );
tr_peerMgrAddPex( tor, TR_PEER_FROM_RESUME, &pex, -1 );
++numAdded;
}
}

View File

@ -354,27 +354,28 @@ onTrackerResponse( void * tracker UNUSED,
void * vevent,
void * user_data )
{
tr_torrent * tor = user_data;
tr_torrent * tor = user_data;
tr_tracker_event * event = vevent;
switch( event->messageType )
{
case TR_TRACKER_PEERS:
{
size_t i, n;
size_t i, n;
const int seedProbability = event->seedProbability;
const tr_bool allAreSeeds = seedProbability == 100;
tr_pex * pex = tr_peerMgrArrayToPex( event->compact,
event->compactLen, &n );
if( event->allAreSeeds )
if( allAreSeeds )
tr_tordbg( tor, "Got %d seeds from tracker", (int)n );
else
tr_tordbg( tor, "Got %d peers from tracker", (int)n );
for( i = 0; i < n; ++i )
{
if( event->allAreSeeds )
pex[i].flags |= ADDED_F_SEED_FLAG;
tr_peerMgrAddPex( tor, TR_PEER_FROM_TRACKER, pex + i );
}
tr_peerMgrAddPex( tor, TR_PEER_FROM_TRACKER, pex+i, seedProbability );
if( allAreSeeds && tr_torrentIsPrivate( tor ) )
tr_peerMgrMarkAllAsSeeds( tor );
tr_free( pex );
break;

View File

@ -613,7 +613,7 @@ callback( void *ignore UNUSED, int event,
else
pex = tr_peerMgrCompact6ToPex(data, data_len, NULL, 0, &n);
for( i=0; i<n; ++i )
tr_peerMgrAddPex( tor, TR_PEER_FROM_DHT, pex+i );
tr_peerMgrAddPex( tor, TR_PEER_FROM_DHT, pex+i, -1 );
tr_free(pex);
tr_tordbg(tor, "Learned %d%s peers from DHT",
(int)n,

View File

@ -49,7 +49,7 @@ struct tr_webseed
****
***/
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0, 0 };
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
static void
publish( tr_webseed * w,