1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2025-03-15 16:29:34 +00:00

(libT) better possible fix for #1468: Speed display is very jumpy

This commit is contained in:
Charles Kerr 2008-11-17 04:00:57 +00:00
parent 9ee9a96b7d
commit a0fd42c5c3
8 changed files with 193 additions and 123 deletions

View file

@ -333,7 +333,7 @@ sendYa( tr_handshake * handshake )
/* send it */
setReadState( handshake, AWAITING_YB );
tr_peerIoWriteBuf( handshake->io, outbuf );
tr_peerIoWriteBuf( handshake->io, outbuf, FALSE );
/* cleanup */
evbuffer_free( outbuf );
@ -485,7 +485,7 @@ readYb( tr_handshake * handshake,
/* send it */
tr_cryptoDecryptInit( handshake->crypto );
setReadState( handshake, AWAITING_VC );
tr_peerIoWriteBuf( handshake->io, outbuf );
tr_peerIoWriteBuf( handshake->io, outbuf, FALSE );
/* cleanup */
evbuffer_free( outbuf );
@ -711,7 +711,7 @@ readHandshake( tr_handshake * handshake,
{
int msgSize;
uint8_t * msg = buildHandshakeMessage( handshake, &msgSize );
tr_peerIoWrite( handshake->io, msg, msgSize );
tr_peerIoWrite( handshake->io, msg, msgSize, FALSE );
tr_free( msg );
handshake->haveSentBitTorrentHandshake = 1;
}
@ -779,7 +779,7 @@ readYa( tr_handshake * handshake,
walk += len;
setReadState( handshake, AWAITING_PAD_A );
tr_peerIoWrite( handshake->io, outbuf, walk - outbuf );
tr_peerIoWrite( handshake->io, outbuf, walk - outbuf, FALSE );
return READ_NOW;
}
@ -991,7 +991,7 @@ readIA( tr_handshake * handshake,
}
/* send it out */
tr_peerIoWriteBuf( handshake->io, outbuf );
tr_peerIoWriteBuf( handshake->io, outbuf, FALSE );
evbuffer_free( outbuf );
/* now await the handshake */
@ -1156,7 +1156,7 @@ gotError( struct bufferevent * evbuf UNUSED,
msg = buildHandshakeMessage( handshake, &msgSize );
handshake->haveSentBitTorrentHandshake = 1;
setReadState( handshake, AWAITING_HANDSHAKE );
tr_peerIoWrite( handshake->io, msg, msgSize );
tr_peerIoWrite( handshake->io, msg, msgSize, FALSE );
tr_free( msg );
}
else
@ -1203,7 +1203,7 @@ tr_handshakeNew( tr_peerIo * io,
uint8_t * msg = buildHandshakeMessage( handshake, &msgSize );
handshake->haveSentBitTorrentHandshake = 1;
setReadState( handshake, AWAITING_HANDSHAKE );
tr_peerIoWrite( handshake->io, msg, msgSize );
tr_peerIoWrite( handshake->io, msg, msgSize, FALSE );
tr_free( msg );
}

View file

@ -49,11 +49,12 @@ PeerEventType;
typedef struct
{
PeerEventType eventType;
uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */
uint32_t offset; /* for GOT_BLOCK */
uint32_t length; /* for GOT_BLOCK + GOT_DATA */
float progress; /* for TR_PEER_PEER_PROGRESS */
int err; /* errno for TR_PEER_GOT_ERROR */
uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */
uint32_t offset; /* for GOT_BLOCK */
uint32_t length; /* for GOT_BLOCK + GOT_DATA */
float progress; /* for PEER_PROGRESS */
int err; /* errno for GOT_ERROR */
int wasPieceData; /* for GOT_DATA */
}
tr_peer_event;

View file

@ -27,6 +27,7 @@
#include "transmission.h"
#include "crypto.h"
#include "list.h"
#include "net.h"
#include "peer-io.h"
#include "ratecontrol.h"
@ -76,6 +77,12 @@ struct tr_bandwidth
size_t bytesLeft;
};
struct tr_datatype
{
unsigned int isPieceData : 1;
size_t length;
};
struct tr_peerIo
{
unsigned int isEncrypted : 1;
@ -92,11 +99,12 @@ struct tr_peerIo
uint8_t peerId[20];
time_t timeCreated;
tr_session * session;
tr_session * session;
struct in_addr in_addr;
struct bufferevent * bufev;
struct evbuffer * output;
struct bufferevent * bufev;
struct evbuffer * output;
tr_list * output_datatypes; /* struct tr_datatype */
tr_can_read_cb canRead;
tr_did_write_cb didWrite;
@ -199,22 +207,33 @@ didWriteWrapper( struct bufferevent * e,
if( len < io->bufferSize[TR_UP] )
{
const size_t payload = io->bufferSize[TR_UP] - len;
const size_t n = addPacketOverhead( payload );
struct tr_bandwidth * b = &io->bandwidth[TR_UP];
b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
b->bytesUsed += n;
tr_rcTransferred( io->session->rawSpeed[TR_UP], n );
dbgmsg( io,
"wrote %zu bytes to peer... upload bytesLeft is now %zu",
n,
b->bytesLeft );
size_t payload = io->bufferSize[TR_UP] - len;
while( payload )
{
struct tr_datatype * next = io->output_datatypes->data;
const size_t chunk_length = MIN( next->length, payload );
const size_t n = addPacketOverhead( chunk_length );
if( next->isPieceData )
{
struct tr_bandwidth * b = &io->bandwidth[TR_UP];
b->bytesLeft -= MIN( b->bytesLeft, n );
b->bytesUsed += n;
}
if( io->didWrite )
io->didWrite( io, n, next->isPieceData, io->userData );
payload -= chunk_length;
next->length -= chunk_length;
if( !next->length )
tr_free( tr_list_pop_front( &io->output_datatypes ) );
}
}
adjustOutputBuffer( io );
if( io->didWrite )
io->didWrite( e, io->userData );
}
static void
@ -237,11 +256,7 @@ canReadWrapper( struct bufferevent * e,
struct tr_bandwidth * b = io->bandwidth + TR_DOWN;
b->bytesLeft -= MIN( b->bytesLeft, (size_t)n );
b->bytesUsed += n;
tr_rcTransferred( io->session->rawSpeed[TR_DOWN], n );
dbgmsg( io,
"%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu",
n, b->bytesUsed,
b->bytesLeft );
dbgmsg( io, "%zu new input bytes. bytesUsed is %zu, bytesLeft is %zu", n, b->bytesUsed, b->bytesLeft );
adjustInputBuffer( io );
}
@ -386,6 +401,7 @@ io_dtor( void * vio )
bufferevent_free( io->bufev );
tr_netClose( io->socket );
tr_cryptoFree( io->crypto );
tr_list_free( &io->output_datatypes, tr_free );
tr_free( io );
}
@ -717,10 +733,12 @@ tr_peerIoWantsBandwidth( const tr_peerIo * io,
}
void
tr_peerIoWrite( tr_peerIo * io,
const void * writeme,
size_t writemeLen )
tr_peerIoWrite( tr_peerIo * io,
const void * writeme,
size_t writemeLen,
int isPieceData )
{
struct tr_datatype * datatype;
assert( tr_amInEventThread( io->session ) );
dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
@ -729,16 +747,22 @@ tr_peerIoWrite( tr_peerIo * io,
else
evbuffer_add( io->output, writeme, writemeLen );
datatype = tr_new( struct tr_datatype, 1 );
datatype->isPieceData = isPieceData != 0;
datatype->length = writemeLen;
tr_list_append( &io->output_datatypes, datatype );
adjustOutputBuffer( io );
}
void
tr_peerIoWriteBuf( tr_peerIo * io,
struct evbuffer * buf )
tr_peerIoWriteBuf( tr_peerIo * io,
struct evbuffer * buf,
int isPieceData )
{
const size_t n = EVBUFFER_LENGTH( buf );
tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n );
tr_peerIoWrite( io, EVBUFFER_DATA( buf ), n, isPieceData );
evbuffer_drain( buf, n );
}

View file

@ -28,14 +28,12 @@ typedef struct tr_peerIo tr_peerIo;
***
**/
tr_peerIo* tr_peerIoNewOutgoing(
struct tr_handle * session,
const struct in_addr * addr,
int port,
const uint8_t *
torrentHash );
tr_peerIo* tr_peerIoNewOutgoing( struct tr_handle * session,
const struct in_addr * addr,
int port,
const uint8_t * torrentHash );
tr_peerIo* tr_peerIoNewIncoming( struct tr_handle * session,
tr_peerIo* tr_peerIoNewIncoming( struct tr_handle * session,
const struct in_addr * addr,
uint16_t port,
int socket );
@ -108,31 +106,39 @@ typedef enum
}
ReadState;
typedef ReadState ( *tr_can_read_cb )( struct bufferevent*, void* user_data );
typedef void ( *tr_did_write_cb )( struct bufferevent *, void * );
typedef void ( *tr_net_error_cb )( struct bufferevent *, short what, void * );
typedef ReadState ( *tr_can_read_cb )( struct bufferevent * ev,
void * user_data );
void tr_peerIoSetIOFuncs( tr_peerIo * io,
tr_can_read_cb readcb,
tr_did_write_cb writecb,
tr_net_error_cb errcb,
void * user_data );
typedef void ( *tr_did_write_cb )( tr_peerIo * io,
size_t bytesWritten,
int wasPieceData,
void * userData );
int tr_peerIoWantsBandwidth( const tr_peerIo * io,
tr_direction );
typedef void ( *tr_net_error_cb )( struct bufferevent * ev,
short what,
void * userData );
#if 0
void tr_peerIoTryRead( tr_peerIo * io );
void tr_peerIoSetIOFuncs ( tr_peerIo * io,
tr_can_read_cb readcb,
tr_did_write_cb writecb,
tr_net_error_cb errcb,
void * user_data );
#endif
/**
***
**/
void tr_peerIoWrite( tr_peerIo * io,
const void * writeme,
size_t writemeLen );
int tr_peerIoWantsBandwidth ( const tr_peerIo * io,
tr_direction direction );
void tr_peerIoWriteBuf( tr_peerIo * io,
struct evbuffer * buf );
void tr_peerIoWrite ( tr_peerIo * io,
const void * writeme,
size_t writemeLen,
int isPieceData );
void tr_peerIoWriteBuf ( tr_peerIo * io,
struct evbuffer * buf,
int isPieceData );
/**
***

View file

@ -70,6 +70,9 @@ typedef struct tr_peer
/* the rate at which pieces are being transferred between client and peer.
* protocol overhead is NOT included; this is only the piece data */
struct tr_ratecontrol * pieceSpeed[2];
/* the rate at which all data is being transferred between client and peer. */
struct tr_ratecontrol * rawSpeed[2];
}
tr_peer;

