* simplify the the choking intervals

* fix peer-mgr destructor bug
* send debugging messagese to TR_DEBUG_FD instead of the console
This commit is contained in:
Charles Kerr 2007-10-02 14:35:02 +00:00
parent 77d7cf76a1
commit f237bf7047
5 changed files with 73 additions and 64 deletions

View File

@ -450,12 +450,10 @@ tr_peerIoWriteBytes( tr_peerIo * io,
switch( io->encryptionMode )
{
case PEER_ENCRYPTION_NONE:
/*fprintf( stderr, "writing %d plaintext bytes to outbuf...\n", byteCount );*/
evbuffer_add( outbuf, bytes, byteCount );
break;
case PEER_ENCRYPTION_RC4:
/*fprintf( stderr, "encrypting and writing %d bytes to outbuf...\n", byteCount );*/
tmp = tr_new( uint8_t, byteCount );
tr_cryptoEncrypt( io->crypto, byteCount, bytes, tmp );
evbuffer_add( outbuf, tmp, byteCount );
@ -504,12 +502,10 @@ tr_peerIoReadBytes( tr_peerIo * io,
switch( io->encryptionMode )
{
case PEER_ENCRYPTION_NONE:
/*fprintf( stderr, "reading %d plaintext bytes from inbuf...\n", byteCount );*/
evbuffer_remove( inbuf, bytes, byteCount );
break;
case PEER_ENCRYPTION_RC4:
/*fprintf( stderr, "reading AND DECRYPTING %d bytes from inbuf...\n", byteCount );*/
evbuffer_remove( inbuf, bytes, byteCount );
tr_cryptoDecrypt( io->crypto, byteCount, bytes, bytes );
break;

View File

@ -16,6 +16,8 @@
#include <stdio.h> /* printf */
#include <limits.h> /* INT_MAX */
#include <event.h>
#include "transmission.h"
#include "clients.h"
#include "completion.h"
@ -51,9 +53,6 @@ enum
/* don't change a peer's choke status more often than this */
MIN_CHOKE_PERIOD_SEC = 10,
/* how soon is `soon' in the rechokeSoon, reconnecSoon funcs */
SOON_MSEC = 1000,
/* following the BT spec, we consider ourselves `snubbed' if
* we're we don't get piece data from a peer in this long */
SNUBBED_SEC = 60,
@ -100,7 +99,6 @@ typedef struct
tr_ptrArray * peers; /* tr_peer */
tr_timer * reconnectTimer;
tr_timer * rechokeTimer;
tr_timer * rechokeSoonTimer;
tr_timer * refillTimer;
tr_torrent * tor;
tr_bitfield * requested;
@ -123,6 +121,29 @@ struct tr_peerMgr
***
**/
static void
myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
{
FILE * fp = tr_getLog( );
if( fp != NULL )
{
va_list args;
struct evbuffer * buf = evbuffer_new( );
evbuffer_add_printf( buf, "[%s:%d] %s ", file, line, t->tor->info.name );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );
fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
evbuffer_free( buf );
}
}
#define tordbg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt )
/**
***
**/
static void
managerLock( struct tr_peerMgr * manager )
{
@ -350,30 +371,26 @@ removeAllPeers( Torrent * t )
removePeer( t, tr_ptrArrayNth( t->peers, 0 ) );
}
/* torrent must have already been removed from manager->torrents */
static void
freeTorrent( tr_peerMgr * manager, Torrent * t )
freeTorrent( Torrent * t )
{
uint8_t hash[SHA_DIGEST_LENGTH];
assert( t != NULL );
assert( t->peers != NULL );
assert( torrentIsLocked( t ) );
assert( getExistingTorrent( manager, t->hash ) != NULL );
memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
tr_timerFree( &t->reconnectTimer );
tr_timerFree( &t->rechokeTimer );
tr_timerFree( &t->rechokeSoonTimer );
tr_timerFree( &t->refillTimer );
tr_bitfieldFree( t->requested );
tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer );
tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
tr_free( t );
assert( getExistingTorrent( manager, hash ) == NULL );
}
/**
@ -455,9 +472,15 @@ tr_peerMgrFree( tr_peerMgr * manager )
{
managerLock( manager );
tr_ptrArrayFree( manager->handshakes, (PtrArrayForeachFunc)tr_handshakeAbort );
/* free the torrents. */
tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent );
/* free the handshakes. Abort invokes handshakeDoneCB(), which removes
* the item from manager->handshakes, so this is a little roundabout... */
while( !tr_ptrArrayEmpty( manager->handshakes ) )
tr_handshakeAbort( tr_ptrArrayNth( manager->handshakes, 0 ) );
tr_ptrArrayFree( manager->handshakes, NULL );
managerUnlock( manager );
tr_free( manager );
}
@ -489,10 +512,10 @@ getConnectedPeers( Torrent * t, int * setmeCount )
struct tr_refill_piece
{
tr_priority_t priority;
uint16_t random;
uint32_t piece;
uint32_t peerCount;
uint32_t fastAllowed;
uint8_t random;
};
static int
@ -514,7 +537,7 @@ compareRefillPiece (const void * aIn, const void * bIn)
return a->fastAllowed < b->fastAllowed ? -1 : 1;
/* otherwise go with our random seed */
return tr_compareUint8( a->random, b->random );
return tr_compareUint16( a->random, b->random );
}
static int
@ -566,7 +589,7 @@ getPreferredPieces( Torrent * t,
setme->priority = inf->pieces[piece].priority;
setme->peerCount = 0;
setme->fastAllowed = 0;
setme->random = tr_rand( UINT8_MAX );
setme->random = tr_rand( UINT16_MAX );
/* FIXME */
// setme->fastAllowed = tr_bitfieldHas( t->tor->allowedList, i);
@ -585,11 +608,6 @@ getPreferredPieces( Torrent * t,
tr_free( p );
}
#if 0
fprintf (stderr, "new pool: ");
for (i=0; i<15 && i<(int)poolSize; ++i ) fprintf (stderr, "%d, ", (int)pool[i] );
fprintf (stderr, "\n");
#endif
tr_free( peers );
*pieceCount = poolSize;
@ -609,7 +627,6 @@ getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
assert( torrentIsLocked( t ) );
pieces = getPreferredPieces( t, &pieceCount );
/*fprintf( stderr, "REFILL refillPulse for {%s} got %d of %d pieces\n", tor->info.name, (int)pieceCount, t->tor->info.pieceCount );*/
req = tr_new( uint64_t, pieceCount * tor->blockCountInPiece );
reqCount = 0;
@ -630,7 +647,6 @@ getPreferredBlocks( Torrent * t, uint64_t * setmeCount )
unreq[unreqCount++] = block;
}
/*fprintf( stderr, "REFILL refillPulse for {%s} reqCount is %d, unreqCount is %d\n", tor->info.name, (int)reqCount, (int)unreqCount );*/
ret = walk = tr_new( uint64_t, unreqCount + reqCount );
memcpy( walk, unreq, sizeof(uint64_t) * unreqCount );
walk += unreqCount;
@ -667,8 +683,6 @@ refillPulse( void * vtorrent )
blocks = getPreferredBlocks( t, &blockCount );
peers = getConnectedPeers( t, &peerCount );
/*fprintf( stderr, "REFILL refillPulse for {%s} got %d blocks\n", tor->info.name, (int)blockCount );*/
for( i=0; peerCount && i<blockCount; ++i )
{
const int block = blocks[i];
@ -697,7 +711,6 @@ refillPulse( void * vtorrent )
break;
case TR_ADDREQ_OK:
/*fprintf( stderr, "REFILL peer %p took the request for block %d\n", peers[j]->msgs, block );*/
tr_bitfieldAdd( t->requested, block );
j = peerCount;
break;
@ -775,30 +788,6 @@ restartChokeTimer( Torrent * t )
t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
}
static void
rechokeNow( Torrent * t )
{
rechokePulse( t );
restartChokeTimer( t );
}
static int
rechokeSoonCB( void * vt )
{
Torrent * t = vt;
rechokeNow( t );
t->rechokeSoonTimer = NULL;
return FALSE;
}
static void
rechokeSoon( Torrent * t )
{
if( t->rechokeSoonTimer == NULL )
t->rechokeSoonTimer = tr_timerNew( t->manager->handle,
rechokeSoonCB, t, SOON_MSEC );
}
static void
msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
{
@ -858,7 +847,7 @@ ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8
a->flags = flags;
a->from = from;
a->time = 0;
fprintf( stderr, "torrent [%s] getting a new atom: %s\n", t->tor->info.name, tr_peerIoAddrStr(&a->addr,a->port) );
tordbg( t, "got a new atom: %s", tr_peerIoAddrStr(&a->addr,a->port) );
tr_ptrArrayInsertSorted( t->pool, a, comparePeerAtoms );
}
}
@ -933,7 +922,6 @@ myHandshakeDoneCB( tr_handshake * handshake,
tr_free( peer->client );
peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
peer->msgsTag = tr_peerMsgsSubscribe( peer->msgs, msgsCallbackFunc, t );
rechokeSoon( t );
}
}
@ -1178,7 +1166,8 @@ tr_peerMgrRemoveTorrent( tr_peerMgr * manager,
t = getExistingTorrent( manager, torrentHash );
assert( t != NULL );
stopTorrent( t );
freeTorrent( manager, t );
tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
freeTorrent( t );
managerUnlock( manager );
}
@ -1562,19 +1551,19 @@ getPeerCandidates( Torrent * t, int * setmeSize )
/* we don't need two connections to the same peer... */
if( peerIsInUse( t, &atom->addr ) ) {
fprintf( stderr, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
tordbg( t, "RECONNECT peer %d (%s) is in use...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
continue;
}
/* no need to connect if we're both seeds... */
if( seed && (atom->flags & ADDED_F_SEED_FLAG) ) {
fprintf( stderr, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
tordbg( t, "RECONNECT peer %d (%s) is a seed and so are we...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
continue;
}
/* if we used this peer recently, give someone else a turn */
if( ( now - atom->time ) < LAISSEZ_FAIRE_PERIOD_SECS ) {
fprintf( stderr, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
tordbg( t, "RECONNECT peer %d (%s) is in its grace period...\n", i, tr_peerIoAddrStr(&atom->addr,atom->port) );
continue;
}
@ -1604,19 +1593,19 @@ reconnectPulse( void * vtorrent )
struct tr_connection * connections = getWeakConnections( t, &nConnections );
const int peerCount = tr_ptrArraySize( t->peers );
fprintf( stderr, "RECONNECT pulse for [%s]: %d weak connections, %d connection candidates, %d atoms, max per pulse is %d\n",
tordbg( t, "RECONNECT pulse for [%s]: %d weak connections, %d connection candidates, %d atoms, max per pulse is %d\n",
t->tor->info.name, nConnections, nCandidates, tr_ptrArraySize(t->pool), (int)MAX_RECONNECTIONS_PER_PULSE );
for( i=0; i<nConnections; ++i )
fprintf( stderr, "connection #%d: %s @ %.2f\n", i+1,
tordbg( t, "connection #%d: %s @ %.2f\n", i+1,
tr_peerIoAddrStr( &connections[i].peer->in_addr, connections[i].peer->port ), connections[i].throughput );
/* disconnect some peers */
for( i=0; i<nConnections && i<(peerCount-5); ++i ) {
const double throughput = connections[i].throughput;
tr_peer * peer = connections[i].peer;
fprintf( stderr, "RECONNECT culling peer %s, whose throughput was %f\n",
tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
tordbg( t, "RECONNECT culling peer %s, whose throughput was %f\n",
tr_peerIoAddrStr(&peer->in_addr, peer->port), throughput );
removePeer( t, peer );
}
@ -1625,7 +1614,7 @@ reconnectPulse( void * vtorrent )
for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i ) {
struct peer_atom * atom = candidates[i];
tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
fprintf( stderr, "RECONNECT adding an outgoing connection...\n" );
tordbg( t, "RECONNECT adding an outgoing connection...\n" );
initiateHandshake( t->manager, io );
atom->time = time( NULL );
}

View File

@ -319,6 +319,7 @@ tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, choke ? BT_CHOKE : BT_UNCHOKE );
msgs->info->chokeChangedAt = time( NULL );
}
}
@ -563,8 +564,10 @@ sendLtepHandshake( tr_peermsgs * msgs )
tr_peerIoWriteBuf( msgs->io, outbuf );
#if 0
dbgmsg( msgs, "here is the ltep handshake we sent:" );
tr_bencPrint( &val );
#endif
/* cleanup */
tr_bencFree( &val );
@ -587,8 +590,10 @@ parseLtepHandshake( tr_peermsgs * msgs, int len, struct evbuffer * inbuf )
return;
}
#if 0
dbgmsg( msgs, "here is the ltep handshake we read:" );
tr_bencPrint( &val );
#endif
/* does the peer prefer encrypted connections? */
sub = tr_bencDictFind( &val, "e" );
@ -787,16 +792,20 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
firePeerProgress( msgs );
break;
case BT_BITFIELD:
case BT_BITFIELD: {
const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
assert( msglen == msgs->info->have->len );
tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
msgs->info->progress = tr_bitfieldCountTrueBits( msgs->info->have ) / (float)msgs->torrent->info.pieceCount;
if( clientIsSeed && ( msgs->info->progress < 1.0 ) )
tr_peerMsgsSetChoke( msgs, FALSE );
dbgmsg( msgs, "after the HAVE message, peer progress is %f", msgs->info->progress );
updateInterest( msgs );
fireNeedReq( msgs );
firePeerProgress( msgs );
break;
}
case BT_REQUEST: {
struct peer_request * req;

View File

@ -39,6 +39,20 @@ tr_ptrArrayNew( void )
return p;
}
tr_ptrArray*
tr_ptrArrayDup( tr_ptrArray* in )
{
tr_ptrArray * out;
out = tr_new( tr_ptrArray, 1 );
out->n_items = in->n_items;
out->n_alloc = in->n_items;
out->items = tr_new( void*, out->n_alloc );
memcpy( out->items, in->items, out->n_items * sizeof(void*) );
return out;
}
void
tr_ptrArrayForeach( tr_ptrArray * t, PtrArrayForeachFunc func )
{

View File

@ -21,6 +21,7 @@ typedef struct tr_ptrArray tr_ptrArray;
typedef void (*PtrArrayForeachFunc)(void *);
tr_ptrArray * tr_ptrArrayNew ( void );
tr_ptrArray * tr_ptrArrayDup ( tr_ptrArray* );
void tr_ptrArrayForeach ( tr_ptrArray*, PtrArrayForeachFunc func );
void tr_ptrArrayFree ( tr_ptrArray*, PtrArrayForeachFunc func );
void* tr_ptrArrayNth ( tr_ptrArray*, int n );