(trunk libT) #2548: T's request queue can send out too many duplicate requests

This commit is contained in:
Charles Kerr 2009-11-08 23:20:00 +00:00
parent cf6e618753
commit b906c125ac
11 changed files with 833 additions and 1099 deletions

View File

@ -39,7 +39,6 @@ libtransmission_a_SOURCES = \
ptrarray.c \
publish.c \
ratecontrol.c \
request-list.c \
resume.c \
rpcimpl.c \
rpc-server.c \
@ -87,7 +86,6 @@ noinst_HEADERS = \
ptrarray.h \
publish.h \
ratecontrol.h \
request-list.h \
resume.h \
rpcimpl.h \
rpc-server.h \
@ -111,7 +109,6 @@ TESTS = \
clients-test \
json-test \
peer-msgs-test \
request-list-test \
rpc-test \
test-peer-id \
utils-test
@ -157,10 +154,6 @@ test_peer_id_SOURCES = test-peer-id.c
test_peer_id_LDADD = ${apps_ldadd}
test_peer_id_LDFLAGS = ${apps_ldflags}
request_list_test_SOURCES = request-list-test.c
request_list_test_LDADD = ${apps_ldadd}
request_list_test_LDFLAGS = ${apps_ldflags}
peer_msgs_test_SOURCES = peer-msgs-test.c
peer_msgs_test_LDADD = ${apps_ldadd}
peer_msgs_test_LDFLAGS = ${apps_ldflags}

View File

