* add a per-peer request queue to hold the next 10-15 seconds' worth of requests so that we always have more requests at hand when the current requests start to run low.

* increase the tracker `numwant' variable to grow our peer pool
* bugfixes in cancelling requests.
* make the debug log sexy and readable like uTorrent's ;)
This commit is contained in:
Charles Kerr 2007-10-06 18:20:52 +00:00
parent 985b34a32f
commit c75f49bc26
9 changed files with 328 additions and 151 deletions

View File

@ -157,13 +157,17 @@ myDebug( const char * file, int line, const tr_handshake * handshake, const char
if( fp != NULL )
{
va_list args;
const char * addr = tr_peerIoGetAddrStr( handshake->io );
struct evbuffer * buf = evbuffer_new( );
evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, handshake->io );
char timestr[64];
evbuffer_add_printf( buf, "[%s] %s: ",
tr_getLogTimeStr( timestr, sizeof(timestr) ),
tr_peerIoGetAddrStr( handshake->io ) );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );
fprintf( stderr, "%s\n", EVBUFFER_DATA(buf) );
evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
evbuffer_free( buf );
}
}
@ -580,7 +584,7 @@ readHandshake( tr_handshake * handshake, struct evbuffer * inbuf )
/* FIXME: use readHandshake here */
dbgmsg( handshake, "payload: need %d, got %d\n", (int)HANDSHAKE_SIZE, (int)EVBUFFER_LENGTH(inbuf) );
dbgmsg( handshake, "payload: need %d, got %d", (int)HANDSHAKE_SIZE, (int)EVBUFFER_LENGTH(inbuf) );
if( EVBUFFER_LENGTH(inbuf) < HANDSHAKE_SIZE )
return READ_MORE;
@ -974,7 +978,7 @@ canRead( struct bufferevent * evin, void * arg )
tr_handshake * handshake = (tr_handshake *) arg;
struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
ReadState ret;
dbgmsg( handshake, "handling canRead; state is [%s]\n", getStateName(handshake->state) );
dbgmsg( handshake, "handling canRead; state is [%s]", getStateName(handshake->state) );
switch( handshake->state )
{

View File

@ -56,8 +56,11 @@ typedef struct tr_peer
struct tr_peermsgs * msgs;
tr_publisher_tag msgsTag;
struct tr_ratecontrol * rateToClient;
struct tr_ratecontrol * rateToPeer;
struct tr_ratecontrol * rcToClient;
struct tr_ratecontrol * rcToPeer;
double rateToClient;
double rateToPeer;
}
tr_peer;

View File

@ -39,13 +39,13 @@
enum
{
/* how frequently to change which peers are choked */
RECHOKE_PERIOD_MSEC = (15 * 1000),
RECHOKE_PERIOD_MSEC = (1000),
/* how frequently to decide which peers live and die */
RECONNECT_PERIOD_MSEC = (5 * 1000),
/* how frequently to refill peers' request lists */
REFILL_PERIOD_MSEC = 1500,
REFILL_PERIOD_MSEC = 666,
/* don't change a peer's choke status more often than this */
MIN_CHOKE_PERIOD_SEC = 10,
@ -60,13 +60,11 @@ enum
/* how many peers to unchoke per-torrent. */
/* FIXME: make this user-configurable? */
NUM_UNCHOKED_PEERS_PER_TORRENT = 16, /* arbitrary */
NUM_UNCHOKED_PEERS_PER_TORRENT = 12, /* arbitrary */
/* another arbitrary number */
MAX_RECONNECTIONS_PER_MINUTE = 60,
MAX_RECONNECTIONS_PER_PULSE =
((MAX_RECONNECTIONS_PER_MINUTE * RECONNECT_PERIOD_MSEC) / (60*1000)),
/* set this too high and there will be a lot of churn.
* set it too low and you'll get peers too slowly */
MAX_RECONNECTIONS_PER_PULSE = 8,
/* corresponds to ut_pex's added.f flags */
ADDED_F_ENCRYPTION_FLAG = 1,
@ -131,11 +129,16 @@ myDebug( const char * file, int line, const Torrent * t, const char * fmt, ... )
{
va_list args;
struct evbuffer * buf = evbuffer_new( );
evbuffer_add_printf( buf, "[%s:%d] %s ", file, line, t->tor->info.name );
char timestr[64];
evbuffer_add_printf( buf, "[%s] %s: ",
tr_getLogTimeStr( timestr, sizeof(timestr) ),
t->tor->info.name );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );
fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
evbuffer_free( buf );
}
}
@ -301,8 +304,8 @@ peerConstructor( const struct in_addr * in_addr )
{
tr_peer * p;
p = tr_new0( tr_peer, 1 );
p->rateToClient = tr_rcInit( );
p->rateToPeer = tr_rcInit( );
p->rcToClient = tr_rcInit( );
p->rcToPeer = tr_rcInit( );
memcpy( &p->in_addr, in_addr, sizeof(struct in_addr) );
return p;
}
@ -338,8 +341,8 @@ peerDestructor( tr_peer * peer )
tr_bitfieldFree( peer->have );
tr_bitfieldFree( peer->blame );
tr_bitfieldFree( peer->banned );
tr_rcClose( peer->rateToClient );
tr_rcClose( peer->rateToPeer );
tr_rcClose( peer->rcToClient );
tr_rcClose( peer->rcToPeer );
tr_free( peer->client );
tr_free( peer );
}
@ -697,6 +700,7 @@ refillPulse( void * vtorrent )
return TRUE;
torrentLock( t );
tordbg( t, "Refilling Request Buffers..." );
blocks = getPreferredBlocks( t, &blockCount );
peers = getConnectedPeers( t, &peerCount );
@ -746,7 +750,6 @@ refillPulse( void * vtorrent )
tr_free( blocks );
t->refillTimer = NULL;
torrentUnlock( t );
return FALSE;
}
@ -797,6 +800,10 @@ msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
REFILL_PERIOD_MSEC );
break;
case TR_PEERMSG_CANCEL:
tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
break;
case TR_PEERMSG_CLIENT_HAVE:
broadcastClientHave( t, e->pieceIndex );
tr_torrentRecheckCompleteness( t->tor );
@ -905,6 +912,8 @@ myHandshakeDoneCB( tr_handshake * handshake,
uint16_t port;
const struct in_addr * addr = tr_peerIoGetAddress( io, &port );
ensureAtomExists( t, addr, port, 0, TR_PEER_FROM_INCOMING );
tr_free( peer->client );
peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
if( peer->msgs != NULL ) { /* we already have this peer */
tr_peerIoFree( io );
--manager->connectionCount;
@ -912,8 +921,6 @@ myHandshakeDoneCB( tr_handshake * handshake,
peer->port = port;
peer->io = io;
peer->msgs = tr_peerMsgsNew( t->tor, peer, msgsCallbackFunc, t, &peer->msgsTag );
tr_free( peer->client );
peer->client = peer_id ? tr_clientForId( peer_id ) : NULL;
}
}
@ -1257,10 +1264,10 @@ tr_peerMgrTorrentStats( const tr_peerMgr * manager,
++setmePeersFrom[atom->from];
if( tr_rcRate( peer->rateToPeer ) > 0.01 )
if( peer->rateToPeer > 0.01 )
++*setmePeersGettingFromUs;
if( tr_rcRate( peer->rateToClient ) > 0.01 )
if( peer->rateToClient > 0.01 )
++*setmePeersSendingToUs;
}
@ -1297,8 +1304,8 @@ tr_peerMgrPeerStats( const tr_peerMgr * manager,
stat->client = tr_strdup( peer->client ? peer->client : "" );
stat->progress = peer->progress;
stat->isEncrypted = tr_peerIoIsEncrypted( peer->io ) ? 1 : 0;
stat->uploadToRate = tr_rcRate( peer->rateToPeer );
stat->downloadFromRate = tr_rcRate( peer->rateToClient );
stat->uploadToRate = peer->rateToPeer;
stat->downloadFromRate = peer->rateToClient;
stat->isDownloading = stat->uploadToRate > 0.01;
stat->isUploading = stat->downloadFromRate > 0.01;
}
@ -1355,6 +1362,13 @@ clientIsSnubbedBy( const tr_peer * peer )
***
**/
static double
getWeightedThroughput( const tr_peer * peer )
{
return ( 3 * peer->rateToPeer )
+ ( 1 * peer->rateToClient );
}
static void
rechoke( Torrent * t )
{
@ -1377,8 +1391,7 @@ rechoke( Torrent * t )
node->peer = peer;
node->preferred = peer->peerIsInterested && !clientIsSnubbedBy(peer);
node->randomKey = tr_rand( INT_MAX );
node->rate = (3*tr_rcRate(peer->rateToPeer))
+ (1*tr_rcRate(peer->rateToClient));
node->rate = getWeightedThroughput( peer );
}
qsort( choke, size, sizeof(struct ChokeData), compareChoke );
@ -1438,8 +1451,7 @@ getWeakConnections( Torrent * t, int * setmeSize )
int isWeak;
const struct peer_atom * atom = getExistingAtom( t, &peer->in_addr );
const int peerIsSeed = atom->flags & ADDED_F_SEED_FLAG;
const double throughput = (3*tr_rcRate(peer->rateToPeer))
+ (1*tr_rcRate(peer->rateToClient));
const double throughput = getWeightedThroughput( peer );
assert( atom != NULL );
@ -1545,7 +1557,8 @@ reconnectPulse( void * vtorrent )
/* add some new ones */
nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
for( i=0; i<nAdd && i<nCandidates; ++i )
for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
//for( i=0; i<nCandidates; ++i )
{
tr_peerMgr * mgr = t->manager;

View File

@ -68,7 +68,8 @@ enum
KEEPALIVE_INTERVAL_SECS = 90, /* idle seconds before we send a keepalive */
PEX_INTERVAL = (60 * 1000), /* msec between calls to sendPex() */
PEER_PULSE_INTERVAL = (33), /* msec between calls to pulse() */
PEER_PULSE_INTERVAL = (66), /* msec between calls to pulse() */
RATE_PULSE_INTERVAL = (333), /* msec between calls to ratePulse() */
};
enum
@ -87,7 +88,7 @@ struct peer_request
};
static int
peer_request_compare( const void * va, const void * vb )
compareRequest( const void * va, const void * vb )
{
struct peer_request * a = (struct peer_request*) va;
struct peer_request * b = (struct peer_request*) vb;
@ -111,7 +112,9 @@ struct tr_peermsgs
struct evbuffer * inBlock; /* the block we're currently receiving */
tr_list * peerAskedFor;
tr_list * clientAskedFor;
tr_list * clientWillAskFor;
tr_timer * rateTimer;
tr_timer * pulseTimer;
tr_timer * pexTimer;
@ -130,12 +133,11 @@ struct tr_peermsgs
tr_bitfield * peerAllowedPieces;
uint8_t state;
uint8_t ut_pex_id;
uint16_t pexCount;
uint32_t incomingMessageLength;
uint32_t maxActiveRequests;
uint32_t minActiveRequests;
tr_pex * pex;
};
@ -153,18 +155,78 @@ myDebug( const char * file, int line,
if( fp != NULL )
{
va_list args;
const char * addr = tr_peerIoGetAddrStr( msgs->io );
struct evbuffer * buf = evbuffer_new( );
evbuffer_add_printf( buf, "[%s:%d] %s (%p) ", file, line, addr, msgs->io );
char timestr[64];
evbuffer_add_printf( buf, "[%s] %s [%s]: ",
tr_getLogTimeStr( timestr, sizeof(timestr) ),
tr_peerIoGetAddrStr( msgs->io ),
msgs->info->client );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );
fprintf( fp, "%s\n", EVBUFFER_DATA(buf) );
evbuffer_add_printf( buf, " (%s:%d)\n", file, line );
fwrite( EVBUFFER_DATA(buf), 1, EVBUFFER_LENGTH(buf), fp );
evbuffer_free( buf );
}
}
#define dbgmsg(handshake, fmt...) myDebug(__FILE__, __LINE__, handshake, ##fmt )
#define dbgmsg(msgs, fmt...) myDebug(__FILE__, __LINE__, msgs, ##fmt )
/**
***
**/
static void
protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
dbgmsg( msgs, "requesting %u:%u->%u", req->index, req->offset, req->length );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
}
static void
protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
dbgmsg( msgs, "cancelling %u:%u->%u", req->index, req->offset, req->length );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 3*sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
}
static void
protocolSendHave( tr_peermsgs * msgs, uint32_t index )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
dbgmsg( msgs, "sending Have %u", index );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_HAVE );
tr_peerIoWriteUint32( io, out, index );
}
static void
protocolSendChoke( tr_peermsgs * msgs, int choke )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outMessages;
dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
}
/**
*** EVENTS
@ -191,6 +253,7 @@ fireNeedReq( tr_peermsgs * msgs )
{
tr_peermsgs_event e = blankEvent;
e.eventType = TR_PEERMSG_NEED_REQ;
dbgmsg( msgs, "firing NEED_REQ" );
publish( msgs, &e );
}
@ -223,6 +286,17 @@ fireGotBlock( tr_peermsgs * msgs, uint32_t pieceIndex, uint32_t offset, uint32_t
publish( msgs, &e );
}
static void
fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peermsgs_event e = blankEvent;
e.eventType = TR_PEERMSG_CANCEL;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
publish( msgs, &e );
}
/**
*** INTEREST
**/
@ -277,8 +351,8 @@ sendInterest( tr_peermsgs * msgs, int weAreInterested )
assert( weAreInterested==0 || weAreInterested==1 );
msgs->info->clientIsInterested = weAreInterested;
dbgmsg( msgs, ": sending an %s message",
weAreInterested ? "INTERESTED" : "NOT_INTERESTED");
dbgmsg( msgs, "Sending %s",
weAreInterested ? "Interested" : "Not Interested");
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
@ -321,10 +395,7 @@ tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
walk = next;
}
dbgmsg( msgs, "sending a %s message", (choke ? "CHOKE" : "UNCHOKE") );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
choke ? BT_CHOKE : BT_UNCHOKE );
protocolSendChoke( msgs, choke );
msgs->info->chokeChangedAt = time( NULL );
}
}
@ -333,50 +404,13 @@ tr_peerMsgsSetChoke( tr_peermsgs * msgs, int choke )
***
**/
void
tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length )
{
tr_list * node;
struct peer_request tmp;
assert( msgs != NULL );
assert( length > 0 );
/* have we asked the peer for this piece? */
tmp.index = pieceIndex;
tmp.offset = offset;
tmp.length = length;
node = tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare );
/* if so, send a cancel message */
if( node != NULL ) {
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_CANCEL );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
tr_free( node );
}
}
/**
***
**/
void
tr_peerMsgsHave( tr_peermsgs * msgs,
uint32_t pieceIndex )
uint32_t index )
{
dbgmsg( msgs, "w00t telling them we HAVE piece #%d", pieceIndex );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages,
sizeof(uint8_t) + sizeof(uint32_t) );
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_HAVE );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
protocolSendHave( msgs, index );
/* since we have more pieces now, we might not be interested in this peer */
updateInterest( msgs );
}
#if 0
@ -450,9 +484,6 @@ sendFastAllowedSet( tr_peermsgs * msgs )
***
**/
static int
pulse( void * vmsgs );
static int
reqIsValid( const tr_peermsgs * msgs, uint32_t index, uint32_t offset, uint32_t length )
{
@ -476,55 +507,147 @@ requestIsValid( const tr_peermsgs * msgs, struct peer_request * req )
return reqIsValid( msgs, req->index, req->offset, req->length );
}
static void
pumpRequestQueue( tr_peermsgs * msgs )
{
const int max = msgs->maxActiveRequests;
const int min = msgs->minActiveRequests;
int count = tr_list_size( msgs->clientAskedFor );
int sent = 0;
if( count > min )
return;
if( msgs->info->clientIsChoked )
return;
while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
{
struct peer_request * req = tr_list_pop_front( &msgs->clientWillAskFor );
protocolSendRequest( msgs, req );
req->time_requested = msgs->lastReqAddedAt = time( NULL );
tr_list_append( &msgs->clientAskedFor, req );
++count;
++sent;
}
if( sent )
dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
sent,
tr_list_size(msgs->clientAskedFor),
tr_list_size(msgs->clientWillAskFor) );
if( count < max )
fireNeedReq( msgs );
}
int
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
uint32_t index,
uint32_t offset,
uint32_t length )
{
const int req_max = msgs->maxActiveRequests;
struct peer_request tmp, *req;
int maxSize;
assert( msgs != NULL );
assert( msgs->torrent != NULL );
assert( reqIsValid( msgs, index, offset, length ) );
if( msgs->info->clientIsChoked )
return TR_ADDREQ_CLIENT_CHOKED;
/**
*** Reasons to decline the request
**/
/* don't send requests to choked clients */
if( msgs->info->clientIsChoked ) {
dbgmsg( msgs, "declining request because they're choking us" );
return TR_ADDREQ_CLIENT_CHOKED;
}
/* peer doesn't have this piece */
if( !tr_bitfieldHas( msgs->info->have, index ) )
return TR_ADDREQ_MISSING;
maxSize = MIN( 3 + (int)(tr_rcRate(msgs->info->rateToClient)/5), 100 );
if( tr_list_size( msgs->clientAskedFor) >= maxSize )
/* peer's queue is full */
if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
dbgmsg( msgs, "declining request because we're full" );
return TR_ADDREQ_FULL;
}
/* have we already asked for this piece? */
tmp.index = index;
tmp.offset = offset;
tmp.length = length;
if( tr_list_remove( &msgs->clientAskedFor, &tmp, peer_request_compare ) != NULL )
if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
dbgmsg( msgs, "w00t peer has a max request queue size of %d... adding request for piece %d, offset %d", maxSize, (int)index, (int)offset );
/**
*** Accept this request
**/
/* queue the request */
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + 3*sizeof(uint32_t) );
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_REQUEST );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, index );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
/* add it to our `requests sent' list */
req = tr_new( struct peer_request, 1 );
dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
req = tr_new0( struct peer_request, 1 );
*req = tmp;
req->time_requested = msgs->lastReqAddedAt = time( NULL );
tr_list_append( &msgs->clientAskedFor, req );
pulse( msgs );
tr_list_append( &msgs->clientWillAskFor, req );
return TR_ADDREQ_OK;
}
static void
tr_peerMsgsCancelAllRequests( tr_peermsgs * msgs )
{
struct peer_request * req;
while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
{
fireCancelledReq( msgs, req );
tr_free( req );
}
while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
{
fireCancelledReq( msgs, req );
protocolSendCancel( msgs, req );
tr_free( req );
}
}
void
tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length )
{
struct peer_request *req, tmp;
assert( msgs != NULL );
assert( length > 0 );
/* have we asked the peer for this piece? */
tmp.index = pieceIndex;
tmp.offset = offset;
tmp.length = length;
/* if it's only in the queue and hasn't been sent yet, free it */
if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
{
fireCancelledReq( msgs, req );
tr_free( req );
}
/* if it's already been sent, send a cancel message too */
if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
{
protocolSendCancel( msgs, req );
fireCancelledReq( msgs, req );
tr_free( req );
}
}
/**
***
**/
@ -711,9 +834,8 @@ readBtLength( tr_peermsgs * msgs, struct evbuffer * inbuf )
tr_peerIoReadUint32( msgs->io, inbuf, &len );
if( len == 0 ) /* peer sent us a keepalive message */
dbgmsg( msgs, "peer sent us a keepalive message..." );
dbgmsg( msgs, "got KeepAlive" );
else {
dbgmsg( msgs, "peer is sending us a message with %"PRIu64" bytes...", (uint64_t)len );
msgs->incomingMessageLength = len;
msgs->state = AWAITING_BT_MESSAGE;
}
@ -742,16 +864,15 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
tr_peerIoReadUint8( msgs->io, inbuf, &id );
msglen--;
dbgmsg( msgs, "peer sent us a message... "
"bt id number is %d, and remaining len is %d", (int)id, (int)msglen );
dbgmsg( msgs, "got BT id %d, len %d", (int)id, (int)msglen );
switch( id )
{
case BT_CHOKE:
dbgmsg( msgs, "w00t peer sent us a BT_CHOKE" );
dbgmsg( msgs, "got Choke" );
assert( msglen == 0 );
msgs->info->clientIsChoked = 1;
#if 0
tr_list * walk;
for( walk = msgs->peerAskedFor; walk != NULL; )
{
@ -765,40 +886,40 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
}
walk = next;
}
tr_list_free( &msgs->clientAskedFor, tr_free );
#endif
tr_peerMsgsCancelAllRequests( msgs );
break;
case BT_UNCHOKE:
dbgmsg( msgs, "w00t peer sent us a BT_UNCHOKE" );
dbgmsg( msgs, "got Unchoke" );
assert( msglen == 0 );
msgs->info->clientIsChoked = 0;
fireNeedReq( msgs );
break;
case BT_INTERESTED:
dbgmsg( msgs, "w00t peer sent us a BT_INTERESTED" );
dbgmsg( msgs, "got Interested" );
assert( msglen == 0 );
msgs->info->peerIsInterested = 1;
tr_peerMsgsSetChoke( msgs, 0 );
break;
case BT_NOT_INTERESTED:
dbgmsg( msgs, "w00t peer sent us a BT_NOT_INTERESTED" );
dbgmsg( msgs, "got Not Interested" );
assert( msglen == 0 );
msgs->info->peerIsInterested = 0;
break;
case BT_HAVE:
dbgmsg( msgs, "w00t peer sent us a BT_HAVE" );
assert( msglen == 4 );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
tr_bitfieldAdd( msgs->info->have, ui32 );
updatePeerProgress( msgs );
dbgmsg( msgs, "got Have: %u", ui32 );
break;
case BT_BITFIELD: {
const int clientIsSeed = tr_cpGetStatus( msgs->torrent->completion ) != TR_CP_INCOMPLETE;
dbgmsg( msgs, "w00t peer sent us a BT_BITFIELD" );
assert( msglen == msgs->info->have->len );
tr_peerIoReadBytes( msgs->io, inbuf, msgs->info->have->bits, msglen );
updatePeerProgress( msgs );
@ -809,12 +930,12 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
case BT_REQUEST: {
struct peer_request * req;
dbgmsg( msgs, "peer sent us a BT_REQUEST" );
assert( msglen == 12 );
req = tr_new( struct peer_request, 1 );
tr_peerIoReadUint32( msgs->io, inbuf, &req->index );
tr_peerIoReadUint32( msgs->io, inbuf, &req->offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req->length );
dbgmsg( msgs, "got Request: %u:%u->%u", req->index, req->offset, req->length );
if ( !requestIsValid( msgs, req ) )
{
@ -867,18 +988,18 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
case BT_CANCEL: {
struct peer_request req;
void * data;
dbgmsg( msgs, "peer sent us a BT_CANCEL" );
assert( msglen == 12 );
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
data = tr_list_remove( &msgs->peerAskedFor, &req, peer_request_compare );
dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
data = tr_list_remove( &msgs->peerAskedFor, &req, compareRequest );
tr_free( data );
break;
}
case BT_PIECE: {
dbgmsg( msgs, "peer sent us a BT_PIECE" );
dbgmsg( msgs, "got a Piece!" );
assert( msgs->blockToUs.length == 0 );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.index );
tr_peerIoReadUint32( msgs->io, inbuf, &msgs->blockToUs.offset );
@ -891,7 +1012,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
}
case BT_PORT: {
dbgmsg( msgs, "peer sent us a BT_PORT" );
dbgmsg( msgs, "Got a BT_PORT" );
assert( msglen == 2 );
tr_peerIoReadUint16( msgs->io, inbuf, &msgs->info->port );
break;
@ -904,7 +1025,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
case BT_HAVE_ALL: {
assert( msglen == 0 );
dbgmsg( msgs, "peer sent us a BT_HAVE_ALL" );
dbgmsg( msgs, "Got a BT_HAVE_ALL" );
memset( msgs->info->have->bits, 1, msgs->info->have->len );
updatePeerProgress( msgs );
break;
@ -912,7 +1033,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
case BT_HAVE_NONE: {
assert( msglen == 0 );
dbgmsg( msgs, "peer sent us a BT_HAVE_NONE" );
dbgmsg( msgs, "Got a BT_HAVE_NONE" );
memset( msgs->info->have->bits, 1, msgs->info->have->len );
updatePeerProgress( msgs );
break;
@ -922,11 +1043,11 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
struct peer_request req;
tr_list * node;
assert( msglen == 12 );
dbgmsg( msgs, "peer sent us a BT_REJECT" );
dbgmsg( msgs, "Got a BT_REJECT" );
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
node = tr_list_find( msgs->peerAskedFor, &req, peer_request_compare );
node = tr_list_find( msgs->peerAskedFor, &req, compareRequest );
if( node != NULL ) {
void * data = node->data;
tr_list_remove_data( &msgs->peerAskedFor, data );
@ -938,14 +1059,14 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf )
case BT_ALLOWED_FAST: {
assert( msglen == 4 );
dbgmsg( msgs, "peer sent us a BT_ALLOWED_FAST" );
dbgmsg( msgs, "Got a BT_ALLOWED_FAST" );
tr_peerIoReadUint32( msgs->io, inbuf, &ui32 );
tr_bitfieldAdd( msgs->clientAllowedPieces, ui32 );
break;
}
case BT_LTEP:
dbgmsg( msgs, "peer sent us a BT_LTEP" );
dbgmsg( msgs, "Got a BT_LTEP" );
parseLtep( msgs, msglen, inbuf );
break;
@ -967,7 +1088,7 @@ clientGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
tor->activityDate = tr_date( );
tor->downloadedCur += byteCount;
msgs->info->pieceDataActivityDate = time( NULL );
tr_rcTransferred( msgs->info->rateToClient, byteCount );
tr_rcTransferred( msgs->info->rcToClient, byteCount );
tr_rcTransferred( tor->download, byteCount );
tr_rcTransferred( tor->handle->download, byteCount );
}
@ -979,7 +1100,7 @@ peerGotBytes( tr_peermsgs * msgs, uint32_t byteCount )
tor->activityDate = tr_date( );
tor->uploadedCur += byteCount;
msgs->info->pieceDataActivityDate = time( NULL );
tr_rcTransferred( msgs->info->rateToPeer, byteCount );
tr_rcTransferred( msgs->info->rcToPeer, byteCount );
tr_rcTransferred( tor->upload, byteCount );
tr_rcTransferred( tor->handle->upload, byteCount );
}
@ -1054,13 +1175,14 @@ gotBlock( tr_peermsgs * msgs,
key.offset = offset;
key.length = length;
req = (struct peer_request*) tr_list_remove( &msgs->clientAskedFor, &key,
peer_request_compare );
compareRequest );
if( req == NULL ) {
gotUnwantedBlock( msgs, index, offset, length );
dbgmsg( msgs, "we didn't ask for this message..." );
return;
}
dbgmsg( msgs, "w00t peer sent us a block. turnaround time was %d seconds",
dbgmsg( msgs, "Got block %u:%u->%u (turnaround time %d secs)",
req->index, req->offset, req->length,
(int)(time(NULL) - req->time_requested) );
tr_free( req );
dbgmsg( msgs, "peer has %d more blocks we've asked for",
@ -1096,7 +1218,6 @@ gotBlock( tr_peermsgs * msgs,
addUsToBlamefield( msgs, index );
fireGotBlock( msgs, index, offset, length );
fireNeedReq( msgs );
/**
*** Handle if this was the last block in the piece
@ -1141,7 +1262,7 @@ readBtPiece( tr_peermsgs * msgs, struct evbuffer * inbuf )
/* if this was the entire block, save it */
if( !msgs->blockToUs.length )
{
dbgmsg( msgs, "w00t -- got block index %u, offset %u", msgs->blockToUs.index, msgs->blockToUs.offset );
dbgmsg( msgs, "got block %u:%u", msgs->blockToUs.index, msgs->blockToUs.offset );
assert( (int)EVBUFFER_LENGTH( msgs->inBlock ) == tr_torBlockCountBytes( msgs->torrent, _tr_block(msgs->torrent,msgs->blockToUs.index, msgs->blockToUs.offset) ) );
gotBlock( msgs, msgs->inBlock,
msgs->blockToUs.index,
@ -1218,6 +1339,17 @@ canUpload( const tr_peermsgs * msgs )
return TRUE;
}
static int
ratePulse( void * vmsgs )
{
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
msgs->maxActiveRequests = MIN( 8 + (int)(msgs->info->rateToClient/10), 100 );
msgs->minActiveRequests = msgs->maxActiveRequests / 2;
return TRUE;
}
static int
pulse( void * vmsgs )
{
@ -1233,6 +1365,8 @@ pulse( void * vmsgs )
tr_peerIoSetIOMode ( msgs->io, EV_READ, 0 );
}
pumpRequestQueue( msgs );
if( !canWrite( msgs ) )
{
}
@ -1261,7 +1395,7 @@ pulse( void * vmsgs )
tr_peerIoWrite( msgs->io, tmp, r->length );
peerGotBytes( msgs, r->length );
dbgmsg( msgs, "putting req into out queue: index %d, offset %d, length %d ... %d blocks left in our queue", (int)r->index, (int)r->offset, (int)r->length, tr_list_size(msgs->peerAskedFor) );
dbgmsg( msgs, "Sending block %u:%u->%u (%d blocks left to send)", r->index, r->offset, r->length, tr_list_size(msgs->peerAskedFor) );
tr_free( r );
tr_free( tmp );
@ -1407,10 +1541,8 @@ sendPex( tr_peermsgs * msgs )
/* "added.f" */
flags = tr_bencDictAdd( &val, "added.f" );
tmp = walk = tr_new( uint8_t, diffs.addedCount );
for( i=0; i<diffs.addedCount; ++i ) {
dbgmsg( msgs, "PEX -->> -->> flag is %d", (int)diffs.added[i].flags );
for( i=0; i<diffs.addedCount; ++i )
*walk++ = diffs.added[i].flags;
}
assert( ( walk - tmp ) == diffs.addedCount );
tr_bencInitStr( flags, tmp, walk-tmp, FALSE );
@ -1477,6 +1609,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
m->info->peerIsInterested = 0;
m->info->have = tr_bitfieldNew( torrent->info.pieceCount );
m->pulseTimer = tr_timerNew( m->handle, pulse, m, PEER_PULSE_INTERVAL );
m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
m->outMessages = evbuffer_new( );
m->inBlock = evbuffer_new( );
@ -1501,6 +1634,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
tr_peerIoSetIOMode( m->io, EV_READ|EV_WRITE, 0 );
ratePulse( m );
/**
*** If we initiated this connection,
@ -1547,8 +1681,10 @@ tr_peerMsgsFree( tr_peermsgs* msgs )
if( msgs != NULL )
{
tr_timerFree( &msgs->pulseTimer );
tr_timerFree( &msgs->rateTimer );
tr_timerFree( &msgs->pexTimer );
tr_publisherFree( &msgs->publisher );
tr_list_free( &msgs->clientWillAskFor, tr_free );
tr_list_free( &msgs->clientAskedFor, tr_free );
tr_list_free( &msgs->peerAskedFor, tr_free );
evbuffer_free( msgs->outMessages );

View File

@ -65,6 +65,7 @@ typedef enum
TR_PEERMSG_CLIENT_BLOCK,
TR_PEERMSG_PEER_PROGRESS,
TR_PEERMSG_GOT_ERROR,
TR_PEERMSG_CANCEL,
TR_PEERMSG_NEED_REQ
}
PeerMsgsEventType;

View File

@ -51,7 +51,7 @@
#define REQ_TIMEOUT_INTERVAL_SEC 60
/* the value of the 'numwant' argument passed in tracker requests */
#define NUMWANT 75
#define NUMWANT 128
/* the length of the 'key' argument passed in tracker requests */
#define TR_KEY_LEN 10

View File

@ -316,18 +316,16 @@ tr_close( tr_handle * h )
tr_runInEventThread( h, tr_closeImpl, h );
while( !h->isClosed )
tr_wait( 200 );
tr_wait( 100 );
tr_eventClose( h );
while( h->events != NULL ) {
fprintf( stderr, "waiting for libevent thread to close...\n" );
tr_wait( 200 );
}
while( h->events != NULL )
tr_wait( 100 );
tr_lockFree( h->lock );
free( h->tag );
free( h );
fprintf( stderr, "tr_close() completed.\n" );
fprintf( stderr, "libtransmission closed cleanly.\n" );
}
tr_torrent **

View File

@ -84,6 +84,26 @@ tr_getLog( void )
return file;
}
char*
tr_getLogTimeStr( char * buf, int buflen )
{
char tmp[64];
time_t now;
struct tm now_tm;
struct timeval tv;
int milliseconds;
now = time( NULL );
gettimeofday( &tv, NULL );
localtime_r( &now, &now_tm );
strftime( tmp, sizeof(tmp), "%H:%M:%S", &now_tm );
milliseconds = (int)(tv.tv_usec / 1000);
snprintf( buf, buflen, "%s.%03d", tmp, milliseconds );
return buf;
}
void
tr_setMessageLevel( int level )
{

View File

@ -38,6 +38,8 @@ void tr_msgInit( void );
void tr_msg ( int level, char * msg, ... );
FILE* tr_getLog( void );
char* tr_getLogTimeStr( char * buf, int buflen );
int tr_rand ( int );
void * tr_memmem( const void *, size_t, const void *, size_t );