(libT) yet another stab at getting bandwidth management under control. this version may suck less than previous attempts. It also breaks the mac build until someone adds iobuf.[ch] to xcode...

This commit is contained in:
Charles Kerr 2008-11-25 21:35:17 +00:00
parent a12715fc9a
commit f44248a7d7
16 changed files with 1119 additions and 545 deletions

View File

@ -24,6 +24,7 @@ libtransmission_a_SOURCES = \
ggets.c \
handshake.c \
inout.c \
iobuf.c \
json.c \
JSON_parser.c \
list.c \
@ -69,6 +70,7 @@ noinst_HEADERS = \
ggets.h \
handshake.h \
inout.h \
iobuf.h \
json.h \
JSON_parser.h \
list.h \

View File

@ -10,9 +10,15 @@
* $Id:$
*/
#include <assert.h>
#include <limits.h>
#include "event.h"
#include "transmission.h"
#include "bandwidth.h"
#include "iobuf.h"
#include "ptrarray.h"
#include "utils.h"
/***
@ -24,7 +30,8 @@ enum
HISTORY_MSEC = 2000,
INTERVAL_MSEC = HISTORY_MSEC,
GRANULARITY_MSEC = 250,
HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC )
HISTORY_SIZE = ( INTERVAL_MSEC / GRANULARITY_MSEC ),
MAGIC_NUMBER = 43143
};
struct bratecontrol
@ -75,33 +82,75 @@ bytesUsed( struct bratecontrol * r, size_t size )
*******
******/
struct tr_bandwidth
struct tr_band
{
unsigned int isLimited : 1;
unsigned int honorParentLimits : 1;
size_t bytesLeft;
double desiredSpeed;
struct bratecontrol raw;
struct bratecontrol piece;
};
struct tr_bandwidth
{
struct tr_band band[2];
struct tr_bandwidth * parent;
int magicNumber;
tr_session * session;
tr_ptrArray * children; /* struct tr_bandwidth */
tr_ptrArray * iobufs; /* struct tr_iobuf */
};
/***
****
***/
static int
comparePointers( const void * a, const void * b )
{
return a - b;
}
static int
isBandwidth( const tr_bandwidth * b )
{
return ( b != NULL ) && ( b->magicNumber == MAGIC_NUMBER );
}
static int
isDirection( const tr_direction dir )
{
return ( dir == TR_UP ) || ( dir == TR_DOWN );
}
/***
****
***/
tr_bandwidth*
tr_bandwidthNew( tr_session * session )
tr_bandwidthNew( tr_session * session, tr_bandwidth * parent )
{
tr_bandwidth * b = tr_new0( tr_bandwidth, 1 );
b->session = session;
b->children = tr_ptrArrayNew( );
b->iobufs = tr_ptrArrayNew( );
b->magicNumber = MAGIC_NUMBER;
b->band[TR_UP].honorParentLimits = 1;
b->band[TR_DOWN].honorParentLimits = 1;
tr_bandwidthSetParent( b, parent );
return b;
}
void
tr_bandwidthFree( tr_bandwidth * b )
{
assert( isBandwidth( b ) );
tr_bandwidthSetParent( b, NULL );
tr_ptrArrayFree( b->iobufs, NULL );
tr_ptrArrayFree( b->children, NULL );
b->magicNumber = 0xDEAD;
tr_free( b );
}
@ -110,61 +159,235 @@ tr_bandwidthFree( tr_bandwidth * b )
***/
void
tr_bandwidthSetLimited( tr_bandwidth * b,
size_t bytesLeft )
tr_bandwidthSetParent( tr_bandwidth * b,
tr_bandwidth * parent )
{
b->isLimited = 1;
b->bytesLeft = bytesLeft;
assert( isBandwidth( b ) );
assert( b != parent );
if( b->parent )
{
assert( isBandwidth( b->parent ) );
tr_ptrArrayRemoveSorted( b->parent->children, b, comparePointers );
b->parent= NULL;
}
if( parent )
{
assert( isBandwidth( parent ) );
assert( parent->parent != b );
tr_ptrArrayInsertSorted( parent->children, b, comparePointers );
b->parent = parent;
}
}
void
tr_bandwidthSetUnlimited( tr_bandwidth * b )
tr_bandwidthHonorParentLimits( tr_bandwidth * b,
tr_direction dir,
int honorParentLimits )
{
b->isLimited = 0;
}
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
size_t
tr_bandwidthClamp( const tr_bandwidth * b,
size_t byteCount )
{
/* const size_t n = byteCount; */
if( b && b->isLimited )
byteCount = MIN( byteCount, b->bytesLeft );
/* if( n != byteCount ) fprintf( stderr, "%p: %zu clamped to %zu\n", b, n, byteCount ); */
return byteCount;
b->band[dir].honorParentLimits = honorParentLimits != 0;
}
/***
****
***/
double
tr_bandwidthGetRawSpeed( const tr_bandwidth * b )
void
tr_bandwidthSetDesiredSpeed( tr_bandwidth * b,
tr_direction dir,
double desiredSpeed )
{
return getSpeed( &b->raw );
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
b->band[dir].desiredSpeed = desiredSpeed;
}
double
tr_bandwidthGetPieceSpeed( const tr_bandwidth * b UNUSED )
tr_bandwidthGetDesiredSpeed( const tr_bandwidth * b,
tr_direction dir )
{
return getSpeed( &b->piece );
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
return b->band[dir].desiredSpeed;
}
void
tr_bandwidthSetLimited( tr_bandwidth * b,
tr_direction dir,
int isLimited )
{
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
b->band[dir].isLimited = isLimited != 0;
}
int
tr_bandwidthIsLimited( const tr_bandwidth * b,
tr_direction dir )
{
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
return b->band[dir].isLimited != 0;
}
#if 0
#define DEBUG_DIRECTION TR_DOWN
#endif
void
tr_bandwidthAllocate( tr_bandwidth * b,
tr_direction dir,
int period_msec )
{
const double currentSpeed = tr_bandwidthGetPieceSpeed( b, dir ); /* KiB/s */
const double desiredSpeed = b->band[dir].desiredSpeed; /* KiB/s */
const double seconds_per_pulse = period_msec / 1000.0;
const double current_bytes_per_pulse = currentSpeed * 1024.0 * seconds_per_pulse;
const double desired_bytes_per_pulse = desiredSpeed * 1024.0 * seconds_per_pulse;
const double pulses_per_history = (double)HISTORY_MSEC / period_msec;
const double min = desired_bytes_per_pulse * 0.90;
const double max = desired_bytes_per_pulse * 1.50;
const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
- ( current_bytes_per_pulse * pulses_per_history );
double clamped;
/* clamp the return value to lessen oscillation */
clamped = next_pulse_bytes;
clamped = MAX( clamped, min );
clamped = MIN( clamped, max );
b->band[dir].bytesLeft = clamped;
#ifdef DEBUG_DIRECTION
if( dir == DEBUG_DIRECTION )
fprintf( stderr, "bandwidth %p currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n",
b, currentSpeed, desiredSpeed,
clamped/1024.0, next_pulse_bytes/1024.0 );
#endif
/* notify the io buffers that there's more bandwidth available */
if( !b->band[dir].isLimited || ( clamped > 0 ) ) {
int i, n=0;
short what = dir==TR_UP ? EV_WRITE : EV_READ;
struct tr_iobuf ** iobufs = (struct tr_iobuf**) tr_ptrArrayPeek( b->iobufs, &n );
#ifdef DEBUG_DIRECTION
if( dir == DEBUG_DIRECTION )
fprintf( stderr, "bandwidth %p has %d iobufs\n", b, n );
#endif
for( i=0; i<n; ++i )
tr_iobuf_enable( iobufs[i], what );
}
/* all children should reallocate too */
if( 1 ) {
int i, n=0;
struct tr_bandwidth ** children = (struct tr_bandwidth**) tr_ptrArrayPeek( b->children, &n );
for( i=0; i<n; ++i )
tr_bandwidthAllocate( children[i], dir, period_msec );
}
}
/***
****
***/
void
tr_bandwidthAddBuffer( tr_bandwidth * b,
struct tr_iobuf * iobuf )
{
assert( isBandwidth( b ) );
assert( iobuf );
tr_ptrArrayInsertSorted( b->iobufs, iobuf, comparePointers );
}
void
tr_bandwidthRemoveBuffer( tr_bandwidth * b,
struct tr_iobuf * iobuf )
{
assert( isBandwidth( b ) );
assert( iobuf );
tr_ptrArrayRemoveSorted( b->iobufs, iobuf, comparePointers );
}
/***
****
***/
size_t
tr_bandwidthClamp( const tr_bandwidth * b,
tr_direction dir,
size_t byteCount )
{
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
if( b )
{
if( b->band[dir].isLimited )
byteCount = MIN( byteCount, b->band[dir].bytesLeft );
if( b->parent && b->band[dir].honorParentLimits )
byteCount = tr_bandwidthClamp( b->parent, dir, byteCount );
}
return byteCount;
}
double
tr_bandwidthGetRawSpeed( const tr_bandwidth * b, tr_direction dir )
{
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
return getSpeed( &b->band[dir].raw );
}
double
tr_bandwidthGetPieceSpeed( const tr_bandwidth * b, tr_direction dir )
{
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
return getSpeed( &b->band[dir].piece );
}
void
tr_bandwidthUsed( tr_bandwidth * b,
tr_direction dir,
size_t byteCount,
int isPieceData )
{
if( b->isLimited && isPieceData )
{
b->bytesLeft -= MIN( b->bytesLeft, byteCount );
/* fprintf( stderr, "%p used %zu bytes ... %zu left\n", b, byteCount, b->bytesLeft ); */
}
struct tr_band * band;
bytesUsed( &b->raw, byteCount );
assert( isBandwidth( b ) );
assert( isDirection( dir ) );
band = &b->band[dir];
if( band->isLimited && isPieceData )
band->bytesLeft -= MIN( band->bytesLeft, byteCount );
#ifdef DEBUG_DIRECTION
if( ( dir == DEBUG_DIRECTION ) && band->isLimited && isPieceData )
fprintf( stderr, "%p consumed %zu bytes of piece data... %zu left\n", b, byteCount, band->bytesLeft );
#endif
bytesUsed( &band->raw, byteCount );
if( isPieceData )
bytesUsed( &b->piece, byteCount );
bytesUsed( &band->piece, byteCount );
if( b->parent != NULL )
tr_bandwidthUsed( b->parent, dir, byteCount, isPieceData );
}