@ -28,6 +28,18 @@
#include "transmission.h"
enum
{
/** when we're making requests from another peer,
batch them together to send enough requests to
meet our bandwidth goals for the next N seconds */
REQUEST_BUF_SECS = 10,
/** how long we'll let requests we've made stay pending
before we cancel them */
REQUEST_TTL_SECS = 30
};
typedef enum
{
TR_ADDREQ_OK = 0,
@ -45,16 +57,16 @@ tr_addreq_t;
typedef enum
{
TR_PEER_CLIENT_GOT_BLOCK,
TR_PEER_CLIENT_GOT_CHOKE,
TR_PEER_CLIENT_GOT_DATA,
TR_PEER_CLIENT_GOT_ALLOWED_FAST,
TR_PEER_CLIENT_GOT_SUGGEST,
TR_PEER_CLIENT_GOT_PORT,
TR_PEER_CLIENT_GOT_REJ,
TR_PEER_PEER_GOT_DATA,
TR_PEER_PEER_PROGRESS,
TR_PEER_ERROR,
TR_PEER_CANCEL,
TR_PEER_UPLOAD_ONLY,
TR_PEER_NEED_REQ
TR_PEER_UPLOAD_ONLY
}
PeerEventType;

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@
#include "bitfield.h"
#include "net.h"
#include "peer-common.h" /* struct peer_request */
#include "publish.h" /* tr_publisher_tag */
#include "utils.h"
@ -118,6 +119,18 @@ void tr_peerMgrFree( tr_peerMgr * manager );
tr_bool tr_peerMgrPeerIsSeed( const tr_torrent * tor,
const tr_address * addr );
void tr_peerMgrGetNextRequests( tr_torrent * torrent,
tr_peer * peer,
int numwant,
tr_block_index_t * setme,
int * numgot );
tr_bool tr_peerMgrDidPeerRequest( const tr_torrent * torrent,
const tr_peer * peer,
tr_block_index_t block );
void tr_peerMgrRebuildRequests( tr_torrent * torrent );
void tr_peerMgrAddIncoming( tr_peerMgr * manager,
tr_address * addr,
tr_port port,

View File

@ -32,7 +32,6 @@
#include "peer-msgs.h"
#include "platform.h" /* MAX_STACK_ARRAY_SIZE */
#include "ratecontrol.h"
#include "request-list.h"
#include "session.h"
#include "stats.h"
#include "torrent.h"
@ -78,22 +77,14 @@ enum
PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
REQQ = 512,
MAX_BLOCK_SIZE = ( 1024 * 16 ),
/* how long an unsent request can stay queued before it's returned
back to the peer-mgr's pool of requests */
QUEUED_REQUEST_TTL_SECS = 20,
/* how long a sent request can stay queued before it's returned
back to the peer-mgr's pool of requests */
SENT_REQUEST_TTL_SECS = 240,
/* used in lowering the outMessages queue period */
IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
HIGH_PRIORITY_INTERVAL_SECS = 2,
LOW_PRIORITY_INTERVAL_SECS = 20,
LOW_PRIORITY_INTERVAL_SECS = 10,
/* number of pieces to remove from the bitfield when
* lazy bitfields are turned on */
@ -115,6 +106,38 @@ enum
***
**/
struct peer_request
{
uint32_t index;
uint32_t offset;
uint32_t length;
};
static uint32_t
getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
{
const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
const uint64_t blockPos = tor->blockSize * b;
assert( blockPos >= piecePos );
return (uint32_t)( blockPos - piecePos );
}
static void
blockToReq( const tr_torrent * tor,
tr_block_index_t block,
struct peer_request * setme )
{
assert( setme != NULL );
setme->index = tr_torBlockPiece( tor, block );
setme->offset = getBlockOffsetInPiece( tor, block );
setme->length = tr_torBlockCountBytes( tor, block );
}
/**
***
**/
/* this is raw, unchanged data from the peer regarding
* the current message that it's sending us. */
struct tr_incoming
@ -146,6 +169,9 @@ struct tr_peermsgs
tr_bool peerSentLtepHandshake;
/*tr_bool haveFastSet;*/
int activeRequestCount;
int desiredRequestCount;
/* how long the outMessages batch should be allowed to grow before
* it's flushed -- some messages (like requests >:) should be sent
* very quickly; others aren't as urgent. */
@ -155,7 +181,6 @@ struct tr_peermsgs
uint8_t ut_pex_id;
uint16_t pexCount;
uint16_t pexCount6;
uint16_t maxActiveRequests;
#if 0
size_t fastsetSize;
@ -170,10 +195,9 @@ struct tr_peermsgs
struct evbuffer * outMessages; /* all the non-piece messages */
struct request_list peerAskedFor;
struct request_list clientAskedFor;
struct request_list clientWillAskFor;
struct peer_request peerAskedFor[REQQ];
int peerAskedForCount;
tr_pex * pex;
tr_pex * pex6;
@ -441,14 +465,6 @@ fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
publish( msgs, &e );
}
static void
fireNeedReq( tr_peermsgs * msgs )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_NEED_REQ;
publish( msgs, &e );
}
static void
firePeerProgress( tr_peermsgs * msgs )
{
@ -469,6 +485,25 @@ fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
publish( msgs, &e );
}
static void
fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_REJ;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
publish( msgs, &e );
}
static void
fireGotChoke( tr_peermsgs * msgs )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
publish( msgs, &e );
}
static void
fireClientGotData( tr_peermsgs * msgs,
uint32_t length,
@ -523,17 +558,6 @@ firePeerGotData( tr_peermsgs * msgs,
publish( msgs, &e );
}
static void
fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CANCEL;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
publish( msgs, &e );
}
/**
*** ALLOWED FAST SET
*** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
@ -688,15 +712,17 @@ updateInterest( tr_peermsgs * msgs )
if( i != msgs->peer->clientIsInterested )
sendInterest( msgs, i );
if( i )
fireNeedReq( msgs );
}
static tr_bool
popNextRequest( tr_peermsgs * msgs,
struct peer_request * setme )
popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
{
return reqListPop( &msgs->peerAskedFor, setme );
if( msgs->peerAskedForCount == 0 )
return FALSE;
*setme = msgs->peerAskedFor[0];
memmove( msgs->peerAskedFor, msgs->peerAskedFor + 1, --msgs->peerAskedForCount );
return TRUE;
}
static void
@ -705,7 +731,7 @@ cancelAllRequestsToClient( tr_peermsgs * msgs )
struct peer_request req;
const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
while( popNextRequest( msgs, &req ) )
while( popNextRequest( msgs, &req ))
if( mustSendCancel )
protocolSendReject( msgs, &req );
}
@ -768,215 +794,15 @@ requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
return reqIsValid( msgs, req->index, req->offset, req->length );
}
static void
expireFromList( tr_peermsgs * msgs,
struct request_list * list,
const time_t oldestAllowed )
{
size_t i;
struct request_list tmp = REQUEST_LIST_INIT;
/* since the fifo list is sorted by time, the oldest will be first */
if( !list->len || ( list->fifo[0].time_requested >= oldestAllowed ) )
return;
/* if we found one too old, start pruning them */
reqListCopy( &tmp, list );
for( i=0; i<tmp.len; ++i ) {
const struct peer_request * req = &tmp.fifo[i];
if( req->time_requested >= oldestAllowed )
break;
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
}
reqListClear( &tmp );
}
static void
expireOldRequests( tr_peermsgs * msgs, const time_t now )
{
time_t oldestAllowed;
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
dbgmsg( msgs, "entering `expire old requests' block" );
/* cancel requests that have been queued for too long */
oldestAllowed = now - QUEUED_REQUEST_TTL_SECS;
expireFromList( msgs, &msgs->clientWillAskFor, oldestAllowed );
/* if the peer doesn't support "Reject Request",
* cancel requests that were sent too long ago. */
if( !fext ) {
oldestAllowed = now - SENT_REQUEST_TTL_SECS;
expireFromList( msgs, &msgs->clientAskedFor, oldestAllowed );
}
dbgmsg( msgs, "leaving `expire old requests' block" );
}
static void
pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
{
const int max = msgs->maxActiveRequests;
int sent = 0;
int len = msgs->clientAskedFor.len;
struct peer_request req;
dbgmsg( msgs, "clientIsChoked %d, download allowed %d, len %d, max %d, msgs->clientWillAskFor.len %d",
(int)msgs->peer->clientIsChoked,
(int)tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ),
len, max, msgs->clientWillAskFor.len );
if( msgs->peer->clientIsChoked )
return;
if( !tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT ) )
return;
while( ( len < max ) && reqListPop( &msgs->clientWillAskFor, &req ) )
{
const tr_block_index_t block = _tr_block( msgs->torrent, req.index, req.offset );
assert( requestIsValid( msgs, &req ) );
assert( tr_bitfieldHas( msgs->peer->have, req.index ) );
/* don't ask for it if we've already got it... this block may have
* come in from a different peer after we cancelled a request for it */
if( !tr_cpBlockIsComplete( &msgs->torrent->completion, block ) )
{
protocolSendRequest( msgs, &req );
req.time_requested = now;
reqListAppend( &msgs->clientAskedFor, &req );
++len;
++sent;
}
else dbgmsg( msgs, "not asking for it because we've already got it..." );
}
if( sent )
dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
if( len < max )
fireNeedReq( msgs );
}
static TR_INLINE tr_bool
requestQueueIsFull( const tr_peermsgs * msgs )
{
const int req_max = msgs->maxActiveRequests;
return msgs->clientWillAskFor.len >= (size_t)req_max;
}
tr_addreq_t
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
uint32_t index,
uint32_t offset,
uint32_t length )
{
struct peer_request req;
assert( msgs );
assert( msgs->torrent );
/**
*** Reasons to decline the request
**/
/* don't send requests to choked clients */
if( msgs->peer->clientIsChoked ) {
dbgmsg( msgs, "declining request because they're choking us" );
return TR_ADDREQ_CLIENT_CHOKED;
}
/* peer's queue is full */
if( requestQueueIsFull( msgs ) ) {
dbgmsg( msgs, "declining request because we're full" );
return TR_ADDREQ_FULL;
}
/* peer doesn't have this piece */
if( !tr_bitfieldHas( msgs->peer->have, index ) )
return TR_ADDREQ_MISSING;
/* have we already asked for this piece? */
req.index = index;
req.offset = offset;
req.length = length;
if( reqListHas( &msgs->clientAskedFor, &req ) ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
if( reqListHas( &msgs->clientWillAskFor, &req ) ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
/**
*** Accept this request
**/
dbgmsg( msgs, "adding req for %"PRIu32":%"PRIu32"->%"PRIu32" to our `will request' list",
index, offset, length );
req.time_requested = time( NULL );
reqListAppend( &msgs->clientWillAskFor, &req );
return TR_ADDREQ_OK;
}
static void
cancelAllRequestsToPeer( tr_peermsgs * msgs, tr_bool sendCancel )
{
size_t i;
struct request_list a = msgs->clientWillAskFor;
struct request_list b = msgs->clientAskedFor;
dbgmsg( msgs, "cancelling all requests to peer" );
msgs->clientAskedFor = REQUEST_LIST_INIT;
msgs->clientWillAskFor = REQUEST_LIST_INIT;
for( i=0; i<a.len; ++i )
fireCancelledReq( msgs, &a.fifo[i] );
for( i = 0; i < b.len; ++i ) {
fireCancelledReq( msgs, &b.fifo[i] );
if( sendCancel )
protocolSendCancel( msgs, &b.fifo[i] );
}
reqListClear( &a );
reqListClear( &b );
}
void
tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length )
tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
{
struct peer_request req;
assert( msgs != NULL );
assert( length > 0 );
/* have we asked the peer for this piece? */
req.index = pieceIndex;
req.offset = offset;
req.length = length;
/* if it's only in the queue and hasn't been sent yet, free it */
if( reqListRemove( &msgs->clientWillAskFor, &req ) ) {
dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
fireCancelledReq( msgs, &req );
}
/* if it's already been sent, send a cancel message too */
if( reqListRemove( &msgs->clientAskedFor, &req ) ) {
dbgmsg( msgs, "cancelling %"PRIu32":%"PRIu32"->%"PRIu32, pieceIndex, offset, length );
protocolSendCancel( msgs, &req );
fireCancelledReq( msgs, &req );
}
blockToReq( msgs->torrent, block, &req );
protocolSendCancel( msgs, &req );
}
/**
***
**/
@ -1007,6 +833,7 @@ sendLtepHandshake( tr_peermsgs * msgs )
tr_bencInitDict( &val, 5 );
tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
tr_bencDictAddInt( &val, "reqq", REQQ );
tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
m = tr_bencDictAddDict( &val, "m", 1 );
@ -1251,11 +1078,13 @@ peerMadeRequest( tr_peermsgs * msgs,
dbgmsg( msgs, "rejecting request for a piece we don't have." );
else if( peerIsChoked )
dbgmsg( msgs, "rejecting request from choked peer" );
else if( msgs->peerAskedForCount + 1 >= REQQ )
dbgmsg( msgs, "rejecting request ... reqq is full" );
else
allow = TRUE;
if( allow )
reqListAppend( &msgs->peerAskedFor, req );
msgs->peerAskedFor[msgs->peerAskedForCount++] = *req;
else if( fext )
protocolSendReject( msgs, req );
}
@ -1364,6 +1193,15 @@ readBtPiece( tr_peermsgs * msgs,
}
}
static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
static void
decrementActiveRequestCount( tr_peermsgs * msgs )
{
if( msgs->activeRequestCount > 0 )
msgs->activeRequestCount--;
}
static int
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
{
@ -1393,13 +1231,13 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
dbgmsg( msgs, "got Choke" );
msgs->peer->clientIsChoked = 1;
if( !fext )
cancelAllRequestsToPeer( msgs, FALSE );
fireGotChoke( msgs );
break;
case BT_UNCHOKE:
dbgmsg( msgs, "got Unchoke" );
msgs->peer->clientIsChoked = 0;
fireNeedReq( msgs );
updateDesiredRequestCount( msgs, tr_date( ) );
break;
case BT_INTERESTED:
@ -1425,13 +1263,10 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
break;
case BT_BITFIELD:
{
dbgmsg( msgs, "got a bitfield" );
tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
updatePeerProgress( msgs );
fireNeedReq( msgs );
break;
}
case BT_REQUEST:
{
@ -1446,13 +1281,19 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
case BT_CANCEL:
{
int i;
struct peer_request r;
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
if( reqListRemove( &msgs->peerAskedFor, &r ) && fext )
protocolSendReject( msgs, &r );
for( i=0; i<msgs->peerAskedForCount; ++i ) {
const struct peer_request * req = msgs->peerAskedFor + i;
if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
break;
}
if( i < msgs->peerAskedForCount )
memmove( msgs->peerAskedFor+i, msgs->peerAskedFor+i+1, --msgs->peerAskedForCount-i );
break;
}
@ -1520,9 +1361,10 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
if( fext )
reqListRemove( &msgs->clientAskedFor, &r );
else {
if( fext ) {
decrementActiveRequestCount( msgs );
fireGotRej( msgs, &r );
} else {
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
@ -1591,29 +1433,11 @@ clientGotBlock( tr_peermsgs * msgs,
/* save the block */
dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
/**
*** Remove the block from our `we asked for this' list
**/
if( !reqListRemove( &msgs->clientAskedFor, req ) ) {
clientGotUnwantedBlock( msgs, req );
if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
dbgmsg( msgs, "we didn't ask for this message..." );
return 0;
}
dbgmsg( msgs, "peer has %d more blocks we've asked for",
msgs->clientAskedFor.len );
/**
*** Error checks
**/
if( tr_cpBlockIsComplete( &tor->completion, block ) ) {
dbgmsg( msgs, "we have this block already..." );
clientGotUnwantedBlock( msgs, req );
return 0;
}
/**
*** Save the block
**/
@ -1622,6 +1446,7 @@ clientGotBlock( tr_peermsgs * msgs,
return err;
addPeerToBlamefield( msgs, req->index );
decrementActiveRequestCount( msgs );
fireGotBlock( msgs, req );
return 0;
}
@ -1685,32 +1510,76 @@ canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
***
**/
static int
ratePulse( tr_peermsgs * msgs, uint64_t now )
static void
updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
{
int irate;
const int floor = 8;
const int seconds = 10;
double rate;
int estimatedBlocksInPeriod;
const tr_torrent * const torrent = msgs->torrent;
/* Get the rate limit we should use.
* FIXME: this needs to consider all the other peers as well... */
rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
if( tr_torrentUsesSessionLimits( torrent ) )
if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
rate = MIN( rate, irate );
if( tr_torrentIsSeed( msgs->torrent ) )
{
msgs->desiredRequestCount = 0;
}
else if( msgs->peer->clientIsChoked )
{
msgs->desiredRequestCount = 0;
}
else
{
int irate;
int estimatedBlocksInPeriod;
double rate;
const int floor = 16;
const int seconds = REQUEST_BUF_SECS;
estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
msgs->maxActiveRequests = MAX( floor, estimatedBlocksInPeriod );
/* Get the rate limit we should use.
* FIXME: this needs to consider all the other peers as well... */
rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
if( msgs->reqq > 0 )
msgs->maxActiveRequests = MIN( msgs->maxActiveRequests, msgs->reqq );
/* honor the session limits, if enabled */
if( tr_torrentUsesSessionLimits( torrent ) )
if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
rate = MIN( rate, irate );
return TRUE;
/* use this desired rate to figure out how
* many requests we should send to this peer */
estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
/* honor the peer's maximum request count, if specified */
if( msgs->reqq > 0 )
if( msgs->desiredRequestCount > msgs->reqq )
msgs->desiredRequestCount = msgs->reqq;
}
}
static void
updateRequests( tr_peermsgs * msgs )
{
const int MIN_BATCH_SIZE = 4;
const int numwant = msgs->desiredRequestCount - msgs->activeRequestCount;
/* make sure we have enough block requests queued up */
if( numwant >= MIN_BATCH_SIZE )
{
int i;
int n;
tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
for( i=0; i<n; ++i )
{
struct peer_request req;
blockToReq( msgs->torrent, blocks[i], &req );
protocolSendRequest( msgs, &req );
}
msgs->activeRequestCount += n;
tr_free( blocks );
}
}
static size_t
@ -1818,10 +1687,10 @@ peerPulse( void * vmsgs )
tr_peermsgs * msgs = vmsgs;
const time_t now = time( NULL );
ratePulse( msgs, now );
pumpRequestQueue( msgs, now );
expireOldRequests( msgs, now );
if ( tr_isPeerIo( msgs->peer->io ) ) {
updateDesiredRequestCount( msgs, now );
updateRequests( msgs );
}
for( ;; )
if( fillOutputBuffer( msgs, now ) < 1 )
@ -2179,9 +2048,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
m->outMessagesBatchedAt = 0;
m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
m->incoming.block = evbuffer_new( );
m->peerAskedFor = REQUEST_LIST_INIT;
m->clientAskedFor = REQUEST_LIST_INIT;
m->clientWillAskFor = REQUEST_LIST_INIT;
m->peerAskedForCount = 0;
evtimer_set( &m->pexTimer, pexPulse, m );
tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
peer->msgs = m;
@ -2197,7 +2064,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
tellPeerWhatWeHave( m );
tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
ratePulse( m, tr_date() );
updateDesiredRequestCount( m, tr_date( ) );
return m;
}
@ -2209,9 +2076,6 @@ tr_peerMsgsFree( tr_peermsgs* msgs )
{
evtimer_del( &msgs->pexTimer );
tr_publisherDestruct( &msgs->publisher );
reqListClear( &msgs->clientWillAskFor );
reqListClear( &msgs->clientAskedFor );
reqListClear( &msgs->peerAskedFor );
evbuffer_free( msgs->incoming.block );
evbuffer_free( msgs->outMessages );

View File

@ -48,18 +48,10 @@ void tr_peerMsgsHave( tr_peermsgs * msgs,
void tr_peerMsgsPulse( tr_peermsgs * msgs );
void tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length );
tr_block_index_t block );
void tr_peerMsgsFree( tr_peermsgs* );
tr_addreq_t tr_peerMsgsAddRequest( tr_peermsgs * peer,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length );
void tr_peerMsgsUnsubscribe( tr_peermsgs * peer,
tr_publisher_tag tag );

View File

@ -1,130 +0,0 @@
#include <stdio.h>
#include "transmission.h"
#include "request-list.h"
#undef VERBOSE
static int test = 0;
#ifdef VERBOSE
#define check( A ) \
{ \
++test; \
if( A ){ \
fprintf( stderr, "PASS test #%d (%s, %d)\n", test, __FILE__, __LINE__ ); \
} else { \
fprintf( stderr, "FAIL test #%d (%s, %d)\n", test, __FILE__, __LINE__ ); \
return test; \
} \
}
#else
#define check( A ) \
{ \
++test; \
if( !( A ) ){ \
fprintf( stderr, "FAIL test #%d (%s, %d)\n", test, __FILE__, __LINE__ ); \
return test; \
} \
}
#endif
static int
testFoo( void )
{
tr_bool success;
struct request_list list = REQUEST_LIST_INIT;
struct peer_request a, b, c, tmp;
a.index = a.offset = a.length = 10;
b.index = b.offset = b.length = 20;
c.index = c.offset = c.length = 30;
check( list.len == 0 );
reqListAppend( &list, &a );
reqListAppend( &list, &b );
reqListAppend( &list, &c );
check( list.len == 3 );
check( list.fifo[0].index == 10 );
check( list.fifo[1].index == 20 );
check( list.fifo[2].index == 30 );
check( reqListHas( &list, &a ) );
check( reqListHas( &list, &b ) );
check( reqListHas( &list, &c ) );
success = reqListRemove( &list, &b );
check( success );
check( list.len == 2 );
check( list.fifo[0].index == 10 );
check( list.fifo[1].index == 30 );
check( reqListHas( &list, &a ) );
check( !reqListHas( &list, &b ) );
check( reqListHas( &list, &c ) );
success = reqListPop( &list, &tmp );
check( success );
check( list.len == 1 );
check( tmp.index == 10 );
check( list.fifo[0].index == 30 );
check( !reqListHas( &list, &a ) );
check( !reqListHas( &list, &b ) );
check( reqListHas( &list, &c ) );
success = reqListPop( &list, &tmp );
check( success );
check( list.len == 0 );
check( tmp.index == 30 );
check( !reqListHas( &list, &a ) );
check( !reqListHas( &list, &b ) );
check( !reqListHas( &list, &c ) );
success = reqListPop( &list, &tmp );
check( !success );
reqListAppend( &list, &a );
reqListAppend( &list, &b );
reqListAppend( &list, &c );
/* remove from middle, front, end */
success = reqListRemove( &list, &b );
check( success );
check( list.len == 2 );
check( reqListHas( &list, &a ) );
check( !reqListHas( &list, &b ) );
check( reqListHas( &list, &c ) );
success = reqListRemove( &list, &c );
check( success );
check( list.len == 1 );
check( reqListHas( &list, &a ) );
check( !reqListHas( &list, &b ) );
check( !reqListHas( &list, &c ) );
success = reqListRemove( &list, &c );
check( !success );
check( list.len == 1 );
success = reqListRemove( &list, &a );
check( success );
check( list.len == 0 );
check( !reqListHas( &list, &a ) );
check( !reqListHas( &list, &b ) );
check( !reqListHas( &list, &c ) );
reqListClear( &list );
return 0;
}
int
main( void )
{
int i;
if(( i = testFoo( )))
return i;
return 0;
}

View File

@ -1,161 +0,0 @@
/*
* This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*
* $Id:$
*/
#include <assert.h>
#include "transmission.h"
#include "request-list.h"
#include "utils.h"
static int
compareRequests( const struct peer_request * a,
const struct peer_request * b )
{
if( a->index != b->index )
return a->index < b->index ? -1 : 1;
if( a->offset != b->offset )
return a->offset < b->offset ? -1 : 1;
return 0;
}
const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL, NULL };
static const int GROW = 8;
static void
reqListReserve( struct request_list * list, size_t max )
{
if( list->max < max )
{
list->max = max + GROW;
list->fifo = tr_renew( struct peer_request, list->fifo, list->max );
list->sort = tr_renew( struct peer_request, list->sort, list->max );
}
}
void
reqListClear( struct request_list * list )
{
tr_free( list->fifo );
tr_free( list->sort );
*list = REQUEST_LIST_INIT;
}
void
reqListCopy( struct request_list * dest, const struct request_list * src )
{
dest->len = dest->max = src->len;
dest->fifo = tr_memdup( src->fifo, dest->len * sizeof( struct peer_request ) );
dest->sort = tr_memdup( src->sort, dest->len * sizeof( struct peer_request ) );
}
typedef int (*compareFunc)(const void * a, const void * b );
static int
reqListSortPos( const struct request_list * list,
const struct peer_request * req,
tr_bool * exactMatch )
{
return tr_lowerBound( req,
list->sort,
list->len,
sizeof( struct peer_request ),
(compareFunc)compareRequests,
exactMatch );
}
void
reqListAppend( struct request_list * list, const struct peer_request * req )
{
int low;
tr_bool exact;
reqListReserve( list, list->len + 8 );
/* append into list->fifo */
list->fifo[list->len] = *req;
/* insert into list->sort */
low = reqListSortPos( list, req, &exact );
memmove( &list->sort[low+1], &list->sort[low], (list->len-low)*sizeof(struct peer_request) );
list->sort[low] = *req;
++list->len;
}
static tr_bool
reqListRemoveFromSorted( struct request_list * list, const struct peer_request * key )
{
tr_bool found;
const int low = reqListSortPos( list, key, &found );
if( found )
memmove( &list->sort[low], &list->sort[low+1], (list->len-low-1)*sizeof(struct peer_request));
return found;
}
static void
reqListRemoveNthFromFifo( struct request_list * list, int n )
{
memmove( &list->fifo[n], &list->fifo[n+1], (list->len-n-1)*sizeof(struct peer_request));
}
tr_bool
reqListPop( struct request_list * list,
struct peer_request * setme )
{
tr_bool success;
if( !list->len )
{
success = FALSE;
}
else
{
*setme = list->fifo[0];
reqListRemoveNthFromFifo( list, 0 );
reqListRemoveFromSorted( list, setme );
--list->len;
success = TRUE;
}
return success;
}
tr_bool
reqListHas( const struct request_list * list,
const struct peer_request * key )
{
tr_bool exactMatch;
reqListSortPos( list, key, &exactMatch );
return exactMatch;
}
tr_bool
reqListRemove( struct request_list * list,
const struct peer_request * key )
{
tr_bool found = reqListRemoveFromSorted( list, key );
if( found )
{
size_t i;
for( i=0; i<list->len; ++i )
if( !compareRequests( &list->fifo[i], key ) )
break;
assert( i < list->len );
reqListRemoveNthFromFifo( list, i );
--list->len;
}
return found;
}

View File

@ -1,57 +0,0 @@
/*
* This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*
* $Id:$
*/
#ifndef __TRANSMISSION__
#error only libtransmission should #include this header.
#endif
#ifndef TR_PEER_REQ_H
#define TR_PEER_REQ_H
#include <inttypes.h>
struct peer_request
{
uint32_t index;
uint32_t offset;
uint32_t length;
time_t time_requested;
};
struct request_list
{
size_t len;
size_t max;
struct peer_request * fifo;
struct peer_request * sort;
};
extern const struct request_list REQUEST_LIST_INIT;
void reqListClear( struct request_list * list );
void reqListCopy( struct request_list * dest, const struct request_list * src );
/* O(log(N)) */
tr_bool reqListHas( const struct request_list * list, const struct peer_request * key );
/* O(log(N) + 1) */
void reqListAppend( struct request_list * list, const struct peer_request * req );
/* O(log(N) + 1) */
tr_bool reqListPop( struct request_list * list, struct peer_request * setme );
/* O(N + log(N)) */
tr_bool reqListRemove( struct request_list * list, const struct peer_request * key );
#endif

View File

@ -1677,15 +1677,14 @@ tr_torrentSetFilePriorities( tr_torrent * tor,
tr_priority_t priority )
{
tr_file_index_t i;
assert( tr_isTorrent( tor ) );
tr_torrentLock( tor );
for( i = 0; i < fileCount; ++i )
tr_torrentInitFilePriority( tor, files[i], priority );
tr_torrentSetDirty( tor );
tr_peerMgrRebuildRequests( tor );
tr_torrentUnlock( tor );
}
@ -1826,10 +1825,12 @@ tr_torrentSetFileDLs( tr_torrent * tor,
tr_bool doDownload )
{
assert( tr_isTorrent( tor ) );
tr_torrentLock( tor );
tr_torrentInitFileDLs( tor, files, fileCount, doDownload );
tr_torrentSetDirty( tor );
tr_peerMgrRebuildRequests( tor );
tr_torrentUnlock( tor );
}

View File

@ -64,9 +64,11 @@ publish( tr_webseed * w,
static void
fireNeedReq( tr_webseed * w )
{
#if 0
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_NEED_REQ;
publish( w, &e );
#endif
}
static void