use a custom struct for the peer's four request queues since John_Clay's shark report shows it taking up about 40% of the time in malloc/free. also, maybe this will help the "idle memory" numbers some.

This commit is contained in:
Charles Kerr 2008-02-29 03:41:50 +00:00
parent eb3ca08961
commit a7d087b60a
2 changed files with 230 additions and 136 deletions

View File

@ -308,13 +308,13 @@ ipc_mkempty( const struct ipc_info * info, size_t * len, enum ipc_msg id,
int64_t tag )
{
tr_benc pk;
uint8_t * ret;
uint8_t * ret = NULL;
if( NULL == ipc_initval( info, id, tag, &pk, TYPE_STR ) )
return NULL;
ret = ipc_mkval( &pk, len );
SAFEBENCFREE( &pk );
if( ipc_initval( info, id, tag, &pk, TYPE_STR ) )
{
ret = ipc_mkval( &pk, len );
SAFEBENCFREE( &pk );
}
return ret;
}
@ -324,15 +324,14 @@ ipc_mkint( const struct ipc_info * info, size_t * len, enum ipc_msg id,
int64_t tag, int64_t num )
{
tr_benc pk, * val;
uint8_t * ret;
uint8_t * ret = NULL;
val = ipc_initval( info, id, tag, &pk, TYPE_INT );
if( !val )
return NULL;
val->val.i = num;
ret = ipc_mkval( &pk, len );
SAFEBENCFREE( &pk );
if(( val = ipc_initval( info, id, tag, &pk, TYPE_INT )))
{
val->val.i = num;
ret = ipc_mkval( &pk, len );
SAFEBENCFREE( &pk );
}
return ret;
}
@ -342,15 +341,14 @@ ipc_mkstr( const struct ipc_info * info, size_t * len, enum ipc_msg id,
int64_t tag, const char * str )
{
tr_benc pk, * val;
uint8_t * ret;
uint8_t * ret = NULL;
val = ipc_initval( info, id, tag, &pk, TYPE_STR );
if( !val )
return NULL;
tr_bencInitStr( val, str, -1, 1 );
ret = ipc_mkval( &pk, len );
SAFEBENCFREE( &pk );
if(( val = ipc_initval( info, id, tag, &pk, TYPE_STR )))
{
tr_bencInitStr( val, str, -1, 1 );
ret = ipc_mkval( &pk, len );
SAFEBENCFREE( &pk );
}
return ret;
}

View File