View File

@ -17,38 +17,165 @@
#ifndef TR_BANDWIDTH_H
#define TR_BANDWIDTH_H
struct tr_iobuf;
/**
* Bandwidth is an object for measuring and constraining bandwidth speeds.
*
* Bandwidth objects can be "stacked" so that a peer can be made to obey
* multiple constraints (for example, obeying the global speed limit and a
* per-torrent speed limit).
*
* HIERARCHY
*
* Transmission's bandwidth hierarchy is a tree.
* At the top is the global bandwidth object owned by tr_session.
* Its children are per-torrent bandwidth objects owned by tr_torrent.
* Underneath those are per-peer bandwidth objects owned by tr_peer.
*
* tr_session also owns a tr_handshake's bandwidths, so that the handshake
* I/O can be counted in the global raw totals. When the handshake is done,
* the bandwidth's ownership passes to a tr_peer.
*
* MEASURING
*
* When you ask a bandwidth object for its speed, it gives the speed of the
* subtree underneath it as well. So you can get Transmission's overall
* speed by quering tr_session's bandwidth, per-torrent speeds by asking
* tr_torrent's bandwidth, and per-peer speeds by asking tr_peer's bandwidth.
*
* CONSTRAINING
*
* Call tr_bandwidthAllocate() periodically. tr_bandwidth knows its current
* speed and will decide how many bytes to make available over the
* user-specified period to reach the user-specified desired speed.
* If appropriate, it notifies its iobufs that new bandwidth is available.
*
* tr_bandwidthAllocate() operates on the tr_bandwidth subtree, so usually
* you'll only need to invoke it for the top-level tr_session bandwidth.
*
* The iobufs all have a pointer to their associated tr_bandwidth object,
* and call tr_bandwidthClamp() before performing I/O to see how much
* bandwidth they can safely use.
*/
typedef struct tr_bandwidth tr_bandwidth;
/**
***
**/
tr_bandwidth* tr_bandwidthNew ( tr_session * session );
/** @brief create a new tr_bandwidth object */
tr_bandwidth*
tr_bandwidthNew ( tr_session * session,
tr_bandwidth * parent );
void tr_bandwidthFree ( tr_bandwidth * bandwidth );
/** @brief destroy a tr_bandwidth object */
void tr_bandwidthFree ( tr_bandwidth * bandwidth );
/******
*******
******/
/**
***
**/
void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth,
size_t byteCount );
void tr_bandwidthSetUnlimited ( tr_bandwidth * bandwidth );
size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth,
size_t byteCount );
* @brief Set the desired speed (in KiB/s) for this bandwidth subtree.
* @see tr_bandwidthAllocate
* @see tr_bandwidthGetDesiredSpeed
*/
void tr_bandwidthSetDesiredSpeed ( tr_bandwidth * bandwidth,
tr_direction direction,
double desiredSpeed );
/**
***
**/
* @brief Get the desired speed (in KiB/s) for ths bandwidth subtree.
* @see tr_bandwidthSetDesiredSpeed
*/
double tr_bandwidthGetDesiredSpeed ( const tr_bandwidth * bandwidth,
tr_direction direction );
double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth );
/**
* @brief Set whether or not this bandwidth should throttle its iobufs' speeds
*/
void tr_bandwidthSetLimited ( tr_bandwidth * bandwidth,
tr_direction direction,
int isLimited );
double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth );
/**
* @return nonzero if this bandwidth throttles its iobufs' speeds
*/
int tr_bandwidthIsLimited ( const tr_bandwidth * bandwidth,
tr_direction direction );
void tr_bandwidthUsed ( tr_bandwidth * bandwidth,
size_t byteCount,
int isPieceData );
/**
* @brief allocate the next period_msec's worth of bandwidth for the iobufs to consume
*/
void tr_bandwidthAllocate ( tr_bandwidth * bandwidth,
tr_direction direction,
int period_msec );
/**
* @brief clamps byteCount down to a number that this bandwidth will allow to be consumed
*/
size_t tr_bandwidthClamp ( const tr_bandwidth * bandwidth,
tr_direction direction,
size_t byteCount );
/******
*******
******/
/**
* @brief Get the raw total of bytes read or sent by this bandwidth subtree.
*/
double tr_bandwidthGetRawSpeed ( const tr_bandwidth * bandwidth,
tr_direction direction );
/**
* @brief Get the number of piece data bytes read or sent by this bandwidth subtree.
*/
double tr_bandwidthGetPieceSpeed ( const tr_bandwidth * bandwidth,
tr_direction direction );
/**
* @brief Notify the bandwidth object that some of its allocated bandwidth has been consumed.
* This is is usually invoked by the iobuf after a read or write.
*/
void tr_bandwidthUsed ( tr_bandwidth * bandwidth,
tr_direction direction,
size_t byteCount,
int isPieceData );
/******
*******
******/
void tr_bandwidthSetParent ( tr_bandwidth * bandwidth,
tr_bandwidth * parent );
/**
* Almost all the time we do want to honor a parents' bandwidth cap, so that
* (for example) a peer is constrained by a per-torrent cap and the global cap.
* But when we set a torrent's speed mode to TR_SPEEDLIMIT_UNLIMITED, then
* in that particular case we want to ignore the global speed limit...
*/
void tr_bandwidthHonorParentLimits ( tr_bandwidth * bandwidth,
tr_direction direction,
int isEnabled );
/******
*******
******/
/**
* @brief add an iobuf to this bandwidth's list of iobufs.
* They will be notified when more bandwidth is made available for them to consume.
*/
void tr_bandwidthAddBuffer ( tr_bandwidth * bandwidth,
struct tr_iobuf * iobuf );
/**
* @brief remove an iobuf from this bandwidth's list of iobufs.
*/
void tr_bandwidthRemoveBuffer ( tr_bandwidth * bandwidth,
struct tr_iobuf * iobuf );
#endif

View File

