experimental better peer management.

This commit is contained in:
Charles Kerr 2007-09-30 23:55:49 +00:00
parent 66bf641f5d
commit 7cc6771318
9 changed files with 335 additions and 248 deletions

View File

@ -159,7 +159,7 @@ myDebug( const char * file, int line, const tr_handshake * handshake, const char
va_list args;
const char * addr = tr_peerIoGetAddrStr( handshake->io );
struct evbuffer * buf = evbuffer_new( );
evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, handshake );
evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, handshake->io );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );

View File

@ -106,12 +106,12 @@ gotErrorWrapper( struct bufferevent * e, short what, void * userData )
**/
static tr_peerIo*
tr_peerIoNew( struct tr_handle * handle,
struct in_addr * in_addr,
uint16_t port,
const uint8_t * torrentHash,
int isIncoming,
int socket )
tr_peerIoNew( struct tr_handle * handle,
const struct in_addr * in_addr,
uint16_t port,
const uint8_t * torrentHash,
int isIncoming,
int socket )
{
tr_peerIo * c;
c = tr_new0( tr_peerIo, 1 );
@ -133,10 +133,10 @@ tr_peerIoNew( struct tr_handle * handle,
}
tr_peerIo*
tr_peerIoNewIncoming( struct tr_handle * handle,
struct in_addr * in_addr,
uint16_t port,
int socket )
tr_peerIoNewIncoming( struct tr_handle * handle,
const struct in_addr * in_addr,
uint16_t port,
int socket )
{
assert( handle != NULL );
assert( in_addr != NULL );
@ -148,10 +148,10 @@ tr_peerIoNewIncoming( struct tr_handle * handle,
}
tr_peerIo*
tr_peerIoNewOutgoing( struct tr_handle * handle,
struct in_addr * in_addr,
int port,
const uint8_t * torrentHash )
tr_peerIoNewOutgoing( struct tr_handle * handle,
const struct in_addr * in_addr,
int port,
const uint8_t * torrentHash )
{
assert( handle != NULL );
assert( in_addr != NULL );
@ -204,14 +204,19 @@ tr_peerIoGetAddress( const tr_peerIo * io, uint16_t * port )
}
const char*
tr_peerIoGetAddrStr( const tr_peerIo * io )
tr_peerIoAddrStr( const struct in_addr * addr, uint16_t port )
{
static char buf[512];
assert( io != NULL );
snprintf( buf, sizeof(buf), "%s:%u", inet_ntoa( io->in_addr ), (unsigned int)io->port );
snprintf( buf, sizeof(buf), "%s:%u", inet_ntoa( *addr ), (unsigned int)port );
return buf;
}
const char*
tr_peerIoGetAddrStr( const tr_peerIo * io )
{
return tr_peerIoAddrStr( &io->in_addr, io->port );
}
void
tr_peerIoSetIOFuncs( tr_peerIo * io,
tr_can_read_cb readcb,

View File

@ -30,16 +30,16 @@ typedef struct tr_extensions tr_extensions;
**/
tr_peerIo*
tr_peerIoNewOutgoing( struct tr_handle * handle,
struct in_addr * addr,
int port,
const uint8_t * torrentHash );
tr_peerIoNewOutgoing( struct tr_handle * handle,
const struct in_addr * addr,
int port,
const uint8_t * torrentHash );
tr_peerIo*
tr_peerIoNewIncoming( struct tr_handle * handle,
struct in_addr * addr,
uint16_t port,
int socket );
tr_peerIoNewIncoming( struct tr_handle * handle,
const struct in_addr * addr,
uint16_t port,
int socket );
void tr_peerIoFree ( tr_peerIo * io );
@ -61,6 +61,9 @@ int tr_peerIoSupportsFEXT( const tr_peerIo * io );
***
**/
const char*
tr_peerIoAddrStr( const struct in_addr * addr, uint16_t port );
const char*
tr_peerIoGetAddrStr( const tr_peerIo * io );

View File

@ -53,7 +53,6 @@ typedef struct tr_peer
time_t clientSentPieceDataAt;
time_t peerSentPieceDataAt;
time_t chokeChangedAt;
time_t connectionChangedAt;
struct tr_peermsgs * msgs;
tr_publisher_tag msgsTag;

View File

@ -28,6 +28,7 @@
#include "peer-msgs.h"
#include "platform.h"
#include "ptrarray.h"
#include "ratecontrol.h"
#include "trevent.h"
#include "utils.h"
@ -58,34 +59,32 @@ enum
* we're we don't get piece data from a peer in this long */
SNUBBED_SEC = 60,
/* if our connection count for a torrent is <= N% of what we wanted,
* start relaxing the rules that decide when to disconnect a peer */
RELAX_RULES_PERCENTAGE = 25,
/* if we're not relaxing the rules, disconnect a peer that hasn't
* given us anything (or taken, if we're seeding) in this long */
MIN_TRANSFER_IDLE = 90000,
/* even if we're relaxing the rules, disconnect a peer that hasn't
* given us anything (or taken, if we're seeding) in this long */
MAX_TRANSFER_IDLE = 240000,
/* this is arbitrary and, hopefully, temporary until we come up
* with a better idea for managing the connection limits */
MAX_CONNECTED_PEERS_PER_TORRENT = 100,
/* if we hang up on a peer for being worthless, don't try to
* reconnect to it for this long. */
MIN_HANGUP_PERIOD_SEC = 120
};
/**
***
**/
/* We keep one of these for every peer we know about, whether
* it's connected or not, so the struct must be small.
* When our current connections underperform, we dip back
* int this list for new ones. */
struct peer_atom
{
uint8_t from;
uint8_t flags; /* these match the added_f flags */
uint16_t port;
struct in_addr addr;
time_t time;
};
typedef struct
{
uint8_t hash[SHA_DIGEST_LENGTH];
tr_ptrArray * pool; /* struct peer_atom */
tr_ptrArray * peers; /* tr_peer */
tr_timer * reconnectTimer;
tr_timer * reconnectSoonTimer;
@ -140,7 +139,7 @@ torrentUnlock( Torrent * torrent )
managerUnlock( torrent->manager );
}
static int
torrentIsLocked( Torrent * t )
torrentIsLocked( const Torrent * t )
{
return t!=NULL && t->manager!=NULL && t->manager->lockThread!=0;
}
@ -149,12 +148,17 @@ torrentIsLocked( Torrent * t )
***
**/
static int
compareAddresses( const struct in_addr * a, const struct in_addr * b )
{
return tr_compareUint32( a->s_addr, b->s_addr );
}
static int
handshakeCompareToAddr( const void * va, const void * vb )
{
const tr_handshake * a = va;
const struct in_addr * b = vb;
return memcmp( tr_handshakeGetAddr( a, NULL ), b, sizeof( struct in_addr ) );
return compareAddresses( tr_handshakeGetAddr( a, NULL ), vb );
}
static int
@ -171,6 +175,20 @@ getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr )
handshakeCompareToAddr );
}
static int
comparePeerAtomToAddress( const void * va, const void * vb )
{
const struct peer_atom * a = va;
return compareAddresses( &a->addr, vb );
}
static int
comparePeerAtoms( const void * va, const void * vb )
{
const struct peer_atom * b = vb;
return comparePeerAtomToAddress( va, &b->addr );
}
/**
***
**/
@ -204,15 +222,14 @@ peerCompare( const void * va, const void * vb )
{
const tr_peer * a = (const tr_peer *) va;
const tr_peer * b = (const tr_peer *) vb;
return memcmp( &a->in_addr, &b->in_addr, sizeof(struct in_addr) );
return compareAddresses( &a->in_addr, &b->in_addr );
}
static int
peerCompareToAddr( const void * va, const void * vb )
{
const tr_peer * a = (const tr_peer *) va;
const struct in_addr * b = (const struct in_addr *) vb;
return memcmp( &a->in_addr, b, sizeof(struct in_addr) );
return compareAddresses( &a->in_addr, vb );
}
static tr_peer*
@ -226,8 +243,30 @@ getExistingPeer( Torrent * torrent, const struct in_addr * in_addr )
peerCompareToAddr );
}
static struct peer_atom*
getExistingAtom( const Torrent * t, const struct in_addr * addr )
{
assert( torrentIsLocked( t ) );
return tr_ptrArrayFindSorted( t->pool, addr, comparePeerAtomToAddress );
}
static int
peerIsKnown( const Torrent * t, const struct in_addr * addr )
{
return getExistingAtom( t, addr ) != NULL;
}
static int
peerIsInUse( const Torrent * t, const struct in_addr * addr )
{
assert( torrentIsLocked ( t ) );
return ( getExistingPeer( (Torrent*)t, addr ) != NULL )
|| ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL );
}
static tr_peer*
getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
getPeer( Torrent * torrent, const struct in_addr * in_addr )
{
tr_peer * peer;
@ -235,9 +274,6 @@ getPeer( Torrent * torrent, const struct in_addr * in_addr, int * isNew )
peer = getExistingPeer( torrent, in_addr );
if( isNew )
*isNew = peer == NULL;
if( peer == NULL )
{
peer = tr_new0( tr_peer, 1 );
@ -281,11 +317,26 @@ freePeer( tr_peer * peer )
tr_free( peer );
}
static void
removePeer( Torrent * t, tr_peer * peer )
{
tr_peer * removed;
struct peer_atom * atom;
assert( torrentIsLocked( t ) );
atom = getExistingAtom( t, &peer->in_addr );
assert( atom != NULL );
atom->time = time( NULL );
removed = tr_ptrArrayRemoveSorted ( t->peers, peer, peerCompare );
assert( removed == peer );
freePeer( removed );
}
static void
freeTorrent( tr_peerMgr * manager, Torrent * t )
{
int i, size;
tr_peer ** peers;
uint8_t hash[SHA_DIGEST_LENGTH];
assert( t != NULL );
@ -301,12 +352,9 @@ freeTorrent( tr_peerMgr * manager, Torrent * t )
tr_timerFree( &t->rechokeSoonTimer );
tr_timerFree( &t->refillTimer );
peers = (tr_peer **) tr_ptrArrayPeek( t->peers, &size );
for( i=0; i<size; ++i )
freePeer( peers[i] );
tr_bitfieldFree( t->requested );
tr_ptrArrayFree( t->peers );
tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
tr_free( t );
@ -393,13 +441,8 @@ tr_peerMgrFree( tr_peerMgr * manager )
{
managerLock( manager );
while( !tr_ptrArrayEmpty( manager->handshakes ) )
tr_handshakeAbort( (tr_handshake*)tr_ptrArrayNth( manager->handshakes, 0) );
tr_ptrArrayFree( manager->handshakes );
while( !tr_ptrArrayEmpty( manager->torrents ) )
freeTorrent( manager, (Torrent*)tr_ptrArrayNth( manager->torrents, 0) );
tr_ptrArrayFree( manager->torrents );
tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
managerUnlock( manager );
tr_lockFree( manager->lock );
@ -873,7 +916,7 @@ myHandshakeDoneCB( tr_handshake * handshake,
}
else /* looking good */
{
tr_peer * peer = getPeer( t, in_addr, NULL );
tr_peer * peer = getPeer( t, in_addr );
if( peer->msgs != NULL ) { /* we already have this peer */
tr_peerIoFree( io );
--manager->connectionCount;
@ -884,7 +927,6 @@ myHandshakeDoneCB( tr_handshake * handshake,
tr_free( peer->client );
peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
peer->connectionChangedAt = time( NULL );
rechokeSoon( t );
}
}
@ -927,6 +969,22 @@ tr_peerMgrAddIncoming( tr_peerMgr * manager,
managerUnlock( manager );
}
static void
maybeAddNewAtom( Torrent * t, const struct in_addr * addr, uint16_t port, uint8_t flags, uint8_t from )
{
if( !peerIsKnown( t, addr ) )
{
struct peer_atom * a = tr_new( struct peer_atom, 1 );
a->addr = *addr;
a->port = port;
a->flags = flags;
a->from = from;
a->time = 0;
fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
}
}
void
tr_peerMgrAddPex( tr_peerMgr * manager,
const uint8_t * torrentHash,
@ -940,17 +998,8 @@ tr_peerMgrAddPex( tr_peerMgr * manager,
managerLock( manager );
t = getExistingTorrent( manager, torrentHash );
end = pex + pexCount;
while( pex != end )
{
int isNew;
tr_peer * peer = getPeer( t, &pex->in_addr, &isNew );
if( isNew ) {
peer->port = pex->port;
peer->from = from;
}
++pex;
}
for( end=pex+pexCount; pex!=end; ++pex )
maybeAddNewAtom( t, &pex->in_addr, pex->port, pex->flags, from );
reconnectSoon( t );
managerUnlock( manager );
@ -972,17 +1021,11 @@ tr_peerMgrAddPeers( tr_peerMgr * manager,
t = getExistingTorrent( manager, torrentHash );
for( i=0; t!=NULL && i<peerCount; ++i )
{
int isNew;
tr_peer * peer;
struct in_addr addr;
uint16_t port;
memcpy( &addr, walk, 4 ); walk += 4;
memcpy( &port, walk, 2 ); walk += 2;
peer = getPeer( t, &addr, &isNew );
if( isNew ) {
peer->port = port;
peer->from = from;
}
maybeAddNewAtom( t, &addr, port, 0, from );
}
reconnectSoon( t );
@ -1131,6 +1174,7 @@ tr_peerMgrAddTorrent( tr_peerMgr * manager,
t = tr_new0( Torrent, 1 );
t->manager = manager;
t->tor = tor;
t->pool = tr_ptrArrayNew( );
t->peers = tr_ptrArrayNew( );
t->requested = tr_bitfieldNew( tor->blockCount );
restartChokeTimer( t );
@ -1437,150 +1481,159 @@ rechokePulse( void * vtorrent )
return TRUE;
}
/**
***
**/
/***
****
****
****
***/
struct tr_connection
{
tr_peer * peer;
double throughput;
};
#define LAISSEZ_FAIRE_PERIOD_SECS 60
static int
shouldPeerBeDisconnected( Torrent * t, tr_peer * peer, int peerCount, int isSeeding )
compareConnections( const void * va, const void * vb )
{
const struct tr_connection * a = va;
const struct tr_connection * b = vb;
if( a->throughput < b->throughput ) return -1;
if( a->throughput > b->throughput ) return 1;
return 0;
}
static struct tr_connection *
getWeakConnections( Torrent * t, int * setmeSize )
{
int i, insize, outsize;
const int seeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
tr_peer ** peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &insize );
struct tr_connection * ret = tr_new( struct tr_connection, insize );
const time_t now = time( NULL );
int relaxStrictnessIfFewerThanN;
double strictness;
assert( torrentIsLocked( t ) );
assert( peer != NULL );
if( peer->io == NULL ) /* not connected */
return FALSE;
if( !t->isRunning ) /* the torrent is stopped... nobody should be connected */
return TRUE;
/* not enough peers to go around... might as well keep this one;
* they might unchoke us or give us a pex or something */
if( peerCount < MAX_CONNECTED_PEERS_PER_TORRENT )
return FALSE;
/* when deciding whether or not to keep a peer, judge its responsiveness
on a sliding scale that's based on how many other peers are available */
relaxStrictnessIfFewerThanN =
(int)(((MAX_CONNECTED_PEERS_PER_TORRENT * RELAX_RULES_PERCENTAGE) / 100.0) + 0.5);
/* if we have >= relaxIfFewerThan, strictness is 100%.
if we have zero connections, strictness is 0% */
if( peerCount >= relaxStrictnessIfFewerThanN )
strictness = 1.0;
else
strictness = peerCount / (double)relaxStrictnessIfFewerThanN;
/* test: has it been too long since we exchanged piece data? */
if( ( now - peer->connectionChangedAt ) >= MAX_TRANSFER_IDLE ) {
const uint64_t lo = MIN_TRANSFER_IDLE;
const uint64_t hi = MAX_TRANSFER_IDLE;
const uint64_t limit = lo + ((hi-lo) * strictness);
const uint64_t interval = now - (isSeeding ? peer->clientSentPieceDataAt : peer->peerSentPieceDataAt);
if( interval > limit )
return TRUE;
}
/* FIXME: SWE had other tests too... */
return FALSE;
}
static int
comparePeerByConnectionDate( const void * va, const void * vb )
{
const tr_peer * a = *(const tr_peer**) va;
const tr_peer * b = *(const tr_peer**) vb;
return tr_compareUint64( a->connectionChangedAt, b->connectionChangedAt );
}
static int
reconnectPulse( void * vt UNUSED )
{
int i, size, liveCount;
Torrent * t = vt;
tr_peer ** peers;
int isSeeding;
torrentLock( t );
peers = (tr_peer**) tr_ptrArrayPeek( t->peers, &size );
isSeeding = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
/* how many connections do we have? */
for( i=liveCount=0; i<size; ++i )
if( peers[i]->msgs != NULL )
++liveCount;
/* destroy and/or disconnect from some peers */
for( i=0; i<size; )
for( i=outsize=0; i<insize; ++i )
{
tr_peer * peer = peers[i];
const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
const double throughput = seeding ? tr_peerIoGetRateToPeer( peer->io )
: tr_peerIoGetRateToClient( peer->io );
if( peer->doPurge ) {
tr_ptrArrayErase( t->peers, i, i+1 );
freePeer( peer );
--size;
--liveCount;
if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS )
continue;
if( throughput >= 2 )
continue;
ret[outsize].peer = peer;
ret[outsize].throughput = throughput;
++outsize;
}
qsort( ret, outsize, sizeof(struct tr_connection), compareConnections );
*setmeSize = outsize;
return ret;
}
static int
compareAtomByTime( const void * va, const void * vb )
{
const struct peer_atom * a = * (const struct peer_atom**) va;
const struct peer_atom * b = * (const struct peer_atom**) vb;
if( a->time < b->time ) return -1;
if( a->time > b->time ) return 1;
return 0;
}
static struct peer_atom **
getPeerCandidates( Torrent * t, int * setmeSize )
{
int i, insize, outsize;
struct peer_atom ** atoms;
struct peer_atom ** ret;
const time_t now = time( NULL );
const int seed = tr_cpGetStatus( t->tor->completion ) != TR_CP_INCOMPLETE;
assert( torrentIsLocked( t ) );
atoms = (struct peer_atom**) tr_ptrArrayPeek( t->pool, &insize );
ret = tr_new( struct peer_atom*, insize );
for( i=outsize=0; i<insize; ++i )
{
struct peer_atom * atom = atoms[i];
/* we don't need two connections to the same peer... */
if( peerIsInUse( t, &atom->addr ) ) {
fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
continue;
}
if( shouldPeerBeDisconnected( t, peer, liveCount, isSeeding ) ) {
disconnectPeer( peer );
--liveCount;
/* no need to connect if we're both seeds... */
if( seed && ( atom->flags & 2 ) ) {
fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
continue;
}
++i;
/* if we used this peer recently, give someone else a turn */
if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS ) {
fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
continue;
}
ret[outsize++] = atom;
}
/* maybe connect to some new peers */
if( t->isRunning && (liveCount<MAX_CONNECTED_PEERS_PER_TORRENT) )
{
int poolSize;
int left = MAX_CONNECTED_PEERS_PER_TORRENT - liveCount;
tr_peer ** pool;
tr_peerMgr * manager = t->manager;
const time_t now = time( NULL );
qsort( ret, outsize, sizeof(struct peer_atom*), compareAtomByTime );
*setmeSize = outsize;
return ret;
}
/* make a list of peers we know about but aren't connected to */
poolSize = 0;
pool = tr_new0( tr_peer*, size );
for( i=0; i<size; ++i ) {
tr_peer * peer = peers[i];
if( peer->msgs == NULL )
pool[poolSize++] = peer;
}
static int
reconnectPulse( void * vtorrent )
{
Torrent * t = vtorrent;
struct peer_atom ** candidates;
struct tr_connection * connections;
int i, nCandidates, nConnections, nCull, nAdd;
/* sort them s.t. the ones we've already tried are at the last of the list */
qsort( pool, poolSize, sizeof(tr_peer*), comparePeerByConnectionDate );
torrentLock( t );
/* make some connections */
for( i=0; i<poolSize && left>0; ++i )
{
tr_peer * peer = pool[i];
tr_peerIo * io;
connections = getWeakConnections( t, &nConnections );
candidates = getPeerCandidates( t, &nCandidates );
if( ( now - peer->connectionChangedAt ) < MIN_HANGUP_PERIOD_SEC )
break;
/* figure out how many peers to disconnect */
nCull = nConnections-4;
/* already have a handshake pending */
if( getExistingHandshake( manager, &peer->in_addr ) != NULL )
continue;
fprintf( stderr, "RECONNECT pulse for [%s]: %d connections, %d candidates, %d atoms, %d cull\n", t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), nCull );
/* initiate a connection to the peer */
io = tr_peerIoNewOutgoing( manager->handle, &peer->in_addr, peer->port, t->hash );
/*fprintf( stderr, "[%s] connecting to potential peer %s\n", t->tor->info.name, tr_peerIoGetAddrStr(io) );*/
peer->connectionChangedAt = time( NULL );
initiateHandshake( manager, io );
--left;
}
for( i=0; i<nConnections; ++i )
fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1, tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
tr_free( pool );
/* disconnect some peers */
for( i=0; i<nCull && i<nConnections; ++i ) {
const double throughput = connections[i].throughput;
tr_peer * peer = connections[i].peer;
fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n", tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
removePeer( t, peer );
}
/* add some new ones */
nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - nConnections;
for( i=0; i<nAdd && i<nCandidates; ++i ) {
struct peer_atom * atom = candidates[i];
tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
initiateHandshake( t->manager, io );
atom->time = time( NULL );
}
/* cleanup */
tr_free( connections );
tr_free( candidates );
torrentUnlock( t );
return TRUE;
}

View File

@ -154,7 +154,7 @@ myDebug( const char * file, int line, const struct tr_peermsgs * msgs, const cha
va_list args;
const char * addr = tr_peerIoGetAddrStr( msgs->io );
struct evbuffer * buf = evbuffer_new( );
evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs );
evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs->io );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );
@ -242,36 +242,30 @@ isPieceInteresting( const tr_peermsgs * peer,
return TRUE;
}
/* "interested" means we'll ask for piece data from the peer if they unchoke us */
static int
isPeerInteresting( const tr_peermsgs * msgs )
{
int i;
const tr_torrent * torrent;
const tr_bitfield * bitfield;
const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
const int peerIsSeed = msgs->info->progress >= 1.0;
if( peerIsSeed )
{
return !clientIsSeed;
}
else if( clientIsSeed )
{
return !peerIsSeed;
}
else /* we're both leeches... */
{
int i;
const tr_torrent * torrent = msgs->torrent;
const tr_bitfield * bitfield = tr_cpPieceBitfield( torrent->completion );
if( clientIsSeed )
return FALSE;
if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
torrent = msgs->torrent;
bitfield = tr_cpPieceBitfield( torrent->completion );
if( !msgs->info->have ) /* We don't know what this peer has... what should this be? */
return TRUE;
assert( bitfield->len == msgs->info->have->len );
for( i=0; i<torrent->info.pieceCount; ++i )
if( isPieceInteresting( msgs, i ) )
return TRUE;
assert( bitfield->len == msgs->info->have->len );
for( i=0; i<torrent->info.pieceCount; ++i )
if( isPieceInteresting( msgs, i ) )
return TRUE;
return FALSE;
}
return FALSE;
}
static void
@ -689,6 +683,7 @@ parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
dbgmsg( msgs, "got ltep handshake" );
parseLtepHandshake( msgs, msglen, inbuf );
sendLtepHandshake( msgs );
sendPex( msgs );
}
else if( ltep_msgid == msgs->ut_pex_id )
{
@ -774,6 +769,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
assert( msglen == 0 );
msgs->info->peerIsInterested = 1;
tr_peerMsgsSetChoke( msgs, 0 );
break;
case BT_NOT_INTERESTED:
@ -1236,6 +1232,7 @@ pulse( void * vmsgs )
if( !canWrite( msgs ) )
{
}
#if 0
else if(( len = EVBUFFER_LENGTH( msgs->outBlock ) ))
{
while ( len && canUpload( msgs ) )
@ -1250,6 +1247,7 @@ pulse( void * vmsgs )
dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen, (int)len );
}
}
#endif
else if(( len = EVBUFFER_LENGTH( msgs->outMessages ) ))
{
tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
@ -1257,19 +1255,30 @@ pulse( void * vmsgs )
}
else if(( msgs->peerAskedFor ))
{
struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
uint8_t * tmp = tr_new( uint8_t, req->length );
const uint32_t msglen = sizeof(uint8_t) + 2*sizeof(uint32_t) + req->length;
assert( requestIsValid( msgs, req ) );
tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
tr_peerIoWriteUint32( msgs->io, msgs->outBlock, msglen );
tr_peerIoWriteUint8 ( msgs->io, msgs->outBlock, BT_PIECE );
tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->index );
tr_peerIoWriteUint32( msgs->io, msgs->outBlock, req->offset );
tr_peerIoWriteBytes ( msgs->io, msgs->outBlock, tmp, req->length );
tr_free( tmp );
dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
tr_free( req );
if( canUpload( msgs ) )
{
struct peer_request * req = tr_list_pop_front( &msgs->peerAskedFor );
uint8_t * tmp = tr_new( uint8_t, req->length );
const uint32_t msglen = sizeof(uint8_t) + 2*sizeof(uint32_t) + req->length;
struct evbuffer * out = evbuffer_new( );
assert( requestIsValid( msgs, req ) );
tr_peerIoWriteUint32( msgs->io, out, msglen );
tr_peerIoWriteUint8 ( msgs->io, out, BT_PIECE );
tr_peerIoWriteUint32( msgs->io, out, req->index );
tr_peerIoWriteUint32( msgs->io, out, req->offset );
tr_peerIoWriteBuf( msgs->io, out );
tr_ioRead( msgs->torrent, req->index, req->offset, req->length, tmp );
tr_peerIoWrite( msgs->io, tmp, req->length );
peerGotBytes( msgs, req->length );
dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)req->index, (int)req->offset, (int)req->length, tr_list_size(msgs->peerAskedFor) );
tr_free( req );
tr_free( tmp );
evbuffer_free( out );
}
}
else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
{
@ -1308,7 +1317,7 @@ sendBitfield( tr_peermsgs * msgs )
/* some peers give us error messages if we send
more than this many peers in a single pex message */
#define MAX_PEX_DIFFS 50
#define MAX_PEX_DIFFS 200
typedef struct
{

View File

@ -40,11 +40,27 @@ tr_ptrArrayNew( void )
}
void
tr_ptrArrayFree( tr_ptrArray * t )
tr_ptrArrayForeach( tr_ptrArray * t, PtrArrayForeachFunc func )
{
int i;
assert( t != NULL );
assert( t->items != NULL );
assert( func != NULL );
for( i=0; i<t->n_items; ++i )
func( t->items[i] );
}
void
tr_ptrArrayFree( tr_ptrArray * t, PtrArrayForeachFunc func )
{
assert( t != NULL );
assert( t->items != NULL );
if( func != NULL )
tr_ptrArrayForeach( t, func );
tr_free( t->items );
tr_free( t );
}

View File

@ -18,9 +18,11 @@
*/
typedef struct tr_ptrArray tr_ptrArray;
tr_ptrArray * tr_ptrArrayNew ( void );
typedef void (*PtrArrayForeachFunc)(void *);
void tr_ptrArrayFree ( tr_ptrArray* );
tr_ptrArray * tr_ptrArrayNew ( void );
void tr_ptrArrayForeach ( tr_ptrArray*, PtrArrayForeachFunc func );
void tr_ptrArrayFree ( tr_ptrArray*, PtrArrayForeachFunc func );
void* tr_ptrArrayNth ( tr_ptrArray*, int n );
void** tr_ptrArrayPeek ( tr_ptrArray*, int * size );
void** tr_ptrArrayBase ( tr_ptrArray* );

View File

@ -385,9 +385,9 @@ onTorrentFreeNow( void * vtor )
if( t->connection != NULL )
evhttp_connection_free( t->connection );
tr_ptrArrayFree( t->torrents );
tr_ptrArrayFree( t->scrapeQueue );
tr_ptrArrayFree( t->scraping );
tr_ptrArrayFree( t->torrents, NULL );
tr_ptrArrayFree( t->scrapeQueue, NULL );
tr_ptrArrayFree( t->scraping, NULL );
for( i=0; i<t->addressCount; ++i )
tr_trackerInfoClear( &t->addresses[i] );