View file

@ -330,6 +330,8 @@ peerConstructor( const struct in_addr * in_addr )
memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
return p;
}
@ -367,6 +369,8 @@ peerDestructor( tr_peer * peer )
tr_bitfieldFree( peer->blame );
tr_free( peer->client );
tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
tr_free( peer );
@ -1016,18 +1020,37 @@ peerCallbackFunc( void * vpeer,
{
const time_t now = time( NULL );
tr_torrent * tor = t->tor;
const tr_direction dir = TR_CLIENT_TO_PEER;
tor->activityDate = now;
tor->uploadedCur += e->length;
if( e->wasPieceData )
tor->uploadedCur += e->length;
/* add it to the raw upload speed */
if( peer )
tr_rcTransferred ( peer->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
tr_rcTransferred ( tor->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
tr_rcTransferred ( tor->session->pieceSpeed[TR_CLIENT_TO_PEER], e->length );
tr_statsAddUploaded( tor->session, e->length );
if( peer )
{
tr_rcTransferred ( peer->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
/* maybe add it to the piece upload speed */
if( e->wasPieceData ) {
if( peer )
tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
}
/* update the stats */
if( e->wasPieceData )
tr_statsAddUploaded( tor->session, e->length );
/* update our atom */
if( peer ) {
struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
a->piece_data_time = now;
}
break;
}
@ -1035,24 +1058,43 @@ peerCallbackFunc( void * vpeer,
{
const time_t now = time( NULL );
tr_torrent * tor = t->tor;
const tr_direction dir = TR_PEER_TO_CLIENT;
tor->activityDate = now;
tr_statsAddDownloaded( tor->session, e->length );
if( peer )
tr_rcTransferred ( peer->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
tr_rcTransferred ( tor->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
tr_rcTransferred ( tor->session->pieceSpeed[TR_PEER_TO_CLIENT], e->length );
/* only add this to downloadedCur if we got it from a peer --
* webseeds shouldn't count against our ratio. As one tracker
* admin put it, "Those pieces are downloaded directly from the
* content distributor, not the peers, it is the tracker's job
* to manage the swarms, not the web server and does not fit
* into the jurisdiction of the tracker." */
if( peer )
if( peer && e->wasPieceData )
tor->downloadedCur += e->length;
/* add it to our raw download speed */
if( peer )
tr_rcTransferred ( peer->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->rawSpeed[dir], e->length );
tr_rcTransferred ( tor->session->rawSpeed[dir], e->length );
/* maybe add it to the piece upload speed */
if( e->wasPieceData ) {
if( peer )
tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->pieceSpeed[dir], e->length );
tr_rcTransferred ( tor->session->pieceSpeed[dir], e->length );
}
/* update the stats */
if( e->wasPieceData )
tr_statsAddDownloaded( tor->session, e->length );
/* update our atom */
if( peer ) {
struct peer_atom * a = getExistingAtom( t, &peer->in_addr );
a->piece_data_time = now;
}
break;
}

View file

@ -429,7 +429,7 @@ protocolSendChoke( tr_peermsgs * msgs,
*** EVENTS
**/
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
static void
publish( tr_peermsgs * msgs,
@ -483,23 +483,28 @@ fireGotBlock( tr_peermsgs * msgs,
static void
fireClientGotData( tr_peermsgs * msgs,
uint32_t length )
uint32_t length,
int wasPieceData )
{
tr_peer_event e = blankEvent;
e.length = length;
e.eventType = TR_PEER_CLIENT_GOT_DATA;
e.wasPieceData = wasPieceData;
publish( msgs, &e );
}
static void
firePeerGotData( tr_peermsgs * msgs,
uint32_t length )
firePeerGotData( tr_peermsgs * msgs,
uint32_t length,
int wasPieceData )
{
tr_peer_event e = blankEvent;
e.length = length;
e.eventType = TR_PEER_PEER_GOT_DATA;
e.wasPieceData = wasPieceData;
publish( msgs, &e );
}
@ -1299,14 +1304,6 @@ static int clientGotBlock( tr_peermsgs * msgs,
const uint8_t * block,
const struct peer_request * req );
static void
clientGotBytes( tr_peermsgs * msgs,
uint32_t byteCount )
{
msgs->info->pieceDataActivityDate = time( NULL );
fireClientGotData( msgs, byteCount );
}
static int
readBtPiece( tr_peermsgs * msgs,
struct evbuffer * inbuf,
@ -1342,7 +1339,7 @@ readBtPiece( tr_peermsgs * msgs,
assert( EVBUFFER_LENGTH( inbuf ) >= n );
tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
evbuffer_add( msgs->incoming.block, buf, n );
clientGotBytes( msgs, n );
fireClientGotData( msgs, n, TRUE );
tr_free( buf );
dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
(int)n, req->index, req->offset, req->length,
@ -1538,15 +1535,6 @@ readBtMessage( tr_peermsgs * msgs,
return READ_NOW;
}
static void
peerGotBytes( tr_peermsgs * msgs,
uint32_t byteCount,
const time_t now )
{
msgs->info->pieceDataActivityDate = now;
firePeerGotData( msgs, byteCount );
}
static void
decrementDownloadedCount( tr_peermsgs * msgs,
uint32_t byteCount )
@ -1634,6 +1622,13 @@ clientGotBlock( tr_peermsgs * msgs,
return 0;
}
static void
didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
{
tr_peermsgs * msgs = vmsgs;
firePeerGotData( msgs, bytesWritten, wasPieceData );
}
static ReadState
canRead( struct bufferevent * evin,
void * vmsgs )
@ -1652,19 +1647,23 @@ canRead( struct bufferevent * evin,
ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
}
else switch( msgs->state )
{
case AWAITING_BT_LENGTH:
ret = readBtLength ( msgs, in, inlen ); break;
{
case AWAITING_BT_LENGTH:
ret = readBtLength ( msgs, in, inlen ); break;
case AWAITING_BT_ID:
ret = readBtId ( msgs, in, inlen ); break;
case AWAITING_BT_ID:
ret = readBtId ( msgs, in, inlen ); break;
case AWAITING_BT_MESSAGE:
ret = readBtMessage( msgs, in, inlen ); break;
case AWAITING_BT_MESSAGE:
ret = readBtMessage( msgs, in, inlen ); break;
default:
assert( 0 );
}
default:
assert( 0 );
}
/* log the raw data that was read */
if( EVBUFFER_LENGTH( in ) != inlen )
fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
return ret;
}
@ -1726,20 +1725,16 @@ peerPulse( void * vmsgs )
if( outlen )
{
tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen );
tr_peerIoWrite( msgs->io, EVBUFFER_DATA( msgs->outBlock ), outlen, TRUE );
evbuffer_drain( msgs->outBlock, outlen );
peerGotBytes( msgs, outlen, now );
len -= outlen;
msgs->clientSentAnythingAt = now;
msgs->sendingBlock = len != 0;
dbgmsg( msgs, "wrote %d bytes; %d left in block", (int)outlen,
(int)len );
dbgmsg( msgs, "wrote %zu bytes; %zu left in block", outlen, len );
}
else dbgmsg( msgs,
"stalled writing block... uploadMax %lu, outlen %lu",
uploadMax, outlen );
else dbgmsg( msgs, "stalled writing block... uploadMax %lu, outlen %lu", uploadMax, outlen );
}
if( !msgs->sendingBlock )
@ -1749,18 +1744,15 @@ peerPulse( void * vmsgs )
if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
{
dbgmsg( msgs, "started an outMessages batch (length is %d)",
(int)EVBUFFER_LENGTH( msgs->outMessages ) );
dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
msgs->outMessagesBatchedAt = now;
}
else if( haveMessages
&& ( ( now - msgs->outMessagesBatchedAt ) >
msgs->outMessagesBatchPeriod ) )
{
dbgmsg( msgs, "flushing outMessages... (length is %d)",
(int)EVBUFFER_LENGTH(
msgs->outMessages ) );
tr_peerIoWriteBuf( msgs->io, msgs->outMessages );
dbgmsg( msgs, "flushing outMessages... (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
tr_peerIoWriteBuf( msgs->io, msgs->outMessages, FALSE );
msgs->clientSentAnythingAt = now;
msgs->outMessagesBatchedAt = 0;
msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
@ -2089,7 +2081,7 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
tr_peerIoSetTimeoutSecs( m->io, 150 ); /* timeout after N seconds of
inactivity */
tr_peerIoSetIOFuncs( m->io, canRead, NULL, gotError, m );
tr_peerIoSetIOFuncs( m->io, canRead, didWrite, gotError, m );
ratePulse( m );
return m;

View file

@ -51,7 +51,7 @@ struct tr_webseed
****
***/
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0 };
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0 };
static void
publish( tr_webseed * w,
@ -93,6 +93,8 @@ fireClientGotData( tr_webseed * w,
e.eventType = TR_PEER_CLIENT_GOT_DATA;
e.length = length;
e.wasPieceData = TRUE;
publish( w, &e );
}