@ -23,7 +23,6 @@
#include "bencode.h"
#include "completion.h"
#include "inout.h"
#include "list.h"
#include "peer-io.h"
#include "peer-mgr.h"
#include "peer-mgr-private.h"
@ -70,6 +69,8 @@ enum
PEER_PULSE_INTERVAL = (100), /* msec between calls to pulse() */
RATE_PULSE_INTERVAL = (250), /* msec between calls to ratePulse() */
MAX_QUEUE_SIZE = (100),
MAX_OUTBUF_SIZE = (1024),
/* Fast Peers Extension constants */
@ -79,7 +80,6 @@ enum
QUEUED_REQUEST_TTL_SECS = 20,
SENT_REQUEST_TTL_SECS = 90
};
enum
@ -110,6 +110,118 @@ compareRequest( const void * va, const void * vb )
return 0;
}
struct request_list
{
uint16_t count;
uint16_t max;
struct peer_request * requests;
};
static const struct request_list REQUEST_LIST_INIT = { 0, 0, NULL };
static void
reqListReserve( struct request_list * list, uint16_t max )
{
if( list->max < max )
{
list->max = max;
list->requests = tr_renew( struct peer_request, list->requests, list->max );
}
}
static void
reqListClear( struct request_list * list )
{
tr_free( list->requests );
*list = REQUEST_LIST_INIT;
}
static void
reqListRemoveOne( struct request_list * list, int i )
{
assert( 0<=i && i<list->count );
memmove( &list->requests[i],
&list->requests[i+1],
sizeof( struct peer_request ) * ( --list->count - i ) );
}
static void
reqListAppend( struct request_list * list, const struct peer_request * req )
{
if( ++list->count >= list->max )
reqListReserve( list, list->max + 8 );
list->requests[list->count-1] = *req;
}
static tr_errno
reqListPop( struct request_list * list, struct peer_request * setme )
{
tr_errno err;
if( !list->count )
err = TR_ERROR;
else {
*setme = list->requests[0];
reqListRemoveOne( list, 0 );
err = TR_OK;
}
return err;
}
static int
reqListFind( struct request_list * list, const struct peer_request * key )
{
uint16_t i;
for( i=0; i<list->count; ++i )
if( !compareRequest( key, list->requests+i ) )
return i;
return -1;
}
static tr_errno
reqListRemove( struct request_list * list, const struct peer_request * key )
{
tr_errno err;
const int i = reqListFind( list, key );
if( i < 0 )
err = TR_ERROR;
else {
err = TR_OK;
reqListRemoveOne( list, i );
}
return err;
}
static void
reqListPrune( struct request_list * list,
struct request_list * pruned,
time_t cutoff )
{
int i, k=0, p=0;
struct peer_request keep[MAX_QUEUE_SIZE];
struct peer_request prune[MAX_QUEUE_SIZE];
for( i=0; i<list->count; ++i ) {
const struct peer_request * req = list->requests + i;
if( req->time_requested > cutoff )
keep[k++] = *req;
else
prune[p++] = *req;
}
memcpy( list->requests, keep, sizeof(struct peer_request) * k );
list->count = k;
reqListReserve( pruned, pruned->count + p );
memcpy( pruned->requests + pruned->count, prune, sizeof(struct peer_request) * p );
pruned->count += p;
}
/* this is raw, unchanged data from the peer regarding
* the current message that it's sending us. */
struct tr_incoming
@ -132,10 +244,10 @@ struct tr_peermsgs
struct evbuffer * outBlock; /* buffer of all the current piece message */
struct evbuffer * outMessages; /* buffer of all the non-piece messages */
tr_list * peerAskedFor;
tr_list * peerAskedForFast;
tr_list * clientAskedFor;
tr_list * clientWillAskFor;
struct request_list peerAskedFor;
struct request_list peerAskedForFast;
struct request_list clientAskedFor;
struct request_list clientWillAskFor;
tr_timer * rateTimer;
tr_timer * pulseTimer;
@ -413,7 +525,7 @@ updateInterest( tr_peermsgs * msgs )
static void
cancelAllRequestsToClientExceptFast( tr_peermsgs * msgs )
{
tr_list_free( &msgs->peerAskedFor, tr_free );
reqListClear( &msgs->peerAskedFor );
}
void
@ -583,42 +695,33 @@ requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
static void
expireOldRequests( tr_peermsgs * msgs )
{
tr_list * l;
tr_list * prune = NULL;
int i;
const time_t now = time( NULL );
const time_t queued_cutoff = now - QUEUED_REQUEST_TTL_SECS;
const time_t sent_cutoff = now - SENT_REQUEST_TTL_SECS;
struct request_list pruned = REQUEST_LIST_INIT;
/* find queued requests that are too old
"time_requested" here is when the request was queued */
for( l=msgs->clientWillAskFor; l!=NULL; l=l->next ) {
struct peer_request * req = l->data;
if( req->time_requested + QUEUED_REQUEST_TTL_SECS < now )
tr_list_prepend( &prune, req );
}
/* find sent requests that are too old
"time_requested" here is when the request was sent */
for( l=msgs->clientAskedFor; l!=NULL; l=l->next ) {
struct peer_request * req = l->data;
if( req->time_requested + SENT_REQUEST_TTL_SECS < now )
tr_list_prepend( &prune, req );
}
reqListPrune( &msgs->clientWillAskFor, &pruned, queued_cutoff );
reqListPrune( &msgs->clientAskedFor, &pruned, sent_cutoff );
/* expire the old requests */
for( l=prune; l!=NULL; l=l->next ) {
struct peer_request * req = l->data;
for( i=0; i<pruned.count; ++i ) {
const struct peer_request * req = &pruned.requests[i];
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
}
/* cleanup */
tr_list_free( &prune, NULL );
reqListClear( &pruned );
}
static void
pumpRequestQueue( tr_peermsgs * msgs )
{
struct peer_request req;
const int max = msgs->maxActiveRequests;
const int min = msgs->minActiveRequests;
int count = tr_list_size( msgs->clientAskedFor );
const time_t now = time( NULL );
int count = msgs->clientAskedFor.count;
int sent = 0;
if( count > min )
@ -626,14 +729,15 @@ pumpRequestQueue( tr_peermsgs * msgs )
if( msgs->info->clientIsChoked )
return;
while( ( count < max ) && ( msgs->clientWillAskFor != NULL ) )
while( ( count < max ) && !reqListPop( &msgs->clientWillAskFor, &req ) )
{
struct peer_request * r = tr_list_pop_front( &msgs->clientWillAskFor );
assert( requestIsValid( msgs, r ) );
assert( tr_bitfieldHas( msgs->info->have, r->index ) );
protocolSendRequest( msgs, r );
r->time_requested = msgs->lastReqAddedAt = time( NULL );
tr_list_append( &msgs->clientAskedFor, r );
assert( requestIsValid( msgs, &req ) );
assert( tr_bitfieldHas( msgs->info->have, req.index ) );
protocolSendRequest( msgs, &req );
req.time_requested = msgs->lastReqAddedAt = now;
reqListAppend( &msgs->clientAskedFor, &req );
++count;
++sent;
}
@ -641,8 +745,8 @@ pumpRequestQueue( tr_peermsgs * msgs )
if( sent )
dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
sent,
tr_list_size(msgs->clientAskedFor),
tr_list_size(msgs->clientWillAskFor) );
msgs->clientAskedFor.count,
msgs->clientWillAskFor.count );
if( count < max )
fireNeedReq( msgs );
@ -658,7 +762,7 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
uint32_t length )
{
const int req_max = msgs->maxActiveRequests;
struct peer_request tmp, *req;
struct peer_request req;
assert( msgs != NULL );
assert( msgs->torrent != NULL );
@ -679,20 +783,20 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
return TR_ADDREQ_MISSING;
/* peer's queue is full */
if( tr_list_size( msgs->clientWillAskFor ) >= req_max ) {
if( msgs->clientWillAskFor.count >= req_max ) {
dbgmsg( msgs, "declining request because we're full" );
return TR_ADDREQ_FULL;
}
/* have we already asked for this piece? */
tmp.index = index;
tmp.offset = offset;
tmp.length = length;
if( tr_list_find( msgs->clientAskedFor, &tmp, compareRequest ) ) {
req.index = index;
req.offset = offset;
req.length = length;
if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
if( tr_list_find( msgs->clientWillAskFor, &tmp, compareRequest ) ) {
if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
@ -702,30 +806,31 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
**/
dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
req = tr_new0( struct peer_request, 1 );
*req = tmp;
req->time_requested = time( NULL );
tr_list_append( &msgs->clientWillAskFor, req );
req.time_requested = time( NULL );
reqListAppend( &msgs->clientWillAskFor, &req );
return TR_ADDREQ_OK;
}
static void
cancelAllRequestsToPeer( tr_peermsgs * msgs )
{
struct peer_request * req;
int i;
struct request_list a = msgs->clientWillAskFor;
struct request_list b = msgs->clientAskedFor;
while(( req = tr_list_pop_front( &msgs->clientWillAskFor ) ))
{
fireCancelledReq( msgs, req );
tr_free( req );
msgs->clientAskedFor = REQUEST_LIST_INIT;
msgs->clientWillAskFor = REQUEST_LIST_INIT;
for( i=0; i<a.count; ++i )
fireCancelledReq( msgs, &a.requests[i] );
for( i=0; i<b.count; ++i ) {
fireCancelledReq( msgs, &b.requests[i] );
protocolSendCancel( msgs, &b.requests[i] );
}
while(( req = tr_list_pop_front( &msgs->clientAskedFor ) ))
{
fireCancelledReq( msgs, req );
protocolSendCancel( msgs, req );
tr_free( req );
}
reqListClear( &a );
reqListClear( &b );
}
void
@ -734,29 +839,24 @@ tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t offset,
uint32_t length )
{
struct peer_request *req, tmp;
struct peer_request req;
assert( msgs != NULL );
assert( length > 0 );
/* have we asked the peer for this piece? */
tmp.index = pieceIndex;
tmp.offset = offset;
tmp.length = length;
req.index = pieceIndex;
req.offset = offset;
req.length = length;
/* if it's only in the queue and hasn't been sent yet, free it */
if(( req = tr_list_remove( &msgs->clientWillAskFor, &tmp, compareRequest ) ))
{
fireCancelledReq( msgs, req );
tr_free( req );
}
if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
fireCancelledReq( msgs, &req );
/* if it's already been sent, send a cancel message too */
if(( req = tr_list_remove( &msgs->clientAskedFor, &tmp, compareRequest ) ))
{
protocolSendCancel( msgs, req );
fireCancelledReq( msgs, req );
tr_free( req );
if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
protocolSendCancel( msgs, &req );
fireCancelledReq( msgs, &req );
}
}
@ -1031,12 +1131,10 @@ peerMadeRequest( tr_peermsgs * msgs, const struct peer_request * req )
}
else /* YAY */
{
struct peer_request * tmp = tr_new( struct peer_request, 1 );
*tmp = *req;
if( peerIsFast && pieceIsFast )
tr_list_append( &msgs->peerAskedForFast, tmp );
reqListAppend( &msgs->peerAskedForFast, req );
else
tr_list_append( &msgs->peerAskedFor, tmp );
reqListAppend( &msgs->peerAskedFor, req );
}
}
@ -1239,8 +1337,8 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
dbgmsg( msgs, "got a Cancel %u:%u->%u", req.index, req.offset, req.length );
tr_free( tr_list_remove( &msgs->peerAskedForFast, &req, compareRequest ) );
tr_free( tr_list_remove( &msgs->peerAskedFor, &req, compareRequest ) );
reqListRemove( &msgs->peerAskedForFast, &req );
reqListRemove( &msgs->peerAskedFor, &req );
break;
}
@ -1283,7 +1381,7 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
tr_peerIoReadUint32( msgs->io, inbuf, &req.index );
tr_peerIoReadUint32( msgs->io, inbuf, &req.offset );
tr_peerIoReadUint32( msgs->io, inbuf, &req.length );
tr_free( tr_list_remove( &msgs->clientAskedFor, &req, compareRequest ) );
reqListRemove( &msgs->clientAskedFor, &req );
break;
}
@ -1390,7 +1488,6 @@ clientGotBlock( tr_peermsgs * msgs,
int err;
tr_torrent * tor = msgs->torrent;
const int block = _tr_block( tor, req->index, req->offset );
struct peer_request *myreq;
assert( msgs != NULL );
assert( req != NULL );
@ -1409,21 +1506,15 @@ clientGotBlock( tr_peermsgs * msgs,
*** Remove the block from our `we asked for this' list
**/
myreq = tr_list_remove( &msgs->clientAskedFor, req, compareRequest );
if( myreq == NULL ) {
if( reqListRemove( &msgs->clientAskedFor, req ) )
{
clientGotUnwantedBlock( msgs, req );
dbgmsg( msgs, "we didn't ask for this message..." );
return 0;
}
dbgmsg( msgs, "got block %u:%u->%u (turnaround time %d secs)",
myreq->index, myreq->offset, myreq->length,
(int)(time(NULL) - myreq->time_requested) );
dbgmsg( msgs, "peer has %d more blocks we've asked for",
tr_list_size(msgs->clientAskedFor));
tr_free( myreq );
myreq = NULL;
msgs->clientAskedFor.count );
/**
*** Error checks
@ -1555,19 +1646,20 @@ ratePulse( void * vmsgs )
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), 100 );
msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
msgs->minActiveRequests = msgs->maxActiveRequests / 3;
return TRUE;
}
static struct peer_request*
popNextRequest( tr_peermsgs * msgs )
static tr_errno
popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
{
struct peer_request * ret;
ret = tr_list_pop_front( &msgs->peerAskedForFast );
if( !ret )
ret = tr_list_pop_front( &msgs->peerAskedFor);
return ret;
if( !reqListPop( &msgs->peerAskedForFast, setme ) )
return 0;
if( !reqListPop( &msgs->peerAskedFor, setme ) )
return 0;
return TR_ERROR;
}
static int
@ -1575,7 +1667,6 @@ pulse( void * vmsgs )
{
const time_t now = time( NULL );
tr_peermsgs * msgs = vmsgs;
struct peer_request * r;
tr_peerIoTryRead( msgs->io );
pumpRequestQueue( msgs );
@ -1606,6 +1697,8 @@ pulse( void * vmsgs )
if( !msgs->sendingBlock )
{
struct peer_request req;
if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
{
dbgmsg( msgs, "flushing outMessages..." );
@ -1613,28 +1706,27 @@ pulse( void * vmsgs )
msgs->clientSentAnythingAt = now;
}
else if( !EVBUFFER_LENGTH( msgs->outBlock )
&& (( r = popNextRequest( msgs )))
&& requestIsValid( msgs, r )
&& tr_cpPieceIsComplete( msgs->torrent->completion, r->index ) )
&& !popNextRequest( msgs, &req )
&& requestIsValid( msgs, &req )
&& tr_cpPieceIsComplete( msgs->torrent->completion, req.index ) )
{
uint8_t * buf = tr_new( uint8_t, r->length );
uint8_t * buf = tr_new( uint8_t, req.length );
if( !tr_ioRead( msgs->torrent, r->index, r->offset, r->length, buf ) )
if( !tr_ioRead( msgs->torrent, req.index, req.offset, req.length, buf ) )
{
tr_peerIo * io = msgs->io;
struct evbuffer * out = msgs->outBlock;
dbgmsg( msgs, "sending block %u:%u->%u", r->index, r->offset, r->length );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + r->length );
dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + 2*sizeof(uint32_t) + req.length );
tr_peerIoWriteUint8 ( io, out, BT_PIECE );
tr_peerIoWriteUint32( io, out, r->index );
tr_peerIoWriteUint32( io, out, r->offset );
tr_peerIoWriteBytes ( io, out, buf, r->length );
tr_peerIoWriteUint32( io, out, req.index );
tr_peerIoWriteUint32( io, out, req.offset );
tr_peerIoWriteBytes ( io, out, buf, req.length );
msgs->sendingBlock = 1;
}
tr_free( buf );
tr_free( r );
}
else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
{
@ -1850,6 +1942,10 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
m->incoming.block = evbuffer_new( );
m->outBlock = evbuffer_new( );
m->peerAllowedPieces = NULL;
m->peerAskedFor = REQUEST_LIST_INIT;
m->peerAskedForFast = REQUEST_LIST_INIT;
m->clientAskedFor = REQUEST_LIST_INIT;
m->clientWillAskFor = REQUEST_LIST_INIT;
m->clientAllowedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
m->clientSuggestedPieces = tr_bitfieldNew( m->torrent->info.pieceCount );
*setme = tr_publisherSubscribe( m->publisher, func, userData );
@ -1888,10 +1984,10 @@ tr_peerMsgsFree( tr_peermsgs* msgs )
tr_timerFree( &msgs->rateTimer );
tr_timerFree( &msgs->pexTimer );
tr_publisherFree( &msgs->publisher );
tr_list_free( &msgs->clientWillAskFor, tr_free );
tr_list_free( &msgs->clientAskedFor, tr_free );
tr_list_free( &msgs->peerAskedForFast, tr_free );
tr_list_free( &msgs->peerAskedFor, tr_free );
reqListClear( &msgs->clientWillAskFor );
reqListClear( &msgs->clientAskedFor );
reqListClear( &msgs->peerAskedForFast );
reqListClear( &msgs->peerAskedFor );
tr_bitfieldFree( msgs->peerAllowedPieces );
tr_bitfieldFree( msgs->clientAllowedPieces );
tr_bitfieldFree( msgs->clientSuggestedPieces );