@ -24,6 +24,7 @@
#include "clients.h"
#include "crypto.h"
#include "handshake.h"
#include "iobuf.h"
#include "peer-io.h"
#include "peer-mgr.h"
#include "torrent.h"
@ -1028,14 +1029,16 @@ readPayloadStream( tr_handshake * handshake,
***/
static ReadState
canRead( struct bufferevent * evin,
void * arg )
canRead( struct tr_iobuf * iobuf, void * arg, size_t * piece )
{
tr_handshake * handshake = (tr_handshake *) arg;
struct evbuffer * inbuf = EVBUFFER_INPUT ( evin );
tr_handshake * handshake = arg;
struct evbuffer * inbuf = tr_iobuf_input( iobuf );
ReadState ret;
int readyForMore = TRUE;
/* no piece data in handshake */
*piece = 0;
dbgmsg( handshake, "handling canRead; state is [%s]",
getStateName( handshake->state ) );
@ -1136,9 +1139,9 @@ tr_handshakeAbort( tr_handshake * handshake )
}
static void
gotError( struct bufferevent * evbuf UNUSED,
short what,
void * arg )
gotError( struct tr_iobuf * iobuf UNUSED,
short what,
void * arg )
{
tr_handshake * handshake = (tr_handshake *) arg;
@ -1179,9 +1182,6 @@ tr_handshakeNew( tr_peerIo * io,
{
tr_handshake * handshake;
tr_peerIoSetBandwidth( io, TR_UP, NULL );
tr_peerIoSetBandwidth( io, TR_DOWN, NULL );
handshake = tr_new0( tr_handshake, 1 );
handshake->io = io;
handshake->crypto = tr_peerIoGetCrypto( io );

378
libtransmission/iobuf.c Normal file
View File

@ -0,0 +1,378 @@
/*
* Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Transmission modifications and new bugs by Charles Kerr.
* Source: libevent's "patches-1.4" branch, svn revision 949
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> /* write */
#include "evutil.h"
#include "event.h"
#include "transmission.h"
#include "bandwidth.h"
#include "iobuf.h"
#include "session.h"
#include "utils.h"
#define MAGIC_NUMBER 235705
struct tr_iobuf
{
struct event_base * ev_base;
struct event ev_read;
struct event ev_write;
struct evbuffer * input;
struct evbuffer * output;
tr_iobuf_cb readcb;
tr_iobuf_cb writecb;
tr_iobuf_error_cb errorcb;
void * cbarg;
int magicNumber;
int timeout_read; /* in seconds */
int timeout_write; /* in seconds */
short enabled; /* events that are currently enabled */
struct tr_handle * session;
struct tr_bandwidth * bandwidth;
};
/***
****
***/
static int
isBuf( const struct tr_iobuf * iobuf )
{
return ( iobuf != NULL ) && ( iobuf->magicNumber == MAGIC_NUMBER );
}
static int
tr_evbuffer_write( struct evbuffer *buffer, int fd, size_t maxlen )
{
int n = MIN( EVBUFFER_LENGTH( buffer ), maxlen );
#ifdef WIN32
n = send(fd, buffer->buffer, n, 0 );
#else
n = write(fd, buffer->buffer, n );
#endif
if( n == -1 )
return -1;
if (n == 0)
return 0;
evbuffer_drain( buffer, n );
return n;
}
static int
tr_iobuf_add(struct event *ev, int timeout)
{
struct timeval tv, *ptv = NULL;
if (timeout) {
evutil_timerclear(&tv);
tv.tv_sec = timeout;
ptv = &tv;
}
return event_add( ev, ptv );
}
static void
tr_iobuf_readcb( int fd, short event, void * arg )
{
int res;
short what = EVBUFFER_READ;
struct tr_iobuf * b = arg;
const size_t howmuch = tr_bandwidthClamp( b->bandwidth, TR_DOWN, 2048 );
assert( isBuf( b ) );
if( event == EV_TIMEOUT ) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
/* if we don't have any bandwidth left, stop reading */
if( howmuch < 1 ) {
event_del( &b->ev_read );
return;
}
res = evbuffer_read( b->input, fd, howmuch );
if( res == -1 ) {
if( errno == EAGAIN || errno == EINTR )
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
} else if( res == 0 ) {
/* eof case */
what |= EVBUFFER_EOF;
}
if( res <= 0 )
goto error;
tr_iobuf_add( &b->ev_read, b->timeout_read );
/* Invoke the user callback - must always be called last */
if( b->readcb != NULL )
( *b->readcb )( b, res, b->cbarg );
return;
reschedule:
tr_iobuf_add( &b->ev_read, b->timeout_read );
return;
error:
(*b->errorcb)( b, what, b->cbarg );
}
static void
tr_iobuf_writecb( int fd, short event, void * arg )
{
int res = 0;
short what = EVBUFFER_WRITE;
struct tr_iobuf * b = arg;
size_t howmuch;
assert( isBuf( b ) );
if( event == EV_TIMEOUT ) {
what |= EVBUFFER_TIMEOUT;
goto error;
}
howmuch = MIN( (size_t)b->session->so_sndbuf, EVBUFFER_LENGTH( b->output ) );
howmuch = tr_bandwidthClamp( b->bandwidth, TR_UP, howmuch );
/* if we don't have any bandwidth left, stop writing */
if( howmuch < 1 ) {
event_del( &b->ev_write );
return;
}
res = tr_evbuffer_write( b->output, fd, howmuch );
if (res == -1) {
#ifndef WIN32
/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
*set errno. thus this error checking is not portable*/
if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS)
goto reschedule;
/* error case */
what |= EVBUFFER_ERROR;
#else
goto reschedule;
#endif
} else if (res == 0) {
/* eof case */
what |= EVBUFFER_EOF;
}
if (res <= 0)
goto error;
if( EVBUFFER_LENGTH( b->output ) )
tr_iobuf_add( &b->ev_write, b->timeout_write );
if( b->writecb != NULL )
(*b->writecb)( b, res, b->cbarg );
return;
reschedule:
if( EVBUFFER_LENGTH( b->output ) )
tr_iobuf_add( &b->ev_write, b->timeout_write );
return;
error:
(*b->errorcb)( b, what, b->cbarg );
}
/*
* Create a new buffered event object.
*
* The read callback is invoked whenever we read new data.
* The write callback is invoked whenever the output buffer is drained.
* The error callback is invoked on a write/read error or on EOF.
*
* Both read and write callbacks maybe NULL. The error callback is not
* allowed to be NULL and have to be provided always.
*/
struct tr_iobuf *
tr_iobuf_new( struct tr_handle * session,
tr_bandwidth * bandwidth,
int fd,
short event,
tr_iobuf_cb readcb,
tr_iobuf_cb writecb,
tr_iobuf_error_cb errorcb,
void * cbarg )
{
struct tr_iobuf * b;
b = tr_new0( struct tr_iobuf, 1 );
b->magicNumber = MAGIC_NUMBER;
b->session = session;
b->bandwidth = bandwidth;
b->input = evbuffer_new( );
b->output = evbuffer_new( );
event_set( &b->ev_read, fd, EV_READ, tr_iobuf_readcb, b );
event_set( &b->ev_write, fd, EV_WRITE, tr_iobuf_writecb, b );
tr_iobuf_setcb( b, readcb, writecb, errorcb, cbarg );
tr_iobuf_enable( b, event );
return b;
}
void
tr_iobuf_setcb( struct tr_iobuf * b,
tr_iobuf_cb readcb,
tr_iobuf_cb writecb,
tr_iobuf_error_cb errorcb,
void * cbarg )
{
assert( isBuf( b ) );
b->readcb = readcb;
b->writecb = writecb;
b->errorcb = errorcb;
b->cbarg = cbarg;
}
/* Closing the file descriptor is the responsibility of the caller */
void
tr_iobuf_free( struct tr_iobuf * b )
{
assert( isBuf( b ) );
b->magicNumber = 0xDEAD;
event_del( &b->ev_read );
event_del( &b->ev_write );
evbuffer_free( b->input );
evbuffer_free( b->output );
tr_free( b );
}
int
tr_iobuf_enable(struct tr_iobuf * b, short event )
{
assert( isBuf( b ) );
if( event & EV_READ )
if( tr_iobuf_add( &b->ev_read, b->timeout_read ) == -1 )
return -1;
if( event & EV_WRITE )
if ( tr_iobuf_add( &b->ev_write, b->timeout_write ) == -1 )
return -1;
b->enabled |= event;
return 0;
}
int
tr_iobuf_disable( struct tr_iobuf * b, short event )
{
assert( isBuf( b ) );
if( event & EV_READ )
if( event_del( &b->ev_read ) == -1 )
return -1;
if( event & EV_WRITE )
if( event_del( &b->ev_write ) == -1 )
return -1;
b->enabled &= ~event;
return 0;
}
void
tr_iobuf_settimeout( struct tr_iobuf * b,
int timeout_read,
int timeout_write )
{
assert( isBuf( b ) );
b->timeout_read = timeout_read;
if( event_pending( &b->ev_read, EV_READ, NULL ) )
tr_iobuf_add( &b->ev_read, timeout_read );
b->timeout_write = timeout_write;
if( event_pending( &b->ev_write, EV_WRITE, NULL ) )
tr_iobuf_add( &b->ev_write, timeout_write );
}
void
tr_iobuf_set_bandwidth( struct tr_iobuf * b,
struct tr_bandwidth * bandwidth )
{
assert( isBuf( b ) );
b->bandwidth = bandwidth;
}
struct evbuffer*
tr_iobuf_input( struct tr_iobuf * b )
{
assert( isBuf( b ) );
return b->input;
}
struct evbuffer*
tr_iobuf_output( struct tr_iobuf * b )
{
assert( isBuf( b ) );
return b->output;
}

107
libtransmission/iobuf.h Normal file
View File

@ -0,0 +1,107 @@
/*
* Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Transmission modifications and new bugs by Charles Kerr.
* Source: libevent's "patches-1.4" branch, svn revision 949
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __TRANSMISSION__
#error only libtransmission should #include this header.
#endif
#ifndef TR_EVBUFFER_H
#define TR_EVBUFFER_H
/**
* tr_iobuf is an i/o convenience wrapper similar to libevent's "bufferevent".
* it provides two evbuffers, one for writing and one for reading, and handles
* pumping them to/from the given fd.
*
* this class differs from bufferevent in two major ways:
* 1. the action callbacks include the number of bytes transferred
* 2. the up/down speeds are directly constrained by our `bandwidth' object
* 3. the implementation is hidden in the .c file
*/
struct tr_iobuf;
struct evbuffer;
struct tr_bandwidth;
struct tr_session;
/** @brief returns the input evbuffer to that we can read from */
struct evbuffer* tr_iobuf_input( struct tr_iobuf * iobuf );
/** @brief returns the output evbuffer that we can write to */
struct evbuffer* tr_iobuf_output( struct tr_iobuf * iobuf );
/** @brief prototype for the callbacks invoked when bytes have been read or written
@see tr_iobuf_new
@see tr_iobuf_setcb */
typedef void (*tr_iobuf_cb)( struct tr_iobuf*, size_t bytes_transferred, void* );
/** @brief prototype for the callback invoked on error
@see tr_iobuf_new
@see tr_iobuf_setcb */
typedef void (*tr_iobuf_error_cb)( struct tr_iobuf*, short what, void* );
/** @brief create a new tr_iobuf object. */
struct tr_iobuf* tr_iobuf_new( struct tr_handle * session,
struct tr_bandwidth * bandwidth,
int fd,
short event,
tr_iobuf_cb readcb,
tr_iobuf_cb writecb,
tr_iobuf_error_cb errorcb,
void * cbarg );
/** @brief destroy a tr_iobuf object. */
void tr_iobuf_free( struct tr_iobuf * iobuf );
/** @brief change the number of seconds it takes for a read/write to timeout */
void tr_iobuf_settimeout( struct tr_iobuf * iobuf,
int timeout_read,
int timeout_write );
/** @brief set the bandwidth object that limits this iobuf's read/write speeds */
void tr_iobuf_set_bandwidth( struct tr_iobuf * iobuf,
struct tr_bandwidth * bandwidth );
/** @brief change the callbacks invoked by this iobuf on error and bytes transferred */
void tr_iobuf_setcb( struct tr_iobuf * iobuf,
tr_iobuf_cb readcb,
tr_iobuf_cb writecb,
tr_iobuf_error_cb errorcb,
void * cbarg );
/** @brief tell the iobuf to poll for certain states.
@brief event may be EV_READ, EV_WRITE, or EV_READ|EV_WRITE */
int tr_iobuf_enable( struct tr_iobuf * iobuf, short event );
/** @brief tell the iobuf to stop polling for certain states.
@brief event may be EV_READ, EV_WRITE, or EV_READ|EV_WRITE */
int tr_iobuf_disable( struct tr_iobuf * iobuf, short event );
#endif

View File

@ -125,7 +125,6 @@ createSocket( int type )
static void
setSndBuf( tr_session * session, int fd )
{
#if 0
if( fd >= 0 )
{
const int sndbuf = session->so_sndbuf;
@ -133,7 +132,6 @@ setSndBuf( tr_session * session, int fd )
setsockopt( fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof( sndbuf ) );
setsockopt( fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof( rcvbuf ) );
}
#endif
}
int

View File

@ -28,12 +28,14 @@
#include "transmission.h"
#include "bandwidth.h"
#include "crypto.h"
#include "iobuf.h"
#include "list.h"
#include "net.h"
#include "peer-io.h"
#include "trevent.h"
#include "utils.h"
#define MAGIC_NUMBER 206745
#define IO_TIMEOUT_SECS 8
static size_t
@ -78,131 +80,75 @@ struct tr_datatype
struct tr_peerIo
{
unsigned int isEncrypted : 1;
unsigned int isIncoming : 1;
unsigned int peerIdIsSet : 1;
unsigned int extendedProtocolSupported : 1;
unsigned int fastPeersSupported : 1;
unsigned int isEncrypted : 1;
unsigned int isIncoming : 1;
unsigned int peerIdIsSet : 1;
unsigned int extendedProtocolSupported : 1;
unsigned int fastPeersSupported : 1;
uint8_t encryptionMode;
uint8_t timeout;
uint16_t port;
int socket;
int magicNumber;
uint8_t peerId[20];
time_t timeCreated;
uint8_t encryptionMode;
uint8_t timeout;
uint16_t port;
int socket;
tr_session * session;
uint8_t peerId[20];
time_t timeCreated;
struct in_addr in_addr;
struct bufferevent * bufev;
struct evbuffer * output;
tr_list * output_datatypes; /* struct tr_datatype */
tr_session * session;
tr_can_read_cb canRead;
tr_did_write_cb didWrite;
tr_net_error_cb gotError;
void * userData;
struct in_addr in_addr;
struct tr_iobuf * iobuf;
tr_list * output_datatypes; /* struct tr_datatype */
size_t bufferSize[2];
tr_can_read_cb canRead;
tr_did_write_cb didWrite;
tr_net_error_cb gotError;
void * userData;
tr_bandwidth * bandwidth[2];
tr_crypto * crypto;
size_t bufferSize[2];
tr_bandwidth * bandwidth;
tr_crypto * crypto;
};
/**
***
**/
static void
adjustOutputBuffer( tr_peerIo * io )
{
struct evbuffer * live = EVBUFFER_OUTPUT( io->bufev );
size_t curLive = EVBUFFER_LENGTH( live );
size_t maxLive = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_sndbuf );
if( ( curLive < maxLive ) && EVBUFFER_LENGTH( io->output ) )
{
size_t freeSpace = maxLive - curLive;
size_t n = MIN( freeSpace, EVBUFFER_LENGTH( io->output ) );
bufferevent_write( io->bufev, EVBUFFER_DATA( io->output ), n );
evbuffer_drain( io->output, n );
curLive += n;
}
io->bufferSize[TR_UP] = curLive;
if( curLive )
bufferevent_enable( io->bufev, EV_WRITE );
dbgmsg( io, "after adjusting the output buffer, its size is now %zu", curLive );
}
static void
adjustInputBuffer( tr_peerIo * io )
{
/* FIXME: the max read size probably needs to vary depending on the
* number of peers that we have connected... 1024 is going to force
* us way over the limit when there are lots of peers */
static const int maxBufSize = 1024;
const size_t n = tr_bandwidthClamp( io->bandwidth[TR_DOWN], maxBufSize );
if( !n )
{
dbgmsg( io, "disabling reads because we've hit our limit" );
bufferevent_disable( io->bufev, EV_READ );
}
else
{
dbgmsg( io, "enabling reading of %zu more bytes", n );
bufferevent_setwatermark( io->bufev, EV_READ, 0, n );
bufferevent_enable( io->bufev, EV_READ );
}
io->bufferSize[TR_DOWN] = EVBUFFER_LENGTH( EVBUFFER_INPUT( io->bufev ) );
}
/***
****
***/
static void
didWriteWrapper( struct bufferevent * e,
void * vio )
didWriteWrapper( struct tr_iobuf * iobuf,
size_t bytes_transferred,
void * vio )
{
tr_peerIo * io = vio;
const size_t len = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( e ) );
dbgmsg( io, "didWrite... io->outputBufferSize was %zu, is now %zu",
io->bufferSize[TR_UP], len );
if( len < io->bufferSize[TR_UP] )
while( bytes_transferred )
{
size_t payload = io->bufferSize[TR_UP] - len;
struct tr_datatype * next = io->output_datatypes->data;
const size_t chunk_length = MIN( next->length, bytes_transferred );
const size_t n = addPacketOverhead( chunk_length );
while( payload )
{
struct tr_datatype * next = io->output_datatypes->data;
const size_t chunk_length = MIN( next->length, payload );
const size_t n = addPacketOverhead( chunk_length );
tr_bandwidthUsed( io->bandwidth, TR_UP, n, next->isPieceData );
if( io->didWrite )
io->didWrite( io, n, next->isPieceData, io->userData );
if( io->didWrite )
io->didWrite( io, n, next->isPieceData, io->userData );
payload -= chunk_length;
next->length -= chunk_length;
if( !next->length )
tr_free( tr_list_pop_front( &io->output_datatypes ) );
}
bytes_transferred -= chunk_length;
next->length -= chunk_length;
if( !next->length )
tr_free( tr_list_pop_front( &io->output_datatypes ) );
}
adjustOutputBuffer( io );
if( EVBUFFER_LENGTH( tr_iobuf_output( iobuf ) ) )
tr_iobuf_enable( io->iobuf, EV_WRITE );
}
static void
canReadWrapper( struct bufferevent * e,
void * vio )
canReadWrapper( struct tr_iobuf * iobuf,
size_t bytes_transferred UNUSED,
void * vio )
{
int done = 0;
int err = 0;
@ -218,12 +164,23 @@ canReadWrapper( struct bufferevent * e,
while( !done && !err )
{
const int ret = io->canRead( e, io->userData );
size_t piece = 0;
const size_t oldLen = EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
const int ret = io->canRead( iobuf, io->userData, &piece );
if( ret != err )
{
const size_t used = oldLen - EVBUFFER_LENGTH( tr_iobuf_input( iobuf ) );
if( piece )
tr_bandwidthUsed( io->bandwidth, TR_DOWN, piece, TRUE );
if( used != piece )
tr_bandwidthUsed( io->bandwidth, TR_DOWN, used - piece, FALSE );
}
switch( ret )
{
case READ_NOW:
if( EVBUFFER_LENGTH( e->input ) )
if( EVBUFFER_LENGTH( tr_iobuf_input( iobuf )))
continue;
done = 1;
break;
@ -240,20 +197,17 @@ canReadWrapper( struct bufferevent * e,
tr_globalUnlock( session );
}
if( !err )
adjustInputBuffer( io );
}
static void
gotErrorWrapper( struct bufferevent * e,
short what,
void * userData )
gotErrorWrapper( struct tr_iobuf * iobuf,
short what,
void * userData )
{
tr_peerIo * c = userData;
if( c->gotError )
c->gotError( e, what, c->userData );
c->gotError( iobuf, what, c->userData );
}
/**
@ -263,19 +217,22 @@ gotErrorWrapper( struct bufferevent * e,
static void
bufevNew( tr_peerIo * io )
{
io->bufev = bufferevent_new( io->socket,
canReadWrapper,
didWriteWrapper,
gotErrorWrapper,
io );
io->iobuf = tr_iobuf_new( io->session,
io->bandwidth,
io->socket,
EV_READ | EV_WRITE,
canReadWrapper,
didWriteWrapper,
gotErrorWrapper,
io );
/* tell libevent to call didWriteWrapper after every write,
* not just when the write buffer is empty */
bufferevent_setwatermark( io->bufev, EV_WRITE, INT_MAX, 0 );
tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
}
bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
bufferevent_enable( io->bufev, EV_READ | EV_WRITE );
static int
isPeerIo( const tr_peerIo * io )
{
return ( io != NULL ) && ( io->magicNumber == MAGIC_NUMBER );
}
static tr_peerIo*
@ -292,6 +249,7 @@ tr_peerIoNew( tr_session * session,
tr_netSetTOS( socket, session->peerSocketTOS );
io = tr_new0( tr_peerIo, 1 );
io->magicNumber = MAGIC_NUMBER;
io->crypto = tr_cryptoNew( torrentHash, isIncoming );
io->session = session;
io->in_addr = *in_addr;
@ -300,8 +258,8 @@ tr_peerIoNew( tr_session * session,
io->isIncoming = isIncoming != 0;
io->timeout = IO_TIMEOUT_SECS;
io->timeCreated = time( NULL );
io->output = evbuffer_new( );
bufevNew( io );
tr_peerIoSetBandwidth( io, session->bandwidth );
return io;
}
@ -345,11 +303,13 @@ io_dtor( void * vio )
{
tr_peerIo * io = vio;
evbuffer_free( io->output );
bufferevent_free( io->bufev );
tr_peerIoSetBandwidth( io, NULL );
tr_iobuf_free( io->iobuf );
tr_netClose( io->socket );
tr_cryptoFree( io->crypto );
tr_list_free( &io->output_datatypes, tr_free );
io->magicNumber = 0xDEAD;
tr_free( io );
}
@ -368,7 +328,7 @@ tr_peerIoFree( tr_peerIo * io )
tr_session*
tr_peerIoGetSession( tr_peerIo * io )
{
assert( io );
assert( isPeerIo( io ) );
assert( io->session );
return io->session;
@ -378,7 +338,7 @@ const struct in_addr*
tr_peerIoGetAddress( const tr_peerIo * io,
uint16_t * port )
{
assert( io );
assert( isPeerIo( io ) );
if( port )
*port = io->port;
@ -406,8 +366,8 @@ tr_peerIoGetAddrStr( const tr_peerIo * io )
static void
tr_peerIoTryRead( tr_peerIo * io )
{
if( EVBUFFER_LENGTH( io->bufev->input ) )
canReadWrapper( io->bufev, io );
if( EVBUFFER_LENGTH( tr_iobuf_input( io->iobuf )))
(*canReadWrapper)( io->iobuf, ~0, io );
}
void
@ -443,10 +403,14 @@ tr_peerIoReconnect( tr_peerIo * io )
if( io->socket >= 0 )
{
tr_netSetTOS( io->socket, io->session->peerSocketTOS );
tr_bandwidth * bandwidth = io->bandwidth;
tr_peerIoSetBandwidth( io, NULL );
bufferevent_free( io->bufev );
tr_netSetTOS( io->socket, io->session->peerSocketTOS );
tr_iobuf_free( io->iobuf );
bufevNew( io );
tr_peerIoSetBandwidth( io, bandwidth );
return 0;
}
@ -458,8 +422,8 @@ tr_peerIoSetTimeoutSecs( tr_peerIo * io,
int secs )
{
io->timeout = secs;
bufferevent_settimeout( io->bufev, io->timeout, io->timeout );
bufferevent_enable( io->bufev, EV_READ | EV_WRITE );
tr_iobuf_settimeout( io->iobuf, io->timeout, io->timeout );
tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
}
/**
@ -470,7 +434,7 @@ void
tr_peerIoSetTorrentHash( tr_peerIo * io,
const uint8_t * hash )
{
assert( io );
assert( isPeerIo( io ) );
tr_cryptoSetTorrentHash( io->crypto, hash );
}
@ -478,7 +442,7 @@ tr_peerIoSetTorrentHash( tr_peerIo * io,
const uint8_t*
tr_peerIoGetTorrentHash( tr_peerIo * io )
{
assert( io );
assert( isPeerIo( io ) );
assert( io->crypto );
return tr_cryptoGetTorrentHash( io->crypto );
@ -487,7 +451,7 @@ tr_peerIoGetTorrentHash( tr_peerIo * io )
int
tr_peerIoHasTorrentHash( const tr_peerIo * io )
{
assert( io );
assert( isPeerIo( io ) );
assert( io->crypto );
return tr_cryptoHasTorrentHash( io->crypto );
@ -501,7 +465,7 @@ void
tr_peerIoSetPeersId( tr_peerIo * io,
const uint8_t * peer_id )
{
assert( io );
assert( isPeerIo( io ) );
if( ( io->peerIdIsSet = peer_id != NULL ) )
memcpy( io->peerId, peer_id, 20 );
@ -512,7 +476,7 @@ tr_peerIoSetPeersId( tr_peerIo * io,
const uint8_t*
tr_peerIoGetPeersId( const tr_peerIo * io )
{
assert( io );
assert( isPeerIo( io ) );
assert( io->peerIdIsSet );
return io->peerId;
@ -526,7 +490,7 @@ void
tr_peerIoEnableLTEP( tr_peerIo * io,
int flag )
{
assert( io );
assert( isPeerIo( io ) );
assert( flag == 0 || flag == 1 );
io->extendedProtocolSupported = flag;
@ -536,7 +500,7 @@ void
tr_peerIoEnableFEXT( tr_peerIo * io,
int flag )
{
assert( io );
assert( isPeerIo( io ) );
assert( flag == 0 || flag == 1 );
io->fastPeersSupported = flag;
@ -545,7 +509,7 @@ tr_peerIoEnableFEXT( tr_peerIo * io,
int
tr_peerIoSupportsLTEP( const tr_peerIo * io )
{
assert( io );
assert( isPeerIo( io ) );
return io->extendedProtocolSupported;
}
@ -553,7 +517,7 @@ tr_peerIoSupportsLTEP( const tr_peerIo * io )
int
tr_peerIoSupportsFEXT( const tr_peerIo * io )
{
assert( io );
assert( isPeerIo( io ) );
return io->fastPeersSupported;
}
@ -565,12 +529,8 @@ tr_peerIoSupportsFEXT( const tr_peerIo * io )
size_t
tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
{
const size_t desiredLiveLen = tr_bandwidthClamp( io->bandwidth[TR_UP], io->session->so_rcvbuf );
const size_t currentLiveLen = EVBUFFER_LENGTH( EVBUFFER_OUTPUT( io->bufev ) );
const size_t desiredQueueLen = io->session->so_sndbuf;
const size_t currentQueueLen = EVBUFFER_LENGTH( io->output );
const size_t desiredLen = desiredLiveLen + desiredQueueLen;
const size_t currentLen = currentLiveLen + currentQueueLen;
const size_t desiredLen = io->session->so_sndbuf * 2; /* FIXME: bigger? */
const size_t currentLen = EVBUFFER_LENGTH( tr_iobuf_output( io->iobuf ) );
size_t freeSpace = 0;
if( desiredLen > currentLen )
@ -581,18 +541,20 @@ tr_peerIoGetWriteBufferSpace( const tr_peerIo * io )
void
tr_peerIoSetBandwidth( tr_peerIo * io,
tr_direction direction,
tr_bandwidth * bandwidth )
{
assert( io );
assert( direction == TR_UP || direction == TR_DOWN );
assert( isPeerIo( io ) );
io->bandwidth[direction] = bandwidth;
if( io->bandwidth )
tr_bandwidthRemoveBuffer( io->bandwidth, io->iobuf );
if( direction == TR_UP )
adjustOutputBuffer( io );
else
adjustInputBuffer( io );
io->bandwidth = bandwidth;
tr_iobuf_set_bandwidth( io->iobuf, bandwidth );
if( io->bandwidth )
tr_bandwidthAddBuffer( io->bandwidth, io->iobuf );
tr_iobuf_enable( io->iobuf, EV_READ | EV_WRITE );
}
/**
@ -609,7 +571,7 @@ void
tr_peerIoSetEncryption( tr_peerIo * io,
int encryptionMode )
{
assert( io );
assert( isPeerIo( io ) );
assert( encryptionMode == PEER_ENCRYPTION_NONE
|| encryptionMode == PEER_ENCRYPTION_RC4 );
@ -636,14 +598,13 @@ tr_peerIoWrite( tr_peerIo * io,
assert( tr_amInEventThread( io->session ) );
dbgmsg( io, "adding %zu bytes into io->output", writemeLen );
evbuffer_add( io->output, writeme, writemeLen );
datatype = tr_new( struct tr_datatype, 1 );
datatype->isPieceData = isPieceData != 0;
datatype->length = writemeLen;
tr_list_append( &io->output_datatypes, datatype );
adjustOutputBuffer( io );
evbuffer_add( tr_iobuf_output( io->iobuf ), writeme, writemeLen );
tr_iobuf_enable( io->iobuf, EV_WRITE );
}
void

View File

@ -23,9 +23,9 @@
struct in_addr;
struct evbuffer;
struct bufferevent;
struct tr_bandwidth;
struct tr_crypto;
struct tr_iobuf;
typedef struct tr_peerIo tr_peerIo;
/**
@ -110,17 +110,18 @@ typedef enum
}
ReadState;
typedef ReadState ( *tr_can_read_cb )( struct bufferevent * ev,
void * user_data );
typedef ReadState ( *tr_can_read_cb )( struct tr_iobuf * iobuf,
void * user_data,
size_t * setme_piece_byte_count );
typedef void ( *tr_did_write_cb )( tr_peerIo * io,
size_t bytesWritten,
int wasPieceData,
void * userData );
typedef void ( *tr_did_write_cb )( tr_peerIo * io,
size_t bytesWritten,
int wasPieceData,
void * userData );
typedef void ( *tr_net_error_cb )( struct bufferevent * ev,
short what,
void * userData );
typedef void ( *tr_net_error_cb )( struct tr_iobuf * ev,
short what,
void * userData );
void tr_peerIoSetIOFuncs ( tr_peerIo * io,
tr_can_read_cb readcb,
@ -205,8 +206,13 @@ void tr_peerIoDrain( tr_peerIo * io,
size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io );
void tr_peerIoSetBandwidth( tr_peerIo * io,
tr_direction direction,
struct tr_bandwidth * bandwidth );
void tr_peerIoBandwidthUsed( tr_peerIo * io,
tr_direction direction,
size_t byteCount,
int isPieceData );
#endif

View File

@ -27,10 +27,10 @@
#include "publish.h" /* tr_publisher_tag */
struct tr_bandwidth;
struct tr_bitfield;
struct tr_peerIo;
struct tr_peermsgs;
struct tr_ratecontrol;
enum
{
@ -71,12 +71,7 @@ typedef struct tr_peer
struct tr_peermsgs * msgs;
tr_publisher_tag msgsTag;
/* the rate at which pieces are being transferred between client and peer.
* protocol overhead is NOT included; this is only the piece data */
struct tr_ratecontrol * pieceSpeed[2];
/* the rate at which all data is being transferred between client and peer. */
struct tr_ratecontrol * rawSpeed[2];
struct tr_bandwidth * bandwidth;
}
tr_peer;

View File

@ -33,7 +33,6 @@
#include "peer-mgr-private.h"
#include "peer-msgs.h"
#include "ptrarray.h"
#include "ratecontrol.h"
#include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
#include "torrent.h"
#include "trevent.h"
@ -58,7 +57,7 @@ enum
RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
/* how frequently to reallocate bandwidth */
BANDWIDTH_PERIOD_MSEC = 333,
BANDWIDTH_PERIOD_MSEC = 200,
/* max # of peers to ask fer per torrent per reconnect pulse */
MAX_RECONNECTIONS_PER_PULSE = 4,
@ -323,16 +322,13 @@ peerIsInUse( const Torrent * ct,
}
static tr_peer*
peerConstructor( const struct in_addr * in_addr )
peerConstructor( tr_torrent * tor, const struct in_addr * in_addr )
{
tr_peer * p;
p = tr_new0( tr_peer, 1 );
memcpy( &p->in_addr, in_addr, sizeof( struct in_addr ) );
p->pieceSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
p->pieceSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
p->rawSpeed[TR_CLIENT_TO_PEER] = tr_rcInit( );
p->rawSpeed[TR_PEER_TO_CLIENT] = tr_rcInit( );
p->bandwidth = tr_bandwidthNew( tor->session, tor->bandwidth );
return p;
}
@ -348,7 +344,7 @@ getPeer( Torrent * torrent,
if( peer == NULL )
{
peer = peerConstructor( in_addr );
peer = peerConstructor( torrent->tor, in_addr );
tr_ptrArrayInsertSorted( torrent->peers, peer, peerCompare );
}
@ -370,10 +366,8 @@ peerDestructor( tr_peer * peer )
tr_bitfieldFree( peer->blame );
tr_free( peer->client );
tr_rcClose( peer->rawSpeed[TR_CLIENT_TO_PEER] );
tr_rcClose( peer->rawSpeed[TR_PEER_TO_CLIENT] );
tr_rcClose( peer->pieceSpeed[TR_CLIENT_TO_PEER] );
tr_rcClose( peer->pieceSpeed[TR_PEER_TO_CLIENT] );
tr_bandwidthFree( peer->bandwidth );
tr_free( peer );
}
@ -1017,26 +1011,12 @@ peerCallbackFunc( void * vpeer,
{
const time_t now = time( NULL );
tr_torrent * tor = t->tor;
const tr_direction dir = TR_CLIENT_TO_PEER;
tor->activityDate = now;
if( e->wasPieceData )
tor->uploadedCur += e->length;
/* add it to the raw upload speed */
if( peer )
tr_rcTransferred ( peer->rawSpeed[dir], e->length );
/* maybe add it to the piece upload speed */
if( e->wasPieceData ) {
if( peer )
tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
}
tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
/* update the stats */
if( e->wasPieceData )
tr_statsAddUploaded( tor->session, e->length );
@ -1054,7 +1034,6 @@ peerCallbackFunc( void * vpeer,
{
const time_t now = time( NULL );
tr_torrent * tor = t->tor;
const tr_direction dir = TR_PEER_TO_CLIENT;
tor->activityDate = now;
@ -1067,19 +1046,6 @@ peerCallbackFunc( void * vpeer,
if( peer && e->wasPieceData )
tor->downloadedCur += e->length;
/* add it to our raw download speed */
if( peer )
tr_rcTransferred ( peer->rawSpeed[dir], e->length );
/* maybe add it to the piece upload speed */
if( e->wasPieceData ) {
if( peer )
tr_rcTransferred ( peer->pieceSpeed[dir], e->length );
}
tr_bandwidthUsed( tor->bandwidth[dir], e->length, e->wasPieceData );
tr_bandwidthUsed( tor->session->bandwidth[dir], e->length, e->wasPieceData );
/* update the stats */
if( e->wasPieceData )
tr_statsAddDownloaded( tor->session, e->length );
@ -1312,17 +1278,16 @@ myHandshakeDoneCB( tr_handshake * handshake,
if( !peer_id )
peer->client = NULL;
else
{
else {
char client[128];
tr_clientForId( client, sizeof( client ), peer_id );
peer->client = tr_strdup( client );
}
peer->port = port;
peer->io = io;
peer->msgs =
tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t,
&peer->msgsTag );
peer->msgs = tr_peerMsgsNew( t->tor, peer, peerCallbackFunc, t, &peer->msgsTag );
tr_peerIoSetBandwidth( io, peer->bandwidth );
success = TRUE;
}
@ -1804,7 +1769,7 @@ tr_peerGetPieceSpeed( const tr_peer * peer,
assert( peer );
assert( direction==TR_CLIENT_TO_PEER || direction==TR_PEER_TO_CLIENT );
return tr_rcRate( peer->pieceSpeed[direction] );
return tr_bandwidthGetPieceSpeed( peer->bandwidth, direction );
}
@ -2362,10 +2327,10 @@ pumpAllPeers( tr_peerMgr * mgr )
const int torrentCount = tr_ptrArraySize( mgr->torrents );
int i, j;
for( i = 0; i < torrentCount; ++i )
for( i=0; i<torrentCount; ++i )
{
Torrent * t = tr_ptrArrayNth( mgr->torrents, i );
for( j = 0; j < tr_ptrArraySize( t->peers ); ++j )
for( j=0; j<tr_ptrArraySize( t->peers ); ++j )
{
tr_peer * peer = tr_ptrArrayNth( t->peers, j );
tr_peerMsgsPulse( peer->msgs );
@ -2373,165 +2338,16 @@ pumpAllPeers( tr_peerMgr * mgr )
}
}
static void
setTorrentBandwidth( Torrent * t,
tr_direction dir,
tr_bandwidth * bandwidth )
{
int i;
int peerCount = 0;
tr_peer ** peers = getConnectedPeers( t, &peerCount );
for( i=0; i<peerCount; ++i )
tr_peerIoSetBandwidth( peers[i]->io, dir, bandwidth );
tr_free( peers );
}
#if 0
#define DEBUG_DIRECTION TR_DOWN
#endif
static double
bytesPerPulse( double KiB_per_second )
{
return KiB_per_second * ( 1024.0 * BANDWIDTH_PERIOD_MSEC / 1000.0 );
}
/*
* @param currentSpeed current speed in KiB/s
* @param desiredSpeed desired speed in KiB/s
*/
static double
allocateHowMuch( tr_direction dir UNUSED, double currentSpeed, double desiredSpeed )
{
const int pulses_per_history = TR_RATECONTROL_HISTORY_MSEC / BANDWIDTH_PERIOD_MSEC;
const double current_bytes_per_pulse = bytesPerPulse( currentSpeed );
const double desired_bytes_per_pulse = bytesPerPulse( desiredSpeed );
const double min = desired_bytes_per_pulse * 0.90;
const double max = desired_bytes_per_pulse * 1.50;
const double next_pulse_bytes = desired_bytes_per_pulse * ( pulses_per_history + 1 )
- ( current_bytes_per_pulse * pulses_per_history );
double clamped;
/* clamp the return value to lessen oscillation */
clamped = next_pulse_bytes;
clamped = MAX( clamped, min );
clamped = MIN( clamped, max );
#ifdef DEBUG_DIRECTION
if( dir == DEBUG_DIRECTION )
fprintf( stderr, "currentSpeed(%5.2f) desiredSpeed(%5.2f), allocating %5.2f (unclamped: %5.2f)\n",
currentSpeed,
desiredSpeed,
clamped/1024.0,
next_pulse_bytes/1024.0 );
#endif
return clamped;
}
/**
* Allocate bandwidth for each peer connection.
*
* @param mgr the peer manager
* @param direction whether to allocate upload or download bandwidth
*/
static void
allocateBandwidth( tr_peerMgr * mgr,
tr_direction direction )
{
int i;
tr_session * session = mgr->session;
const int torrentCount = tr_ptrArraySize( mgr->torrents );
Torrent ** torrents = (Torrent **) tr_ptrArrayBase( mgr->torrents );
tr_bandwidth * global_pool = session->bandwidth[direction];
assert( mgr );
assert( direction == TR_UP || direction == TR_DOWN );
/* before allocating bandwidth, pump the connected peers */
pumpAllPeers( mgr );
for( i=0; i<torrentCount; ++i )
{
Torrent * t = torrents[i];
tr_speedlimit speedMode;
/* no point in allocating bandwidth for stopped torrents */
if( tr_torrentGetActivity( t->tor ) == TR_STATUS_STOPPED )
continue;
/* if piece data is disallowed, don't bother limiting bandwidth --
* we won't be asking for, or sending out, any pieces */
if( !tr_torrentIsPieceTransferAllowed( t->tor, direction ) )
speedMode = TR_SPEEDLIMIT_UNLIMITED;
else
speedMode = tr_torrentGetSpeedMode( t->tor, direction );
/* process the torrent's peers based on its speed mode */
switch( speedMode )
{
case TR_SPEEDLIMIT_UNLIMITED:
{
tr_bandwidth * b = t->tor->bandwidth[direction];
tr_bandwidthSetUnlimited( b );
setTorrentBandwidth( t, direction, b );
break;
}
case TR_SPEEDLIMIT_SINGLE:
{
tr_bandwidth * b = t->tor->bandwidth[direction];
const double currentSpeed = tr_bandwidthGetPieceSpeed( b );
const double desiredSpeed = tr_torrentGetSpeedLimit( t->tor, direction );
const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
#ifdef DEBUG_DIRECTION
if( direction == DEBUG_DIRECTION )
fprintf( stderr, "single: currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", currentSpeed, desiredSpeed, bytesPerPulse );
#endif
tr_bandwidthSetLimited( b, bytesPerPulse );
setTorrentBandwidth( t, direction, b );
break;
}
case TR_SPEEDLIMIT_GLOBAL:
{
setTorrentBandwidth( t, direction, global_pool );
break;
}
}
}
/* handle the global pool's connections */
if( !tr_sessionIsSpeedLimitEnabled( session, direction ) )
tr_bandwidthSetUnlimited( global_pool );
else {
const double currentSpeed = tr_bandwidthGetPieceSpeed( global_pool );
const double desiredSpeed = tr_sessionGetSpeedLimit( session, direction );
const double bytesPerPulse = allocateHowMuch( direction, currentSpeed, desiredSpeed );
#ifdef DEBUG_DIRECTION
if( direction == DEBUG_DIRECTION )
fprintf( stderr, "global(%p): currentSpeed %.0f ... desiredSpeed %.0f ... bytesPerPulse %.0f\n", global_pool, currentSpeed, desiredSpeed, bytesPerPulse );
#endif
tr_bandwidthSetLimited( session->bandwidth[direction], bytesPerPulse );
}
/* now that we've allocated bandwidth, pump all the connected peers */
pumpAllPeers( mgr );
}
static int
bandwidthPulse( void * vmgr )
{
tr_peerMgr * mgr = vmgr;
int i;
managerLock( mgr );
/* allocate the upload and download bandwidth */
for( i = 0; i < 2; ++i )
allocateBandwidth( mgr, i );
pumpAllPeers( mgr );
tr_bandwidthAllocate( mgr->session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC );
tr_bandwidthAllocate( mgr->session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC );
pumpAllPeers( mgr );
managerUnlock( mgr );
return TRUE;

View File

@ -24,6 +24,7 @@
#include "completion.h"
#include "crypto.h"
#include "inout.h"
#include "iobuf.h"
#ifdef WIN32
#include "net.h" /* for ECONN */
#endif
@ -1314,9 +1315,10 @@ static int clientGotBlock( tr_peermsgs * msgs,
const struct peer_request * req );
static int
readBtPiece( tr_peermsgs * msgs,
struct evbuffer * inbuf,
size_t inlen )
readBtPiece( tr_peermsgs * msgs,
struct evbuffer * inbuf,
size_t inlen,
size_t * setme_piece_bytes_read )
{
struct peer_request * req = &msgs->incoming.blockReq;
@ -1349,6 +1351,7 @@ readBtPiece( tr_peermsgs * msgs,
tr_peerIoReadBytes( msgs->io, inbuf, buf, n );
evbuffer_add( msgs->incoming.block, buf, n );
fireClientGotData( msgs, n, TRUE );
*setme_piece_bytes_read += n;
tr_free( buf );
dbgmsg( msgs, "got %d bytes for block %u:%u->%u ... %d remain",
(int)n, req->index, req->offset, req->length,
@ -1642,12 +1645,11 @@ didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * v
}
static ReadState
canRead( struct bufferevent * evin,
void * vmsgs )
canRead( struct tr_iobuf * iobuf, void * vmsgs, size_t * piece )
{
ReadState ret;
tr_peermsgs * msgs = vmsgs;
struct evbuffer * in = EVBUFFER_INPUT ( evin );
struct evbuffer * in = tr_iobuf_input( iobuf );
const size_t inlen = EVBUFFER_LENGTH( in );
if( !inlen )
@ -1656,7 +1658,7 @@ canRead( struct bufferevent * evin,
}
else if( msgs->state == AWAITING_BT_PIECE )
{
ret = inlen ? readBtPiece( msgs, in, inlen ) : READ_LATER;
ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
}
else switch( msgs->state )
{
@ -1820,13 +1822,12 @@ tr_peerMsgsPulse( tr_peermsgs * msgs )
}
static void
gotError( struct bufferevent * evbuf UNUSED,
short what,
void * vmsgs )
gotError( struct tr_iobuf * iobuf UNUSED,
short what,
void * vmsgs )
{
if( what & EVBUFFER_TIMEOUT )
dbgmsg( vmsgs, "libevent got a timeout, what=%hd, secs=%d", what,
evbuf->timeout_read );
dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
what, errno, tr_strerror( errno ) );

View File

@ -274,17 +274,15 @@ tr_sessionInitFull( const char * configDir,
/* Initialize rate and file descripts controls */
h->uploadLimit = uploadLimit;
h->useUploadLimit = useUploadLimit;
h->downloadLimit = downloadLimit;
h->useDownloadLimit = useDownloadLimit;
tr_fdInit( globalPeerLimit );
h->shared = tr_sharedInit( h, isPortForwardingEnabled, publicPort );
h->isPortSet = publicPort >= 0;
h->bandwidth[TR_UP] = tr_bandwidthNew( h );
h->bandwidth[TR_DOWN] = tr_bandwidthNew( h );
h->bandwidth = tr_bandwidthNew( h, NULL );
tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_UP, uploadLimit );
tr_bandwidthSetDesiredSpeed( h->bandwidth, TR_DOWN, downloadLimit );
tr_bandwidthSetLimited( h->bandwidth, TR_UP, useUploadLimit );
tr_bandwidthSetLimited( h->bandwidth, TR_DOWN, useDownloadLimit );
/* first %s is the application name
second %s is the version number */
@ -443,42 +441,33 @@ tr_sessionGetPortForwarding( const tr_handle * h )
***/
void
tr_sessionSetSpeedLimitEnabled( tr_handle * h,
tr_direction direction,
int use_flag )
tr_sessionSetSpeedLimitEnabled( tr_session * session,
tr_direction dir,
int isLimited )
{
assert( h );
assert( direction == TR_UP || direction == TR_DOWN );
if( direction == TR_UP )
h->useUploadLimit = use_flag ? 1 : 0;
else
h->useDownloadLimit = use_flag ? 1 : 0;
tr_bandwidthSetLimited( session->bandwidth, dir, isLimited );
}
int
tr_sessionIsSpeedLimitEnabled( const tr_handle * h,
tr_direction direction )
tr_sessionIsSpeedLimitEnabled( const tr_session * session,
tr_direction dir )
{
return direction == TR_UP ? h->useUploadLimit : h->useDownloadLimit;
return !tr_bandwidthIsLimited( session->bandwidth, dir );
}
void
tr_sessionSetSpeedLimit( tr_handle * h,
tr_direction direction,
int KiB_sec )
tr_sessionSetSpeedLimit( tr_session * session,
tr_direction dir,
int desiredSpeed )
{
if( direction == TR_DOWN )
h->downloadLimit = KiB_sec;
else
h->uploadLimit = KiB_sec;
tr_bandwidthSetDesiredSpeed( session->bandwidth, dir, desiredSpeed );
}
int
tr_sessionGetSpeedLimit( const tr_handle * h,
tr_direction direction )
tr_sessionGetSpeedLimit( const tr_session * session,
tr_direction dir )
{
return direction == TR_UP ? h->uploadLimit : h->downloadLimit;
return tr_bandwidthGetDesiredSpeed( session->bandwidth, dir );
}
/***
@ -505,17 +494,13 @@ tr_sessionGetPeerLimit( const tr_handle * handle UNUSED )
double
tr_sessionGetPieceSpeed( const tr_session * session, tr_direction dir )
{
assert( dir==TR_UP || dir==TR_DOWN );
return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0;
return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0;
}
double
tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir )
{
assert( dir==TR_UP || dir==TR_DOWN );
return session ? tr_bandwidthGetPieceSpeed( session->bandwidth[dir] ) : 0.0;
return session ? tr_bandwidthGetPieceSpeed( session->bandwidth, dir ) : 0.0;
}
int
@ -629,8 +614,7 @@ tr_sessionClose( tr_handle * session )
}
/* free the session memory */
tr_bandwidthFree( session->bandwidth[TR_UP] );
tr_bandwidthFree( session->bandwidth[TR_DOWN] );
tr_bandwidthFree( session->bandwidth );
tr_lockFree( session->lock );
for( i = 0; i < session->metainfoLookupCount; ++i )
tr_free( session->metainfoLookup[i].filename );

View File

@ -62,8 +62,6 @@ struct tr_handle
unsigned int isProxyEnabled : 1;
unsigned int isProxyAuthEnabled : 1;
unsigned int isClosed : 1;
unsigned int useUploadLimit : 1;
unsigned int useDownloadLimit : 1;
unsigned int useLazyBitfield : 1;
tr_encryption_mode encryptionMode;
@ -88,9 +86,6 @@ struct tr_handle
char * proxyUsername;
char * proxyPassword;
int uploadLimit;
int downloadLimit;
struct tr_list * blocklists;
struct tr_peerMgr * peerMgr;
struct tr_shared * shared;
@ -116,7 +111,7 @@ struct tr_handle
int so_rcvbuf;
/* monitors the "global pool" speeds */
struct tr_bandwidth * bandwidth[2];
struct tr_bandwidth * bandwidth;
};
const char * tr_sessionFindTorrentFile( const tr_session * session,

View File

@ -143,56 +143,42 @@ tr_torrentUnlock( const tr_torrent * tor )
void
tr_torrentSetSpeedMode( tr_torrent * tor,
tr_direction direction,
tr_direction dir,
tr_speedlimit mode )
{
tr_speedlimit * limit = direction == TR_UP ? &tor->uploadLimitMode
: &tor->downloadLimitMode;
assert( tor != NULL );
assert( dir==TR_UP || dir==TR_DOWN );
assert( mode==TR_SPEEDLIMIT_GLOBAL || mode==TR_SPEEDLIMIT_SINGLE || mode==TR_SPEEDLIMIT_UNLIMITED );
*limit = mode;
tor->speedLimitMode[dir] = mode;
tr_bandwidthSetLimited( tor->bandwidth, dir, mode==TR_SPEEDLIMIT_SINGLE );
tr_bandwidthHonorParentLimits( tor->bandwidth, dir, mode!=TR_SPEEDLIMIT_UNLIMITED );
}
tr_speedlimit
tr_torrentGetSpeedMode( const tr_torrent * tor,
tr_direction direction )
tr_direction dir )
{
return direction == TR_UP ? tor->uploadLimitMode
: tor->downloadLimitMode;
assert( tor != NULL );
assert( dir==TR_UP || dir==TR_DOWN );
return tor->speedLimitMode[dir];
}
void
tr_torrentSetSpeedLimit( tr_torrent * tor,
tr_direction direction,
int single_KiB_sec )
tr_direction dir,
int desiredSpeed )
{
switch( direction )
{
case TR_UP:
tor->uploadLimit = single_KiB_sec; break;
case TR_DOWN:
tor->downloadLimit = single_KiB_sec; break;
default:
assert( 0 );
}
tr_bandwidthSetDesiredSpeed( tor->bandwidth, dir, desiredSpeed );
}
int
tr_torrentGetSpeedLimit( const tr_torrent * tor,
tr_direction direction )
tr_direction dir )
{
switch( direction )
{
case TR_UP:
return tor->uploadLimit;
case TR_DOWN:
return tor->downloadLimit;
default:
assert( 0 );
}
return tr_bandwidthGetDesiredSpeed( tor->bandwidth, dir );
}
int
@ -497,8 +483,9 @@ torrentRealInit( tr_handle * h,
randomizeTiers( info );
tor->bandwidth[TR_UP] = tr_bandwidthNew( h );
tor->bandwidth[TR_DOWN] = tr_bandwidthNew( h );
tor->bandwidth = tr_bandwidthNew( h, h->bandwidth );
fprintf( stderr, "torrent [%s] bandwidth is %p\n", info->name, tor->bandwidth );
tor->blockSize = getBlockSize( info->pieceSize );
@ -540,8 +527,6 @@ torrentRealInit( tr_handle * h,
tr_torrentInitFilePieces( tor );
tor->uploadLimit = 0;
tor->downloadLimit = 0;
tor->swarmSpeed = tr_rcInit( );
tr_sha1( tor->obfuscatedHash, "req2", 4,
@ -813,10 +798,10 @@ tr_torrentStat( tr_torrent * tor )
&s->peersGettingFromUs,
s->peersFrom );
s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth[TR_UP] );
s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth[TR_DOWN] );
s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_UP] );
s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth[TR_DOWN] );
s->rawUploadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth, TR_UP );
s->rawDownloadSpeed = tr_bandwidthGetRawSpeed ( tor->bandwidth, TR_DOWN );
s->pieceUploadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_UP );
s->pieceDownloadSpeed = tr_bandwidthGetPieceSpeed( tor->bandwidth, TR_DOWN );
usableSeeds += tor->info.webseedCount;
@ -1099,8 +1084,7 @@ freeTorrent( tr_torrent * tor )
assert( h->torrentCount >= 1 );
h->torrentCount--;
tr_bandwidthFree( tor->bandwidth[TR_DOWN] );
tr_bandwidthFree( tor->bandwidth[TR_UP] );
tr_bandwidthFree( tor->bandwidth );
tr_metainfoFree( inf );
tr_free( tor );

View File

@ -172,10 +172,7 @@ struct tr_torrent
tr_session * session;
tr_info info;
int uploadLimit;
tr_speedlimit uploadLimitMode;
int downloadLimit;
tr_speedlimit downloadLimitMode;
tr_speedlimit speedLimitMode[2];
struct tr_ratecontrol * swarmSpeed;
@ -234,7 +231,7 @@ struct tr_torrent
int uniqueId;
struct tr_bandwidth * bandwidth[2];
struct tr_bandwidth * bandwidth;
};
#endif