diff --git a/libtransmission/Makefile.am b/libtransmission/Makefile.am index 756b8539d..3e93ebb08 100644 --- a/libtransmission/Makefile.am +++ b/libtransmission/Makefile.am @@ -24,6 +24,7 @@ libtransmission_a_SOURCES = \ ggets.c \ handshake.c \ inout.c \ + iobuf.c \ json.c \ JSON_parser.c \ list.c \ @@ -69,6 +70,7 @@ noinst_HEADERS = \ ggets.h \ handshake.h \ inout.h \ + iobuf.h \ json.h \ JSON_parser.h \ list.h \ diff --git a/libtransmission/bandwidth.c b/libtransmission/bandwidth.c index 3b508afcf..0391cd9cb 100644 --- a/libtransmission/bandwidth.c +++ b/libtransmission/bandwidth.c @@ -10,9 +10,15 @@ * $Id:$ */ +#include #include + +#include "event.h" + #include "transmission.h" #include "bandwidth.h" +#include "iobuf.h" +#include "ptrarray.h" #include "utils.h" /*** @@ -24,7 +30,8 @@ enum HISTORY_MSEC = 2000, INTERVAL_MSEC = HISTORY_MSEC, GRANULARITY_MSEC = 250, - HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC ) + HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC ), + MAGIC_NUMBER = 43143 }; struct bratecontrol @@ -75,33 +82,75 @@ bytesUsed( struct bratecontrol * r, size_t size ) ******* ******/ -struct tr_bandwidth +struct tr_band { unsigned int isLimited : 1; - + unsigned int honorParentLimits : 1; size_t bytesLeft; - + double desiredSpeed; struct bratecontrol raw; struct bratecontrol piece; +}; +struct tr_bandwidth +{ + struct tr_band band[2]; + struct tr_bandwidth * parent; + int magicNumber; tr_session * session; + tr_ptrArray * children; /* struct tr_bandwidth */ + tr_ptrArray * iobufs; /* struct tr_iobuf */ }; /*** **** ***/ +static int +comparePointers( const void * a, const void * b ) +{ + return a - b; +} + +static int +isBandwidth( const tr_bandwidth * b ) +{ + return ( b != NULL ) && ( b->magicNumber == MAGIC_NUMBER ); +} + +static int +isDirection( const tr_direction dir ) +{ + return ( dir == TR_UP ) || ( dir == TR_DOWN ); +} + +/*** +**** +***/ + tr_bandwidth* -tr_bandwidthNew( tr_session * session ) +tr_bandwidthNew( tr_session * session, tr_bandwidth * parent ) { tr_bandwidth * b = tr_new0( tr_bandwidth, 1 ); b->session = session; + b->children = tr_ptrArrayNew( ); + b->iobufs = tr_ptrArrayNew( ); + b->magicNumber = MAGIC_NUMBER; + b->band[TR_UP].honorParentLimits = 1; + b->band[TR_DOWN].honorParentLimits = 1; + tr_bandwidthSetParent( b, parent ); return b; } void tr_bandwidthFree( tr_bandwidth * b ) { + assert( isBandwidth( b ) ); + + tr_bandwidthSetParent( b, NULL ); + tr_ptrArrayFree( b->iobufs, NULL ); + tr_ptrArrayFree( b->children, NULL ); + b->magicNumber = 0xDEAD; tr_free( b ); } @@ -110,61 +159,235 @@ tr_bandwidthFree( tr_bandwidth * b ) ***/ void -tr_bandwidthSetLimited( tr_bandwidth * b, - size_t bytesLeft ) +tr_bandwidthSetParent( tr_bandwidth * b, + tr_bandwidth * parent ) { - b->isLimited = 1; - b->bytesLeft = bytesLeft; + assert( isBandwidth( b ) ); + assert( b != parent ); + + if( b->parent ) + { + assert( isBandwidth( b->parent ) ); + + tr_ptrArrayRemoveSorted( b->parent->children, b, comparePointers ); + b->parent= NULL; + } + + if( parent ) + { + assert( isBandwidth( parent ) ); + assert( parent->parent != b ); + + tr_ptrArrayInsertSorted( parent->children, b, comparePointers ); + b->parent = parent; + } } void -tr_bandwidthSetUnlimited( tr_bandwidth * b ) +tr_bandwidthHonorParentLimits( tr_bandwidth * b, + tr_direction dir, + int honorParentLimits ) { - b->isLimited = 0; -} + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); -size_t -tr_bandwidthClamp( const tr_bandwidth * b, - size_t byteCount ) -{ -/* const size_t n = byteCount; */ - - if( b && b->isLimited ) - byteCount = MIN( byteCount, b->bytesLeft ); - -/* if( n != byteCount ) fprintf( stderr, "%p: %zu clamped to %zu\n", b, n, byteCount ); */ - return byteCount; + b->band[dir].honorParentLimits = honorParentLimits != 0; } /*** **** ***/ -double -tr_bandwidthGetRawSpeed( const tr_bandwidth * b ) +void +tr_bandwidthSetDesiredSpeed( tr_bandwidth * b, + tr_direction dir, + double desiredSpeed ) { - return getSpeed( &b->raw ); + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + b->band[dir].desiredSpeed = desiredSpeed; } double -tr_bandwidthGetPieceSpeed( const tr_bandwidth * b UNUSED ) +tr_bandwidthGetDesiredSpeed( const tr_bandwidth * b, + tr_direction dir ) { - return getSpeed( &b->piece ); + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + return b->band[dir].desiredSpeed; +} + +void +tr_bandwidthSetLimited( tr_bandwidth * b, + tr_direction dir, + int isLimited ) +{ + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + b->band[dir].isLimited = isLimited != 0; +} + +int +tr_bandwidthIsLimited( const tr_bandwidth * b, + tr_direction dir ) +{ + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + return b->band[dir].isLimited != 0; +} + +#if 0 +#define DEBUG_DIRECTION TR_DOWN +#endif + +void +tr_bandwidthAllocate( tr_bandwidth * b, + tr_direction dir, + int period_msec ) +{ + const double currentSpeed = tr_bandwidthGetPieceSpeed( b, dir ); /* KiB/s */ + const double desiredSpeed = b->band[dir].desiredSpeed; /* KiB/s */ + const double seconds_per_pulse = period_msec / 1000.0; + const double current_bytes_per_pulse = currentSpeed * 1024.0 * seconds_per_pulse; + const double desired_bytes_per_pulse = desiredSpeed * 1024.0 * seconds_per_pulse; + const double pulses_per_history = (double)HISTORY_MSEC / period_msec; + const double min = desired_bytes_per_pulse * 0.90; + const double max = desired_bytes_per_pulse * 1.50; + const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 ) + - ( current_bytes_per_pulse * pulses_per_history ); + double clamped; + + /* clamp the return value to lessen oscillation */ + clamped = next_pulse_bytes; + clamped = MAX( clamped, min ); + clamped = MIN( clamped, max ); + b->band[dir].bytesLeft = clamped; + +#ifdef DEBUG_DIRECTION +if( dir == DEBUG_DIRECTION ) +fprintf( stderr, "bandwidth %p currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n", + b, currentSpeed, desiredSpeed, + clamped/1024.0, next_pulse_bytes/1024.0 ); +#endif + + /* notify the io buffers that there's more bandwidth available */ + if( !b->band[dir].isLimited || ( clamped > 0 ) ) { + int i, n=0; + short what = dir==TR_UP ? EV_WRITE : EV_READ; + struct tr_iobuf ** iobufs = (struct tr_iobuf**) tr_ptrArrayPeek( b->iobufs, &n ); +#ifdef DEBUG_DIRECTION +if( dir == DEBUG_DIRECTION ) +fprintf( stderr, "bandwidth %p has %d iobufs\n", b, n ); +#endif + for( i=0; ichildren, &n ); + for( i=0; iiobufs, iobuf, comparePointers ); +} + +void +tr_bandwidthRemoveBuffer( tr_bandwidth * b, + struct tr_iobuf * iobuf ) +{ + assert( isBandwidth( b ) ); + assert( iobuf ); + + tr_ptrArrayRemoveSorted( b->iobufs, iobuf, comparePointers ); +} + +/*** +**** +***/ + +size_t +tr_bandwidthClamp( const tr_bandwidth * b, + tr_direction dir, + size_t byteCount ) +{ + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + if( b ) + { + if( b->band[dir].isLimited ) + byteCount = MIN( byteCount, b->band[dir].bytesLeft ); + + if( b->parent && b->band[dir].honorParentLimits ) + byteCount = tr_bandwidthClamp( b->parent, dir, byteCount ); + } + + return byteCount; +} + +double +tr_bandwidthGetRawSpeed( const tr_bandwidth * b, tr_direction dir ) +{ + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + return getSpeed( &b->band[dir].raw ); +} + +double +tr_bandwidthGetPieceSpeed( const tr_bandwidth * b, tr_direction dir ) +{ + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + return getSpeed( &b->band[dir].piece ); } void tr_bandwidthUsed( tr_bandwidth * b, + tr_direction dir, size_t byteCount, int isPieceData ) { - if( b->isLimited && isPieceData ) - { - b->bytesLeft -= MIN( b->bytesLeft, byteCount ); - /* fprintf( stderr, "%p used %zu bytes ... %zu left\n", b, byteCount, b->bytesLeft ); */ - } + struct tr_band * band; - bytesUsed( &b->raw, byteCount ); + assert( isBandwidth( b ) ); + assert( isDirection( dir ) ); + + band = &b->band[dir]; + + if( band->isLimited && isPieceData ) + band->bytesLeft -= MIN( band->bytesLeft, byteCount ); + +#ifdef DEBUG_DIRECTION +if( ( dir == DEBUG_DIRECTION ) && band->isLimited && isPieceData ) +fprintf( stderr, "%p consumed %zu bytes of piece data... %zu left\n", b, byteCount, band->bytesLeft ); +#endif + + bytesUsed( &band->raw, byteCount ); if( isPieceData ) - bytesUsed( &b->piece, byteCount ); + bytesUsed( &band->piece, byteCount ); + + if( b->parent != NULL ) + tr_bandwidthUsed( b->parent, dir, byteCount, isPieceData ); } diff --git a/libtransmission/bandwidth.h b/libtransmission/bandwidth.h index 2f7f56aa5..89083840f 100644 --- a/libtransmission/bandwidth.h +++ b/libtransmission/bandwidth.h @@ -17,38 +17,165 @@ #ifndef TR_BANDWIDTH_H #define TR_BANDWIDTH_H +struct tr_iobuf; + +/** + * Bandwidth is an object for measuring and constraining bandwidth speeds. + * + * Bandwidth objects can be "stacked" so that a peer can be made to obey + * multiple constraints (for example, obeying the global speed limit and a + * per-torrent speed limit). + * + * HIERARCHY + * + * Transmission's bandwidth hierarchy is a tree. + * At the top is the global bandwidth object owned by tr_session. + * Its children are per-torrent bandwidth objects owned by tr_torrent. + * Underneath those are per-peer bandwidth objects owned by tr_peer. + * + * tr_session also owns a tr_handshake's bandwidths, so that the handshake + * I/O can be counted in the global raw totals. When the handshake is done, + * the bandwidth's ownership passes to a tr_peer. + * + * MEASURING + * + * When you ask a bandwidth object for its speed, it gives the speed of the + * subtree underneath it as well. So you can get Transmission's overall + * speed by quering tr_session's bandwidth, per-torrent speeds by asking + * tr_torrent's bandwidth, and per-peer speeds by asking tr_peer's bandwidth. + * + * CONSTRAINING + * + * Call tr_bandwidthAllocate() periodically. tr_bandwidth knows its current + * speed and will decide how many bytes to make available over the + * user-specified period to reach the user-specified desired speed. + * If appropriate, it notifies its iobufs that new bandwidth is available. + * + * tr_bandwidthAllocate() operates on the tr_bandwidth subtree, so usually + * you'll only need to invoke it for the top-level tr_session bandwidth. + * + * The iobufs all have a pointer to their associated tr_bandwidth object, + * and call tr_bandwidthClamp() before performing I/O to see how much + * bandwidth they can safely use. + */ typedef struct tr_bandwidth tr_bandwidth; /** *** **/ -tr_bandwidth* tr_bandwidthNew ( tr_session * session ); +/** @brief create a new tr_bandwidth object */ +tr_bandwidth* + tr_bandwidthNew ( tr_session * session, + tr_bandwidth * parent ); -void tr_bandwidthFree ( tr_bandwidth * bandwidth ); +/** @brief destroy a tr_bandwidth object */ +void tr_bandwidthFree ( tr_bandwidth * bandwidth ); + +/****** +******* +******/ /** -*** -**/ - -void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth, - size_t byteCount ); - -void tr_bandwidthSetUnlimited ( tr_bandwidth * bandwidth ); - -size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth, - size_t byteCount ); + * @brief Set the desired speed (in KiB/s) for this bandwidth subtree. + * @see tr_bandwidthAllocate + * @see tr_bandwidthGetDesiredSpeed + */ +void tr_bandwidthSetDesiredSpeed ( tr_bandwidth * bandwidth, + tr_direction direction, + double desiredSpeed ); /** -*** -**/ + * @brief Get the desired speed (in KiB/s) for ths bandwidth subtree. + * @see tr_bandwidthSetDesiredSpeed + */ +double tr_bandwidthGetDesiredSpeed ( const tr_bandwidth * bandwidth, + tr_direction direction ); -double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth ); +/** + * @brief Set whether or not this bandwidth should throttle its iobufs' speeds + */ +void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth, + tr_direction direction, + int isLimited ); -double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth ); +/** + * @return nonzero if this bandwidth throttles its iobufs' speeds + */ +int tr_bandwidthIsLimited ( const tr_bandwidth * bandwidth, + tr_direction direction ); -void tr_bandwidthUsed ( tr_bandwidth * bandwidth, - size_t byteCount, - int isPieceData ); +/** + * @brief allocate the next period_msec's worth of bandwidth for the iobufs to consume + */ +void tr_bandwidthAllocate ( tr_bandwidth * bandwidth, + tr_direction direction, + int period_msec ); + +/** + * @brief clamps byteCount down to a number that this bandwidth will allow to be consumed + */ +size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth, + tr_direction direction, + size_t byteCount ); + +/****** +******* +******/ + +/** + * @brief Get the raw total of bytes read or sent by this bandwidth subtree. + */ +double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth, + tr_direction direction ); + +/** + * @brief Get the number of piece data bytes read or sent by this bandwidth subtree. + */ +double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth, + tr_direction direction ); + +/** + * @brief Notify the bandwidth object that some of its allocated bandwidth has been consumed. + * This is is usually invoked by the iobuf after a read or write. + */ +void tr_bandwidthUsed ( tr_bandwidth * bandwidth, + tr_direction direction, + size_t byteCount, + int isPieceData ); + +/****** +******* +******/ + +void tr_bandwidthSetParent ( tr_bandwidth * bandwidth, + tr_bandwidth * parent ); + +/** + * Almost all the time we do want to honor a parents' bandwidth cap, so that + * (for example) a peer is constrained by a per-torrent cap and the global cap. + * But when we set a torrent's speed mode to TR_SPEEDLIMIT_UNLIMITED, then + * in that particular case we want to ignore the global speed limit... + */ +void tr_bandwidthHonorParentLimits ( tr_bandwidth * bandwidth, + tr_direction direction, + int isEnabled ); + +/****** +******* +******/ + +/** + * @brief add an iobuf to this bandwidth's list of iobufs. + * They will be notified when more bandwidth is made available for them to consume. + */ +void tr_bandwidthAddBuffer ( tr_bandwidth * bandwidth, + struct tr_iobuf * iobuf ); + +/** + * @brief remove an iobuf from this bandwidth's list of iobufs. + */ +void tr_bandwidthRemoveBuffer ( tr_bandwidth * bandwidth, + struct tr_iobuf * iobuf ); #endif diff --git a/libtransmission/handshake.c b/libtransmission/handshake.c index 68036337a..ac2c0df46 100644 --- a/libtransmission/handshake.c +++ b/libtransmission/handshake.c @@ -24,6 +24,7 @@ #include "clients.h" #include "crypto.h" #include "handshake.h" +#include "iobuf.h" #include "peer-io.h" #include "peer-mgr.h" #include "torrent.h" @@ -1028,14 +1029,16 @@ readPayloadStream( tr_handshake * handshake, ***/ static ReadState -canRead( struct bufferevent * evin, - void * arg ) +canRead( struct tr_iobuf * iobuf, void * arg, size_t * piece ) { - tr_handshake * handshake = (tr_handshake *) arg; - struct evbuffer * inbuf = EVBUFFER_INPUT ( evin ); + tr_handshake * handshake = arg; + struct evbuffer * inbuf = tr_iobuf_input( iobuf ); ReadState ret; int readyForMore = TRUE; + /* no piece data in handshake */ + *piece = 0; + dbgmsg( handshake, "handling canRead; state is [%s]", getStateName( handshake->state ) ); @@ -1136,9 +1139,9 @@ tr_handshakeAbort( tr_handshake * handshake ) } static void -gotError( struct bufferevent * evbuf UNUSED, - short what, - void * arg ) +gotError( struct tr_iobuf * iobuf UNUSED, + short what, + void * arg ) { tr_handshake * handshake = (tr_handshake *) arg; @@ -1179,9 +1182,6 @@ tr_handshakeNew( tr_peerIo * io, { tr_handshake * handshake; - tr_peerIoSetBandwidth( io, TR_UP, NULL ); - tr_peerIoSetBandwidth( io, TR_DOWN, NULL ); - handshake = tr_new0( tr_handshake, 1 ); handshake->io = io; handshake->crypto = tr_peerIoGetCrypto( io ); diff --git a/libtransmission/iobuf.c b/libtransmission/iobuf.c new file mode 100644 index 000000000..b20d2db46 --- /dev/null +++ b/libtransmission/iobuf.c @@ -0,0 +1,378 @@ +/* + * Copyright (c) 2002-2004 Niels Provos + * All rights reserved. + * + * Transmission modifications and new bugs by Charles Kerr. + * Source: libevent's "patches-1.4" branch, svn revision 949 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include +#include /* write */ + +#include "evutil.h" +#include "event.h" + +#include "transmission.h" +#include "bandwidth.h" +#include "iobuf.h" +#include "session.h" +#include "utils.h" + +#define MAGIC_NUMBER 235705 + +struct tr_iobuf +{ + struct event_base * ev_base; + + struct event ev_read; + struct event ev_write; + + struct evbuffer * input; + struct evbuffer * output; + + tr_iobuf_cb readcb; + tr_iobuf_cb writecb; + tr_iobuf_error_cb errorcb; + void * cbarg; + + int magicNumber; + + int timeout_read; /* in seconds */ + int timeout_write; /* in seconds */ + short enabled; /* events that are currently enabled */ + + struct tr_handle * session; + struct tr_bandwidth * bandwidth; +}; + +/*** +**** +***/ + +static int +isBuf( const struct tr_iobuf * iobuf ) +{ + return ( iobuf != NULL ) && ( iobuf->magicNumber == MAGIC_NUMBER ); +} + +static int +tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t maxlen ) +{ + int n = MIN( EVBUFFER_LENGTH( buffer ), maxlen ); + +#ifdef WIN32 + n = send(fd, buffer->buffer, n, 0 ); +#else + n = write(fd, buffer->buffer, n ); +#endif + if( n == -1 ) + return -1; + if (n == 0) + return 0; + evbuffer_drain( buffer, n ); + + return n; +} + +static int +tr_iobuf_add(struct event *ev, int timeout) +{ + struct timeval tv, *ptv = NULL; + + if (timeout) { + evutil_timerclear(&tv); + tv.tv_sec = timeout; + ptv = &tv; + } + + return event_add( ev, ptv ); +} + +static void +tr_iobuf_readcb( int fd, short event, void * arg ) +{ + int res; + short what = EVBUFFER_READ; + struct tr_iobuf * b = arg; + const size_t howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, 2048 ); + + assert( isBuf( b ) ); + + if( event == EV_TIMEOUT ) { + what |= EVBUFFER_TIMEOUT; + goto error; + } + + /* if we don't have any bandwidth left, stop reading */ + if( howmuch < 1 ) { + event_del( &b->ev_read ); + return; + } + + res = evbuffer_read( b->input, fd, howmuch ); + if( res == -1 ) { + if( errno == EAGAIN || errno == EINTR ) + goto reschedule; + /* error case */ + what |= EVBUFFER_ERROR; + } else if( res == 0 ) { + /* eof case */ + what |= EVBUFFER_EOF; + } + + if( res <= 0 ) + goto error; + + tr_iobuf_add( &b->ev_read, b->timeout_read ); + + /* Invoke the user callback - must always be called last */ + if( b->readcb != NULL ) + ( *b->readcb )( b, res, b->cbarg ); + return; + + reschedule: + tr_iobuf_add( &b->ev_read, b->timeout_read ); + return; + + error: + (*b->errorcb)( b, what, b->cbarg ); +} + +static void +tr_iobuf_writecb( int fd, short event, void * arg ) +{ + int res = 0; + short what = EVBUFFER_WRITE; + struct tr_iobuf * b = arg; + size_t howmuch; + + assert( isBuf( b ) ); + + if( event == EV_TIMEOUT ) { + what |= EVBUFFER_TIMEOUT; + goto error; + } + + howmuch = MIN( (size_t)b->session->so_sndbuf, EVBUFFER_LENGTH( b->output ) ); + howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch ); + + /* if we don't have any bandwidth left, stop writing */ + if( howmuch < 1 ) { + event_del( &b->ev_write ); + return; + } + + res = tr_evbuffer_write( b->output, fd, howmuch ); + if (res == -1) { +#ifndef WIN32 +/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not + *set errno. thus this error checking is not portable*/ + if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS) + goto reschedule; + /* error case */ + what |= EVBUFFER_ERROR; + +#else + goto reschedule; +#endif + + } else if (res == 0) { + /* eof case */ + what |= EVBUFFER_EOF; + } + if (res <= 0) + goto error; + + if( EVBUFFER_LENGTH( b->output ) ) + tr_iobuf_add( &b->ev_write, b->timeout_write ); + + if( b->writecb != NULL ) + (*b->writecb)( b, res, b->cbarg ); + + return; + + reschedule: + if( EVBUFFER_LENGTH( b->output ) ) + tr_iobuf_add( &b->ev_write, b->timeout_write ); + return; + + error: + (*b->errorcb)( b, what, b->cbarg ); +} + + +/* + * Create a new buffered event object. + * + * The read callback is invoked whenever we read new data. + * The write callback is invoked whenever the output buffer is drained. + * The error callback is invoked on a write/read error or on EOF. + * + * Both read and write callbacks maybe NULL. The error callback is not + * allowed to be NULL and have to be provided always. + */ + +struct tr_iobuf * +tr_iobuf_new( struct tr_handle * session, + tr_bandwidth * bandwidth, + int fd, + short event, + tr_iobuf_cb readcb, + tr_iobuf_cb writecb, + tr_iobuf_error_cb errorcb, + void * cbarg ) +{ + struct tr_iobuf * b; + + b = tr_new0( struct tr_iobuf, 1 ); + b->magicNumber = MAGIC_NUMBER; + b->session = session; + b->bandwidth = bandwidth; + b->input = evbuffer_new( ); + b->output = evbuffer_new( ); + + event_set( &b->ev_read, fd, EV_READ, tr_iobuf_readcb, b ); + event_set( &b->ev_write, fd, EV_WRITE, tr_iobuf_writecb, b ); + + tr_iobuf_setcb( b, readcb, writecb, errorcb, cbarg ); + tr_iobuf_enable( b, event ); + + return b; +} + +void +tr_iobuf_setcb( struct tr_iobuf * b, + tr_iobuf_cb readcb, + tr_iobuf_cb writecb, + tr_iobuf_error_cb errorcb, + void * cbarg ) +{ + assert( isBuf( b ) ); + + b->readcb = readcb; + b->writecb = writecb; + b->errorcb = errorcb; + b->cbarg = cbarg; +} + +/* Closing the file descriptor is the responsibility of the caller */ + +void +tr_iobuf_free( struct tr_iobuf * b ) +{ + assert( isBuf( b ) ); + + b->magicNumber = 0xDEAD; + event_del( &b->ev_read ); + event_del( &b->ev_write ); + evbuffer_free( b->input ); + evbuffer_free( b->output ); + tr_free( b ); +} + +int +tr_iobuf_enable(struct tr_iobuf * b, short event ) +{ + assert( isBuf( b ) ); + + if( event & EV_READ ) + if( tr_iobuf_add( &b->ev_read, b->timeout_read ) == -1 ) + return -1; + + if( event & EV_WRITE ) + if ( tr_iobuf_add( &b->ev_write, b->timeout_write ) == -1 ) + return -1; + + b->enabled |= event; + return 0; +} + +int +tr_iobuf_disable( struct tr_iobuf * b, short event ) +{ + assert( isBuf( b ) ); + + if( event & EV_READ ) + if( event_del( &b->ev_read ) == -1 ) + return -1; + + if( event & EV_WRITE ) + if( event_del( &b->ev_write ) == -1 ) + return -1; + + b->enabled &= ~event; + return 0; +} + +void +tr_iobuf_settimeout( struct tr_iobuf * b, + int timeout_read, + int timeout_write ) +{ + assert( isBuf( b ) ); + + b->timeout_read = timeout_read; + if( event_pending( &b->ev_read, EV_READ, NULL ) ) + tr_iobuf_add( &b->ev_read, timeout_read ); + + b->timeout_write = timeout_write; + if( event_pending( &b->ev_write, EV_WRITE, NULL ) ) + tr_iobuf_add( &b->ev_write, timeout_write ); +} + +void +tr_iobuf_set_bandwidth( struct tr_iobuf * b, + struct tr_bandwidth * bandwidth ) +{ + assert( isBuf( b ) ); + + b->bandwidth = bandwidth; +} + +struct evbuffer* +tr_iobuf_input( struct tr_iobuf * b ) +{ + assert( isBuf( b ) ); + + return b->input; +} + +struct evbuffer* +tr_iobuf_output( struct tr_iobuf * b ) +{ + assert( isBuf( b ) ); + + return b->output; +} diff --git a/libtransmission/iobuf.h b/libtransmission/iobuf.h new file mode 100644 index 000000000..fbcf8922a --- /dev/null +++ b/libtransmission/iobuf.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2002-2004 Niels Provos + * All rights reserved. + * + * Transmission modifications and new bugs by Charles Kerr. + * Source: libevent's "patches-1.4" branch, svn revision 949 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __TRANSMISSION__ +#error only libtransmission should #include this header. +#endif + +#ifndef TR_EVBUFFER_H +#define TR_EVBUFFER_H + +/** + * tr_iobuf is an i/o convenience wrapper similar to libevent's "bufferevent". + * it provides two evbuffers, one for writing and one for reading, and handles + * pumping them to/from the given fd. + * + * this class differs from bufferevent in two major ways: + * 1. the action callbacks include the number of bytes transferred + * 2. the up/down speeds are directly constrained by our `bandwidth' object + * 3. the implementation is hidden in the .c file + */ +struct tr_iobuf; + +struct evbuffer; +struct tr_bandwidth; +struct tr_session; + +/** @brief returns the input evbuffer to that we can read from */ +struct evbuffer* tr_iobuf_input( struct tr_iobuf * iobuf ); + +/** @brief returns the output evbuffer that we can write to */ +struct evbuffer* tr_iobuf_output( struct tr_iobuf * iobuf ); + +/** @brief prototype for the callbacks invoked when bytes have been read or written + @see tr_iobuf_new + @see tr_iobuf_setcb */ +typedef void (*tr_iobuf_cb)( struct tr_iobuf*, size_t bytes_transferred, void* ); + +/** @brief prototype for the callback invoked on error + @see tr_iobuf_new + @see tr_iobuf_setcb */ +typedef void (*tr_iobuf_error_cb)( struct tr_iobuf*, short what, void* ); + +/** @brief create a new tr_iobuf object. */ +struct tr_iobuf* tr_iobuf_new( struct tr_handle * session, + struct tr_bandwidth * bandwidth, + int fd, + short event, + tr_iobuf_cb readcb, + tr_iobuf_cb writecb, + tr_iobuf_error_cb errorcb, + void * cbarg ); + +/** @brief destroy a tr_iobuf object. */ +void tr_iobuf_free( struct tr_iobuf * iobuf ); + +/** @brief change the number of seconds it takes for a read/write to timeout */ +void tr_iobuf_settimeout( struct tr_iobuf * iobuf, + int timeout_read, + int timeout_write ); + +/** @brief set the bandwidth object that limits this iobuf's read/write speeds */ +void tr_iobuf_set_bandwidth( struct tr_iobuf * iobuf, + struct tr_bandwidth * bandwidth ); + +/** @brief change the callbacks invoked by this iobuf on error and bytes transferred */ +void tr_iobuf_setcb( struct tr_iobuf * iobuf, + tr_iobuf_cb readcb, + tr_iobuf_cb writecb, + tr_iobuf_error_cb errorcb, + void * cbarg ); + +/** @brief tell the iobuf to poll for certain states. + @brief event may be EV_READ, EV_WRITE, or EV_READ|EV_WRITE */ +int tr_iobuf_enable( struct tr_iobuf * iobuf, short event ); + +/** @brief tell the iobuf to stop polling for certain states. + @brief event may be EV_READ, EV_WRITE, or EV_READ|EV_WRITE */ +int tr_iobuf_disable( struct tr_iobuf * iobuf, short event ); + +#endif diff --git a/libtransmission/net.c b/libtransmission/net.c index ba40a250a..4eb551557 100644 --- a/libtransmission/net.c +++ b/libtransmission/net.c @@ -125,7 +125,6 @@ createSocket( int type ) static void setSndBuf( tr_session * session, int fd ) { -#if 0 if( fd >= 0 ) { const int sndbuf = session->so_sndbuf; @@ -133,7 +132,6 @@ setSndBuf( tr_session * session, int fd ) setsockopt( fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof( sndbuf ) ); setsockopt( fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof( rcvbuf ) ); } -#endif } int diff --git a/libtransmission/peer-io.c b/libtransmission/peer-io.c index 4ef53fed2..71ae2fb35 100644 --- a/libtransmission/peer-io.c +++ b/libtransmission/peer-io.c @@ -28,12 +28,14 @@ #include "transmission.h" #include "bandwidth.h" #include "crypto.h" +#include "iobuf.h" #include "list.h" #include "net.h" #include "peer-io.h" #include "trevent.h" #include "utils.h" +#define MAGIC_NUMBER 206745 #define IO_TIMEOUT_SECS 8 static size_t @@ -78,131 +80,75 @@ struct tr_datatype struct tr_peerIo { - unsigned int isEncrypted : 1; - unsigned int isIncoming : 1; - unsigned int peerIdIsSet : 1; - unsigned int extendedProtocolSupported : 1; - unsigned int fastPeersSupported : 1; + unsigned int isEncrypted : 1; + unsigned int isIncoming : 1; + unsigned int peerIdIsSet : 1; + unsigned int extendedProtocolSupported : 1; + unsigned int fastPeersSupported : 1; - uint8_t encryptionMode; - uint8_t timeout; - uint16_t port; - int socket; + int magicNumber; - uint8_t peerId[20]; - time_t timeCreated; + uint8_t encryptionMode; + uint8_t timeout; + uint16_t port; + int socket; - tr_session * session; + uint8_t peerId[20]; + time_t timeCreated; - struct in_addr in_addr; - struct bufferevent * bufev; - struct evbuffer * output; - tr_list * output_datatypes; /* struct tr_datatype */ + tr_session * session; - tr_can_read_cb canRead; - tr_did_write_cb didWrite; - tr_net_error_cb gotError; - void * userData; + struct in_addr in_addr; + struct tr_iobuf * iobuf; + tr_list * output_datatypes; /* struct tr_datatype */ - size_t bufferSize[2]; + tr_can_read_cb canRead; + tr_did_write_cb didWrite; + tr_net_error_cb gotError; + void * userData; - tr_bandwidth * bandwidth[2]; - tr_crypto * crypto; + size_t bufferSize[2]; + + tr_bandwidth * bandwidth; + tr_crypto * crypto; }; -/** -*** -**/ - -static void -adjustOutputBuffer( tr_peerIo * io ) -{ - struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev ); - size_t curLive = EVBUFFER_LENGTH( live ); - size_t maxLive = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_sndbuf ); - - if( ( curLive < maxLive ) && EVBUFFER_LENGTH( io->output ) ) - { - size_t freeSpace = maxLive - curLive; - size_t n = MIN( freeSpace, EVBUFFER_LENGTH( io->output ) ); - bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n ); - evbuffer_drain( io->output, n ); - curLive += n; - } - - io->bufferSize[TR_UP] = curLive; - - if( curLive ) - bufferevent_enable( io->bufev, EV_WRITE ); - - dbgmsg( io, "after adjusting the output buffer, its size is now %zu", curLive ); -} - -static void -adjustInputBuffer( tr_peerIo * io ) -{ - /* FIXME: the max read size probably needs to vary depending on the - * number of peers that we have connected... 1024 is going to force - * us way over the limit when there are lots of peers */ - static const int maxBufSize = 1024; - const size_t n = tr_bandwidthClamp( io->bandwidth[TR_DOWN], maxBufSize ); - - if( !n ) - { - dbgmsg( io, "disabling reads because we've hit our limit" ); - bufferevent_disable( io->bufev, EV_READ ); - } - else - { - dbgmsg( io, "enabling reading of %zu more bytes", n ); - bufferevent_setwatermark( io->bufev, EV_READ, 0, n ); - bufferevent_enable( io->bufev, EV_READ ); - } - - io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( io->bufev ) ); -} - /*** **** ***/ static void -didWriteWrapper( struct bufferevent * e, - void * vio ) +didWriteWrapper( struct tr_iobuf * iobuf, + size_t bytes_transferred, + void * vio ) { tr_peerIo * io = vio; - const size_t len = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( e ) ); - dbgmsg( io, "didWrite... io->outputBufferSize was %zu, is now %zu", - io->bufferSize[TR_UP], len ); - - if( len < io->bufferSize[TR_UP] ) + while( bytes_transferred ) { - size_t payload = io->bufferSize[TR_UP] - len; + struct tr_datatype * next = io->output_datatypes->data; + const size_t chunk_length = MIN( next->length, bytes_transferred ); + const size_t n = addPacketOverhead( chunk_length ); - while( payload ) - { - struct tr_datatype * next = io->output_datatypes->data; - const size_t chunk_length = MIN( next->length, payload ); - const size_t n = addPacketOverhead( chunk_length ); + tr_bandwidthUsed( io->bandwidth, TR_UP, n, next->isPieceData ); - if( io->didWrite ) - io->didWrite( io, n, next->isPieceData, io->userData ); + if( io->didWrite ) + io->didWrite( io, n, next->isPieceData, io->userData ); - payload -= chunk_length; - next->length -= chunk_length; - if( !next->length ) - tr_free( tr_list_pop_front( &io->output_datatypes ) ); - } + bytes_transferred -= chunk_length; + next->length -= chunk_length; + if( !next->length ) + tr_free( tr_list_pop_front( &io->output_datatypes ) ); } - adjustOutputBuffer( io ); - + if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) ) + tr_iobuf_enable( io->iobuf, EV_WRITE ); } static void -canReadWrapper( struct bufferevent * e, - void * vio ) +canReadWrapper( struct tr_iobuf * iobuf, + size_t bytes_transferred UNUSED, + void * vio ) { int done = 0; int err = 0; @@ -218,12 +164,23 @@ canReadWrapper( struct bufferevent * e, while( !done && !err ) { - const int ret = io->canRead( e, io->userData ); + size_t piece = 0; + const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) ); + const int ret = io->canRead( iobuf, io->userData, &piece ); + + if( ret != err ) + { + const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) ); + if( piece ) + tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE ); + if( used != piece ) + tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE ); + } switch( ret ) { case READ_NOW: - if( EVBUFFER_LENGTH( e->input ) ) + if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf ))) continue; done = 1; break; @@ -240,20 +197,17 @@ canReadWrapper( struct bufferevent * e, tr_globalUnlock( session ); } - - if( !err ) - adjustInputBuffer( io ); } static void -gotErrorWrapper( struct bufferevent * e, - short what, - void * userData ) +gotErrorWrapper( struct tr_iobuf * iobuf, + short what, + void * userData ) { tr_peerIo * c = userData; if( c->gotError ) - c->gotError( e, what, c->userData ); + c->gotError( iobuf, what, c->userData ); } /** @@ -263,19 +217,22 @@ gotErrorWrapper( struct bufferevent * e, static void bufevNew( tr_peerIo * io ) { - io->bufev = bufferevent_new( io->socket, - canReadWrapper, - didWriteWrapper, - gotErrorWrapper, - io ); + io->iobuf = tr_iobuf_new( io->session, + io->bandwidth, + io->socket, + EV_READ | EV_WRITE, + canReadWrapper, + didWriteWrapper, + gotErrorWrapper, + io ); - /* tell libevent to call didWriteWrapper after every write, - * not just when the write buffer is empty */ - bufferevent_setwatermark( io->bufev, EV_WRITE, INT_MAX, 0 ); + tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout ); +} - bufferevent_settimeout( io->bufev, io->timeout, io->timeout ); - - bufferevent_enable( io->bufev, EV_READ | EV_WRITE ); +static int +isPeerIo( const tr_peerIo * io ) +{ + return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER ); } static tr_peerIo* @@ -292,6 +249,7 @@ tr_peerIoNew( tr_session * session, tr_netSetTOS( socket, session->peerSocketTOS ); io = tr_new0( tr_peerIo, 1 ); + io->magicNumber = MAGIC_NUMBER; io->crypto = tr_cryptoNew( torrentHash, isIncoming ); io->session = session; io->in_addr = *in_addr; @@ -300,8 +258,8 @@ tr_peerIoNew( tr_session * session, io->isIncoming = isIncoming != 0; io->timeout = IO_TIMEOUT_SECS; io->timeCreated = time( NULL ); - io->output = evbuffer_new( ); bufevNew( io ); + tr_peerIoSetBandwidth( io, session->bandwidth ); return io; } @@ -345,11 +303,13 @@ io_dtor( void * vio ) { tr_peerIo * io = vio; - evbuffer_free( io->output ); - bufferevent_free( io->bufev ); + tr_peerIoSetBandwidth( io, NULL ); + tr_iobuf_free( io->iobuf ); tr_netClose( io->socket ); tr_cryptoFree( io->crypto ); tr_list_free( &io->output_datatypes, tr_free ); + + io->magicNumber = 0xDEAD; tr_free( io ); } @@ -368,7 +328,7 @@ tr_peerIoFree( tr_peerIo * io ) tr_session* tr_peerIoGetSession( tr_peerIo * io ) { - assert( io ); + assert( isPeerIo( io ) ); assert( io->session ); return io->session; @@ -378,7 +338,7 @@ const struct in_addr* tr_peerIoGetAddress( const tr_peerIo * io, uint16_t * port ) { - assert( io ); + assert( isPeerIo( io ) ); if( port ) *port = io->port; @@ -406,8 +366,8 @@ tr_peerIoGetAddrStr( const tr_peerIo * io ) static void tr_peerIoTryRead( tr_peerIo * io ) { - if( EVBUFFER_LENGTH( io->bufev->input ) ) - canReadWrapper( io->bufev, io ); + if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf ))) + (*canReadWrapper)( io->iobuf, ~0, io ); } void @@ -443,10 +403,14 @@ tr_peerIoReconnect( tr_peerIo * io ) if( io->socket >= 0 ) { - tr_netSetTOS( io->socket, io->session->peerSocketTOS ); + tr_bandwidth * bandwidth = io->bandwidth; + tr_peerIoSetBandwidth( io, NULL ); - bufferevent_free( io->bufev ); + tr_netSetTOS( io->socket, io->session->peerSocketTOS ); + tr_iobuf_free( io->iobuf ); bufevNew( io ); + + tr_peerIoSetBandwidth( io, bandwidth ); return 0; } @@ -458,8 +422,8 @@ tr_peerIoSetTimeoutSecs( tr_peerIo * io, int secs ) { io->timeout = secs; - bufferevent_settimeout( io->bufev, io->timeout, io->timeout ); - bufferevent_enable( io->bufev, EV_READ | EV_WRITE ); + tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout ); + tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE ); } /** @@ -470,7 +434,7 @@ void tr_peerIoSetTorrentHash( tr_peerIo * io, const uint8_t * hash ) { - assert( io ); + assert( isPeerIo( io ) ); tr_cryptoSetTorrentHash( io->crypto, hash ); } @@ -478,7 +442,7 @@ tr_peerIoSetTorrentHash( tr_peerIo * io, const uint8_t* tr_peerIoGetTorrentHash( tr_peerIo * io ) { - assert( io ); + assert( isPeerIo( io ) ); assert( io->crypto ); return tr_cryptoGetTorrentHash( io->crypto ); @@ -487,7 +451,7 @@ tr_peerIoGetTorrentHash( tr_peerIo * io ) int tr_peerIoHasTorrentHash( const tr_peerIo * io ) { - assert( io ); + assert( isPeerIo( io ) ); assert( io->crypto ); return tr_cryptoHasTorrentHash( io->crypto ); @@ -501,7 +465,7 @@ void tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id ) { - assert( io ); + assert( isPeerIo( io ) ); if( ( io->peerIdIsSet = peer_id != NULL ) ) memcpy( io->peerId, peer_id, 20 ); @@ -512,7 +476,7 @@ tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t* tr_peerIoGetPeersId( const tr_peerIo * io ) { - assert( io ); + assert( isPeerIo( io ) ); assert( io->peerIdIsSet ); return io->peerId; @@ -526,7 +490,7 @@ void tr_peerIoEnableLTEP( tr_peerIo * io, int flag ) { - assert( io ); + assert( isPeerIo( io ) ); assert( flag == 0 || flag == 1 ); io->extendedProtocolSupported = flag; @@ -536,7 +500,7 @@ void tr_peerIoEnableFEXT( tr_peerIo * io, int flag ) { - assert( io ); + assert( isPeerIo( io ) ); assert( flag == 0 || flag == 1 ); io->fastPeersSupported = flag; @@ -545,7 +509,7 @@ tr_peerIoEnableFEXT( tr_peerIo * io, int tr_peerIoSupportsLTEP( const tr_peerIo * io ) { - assert( io ); + assert( isPeerIo( io ) ); return io->extendedProtocolSupported; } @@ -553,7 +517,7 @@ tr_peerIoSupportsLTEP( const tr_peerIo * io ) int tr_peerIoSupportsFEXT( const tr_peerIo * io ) { - assert( io ); + assert( isPeerIo( io ) ); return io->fastPeersSupported; } @@ -565,12 +529,8 @@ tr_peerIoSupportsFEXT( const tr_peerIo * io ) size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ) { - const size_t desiredLiveLen = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_rcvbuf ); - const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) ); - const size_t desiredQueueLen = io->session->so_sndbuf; - const size_t currentQueueLen = EVBUFFER_LENGTH( io->output ); - const size_t desiredLen = desiredLiveLen + desiredQueueLen; - const size_t currentLen = currentLiveLen + currentQueueLen; + const size_t desiredLen = io->session->so_sndbuf * 2; /* FIXME: bigger? */ + const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) ); size_t freeSpace = 0; if( desiredLen > currentLen ) @@ -581,18 +541,20 @@ tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ) void tr_peerIoSetBandwidth( tr_peerIo * io, - tr_direction direction, tr_bandwidth * bandwidth ) { - assert( io ); - assert( direction == TR_UP || direction == TR_DOWN ); + assert( isPeerIo( io ) ); - io->bandwidth[direction] = bandwidth; + if( io->bandwidth ) + tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf ); - if( direction == TR_UP ) - adjustOutputBuffer( io ); - else - adjustInputBuffer( io ); + io->bandwidth = bandwidth; + tr_iobuf_set_bandwidth( io->iobuf, bandwidth ); + + if( io->bandwidth ) + tr_bandwidthAddBuffer( io->bandwidth, io->iobuf ); + + tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE ); } /** @@ -609,7 +571,7 @@ void tr_peerIoSetEncryption( tr_peerIo * io, int encryptionMode ) { - assert( io ); + assert( isPeerIo( io ) ); assert( encryptionMode == PEER_ENCRYPTION_NONE || encryptionMode == PEER_ENCRYPTION_RC4 ); @@ -636,14 +598,13 @@ tr_peerIoWrite( tr_peerIo * io, assert( tr_amInEventThread( io->session ) ); dbgmsg( io, "adding %zu bytes into io->output", writemeLen ); - evbuffer_add( io->output, writeme, writemeLen ); - datatype = tr_new( struct tr_datatype, 1 ); datatype->isPieceData = isPieceData != 0; datatype->length = writemeLen; tr_list_append( &io->output_datatypes, datatype ); - adjustOutputBuffer( io ); + evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen ); + tr_iobuf_enable( io->iobuf, EV_WRITE ); } void diff --git a/libtransmission/peer-io.h b/libtransmission/peer-io.h index 598b4f06f..8a9e12702 100644 --- a/libtransmission/peer-io.h +++ b/libtransmission/peer-io.h @@ -23,9 +23,9 @@ struct in_addr; struct evbuffer; -struct bufferevent; struct tr_bandwidth; struct tr_crypto; +struct tr_iobuf; typedef struct tr_peerIo tr_peerIo; /** @@ -110,17 +110,18 @@ typedef enum } ReadState; -typedef ReadState ( *tr_can_read_cb )( struct bufferevent * ev, - void * user_data ); +typedef ReadState ( *tr_can_read_cb )( struct tr_iobuf * iobuf, + void * user_data, + size_t * setme_piece_byte_count ); -typedef void ( *tr_did_write_cb )( tr_peerIo * io, - size_t bytesWritten, - int wasPieceData, - void * userData ); +typedef void ( *tr_did_write_cb )( tr_peerIo * io, + size_t bytesWritten, + int wasPieceData, + void * userData ); -typedef void ( *tr_net_error_cb )( struct bufferevent * ev, - short what, - void * userData ); +typedef void ( *tr_net_error_cb )( struct tr_iobuf * ev, + short what, + void * userData ); void tr_peerIoSetIOFuncs ( tr_peerIo * io, tr_can_read_cb readcb, @@ -205,8 +206,13 @@ void tr_peerIoDrain( tr_peerIo * io, size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io ); void tr_peerIoSetBandwidth( tr_peerIo * io, - tr_direction direction, struct tr_bandwidth * bandwidth ); +void tr_peerIoBandwidthUsed( tr_peerIo * io, + tr_direction direction, + size_t byteCount, + int isPieceData ); + + #endif diff --git a/libtransmission/peer-mgr-private.h b/libtransmission/peer-mgr-private.h index ad652c941..2402c6424 100644 --- a/libtransmission/peer-mgr-private.h +++ b/libtransmission/peer-mgr-private.h @@ -27,10 +27,10 @@ #include "publish.h" /* tr_publisher_tag */ +struct tr_bandwidth; struct tr_bitfield; struct tr_peerIo; struct tr_peermsgs; -struct tr_ratecontrol; enum { @@ -71,12 +71,7 @@ typedef struct tr_peer struct tr_peermsgs * msgs; tr_publisher_tag msgsTag; - /* the rate at which pieces are being transferred between client and peer. - * protocol overhead is NOT included; this is only the piece data */ - struct tr_ratecontrol * pieceSpeed[2]; - - /* the rate at which all data is being transferred between client and peer. */ - struct tr_ratecontrol * rawSpeed[2]; + struct tr_bandwidth * bandwidth; } tr_peer; diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index 179872fb9..7314e58d2 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -33,7 +33,6 @@ #include "peer-mgr-private.h" #include "peer-msgs.h" #include "ptrarray.h" -#include "ratecontrol.h" #include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */ #include "torrent.h" #include "trevent.h" @@ -58,7 +57,7 @@ enum RECONNECT_PERIOD_MSEC = ( 2 * 1000 ), /* how frequently to reallocate bandwidth */ - BANDWIDTH_PERIOD_MSEC = 333, + BANDWIDTH_PERIOD_MSEC = 200, /* max # of peers to ask fer per torrent per reconnect pulse */ MAX_RECONNECTIONS_PER_PULSE = 4, @@ -323,16 +322,13 @@ peerIsInUse( const Torrent * ct, } static tr_peer* -peerConstructor( const struct in_addr * in_addr ) +peerConstructor( tr_torrent * tor, const struct in_addr * in_addr ) { tr_peer * p; p = tr_new0( tr_peer, 1 ); memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) ); - p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); - p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); - p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( ); - p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( ); + p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth ); return p; } @@ -348,7 +344,7 @@ getPeer( Torrent * torrent, if( peer == NULL ) { - peer = peerConstructor( in_addr ); + peer = peerConstructor( torrent->tor, in_addr ); tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare ); } @@ -370,10 +366,8 @@ peerDestructor( tr_peer * peer ) tr_bitfieldFree( peer->blame ); tr_free( peer->client ); - tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] ); - tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] ); - tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] ); - tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] ); + tr_bandwidthFree( peer->bandwidth ); + tr_free( peer ); } @@ -1017,26 +1011,12 @@ peerCallbackFunc( void * vpeer, { const time_t now = time( NULL ); tr_torrent * tor = t->tor; - const tr_direction dir = TR_CLIENT_TO_PEER; tor->activityDate = now; if( e->wasPieceData ) tor->uploadedCur += e->length; - /* add it to the raw upload speed */ - if( peer ) - tr_rcTransferred ( peer->rawSpeed[dir], e->length ); - - /* maybe add it to the piece upload speed */ - if( e->wasPieceData ) { - if( peer ) - tr_rcTransferred ( peer->pieceSpeed[dir], e->length ); - } - - tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData ); - tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData ); - /* update the stats */ if( e->wasPieceData ) tr_statsAddUploaded( tor->session, e->length ); @@ -1054,7 +1034,6 @@ peerCallbackFunc( void * vpeer, { const time_t now = time( NULL ); tr_torrent * tor = t->tor; - const tr_direction dir = TR_PEER_TO_CLIENT; tor->activityDate = now; @@ -1067,19 +1046,6 @@ peerCallbackFunc( void * vpeer, if( peer && e->wasPieceData ) tor->downloadedCur += e->length; - /* add it to our raw download speed */ - if( peer ) - tr_rcTransferred ( peer->rawSpeed[dir], e->length ); - - /* maybe add it to the piece upload speed */ - if( e->wasPieceData ) { - if( peer ) - tr_rcTransferred ( peer->pieceSpeed[dir], e->length ); - } - - tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData ); - tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData ); - /* update the stats */ if( e->wasPieceData ) tr_statsAddDownloaded( tor->session, e->length ); @@ -1312,17 +1278,16 @@ myHandshakeDoneCB( tr_handshake * handshake, if( !peer_id ) peer->client = NULL; - else - { + else { char client[128]; tr_clientForId( client, sizeof( client ), peer_id ); peer->client = tr_strdup( client ); } + peer->port = port; peer->io = io; - peer->msgs = - tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, - &peer->msgsTag ); + peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag ); + tr_peerIoSetBandwidth( io, peer->bandwidth ); success = TRUE; } @@ -1804,7 +1769,7 @@ tr_peerGetPieceSpeed( const tr_peer * peer, assert( peer ); assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT ); - return tr_rcRate( peer->pieceSpeed[direction] ); + return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction ); } @@ -2362,10 +2327,10 @@ pumpAllPeers( tr_peerMgr * mgr ) const int torrentCount = tr_ptrArraySize( mgr->torrents ); int i, j; - for( i = 0; i < torrentCount; ++i ) + for( i=0; itorrents, i ); - for( j = 0; j < tr_ptrArraySize( t->peers ); ++j ) + for( j=0; jpeers ); ++j ) { tr_peer * peer = tr_ptrArrayNth( t->peers, j ); tr_peerMsgsPulse( peer->msgs ); @@ -2373,165 +2338,16 @@ pumpAllPeers( tr_peerMgr * mgr ) } } -static void -setTorrentBandwidth( Torrent * t, - tr_direction dir, - tr_bandwidth * bandwidth ) -{ - int i; - int peerCount = 0; - tr_peer ** peers = getConnectedPeers( t, &peerCount ); - - for( i=0; iio, dir, bandwidth ); - - tr_free( peers ); -} - -#if 0 -#define DEBUG_DIRECTION TR_DOWN -#endif - -static double -bytesPerPulse( double KiB_per_second ) -{ - return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 ); -} - -/* - * @param currentSpeed current speed in KiB/s - * @param desiredSpeed desired speed in KiB/s - */ -static double -allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed ) -{ - const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC; - const double current_bytes_per_pulse = bytesPerPulse( currentSpeed ); - const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed ); - const double min = desired_bytes_per_pulse * 0.90; - const double max = desired_bytes_per_pulse * 1.50; - const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 ) - - ( current_bytes_per_pulse * pulses_per_history ); - double clamped; - - /* clamp the return value to lessen oscillation */ - clamped = next_pulse_bytes; - clamped = MAX( clamped, min ); - clamped = MIN( clamped, max ); - -#ifdef DEBUG_DIRECTION -if( dir == DEBUG_DIRECTION ) -fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n", - currentSpeed, - desiredSpeed, - clamped/1024.0, - next_pulse_bytes/1024.0 ); -#endif - - return clamped; -} - -/** - * Allocate bandwidth for each peer connection. - * - * @param mgr the peer manager - * @param direction whether to allocate upload or download bandwidth - */ -static void -allocateBandwidth( tr_peerMgr * mgr, - tr_direction direction ) -{ - int i; - tr_session * session = mgr->session; - const int torrentCount = tr_ptrArraySize( mgr->torrents ); - Torrent ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents ); - tr_bandwidth * global_pool = session->bandwidth[direction]; - - assert( mgr ); - assert( direction == TR_UP || direction == TR_DOWN ); - - /* before allocating bandwidth, pump the connected peers */ - pumpAllPeers( mgr ); - - for( i=0; itor ) == TR_STATUS_STOPPED ) - continue; - - /* if piece data is disallowed, don't bother limiting bandwidth -- - * we won't be asking for, or sending out, any pieces */ - if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) ) - speedMode = TR_SPEEDLIMIT_UNLIMITED; - else - speedMode = tr_torrentGetSpeedMode( t->tor, direction ); - - /* process the torrent's peers based on its speed mode */ - switch( speedMode ) - { - case TR_SPEEDLIMIT_UNLIMITED: - { - tr_bandwidth * b = t->tor->bandwidth[direction]; - tr_bandwidthSetUnlimited( b ); - setTorrentBandwidth( t, direction, b ); - break; - } - - case TR_SPEEDLIMIT_SINGLE: - { - tr_bandwidth * b = t->tor->bandwidth[direction]; - const double currentSpeed = tr_bandwidthGetPieceSpeed( b ); - const double desiredSpeed = tr_torrentGetSpeedLimit( t->tor, direction ); - const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed ); -#ifdef DEBUG_DIRECTION -if( direction == DEBUG_DIRECTION ) -fprintf( stderr, "single: currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", currentSpeed, desiredSpeed, bytesPerPulse ); -#endif - tr_bandwidthSetLimited( b, bytesPerPulse ); - setTorrentBandwidth( t, direction, b ); - break; - } - - case TR_SPEEDLIMIT_GLOBAL: - { - setTorrentBandwidth( t, direction, global_pool ); - break; - } - } - } - - /* handle the global pool's connections */ - if( !tr_sessionIsSpeedLimitEnabled( session, direction ) ) - tr_bandwidthSetUnlimited( global_pool ); - else { - const double currentSpeed = tr_bandwidthGetPieceSpeed( global_pool ); - const double desiredSpeed = tr_sessionGetSpeedLimit( session, direction ); - const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed ); -#ifdef DEBUG_DIRECTION -if( direction == DEBUG_DIRECTION ) -fprintf( stderr, "global(%p): currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", global_pool, currentSpeed, desiredSpeed, bytesPerPulse ); -#endif - tr_bandwidthSetLimited( session->bandwidth[direction], bytesPerPulse ); - } - - /* now that we've allocated bandwidth, pump all the connected peers */ - pumpAllPeers( mgr ); -} - static int bandwidthPulse( void * vmgr ) { tr_peerMgr * mgr = vmgr; - int i; - managerLock( mgr ); - /* allocate the upload and download bandwidth */ - for( i = 0; i < 2; ++i ) - allocateBandwidth( mgr, i ); + pumpAllPeers( mgr ); + tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC ); + tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC ); + pumpAllPeers( mgr ); managerUnlock( mgr ); return TRUE; diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index dc0543194..ff076ba12 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -24,6 +24,7 @@ #include "completion.h" #include "crypto.h" #include "inout.h" +#include "iobuf.h" #ifdef WIN32 #include "net.h" /* for ECONN */ #endif @@ -1314,9 +1315,10 @@ static int clientGotBlock( tr_peermsgs * msgs, const struct peer_request * req ); static int -readBtPiece( tr_peermsgs * msgs, - struct evbuffer * inbuf, - size_t inlen ) +readBtPiece( tr_peermsgs * msgs, + struct evbuffer * inbuf, + size_t inlen, + size_t * setme_piece_bytes_read ) { struct peer_request * req = &msgs->incoming.blockReq; @@ -1349,6 +1351,7 @@ readBtPiece( tr_peermsgs * msgs, tr_peerIoReadBytes( msgs->io, inbuf, buf, n ); evbuffer_add( msgs->incoming.block, buf, n ); fireClientGotData( msgs, n, TRUE ); + *setme_piece_bytes_read += n; tr_free( buf ); dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain", (int)n, req->index, req->offset, req->length, @@ -1642,12 +1645,11 @@ didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * v } static ReadState -canRead( struct bufferevent * evin, - void * vmsgs ) +canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece ) { ReadState ret; tr_peermsgs * msgs = vmsgs; - struct evbuffer * in = EVBUFFER_INPUT ( evin ); + struct evbuffer * in = tr_iobuf_input( iobuf ); const size_t inlen = EVBUFFER_LENGTH( in ); if( !inlen ) @@ -1656,7 +1658,7 @@ canRead( struct bufferevent * evin, } else if( msgs->state == AWAITING_BT_PIECE ) { - ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER; + ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER; } else switch( msgs->state ) { @@ -1820,13 +1822,12 @@ tr_peerMsgsPulse( tr_peermsgs * msgs ) } static void -gotError( struct bufferevent * evbuf UNUSED, - short what, - void * vmsgs ) +gotError( struct tr_iobuf * iobuf UNUSED, + short what, + void * vmsgs ) { if( what & EVBUFFER_TIMEOUT ) - dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what, - evbuf->timeout_read ); + dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what ); if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) ) dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror( errno ) ); diff --git a/libtransmission/session.c b/libtransmission/session.c index 7bcd9915c..ca5f78223 100644 --- a/libtransmission/session.c +++ b/libtransmission/session.c @@ -274,17 +274,15 @@ tr_sessionInitFull( const char * configDir, /* Initialize rate and file descripts controls */ - h->uploadLimit = uploadLimit; - h->useUploadLimit = useUploadLimit; - h->downloadLimit = downloadLimit; - h->useDownloadLimit = useDownloadLimit; - tr_fdInit( globalPeerLimit ); h->shared = tr_sharedInit( h, isPortForwardingEnabled, publicPort ); h->isPortSet = publicPort >= 0; - h->bandwidth[TR_UP] = tr_bandwidthNew( h ); - h->bandwidth[TR_DOWN] = tr_bandwidthNew( h ); + h->bandwidth = tr_bandwidthNew( h, NULL ); + tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_UP, uploadLimit ); + tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_DOWN, downloadLimit ); + tr_bandwidthSetLimited( h->bandwidth, TR_UP, useUploadLimit ); + tr_bandwidthSetLimited( h->bandwidth, TR_DOWN, useDownloadLimit ); /* first %s is the application name second %s is the version number */ @@ -443,42 +441,33 @@ tr_sessionGetPortForwarding( const tr_handle * h ) ***/ void -tr_sessionSetSpeedLimitEnabled( tr_handle * h, - tr_direction direction, - int use_flag ) +tr_sessionSetSpeedLimitEnabled( tr_session * session, + tr_direction dir, + int isLimited ) { - assert( h ); - assert( direction == TR_UP || direction == TR_DOWN ); - - if( direction == TR_UP ) - h->useUploadLimit = use_flag ? 1 : 0; - else - h->useDownloadLimit = use_flag ? 1 : 0; + tr_bandwidthSetLimited( session->bandwidth, dir, isLimited ); } int -tr_sessionIsSpeedLimitEnabled( const tr_handle * h, - tr_direction direction ) +tr_sessionIsSpeedLimitEnabled( const tr_session * session, + tr_direction dir ) { - return direction == TR_UP ? h->useUploadLimit : h->useDownloadLimit; + return !tr_bandwidthIsLimited( session->bandwidth, dir ); } void -tr_sessionSetSpeedLimit( tr_handle * h, - tr_direction direction, - int KiB_sec ) +tr_sessionSetSpeedLimit( tr_session * session, + tr_direction dir, + int desiredSpeed ) { - if( direction == TR_DOWN ) - h->downloadLimit = KiB_sec; - else - h->uploadLimit = KiB_sec; + tr_bandwidthSetDesiredSpeed( session->bandwidth, dir, desiredSpeed ); } int -tr_sessionGetSpeedLimit( const tr_handle * h, - tr_direction direction ) +tr_sessionGetSpeedLimit( const tr_session * session, + tr_direction dir ) { - return direction == TR_UP ? h->uploadLimit : h->downloadLimit; + return tr_bandwidthGetDesiredSpeed( session->bandwidth, dir ); } /*** @@ -505,17 +494,13 @@ tr_sessionGetPeerLimit( const tr_handle * handle UNUSED ) double tr_sessionGetPieceSpeed( const tr_session * session, tr_direction dir ) { - assert( dir==TR_UP || dir==TR_DOWN ); - - return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0; + return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0; } double tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir ) { - assert( dir==TR_UP || dir==TR_DOWN ); - - return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0; + return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0; } int @@ -629,8 +614,7 @@ tr_sessionClose( tr_handle * session ) } /* free the session memory */ - tr_bandwidthFree( session->bandwidth[TR_UP] ); - tr_bandwidthFree( session->bandwidth[TR_DOWN] ); + tr_bandwidthFree( session->bandwidth ); tr_lockFree( session->lock ); for( i = 0; i < session->metainfoLookupCount; ++i ) tr_free( session->metainfoLookup[i].filename ); diff --git a/libtransmission/session.h b/libtransmission/session.h index f7fde96ba..47ecff06b 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -62,8 +62,6 @@ struct tr_handle unsigned int isProxyEnabled : 1; unsigned int isProxyAuthEnabled : 1; unsigned int isClosed : 1; - unsigned int useUploadLimit : 1; - unsigned int useDownloadLimit : 1; unsigned int useLazyBitfield : 1; tr_encryption_mode encryptionMode; @@ -88,9 +86,6 @@ struct tr_handle char * proxyUsername; char * proxyPassword; - int uploadLimit; - int downloadLimit; - struct tr_list * blocklists; struct tr_peerMgr * peerMgr; struct tr_shared * shared; @@ -116,7 +111,7 @@ struct tr_handle int so_rcvbuf; /* monitors the "global pool" speeds */ - struct tr_bandwidth * bandwidth[2]; + struct tr_bandwidth * bandwidth; }; const char * tr_sessionFindTorrentFile( const tr_session * session, diff --git a/libtransmission/torrent.c b/libtransmission/torrent.c index fd40dbf70..97b6041c2 100644 --- a/libtransmission/torrent.c +++ b/libtransmission/torrent.c @@ -143,56 +143,42 @@ tr_torrentUnlock( const tr_torrent * tor ) void tr_torrentSetSpeedMode( tr_torrent * tor, - tr_direction direction, + tr_direction dir, tr_speedlimit mode ) { - tr_speedlimit * limit = direction == TR_UP ? &tor->uploadLimitMode - : &tor->downloadLimitMode; + assert( tor != NULL ); + assert( dir==TR_UP || dir==TR_DOWN ); + assert( mode==TR_SPEEDLIMIT_GLOBAL || mode==TR_SPEEDLIMIT_SINGLE || mode==TR_SPEEDLIMIT_UNLIMITED ); - *limit = mode; + tor->speedLimitMode[dir] = mode; + + tr_bandwidthSetLimited( tor->bandwidth, dir, mode==TR_SPEEDLIMIT_SINGLE ); + tr_bandwidthHonorParentLimits( tor->bandwidth, dir, mode!=TR_SPEEDLIMIT_UNLIMITED ); } tr_speedlimit tr_torrentGetSpeedMode( const tr_torrent * tor, - tr_direction direction ) + tr_direction dir ) { - return direction == TR_UP ? tor->uploadLimitMode - : tor->downloadLimitMode; + assert( tor != NULL ); + assert( dir==TR_UP || dir==TR_DOWN ); + + return tor->speedLimitMode[dir]; } void tr_torrentSetSpeedLimit( tr_torrent * tor, - tr_direction direction, - int single_KiB_sec ) + tr_direction dir, + int desiredSpeed ) { - switch( direction ) - { - case TR_UP: - tor->uploadLimit = single_KiB_sec; break; - - case TR_DOWN: - tor->downloadLimit = single_KiB_sec; break; - - default: - assert( 0 ); - } + tr_bandwidthSetDesiredSpeed( tor->bandwidth, dir, desiredSpeed ); } int tr_torrentGetSpeedLimit( const tr_torrent * tor, - tr_direction direction ) + tr_direction dir ) { - switch( direction ) - { - case TR_UP: - return tor->uploadLimit; - - case TR_DOWN: - return tor->downloadLimit; - - default: - assert( 0 ); - } + return tr_bandwidthGetDesiredSpeed( tor->bandwidth, dir ); } int @@ -497,8 +483,9 @@ torrentRealInit( tr_handle * h, randomizeTiers( info ); - tor->bandwidth[TR_UP] = tr_bandwidthNew( h ); - tor->bandwidth[TR_DOWN] = tr_bandwidthNew( h ); + tor->bandwidth = tr_bandwidthNew( h, h->bandwidth ); + +fprintf( stderr, "torrent [%s] bandwidth is %p\n", info->name, tor->bandwidth ); tor->blockSize = getBlockSize( info->pieceSize ); @@ -540,8 +527,6 @@ torrentRealInit( tr_handle * h, tr_torrentInitFilePieces( tor ); - tor->uploadLimit = 0; - tor->downloadLimit = 0; tor->swarmSpeed = tr_rcInit( ); tr_sha1( tor->obfuscatedHash, "req2", 4, @@ -813,10 +798,10 @@ tr_torrentStat( tr_torrent * tor ) &s->peersGettingFromUs, s->peersFrom ); - s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth[TR_UP] ); - s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth[TR_DOWN] ); - s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_UP] ); - s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_DOWN] ); + s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth, TR_UP ); + s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth, TR_DOWN ); + s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_UP ); + s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_DOWN ); usableSeeds += tor->info.webseedCount; @@ -1099,8 +1084,7 @@ freeTorrent( tr_torrent * tor ) assert( h->torrentCount >= 1 ); h->torrentCount--; - tr_bandwidthFree( tor->bandwidth[TR_DOWN] ); - tr_bandwidthFree( tor->bandwidth[TR_UP] ); + tr_bandwidthFree( tor->bandwidth ); tr_metainfoFree( inf ); tr_free( tor ); diff --git a/libtransmission/torrent.h b/libtransmission/torrent.h index da50ae8e5..1a48360f1 100644 --- a/libtransmission/torrent.h +++ b/libtransmission/torrent.h @@ -172,10 +172,7 @@ struct tr_torrent tr_session * session; tr_info info; - int uploadLimit; - tr_speedlimit uploadLimitMode; - int downloadLimit; - tr_speedlimit downloadLimitMode; + tr_speedlimit speedLimitMode[2]; struct tr_ratecontrol * swarmSpeed; @@ -234,7 +231,7 @@ struct tr_torrent int uniqueId; - struct tr_bandwidth * bandwidth[2]; + struct tr_bandwidth * bandwidth; }; #endif