libT: first cut at implementing the internal peers' request queues as pieces rather than blocks, as discussed with erdgeist and denis, to avoid a couple of nasty CPU bottlenecks.

This commit is contained in:
Charles Kerr 2008-06-09 22:53:45 +00:00
parent dd715ccaf6
commit dce24eb1f0
10 changed files with 218 additions and 290 deletions

View File

@ -48,7 +48,7 @@ PeerEventType;
typedef struct
{
PeerEventType eventType;
uint32_t pieceIndex; /* for GOT_BLOCK */
uint32_t pieceIndex; /* for GOT_BLOCK, CANCEL */
uint32_t offset; /* for GOT_BLOCK */
uint32_t length; /* for GOT_BLOCK + GOT_DATA */
float progress; /* for TR_PEER_PEER_PROGRESS */

View File

@ -112,7 +112,7 @@ typedef struct
tr_timer * refillTimer;
tr_torrent * tor;
tr_peer * optimistic; /* the optimistic peer, or NULL if none */
tr_bitfield * requested;
tr_bitfield * requestedPieces;
unsigned int isRunning : 1;
@ -366,7 +366,7 @@ torrentDestructor( Torrent * t )
tr_timerFree( &t->rechokeTimer );
tr_timerFree( &t->refillTimer );
tr_bitfieldFree( t->requested );
tr_bitfieldFree( t->requestedPieces );
tr_ptrArrayFree( t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
tr_ptrArrayFree( t->outgoingHandshakes, NULL );
@ -390,7 +390,7 @@ torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
t->peers = tr_ptrArrayNew( );
t->webseeds = tr_ptrArrayNew( );
t->outgoingHandshakes = tr_ptrArrayNew( );
t->requested = tr_bitfieldNew( tor->blockCount );
t->requestedPieces = tr_bitfieldNew( tor->info.pieceCount );
memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
for( i=0; i<tor->info.webseedCount; ++i ) {
@ -543,10 +543,6 @@ compareRefillPiece (const void * aIn, const void * bIn)
const struct tr_refill_piece * a = aIn;
const struct tr_refill_piece * b = bIn;
/* fewer missing pieces goes first */
if( a->missingBlockCount != b->missingBlockCount )
return a->missingBlockCount < b->missingBlockCount ? -1 : 1;
/* if one piece has a higher priority, it goes first */
if( a->priority != b->priority )
return a->priority > b->priority ? -1 : 1;
@ -555,13 +551,8 @@ compareRefillPiece (const void * aIn, const void * bIn)
if (a->peerCount != b->peerCount)
return a->peerCount < b->peerCount ? -1 : 1;
#if 0
/* otherwise download them in order */
return tr_compareUint16( a->piece, b->piece );
#else
/* otherwise go with our random seed */
return tr_compareUint16( a->random, b->random );
#endif
}
static int
@ -636,94 +627,6 @@ getPreferredPieces( Torrent * t,
return pool;
}
static uint64_t*
getPreferredBlocks( Torrent * t, tr_block_index_t * setmeCount )
{
int s;
uint32_t i;
uint32_t pieceCount;
uint32_t blockCount;
uint32_t unreqCount[3], reqCount[3];
uint32_t * pieces;
uint64_t * ret, * walk;
uint64_t * unreq[3], *req[3];
const tr_torrent * tor = t->tor;
assert( torrentIsLocked( t ) );
pieces = getPreferredPieces( t, &pieceCount );
/**
* Now we walk through those preferred pieces to find all the blocks
* are still missing from them. We put unrequested blocks first,
* of course, but by including requested blocks afterwards, endgame
* handling happens naturally.
*
* By doing this once per priority we also effectively get an endgame
* mode for each priority level. The helps keep high priority files
* from getting stuck at 99% due of unresponsive peers.
*/
/* make temporary bins for the four tiers of blocks */
for( i=0; i<3; ++i ) {
req[i] = tr_new( uint64_t, pieceCount * tor->blockCountInPiece );
reqCount[i] = 0;
unreq[i] = tr_new( uint64_t, pieceCount * tor->blockCountInPiece );
unreqCount[i] = 0;
}
/* sort the blocks into our temp bins */
for( i=blockCount=0; i<pieceCount; ++i )
{
const tr_piece_index_t index = pieces[i];
const int priorityIndex = tor->info.pieces[index].priority + 1;
const tr_block_index_t begin = tr_torPieceFirstBlock( tor, index );
const tr_block_index_t end = begin + tr_torPieceCountBlocks( tor, index );
tr_block_index_t block;
assert( tr_bitfieldTestFast( t->requested, end-1 ) );
for( block=begin; block<end; ++block )
{
if( tr_cpBlockIsComplete( tor->completion, block ) )
continue;
++blockCount;
if( tr_bitfieldHasFast( t->requested, block ) )
{
const uint32_t n = reqCount[priorityIndex]++;
req[priorityIndex][n] = block;
}
else
{
const uint32_t n = unreqCount[priorityIndex]++;
unreq[priorityIndex][n] = block;
}
}
}
/* join the bins together, going from highest priority to lowest so
* the the blocks we want to request first will be first in the list */
ret = walk = tr_new( uint64_t, blockCount );
for( s=2; s>=0; --s ) {
memcpy( walk, unreq[s], sizeof(uint64_t) * unreqCount[s] );
walk += unreqCount[s];
memcpy( walk, req[s], sizeof(uint64_t) * reqCount[s] );
walk += reqCount[s];
}
assert( ( walk - ret ) == ( int )blockCount );
*setmeCount = blockCount;
/* cleanup */
tr_free( pieces );
for( i=0; i<3; ++i ) {
tr_free( unreq[i] );
tr_free( req[i] );
}
return ret;
}
static tr_peer**
getPeersUploadingToClient( Torrent * t, int * setmeCount )
{
@ -758,13 +661,13 @@ refillPulse( void * vtorrent )
{
Torrent * t = vtorrent;
tr_torrent * tor = t->tor;
tr_block_index_t i;
int peerCount;
int webseedCount;
tr_peer ** peers;
tr_webseed ** webseeds;
tr_block_index_t blockCount;
uint64_t * blocks;
uint32_t pieceCount;
uint32_t * pieces;
tr_piece_index_t i;
if( !t->isRunning )
return TRUE;
@ -774,30 +677,23 @@ refillPulse( void * vtorrent )
torrentLock( t );
tordbg( t, "Refilling Request Buffers..." );
blocks = getPreferredBlocks( t, &blockCount );
pieces = getPreferredPieces( t, &pieceCount );
peers = getPeersUploadingToClient( t, &peerCount );
webseedCount = tr_ptrArraySize( t->webseeds );
webseeds = tr_memdup( tr_ptrArrayBase(t->webseeds), webseedCount*sizeof(tr_webseed*) );
for( i=0; (webseedCount || peerCount) && i<blockCount; ++i )
for( i=0; (webseedCount || peerCount) && i<pieceCount; ++i )
{
int j;
int handled = FALSE;
const tr_piece_index_t piece = pieces[i];
const tr_block_index_t block = blocks[i];
const tr_piece_index_t index = tr_torBlockPiece( tor, block );
const uint32_t begin = (block * tor->blockSize) - (index * tor->info.pieceSize);
const uint32_t length = tr_torBlockCountBytes( tor, block );
assert( piece < tor->info.pieceSize );
assert( tr_torrentReqIsValid( tor, index, begin, length ) );
assert( _tr_block( tor, index, begin ) == block );
assert( begin < tr_torPieceCountBytes( tor, index ) );
assert( (begin + length) <= tr_torPieceCountBytes( tor, index ) );
/* find a peer who can ask for this block */
/* find a peer who can ask for this piece */
for( j=0; !handled && j<peerCount; )
{
const int val = tr_peerMsgsAddRequest( peers[j]->msgs, index, begin, length );
const int val = tr_peerMsgsAddRequest( peers[j]->msgs, piece );
switch( val )
{
case TR_ADDREQ_FULL:
@ -809,7 +705,7 @@ refillPulse( void * vtorrent )
++j;
break;
case TR_ADDREQ_OK:
tr_bitfieldAdd( t->requested, block );
tr_bitfieldAdd( t->requestedPieces, piece );
handled = TRUE;
break;
default:
@ -821,14 +717,14 @@ refillPulse( void * vtorrent )
/* maybe one of the webseeds can do it */
for( j=0; !handled && j<webseedCount; )
{
const int val = tr_webseedAddRequest( webseeds[j], index, begin, length );
const int val = tr_webseedAddRequest( webseeds[j], piece );
switch( val )
{
case TR_ADDREQ_FULL:
memmove( webseeds+j, webseeds+j+1, sizeof(tr_webseed*)*(--webseedCount-j) );
break;
case TR_ADDREQ_OK:
tr_bitfieldAdd( t->requested, block );
tr_bitfieldAdd( t->requestedPieces, piece );
handled = TRUE;
break;
default:
@ -841,27 +737,13 @@ refillPulse( void * vtorrent )
/* cleanup */
tr_free( webseeds );
tr_free( peers );
tr_free( blocks );
tr_free( pieces );
t->refillTimer = NULL;
torrentUnlock( t );
return FALSE;
}
static void
broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length )
{
int i, size;
tr_peer ** peers;
assert( torrentIsLocked( t ) );
peers = getConnectedPeers( t, &size );
for( i=0; i<size; ++i )
tr_peerMsgsCancel( peers[i]->msgs, index, offset, length );
tr_free( peers );
}
static void
addStrike( Torrent * t, tr_peer * peer )
{
@ -904,7 +786,7 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
break;
case TR_PEER_CANCEL:
tr_bitfieldRem( t->requested, _tr_block( t->tor, e->pieceIndex, e->offset ) );
tr_bitfieldRem( t->requestedPieces, e->pieceIndex );
break;
case TR_PEER_PEER_GOT_DATA: {
@ -926,7 +808,14 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
const time_t now = time( NULL );
tr_torrent * tor = t->tor;
tor->activityDate = now;
tor->downloadedCur += e->length;
/* only add this to downloadedCur if we got it from a peer --
* webseeds shouldn't count against our ratio. As one tracker
* admin put it, "Those pieces are downloaded directly from the
* content distributor, not the peers, it is the tracker's job
* to manage the swarms, not the web server and does not fit
* into the jurisdiction of the tracker." */
if( peer )
tor->downloadedCur += e->length;
tr_rcTransferred( tor->download, e->length );
tr_rcTransferred( tor->handle->download, e->length );
tr_statsAddDownloaded( tor->handle, e->length );
@ -960,8 +849,6 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
tr_cpBlockAdd( tor->completion, block );
broadcastGotBlock( t, e->pieceIndex, e->offset, e->length );
if( tr_cpPieceIsComplete( tor->completion, e->pieceIndex ) )
{
const tr_piece_index_t p = e->pieceIndex;

View File

@ -178,6 +178,32 @@ reqListAppend( struct request_list * list, const struct peer_request * req )
list->requests[list->count-1] = *req;
}
static void
reqListAppendPiece( const tr_torrent * tor,
struct request_list * list,
tr_piece_index_t piece )
{
const time_t now = time( NULL );
const size_t n = tr_torPieceCountBlocks( tor, piece );
const tr_block_index_t begin = tr_torPieceFirstBlock( tor, piece );
const tr_block_index_t end = begin + n;
tr_block_index_t i;
if( list->count + n >= list->max )
reqListReserve( list, list->max + n );
for( i=begin; i<end; ++i ) {
if( !tr_cpBlockIsComplete( tor->completion, i ) ) {
struct peer_request * req = list->requests + list->count++;
req->index = piece;
req->offset = (i * tor->blockSize) - (piece * tor->info.pieceSize);
req->length = tr_torBlockCountBytes( tor, i );
req->time_requested = now;
assert( tr_torrentReqIsValid( tor, req->index, req->offset, req->length ) );
}
}
}
static tr_errno
reqListPop( struct request_list * list, struct peer_request * setme )
{
@ -194,13 +220,27 @@ reqListPop( struct request_list * list, struct peer_request * setme )
return err;
}
static int
reqListHasPiece( struct request_list * list, const tr_piece_index_t piece )
{
uint16_t i;
for( i=0; i<list->count; ++i )
if( list->requests[i].index == piece )
return 1;
return 0;
}
static int
reqListFind( struct request_list * list, const struct peer_request * key )
{
uint16_t i;
for( i=0; i<list->count; ++i )
if( !compareRequest( key, list->requests+i ) )
return i;
return -1;
}
@ -245,8 +285,8 @@ struct tr_peermsgs
uint8_t state;
uint8_t ut_pex_id;
uint16_t pexCount;
uint16_t maxActiveRequests;
uint16_t minActiveRequests;
uint16_t maxActiveRequests;
tr_peer * info;
@ -458,13 +498,11 @@ firePeerGotData( tr_peermsgs * msgs, uint32_t length )
}
static void
fireCancelledReq( tr_peermsgs * msgs, const struct peer_request * req )
fireCancelledReq( tr_peermsgs * msgs, const tr_piece_index_t pieceIndex )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CANCEL;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
e.pieceIndex = pieceIndex;
publish( msgs, &e );
}
@ -683,18 +721,51 @@ maybeSendFastAllowedSet( tr_peermsgs * msgs )
**/
static int
reqIsValid( const tr_peermsgs * msgs,
reqIsValid( const tr_peermsgs * peer,
uint32_t index,
uint32_t offset,
uint32_t length )
{
return tr_torrentReqIsValid( msgs->torrent, index, offset, length );
return tr_torrentReqIsValid( peer->torrent, index, offset, length );
}
static int
requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
requestIsValid( const tr_peermsgs * peer, const struct peer_request * req )
{
return reqIsValid( msgs, req->index, req->offset, req->length );
return reqIsValid( peer, req->index, req->offset, req->length );
}
static void
tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex )
{
uint16_t i;
struct request_list tmp = REQUEST_LIST_INIT;
struct request_list * src;
src = &msgs->clientWillAskFor;
for( i=0; i<src->count; ++i )
if( src->requests[i].index != pieceIndex )
reqListAppend( &tmp, src->requests + i );
/* swap */
reqListClear( &msgs->clientWillAskFor );
msgs->clientWillAskFor = tmp;
tmp = REQUEST_LIST_INIT;
src = &msgs->clientAskedFor;
for( i=0; i<src->count; ++i )
if( src->requests[i].index == pieceIndex )
protocolSendCancel( msgs, src->requests + i );
else
reqListAppend( &tmp, src->requests + i );
/* swap */
reqListClear( &msgs->clientAskedFor );
msgs->clientAskedFor = tmp;
tmp = REQUEST_LIST_INIT;
fireCancelledReq( msgs, pieceIndex );
}
static void
@ -710,7 +781,7 @@ expireOldRequests( tr_peermsgs * msgs )
for( i=0; i<tmp.count; ++i ) {
const struct peer_request * req = &tmp.requests[i];
if( req->time_requested < oldestAllowed )
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
tr_peerMsgsCancel( msgs, req->index );
}
reqListClear( &tmp );
@ -720,7 +791,7 @@ expireOldRequests( tr_peermsgs * msgs )
for( i=0; i<tmp.count; ++i ) {
const struct peer_request * req = &tmp.requests[i];
if( req->time_requested < oldestAllowed )
tr_peerMsgsCancel( msgs, req->index, req->offset, req->length );
tr_peerMsgsCancel( msgs, req->index );
}
reqListClear( &tmp );
}
@ -766,18 +837,22 @@ pumpRequestQueue( tr_peermsgs * msgs )
static int
pulse( void * vmsgs );
int
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
uint32_t index,
uint32_t offset,
uint32_t length )
static int
requestQueueIsFull( const tr_peermsgs * msgs )
{
const int req_max = msgs->maxActiveRequests;
return msgs->clientWillAskFor.count >= req_max;
}
int
tr_peerMsgsAddRequest( tr_peermsgs * msgs,
tr_piece_index_t piece )
{
struct peer_request req;
assert( msgs != NULL );
assert( msgs->torrent != NULL );
assert( reqIsValid( msgs, index, offset, length ) );
assert( piece < msgs->torrent->info.pieceCount );
/**
*** Reasons to decline the request
@ -790,24 +865,18 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
}
/* peer doesn't have this piece */
if( !tr_bitfieldHas( msgs->info->have, index ) )
if( !tr_bitfieldHas( msgs->info->have, piece ) )
return TR_ADDREQ_MISSING;
/* peer's queue is full */
if( msgs->clientWillAskFor.count >= req_max ) {
if( requestQueueIsFull( msgs ) ) {
dbgmsg( msgs, "declining request because we're full" );
return TR_ADDREQ_FULL;
}
/* have we already asked for this piece? */
req.index = index;
req.offset = offset;
req.length = length;
if( reqListFind( &msgs->clientAskedFor, &req ) != -1 ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
if( reqListFind( &msgs->clientWillAskFor, &req ) != -1 ) {
if( reqListHasPiece( &msgs->clientAskedFor, piece ) ||
reqListHasPiece( &msgs->clientWillAskFor, piece ) ) {
dbgmsg( msgs, "declining because it's a duplicate" );
return TR_ADDREQ_DUPLICATE;
}
@ -816,9 +885,9 @@ tr_peerMsgsAddRequest( tr_peermsgs * msgs,
*** Accept this request
**/
dbgmsg( msgs, "added req for piece %d, offset %d", (int)index, (int)offset );
dbgmsg( msgs, "added req for piece %lu", (unsigned long)piece );
req.time_requested = time( NULL );
reqListAppend( &msgs->clientWillAskFor, &req );
reqListAppendPiece( msgs->torrent, &msgs->clientWillAskFor, piece );
return TR_ADDREQ_OK;
}
@ -833,10 +902,10 @@ cancelAllRequestsToPeer( tr_peermsgs * msgs )
msgs->clientWillAskFor = REQUEST_LIST_INIT;
for( i=0; i<a.count; ++i )
fireCancelledReq( msgs, &a.requests[i] );
fireCancelledReq( msgs, a.requests[i].index );
for( i=0; i<b.count; ++i ) {
fireCancelledReq( msgs, &b.requests[i] );
fireCancelledReq( msgs, b.requests[i].index );
protocolSendCancel( msgs, &b.requests[i] );
}
@ -844,33 +913,6 @@ cancelAllRequestsToPeer( tr_peermsgs * msgs )
reqListClear( &b );
}
void
tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length )
{
struct peer_request req;
assert( msgs != NULL );
assert( length > 0 );
/* have we asked the peer for this piece? */
req.index = pieceIndex;
req.offset = offset;
req.length = length;
/* if it's only in the queue and hasn't been sent yet, free it */
if( !reqListRemove( &msgs->clientWillAskFor, &req ) )
fireCancelledReq( msgs, &req );
/* if it's already been sent, send a cancel message too */
if( !reqListRemove( &msgs->clientAskedFor, &req ) ) {
protocolSendCancel( msgs, &req );
fireCancelledReq( msgs, &req );
}
}
/**
***
**/
@ -1571,13 +1613,24 @@ getUploadMax( const tr_peermsgs * msgs )
}
static int
ratePulse( void * vmsgs )
getMaxBlocksFromPeerSoon( const tr_peermsgs * peer )
{
tr_peermsgs * msgs = (tr_peermsgs *) vmsgs;
msgs->info->rateToClient = tr_rcRate( msgs->info->rcToClient );
msgs->info->rateToPeer = tr_rcRate( msgs->info->rcToPeer );
msgs->maxActiveRequests = MIN( 4 + (int)(msgs->info->rateToClient/4), MAX_QUEUE_SIZE );
msgs->minActiveRequests = msgs->maxActiveRequests / 3;
const double seconds = 30;
const double bytesPerSecond = peer->info->rateToClient * 1024;
const double totalBytes = bytesPerSecond * seconds;
const int blockCount = totalBytes / peer->torrent->blockSize;
/*fprintf( stderr, "rate %f -- blockCount %d\n", peer->info->rateToClient, blockCount );*/
return blockCount;
}
static int
ratePulse( void * vpeer )
{
tr_peermsgs * peer = vpeer;
peer->info->rateToClient = tr_rcRate( peer->info->rcToClient );
peer->info->rateToPeer = tr_rcRate( peer->info->rcToPeer );
peer->minActiveRequests = 4;
peer->maxActiveRequests = peer->minActiveRequests + getMaxBlocksFromPeerSoon( peer );
return TRUE;
}

View File

@ -35,59 +35,19 @@ void tr_peerMsgsSetChoke( tr_peermsgs *, int doChoke );
void tr_peerMsgsHave( tr_peermsgs * msgs,
uint32_t pieceIndex );
#if 0
void tr_peerMsgsCancel( tr_peermsgs * msgs,
uint32_t pieceIndex,
uint32_t offset,
uint32_t length );
#endif
void tr_peerMsgsFree( tr_peermsgs* );
int tr_peerMsgsAddRequest( tr_peermsgs * peer,
tr_piece_index_t piece );
#if 0
enum {
TR_ADDREQ_OK=0,
TR_ADDREQ_FULL,
TR_ADDREQ_DUPLICATE,
TR_ADDREQ_MISSING,
TR_ADDREQ_CLIENT_CHOKED
};
#endif
int tr_peerMsgsAddRequest( tr_peermsgs * peer,
uint32_t index,
uint32_t begin,
uint32_t length );
/**
*** PeerMsgs Publish / Subscribe
**/
#if 0
typedef enum
{
TR_PEERMSG_CLIENT_HAVE,
TR_PEERMSG_CLIENT_BLOCK,
TR_PEERMSG_PIECE_DATA,
TR_PEERMSG_PEER_PROGRESS,
TR_PEERMSG_ERROR,
TR_PEERMSG_CANCEL,
TR_PEERMSG_NEED_REQ
}
PeerMsgsEventType;
typedef struct
{
PeerMsgsEventType eventType;
uint32_t pieceIndex; /* for TR_PEERMSG_GOT_BLOCK, TR_PEERMSG_GOT_HAVE */
uint32_t offset; /* for TR_PEERMSG_GOT_BLOCK */
uint32_t length; /* for TR_PEERMSG_GOT_BLOCK */
float progress; /* for TR_PEERMSG_PEER_PROGRESS */
tr_errno err; /* for TR_PEERMSG_GOT_ERROR */
}
tr_peermsgs_event;
#endif
void tr_peerMsgsUnsubscribe ( tr_peermsgs * peer,
tr_publisher_tag tag );
void tr_peerMsgsUnsubscribe ( tr_peermsgs * peer,
tr_publisher_tag tag );
#endif

View File

@ -1427,6 +1427,8 @@ tr_torrentReqIsValid( const tr_torrent * tor,
else if( tr_pieceOffset( tor, index, offset, length ) > tor->info.totalSize )
err = 4;
if( err ) fprintf( stderr, "index %lu offset %lu length %lu err %d\n", (unsigned long)index, (unsigned long)offset, (unsigned long)length, err );
return !err;
}

View File

@ -33,7 +33,6 @@ test_bitfields( void )
{
int i;
int bitcount = 5000000;
size_t pos;
tr_bitfield * field = tr_bitfieldNew( bitcount );
/* make every seventh one true */
@ -45,6 +44,7 @@ test_bitfields( void )
for( i=0; i<bitcount; ++i )
check( tr_bitfieldHas( field, i ) == (!(i%7)) );
#if 0
/* testing the "find next" function */
check( tr_bitfieldFindTrue( field, 0, &pos ) );
check( pos == 0 );
@ -64,7 +64,7 @@ test_bitfields( void )
check( pos == 21 );
check( tr_bitfieldFindTrue( field, 16, &pos ) );
check( pos == 21 );
#endif
tr_bitfieldFree( field );
return 0;

View File

@ -732,6 +732,7 @@ tr_bitfieldHas( const tr_bitfield * bitfield, size_t nth )
&& ( tr_bitfieldHasFast( bitfield, nth ) );
}
#if 0
static int
find_top_bit( uint8_t val )
{
@ -776,6 +777,7 @@ tr_bitfieldFindTrue( const tr_bitfield * bitfield,
return 0;
}
#endif
int
tr_bitfieldAdd( tr_bitfield * bitfield, size_t nth )

View File

@ -263,12 +263,14 @@ size_t tr_bitfieldCountTrueBits( const tr_bitfield* );
tr_bitfield* tr_bitfieldOr( tr_bitfield*, const tr_bitfield* );
#if 0
/** @brief finds the first true bit in the bitfield, starting at `startPos'
@param setmePos the position of the true bit, if found, is set here.
@return nonzero if a true bit was found */
int tr_bitfieldFindTrue( const tr_bitfield * bitfield,
size_t startPos,
size_t * setmePos );
#endif
/** A stripped-down version of bitfieldHas to be used

View File

@ -23,9 +23,10 @@
#include "web.h"
#include "webseed.h"
#define MAX_QUEUE_SIZE 4
struct tr_webseed
{
unsigned int busy : 1;
unsigned int dead : 1;
tr_torrent * torrent;
@ -34,9 +35,10 @@ struct tr_webseed
tr_delivery_func * callback;
void * callback_userdata;
tr_piece_index_t pieceIndex;
uint32_t pieceOffset;
uint32_t byteCount;
uint64_t bytesSaved;
tr_piece_index_t queue[MAX_QUEUE_SIZE];
int queueSize;
struct evbuffer * content;
};
@ -145,29 +147,55 @@ webResponseFunc( tr_handle * session UNUSED,
tr_webseed * w = vw;
const int success = ( response_code == 206 );
fprintf( stderr, "server responded with code %ld and %lu bytes\n", response_code, (unsigned long)response_byte_count );
/*fprintf( stderr, "server responded with code %ld and %lu bytes\n", response_code, (unsigned long)response_byte_count );*/
if( !success )
{
/* FIXME */
}
else if( w->dead )
{
tr_webseedFree( w );
}
else
{
evbuffer_add( w->content, response, response_byte_count );
if( !w->dead )
fireClientGotData( w, response_byte_count );
const tr_piece_index_t piece = w->queue[0];
tr_block_index_t block;
size_t len;
if( EVBUFFER_LENGTH( w->content ) < w->byteCount )
evbuffer_add( w->content, response, response_byte_count );
fireClientGotData( w, response_byte_count );
block = _tr_block( w->torrent, piece, w->bytesSaved );
len = tr_torBlockCountBytes( w->torrent, block );
while( EVBUFFER_LENGTH( w->content ) >= len )
{
/*fprintf( stderr, "saving piece index %lu, offset %lu, len %lu\n", (unsigned long)piece, (unsigned long)w->bytesSaved, (unsigned long)len );*/
/* save one block */
tr_ioWrite( w->torrent, piece, w->bytesSaved, len,
EVBUFFER_DATA(w->content) );
evbuffer_drain( w->content, len );
fireClientGotBlock( w, piece, w->bytesSaved, len );
w->bytesSaved += len;
/* march to the next one */
++block;
len = tr_torBlockCountBytes( w->torrent, block );
}
if( w->bytesSaved < tr_torPieceCountBytes( w->torrent, piece ) )
requestNextChunk( w );
else {
tr_ioWrite( w->torrent, w->pieceIndex, w->pieceOffset, w->byteCount, EVBUFFER_DATA(w->content) );
w->bytesSaved = 0;
evbuffer_drain( w->content, EVBUFFER_LENGTH( w->content ) );
w->busy = 0;
if( w->dead )
tr_webseedFree( w );
else {
fireClientGotBlock( w, w->pieceIndex, w->pieceOffset, w->byteCount );
/*fprintf( stderr, "w->callback_userdata is %p\n", w->callback_userdata );*/
memmove( w->queue, w->queue+1, sizeof(tr_piece_index_t)*(MAX_QUEUE_SIZE-1) );
/*fprintf( stderr, "w->callback_userdata is %p\n", w->callback_userdata );*/
if( --w->queueSize )
requestNextChunk( w );
if( w->queueSize < ( MAX_QUEUE_SIZE / 2 ) )
fireNeedReq( w );
}
}
}
}
@ -176,49 +204,44 @@ static void
requestNextChunk( tr_webseed * w )
{
const tr_info * inf = tr_torrentInfo( w->torrent );
const uint32_t have = EVBUFFER_LENGTH( w->content );
const uint32_t left = w->byteCount - have;
const uint32_t pieceOffset = w->pieceOffset + have;
const uint32_t have = w->bytesSaved + EVBUFFER_LENGTH( w->content );
const tr_piece_index_t piece = w->queue[0];
const uint32_t left = tr_torPieceCountBytes( w->torrent, piece ) - have;
const uint32_t pieceOffset = have;
tr_file_index_t fileIndex;
uint64_t fileOffset;
uint32_t thisPass;
char * url;
char * range;
tr_ioFindFileLocation( w->torrent, w->pieceIndex, pieceOffset,
tr_ioFindFileLocation( w->torrent, piece, pieceOffset,
&fileIndex, &fileOffset );
thisPass = MIN( left, inf->files[fileIndex].length - fileOffset );
url = makeURL( w, &inf->files[fileIndex] );
//fprintf( stderr, "url is [%s]\n", url );
range = tr_strdup_printf( "%"PRIu64"-%"PRIu64, fileOffset, fileOffset + thisPass - 1 );
fprintf( stderr, "range is [%s] ... we want %lu total, we have %lu, so %lu are left, and we're asking for %lu this time\n", range, (unsigned long)w->byteCount, (unsigned long)have, (unsigned long)left, (unsigned long)thisPass );
/*fprintf( stderr, "range is [%s] ... we want %lu total, we have %lu, so %lu are left, and we're asking for %lu this time\n", range, (unsigned long)tr_torPieceCountBytes(w->torrent,piece), (unsigned long)have, (unsigned long)left, (unsigned long)thisPass );*/
tr_webRun( w->torrent->handle, url, range, webResponseFunc, w );
tr_free( range );
tr_free( url );
}
int
tr_webseedAddRequest( tr_webseed * w,
uint32_t pieceIndex,
uint32_t pieceOffset,
uint32_t byteCount )
tr_webseedAddRequest( tr_webseed * w,
tr_piece_index_t piece )
{
int ret;
if( w->busy || w->dead )
if( w->dead || w->queueSize >= MAX_QUEUE_SIZE )
{
ret = TR_ADDREQ_FULL;
}
else
{
fprintf( stderr, "webseed is starting a new piece here -- piece %lu, offset %lu!!!\n", (unsigned long)pieceIndex, (unsigned long)pieceOffset );
w->busy = 1;
w->pieceIndex = pieceIndex;
w->pieceOffset = pieceOffset;
w->byteCount = byteCount;
evbuffer_drain( w->content, EVBUFFER_LENGTH( w->content ) );
requestNextChunk( w );
int wasEmpty = w->queueSize == 0;
w->queue[ w->queueSize++ ] = piece;
if( wasEmpty )
requestNextChunk( w );
ret = TR_ADDREQ_OK;
}
@ -241,6 +264,7 @@ tr_webseedNew( struct tr_torrent * torrent,
w->url = tr_strdup( url );
w->callback = callback;
w->callback_userdata = callback_userdata;
/*fprintf( stderr, "w->callback_userdata is %p\n", w->callback_userdata );*/
return w;
}
@ -249,7 +273,7 @@ tr_webseedFree( tr_webseed * w )
{
if( w )
{
if( w->busy )
if( w->queueSize > 0 )
{
w->dead = 1;
}

View File

@ -24,9 +24,7 @@ tr_webseed* tr_webseedNew( struct tr_torrent * torrent,
void tr_webseedFree( tr_webseed * );
int tr_webseedAddRequest( tr_webseed * w,
uint32_t index,
uint32_t begin,
uint32_t length );
int tr_webseedAddRequest( tr_webseed * w,
tr_piece_index_t piece );
#endif