experimental message batcher as discussed in http://forum.transmissionbt.com/viewtopic.php?p=24251#24251
This commit is contained in:
parent
e391ef6e1a
commit
5141a65d45
|
@ -64,7 +64,7 @@ enum
|
|||
MIN_CHOKE_PERIOD_SEC = (10),
|
||||
|
||||
/* idle seconds before we send a keepalive */
|
||||
KEEPALIVE_INTERVAL_SECS = 90,
|
||||
KEEPALIVE_INTERVAL_SECS = 100,
|
||||
|
||||
PEX_INTERVAL = (90 * 1000), /* msec between sendPex() calls */
|
||||
PEER_PULSE_INTERVAL = (50), /* msec between pulse() calls */
|
||||
|
@ -84,7 +84,12 @@ enum
|
|||
|
||||
/* how long a sent request can stay queued before it's returned
|
||||
back to the peer-mgr's pool of requests */
|
||||
SENT_REQUEST_TTL_SECS = 120
|
||||
SENT_REQUEST_TTL_SECS = 120,
|
||||
|
||||
/* used in lowering the outMessages queue period */
|
||||
IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
|
||||
HIGH_PRIORITY_INTERVAL_SECS = 5,
|
||||
LOW_PRIORITY_INTERVAL_SECS = 30
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -267,6 +272,14 @@ struct tr_peermsgs
|
|||
|
||||
time_t clientSentPexAt;
|
||||
time_t clientSentAnythingAt;
|
||||
|
||||
/* when we started batching the outMessages */
|
||||
time_t outMessagesBatchedAt;
|
||||
|
||||
/* how long the outMessages batch should be allowed to grow before
|
||||
* it's flushed -- some messages (like requests >:) should be sent
|
||||
* very quickly; others aren't as urgent. */
|
||||
int outMessagesBatchPeriod;
|
||||
|
||||
tr_bitfield * peerAllowedPieces;
|
||||
|
||||
|
@ -314,6 +327,16 @@ myDebug( const char * file, int line,
|
|||
***
|
||||
**/
|
||||
|
||||
static void
|
||||
pokeBatchPeriod( tr_peermsgs * msgs, int interval )
|
||||
{
|
||||
if( msgs->outMessagesBatchPeriod > interval )
|
||||
{
|
||||
msgs->outMessagesBatchPeriod = interval;
|
||||
dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
|
||||
{
|
||||
|
@ -326,6 +349,7 @@ protocolSendRequest( tr_peermsgs * msgs, const struct peer_request * req )
|
|||
tr_peerIoWriteUint32( io, out, req->index );
|
||||
tr_peerIoWriteUint32( io, out, req->offset );
|
||||
tr_peerIoWriteUint32( io, out, req->length );
|
||||
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -340,6 +364,7 @@ protocolSendCancel( tr_peermsgs * msgs, const struct peer_request * req )
|
|||
tr_peerIoWriteUint32( io, out, req->index );
|
||||
tr_peerIoWriteUint32( io, out, req->offset );
|
||||
tr_peerIoWriteUint32( io, out, req->length );
|
||||
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -352,6 +377,7 @@ protocolSendHave( tr_peermsgs * msgs, uint32_t index )
|
|||
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
|
||||
tr_peerIoWriteUint8 ( io, out, BT_HAVE );
|
||||
tr_peerIoWriteUint32( io, out, index );
|
||||
pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -363,6 +389,7 @@ protocolSendChoke( tr_peermsgs * msgs, int choke )
|
|||
dbgmsg( msgs, "sending %s", (choke ? "Choke" : "Unchoke") );
|
||||
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) );
|
||||
tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
|
||||
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -496,6 +523,7 @@ sendInterest( tr_peermsgs * msgs, int weAreInterested )
|
|||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) );
|
||||
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages,
|
||||
weAreInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
|
||||
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -595,6 +623,7 @@ sendFastReject( tr_peermsgs * msgs,
|
|||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
|
||||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, offset );
|
||||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, length );
|
||||
pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -624,6 +653,7 @@ sendFastAllowed( tr_peermsgs * msgs,
|
|||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, sizeof(uint8_t) + sizeof(uint32_t) );
|
||||
tr_peerIoWriteUint8( msgs->io, msgs->outMessages, BT_ALLOWED_FAST );
|
||||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, pieceIndex );
|
||||
pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1570,6 +1600,7 @@ sendKeepalive( tr_peermsgs * msgs )
|
|||
{
|
||||
dbgmsg( msgs, "sending a keepalive message" );
|
||||
tr_peerIoWriteUint32( msgs->io, msgs->outMessages, 0 );
|
||||
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1656,12 +1687,21 @@ pulse( void * vmsgs )
|
|||
if( !msgs->sendingBlock )
|
||||
{
|
||||
struct peer_request req;
|
||||
int haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
|
||||
|
||||
if(( EVBUFFER_LENGTH( msgs->outMessages ) ))
|
||||
if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
|
||||
{
|
||||
dbgmsg( msgs, "flushing outMessages..." );
|
||||
dbgmsg( msgs, "started an outMessages batch (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
|
||||
msgs->outMessagesBatchedAt = now;
|
||||
}
|
||||
else if( haveMessages
|
||||
&& ( ( now - msgs->outMessagesBatchedAt ) > msgs->outMessagesBatchPeriod ) )
|
||||
{
|
||||
dbgmsg( msgs, "flushing outMessages... (length is %d)", (int)EVBUFFER_LENGTH( msgs->outMessages ) );
|
||||
tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
|
||||
msgs->clientSentAnythingAt = now;
|
||||
msgs->outMessagesBatchedAt = 0;
|
||||
msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
|
||||
}
|
||||
else if( !EVBUFFER_LENGTH( msgs->outBlock )
|
||||
&& !popNextRequest( msgs, &req )
|
||||
|
@ -1686,7 +1726,8 @@ pulse( void * vmsgs )
|
|||
|
||||
tr_free( buf );
|
||||
}
|
||||
else if( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
|
||||
else if( ( !haveMessages )
|
||||
&& ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS )
|
||||
{
|
||||
sendKeepalive( msgs );
|
||||
}
|
||||
|
@ -1716,6 +1757,7 @@ sendBitfield( tr_peermsgs * msgs )
|
|||
tr_peerIoWriteUint32( msgs->io, out, sizeof(uint8_t) + bitfield->len );
|
||||
tr_peerIoWriteUint8 ( msgs->io, out, BT_BITFIELD );
|
||||
tr_peerIoWriteBytes ( msgs->io, out, bitfield->bits, bitfield->len );
|
||||
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1838,6 +1880,7 @@ sendPex( tr_peermsgs * msgs )
|
|||
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, BT_LTEP );
|
||||
tr_peerIoWriteUint8 ( msgs->io, msgs->outMessages, msgs->ut_pex_id );
|
||||
tr_peerIoWriteBytes ( msgs->io, msgs->outMessages, benc, bencLen );
|
||||
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
|
||||
|
||||
/* cleanup */
|
||||
tr_free( benc );
|
||||
|
@ -1889,6 +1932,8 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
|
|||
m->rateTimer = tr_timerNew( m->handle, ratePulse, m, RATE_PULSE_INTERVAL );
|
||||
m->pexTimer = tr_timerNew( m->handle, pexPulse, m, PEX_INTERVAL );
|
||||
m->outMessages = evbuffer_new( );
|
||||
m->outMessagesBatchedAt = 0;
|
||||
m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
|
||||
m->incoming.block = evbuffer_new( );
|
||||
m->outBlock = evbuffer_new( );
|
||||
m->peerAllowedPieces = NULL;
|
||||
|
|
Loading…
Reference in New Issue