diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index 9973b9fb6..5db13d979 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -1304,7 +1304,8 @@ tr_peerMgrGetNextRequests( tr_torrent * tor, tr_peer * peer, int numwant, tr_block_index_t * setme, - int * numgot ) + int * numgot, + bool get_intervals ) { int i; int got; @@ -1348,7 +1349,7 @@ tr_peerMgrGetNextRequests( tr_torrent * tor, tr_torGetPieceBlockRange( tor, p->index, &first, &last ); - for( b=first; b<=last && gotdesiredRequestCount - msgs->peer->pendingReqsToPeer; tr_block_index_t * blocks = alloca( sizeof( tr_block_index_t ) * numwant ); - tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n ); + tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n, false ); for( i=0; icurl_easy = curl_easy_init( ); task->timeout_secs = getTimeoutFromURL( task ); @@ -219,7 +220,7 @@ task_finish_func( void * vtask ) ***** ****/ -void +struct tr_web_task * tr_webRun( tr_session * session, const char * url, const char * range, @@ -227,12 +228,12 @@ tr_webRun( tr_session * session, tr_web_done_func done_func, void * done_func_user_data ) { - tr_webRunWithBuffer( session, url, range, cookies, - done_func, done_func_user_data, - NULL ); + return tr_webRunWithBuffer( session, url, range, cookies, + done_func, done_func_user_data, + NULL ); } -void +struct tr_web_task * tr_webRunWithBuffer( tr_session * session, const char * url, const char * range, @@ -260,7 +261,9 @@ tr_webRunWithBuffer( tr_session * session, task->next = web->tasks; web->tasks = task; tr_lockUnlock( web->taskLock ); + return task; } + return NULL; } /** @@ -442,6 +445,12 @@ tr_webClose( tr_session * session, tr_web_close_mode close_mode ) } } +void +tr_webGetTaskInfo( struct tr_web_task * task, tr_web_task_info info, void * dst ) +{ + curl_easy_getinfo( task->curl_easy, (CURLINFO) info, dst ); +} + /***** ****** ****** diff --git a/libtransmission/web.h b/libtransmission/web.h index a38e98b7d..bd8e3e19e 100644 --- a/libtransmission/web.h +++ b/libtransmission/web.h @@ -13,11 +13,22 @@ #ifndef TR_HTTP_H #define TR_HTTP_H +#include + #ifdef __cplusplus extern "C" { #endif struct tr_address; +struct tr_web_task; + +typedef enum +{ + TR_WEB_GET_CODE = CURLINFO_RESPONSE_CODE, + TR_WEB_GET_REDIRECTS = CURLINFO_REDIRECT_COUNT, + TR_WEB_GET_REAL_URL = CURLINFO_EFFECTIVE_URL +} +tr_web_task_info; void tr_webInit( tr_session * session ); @@ -40,22 +51,24 @@ typedef void ( tr_web_done_func )( tr_session * session, const char * tr_webGetResponseStr( long response_code ); -void tr_webRun( tr_session * session, - const char * url, - const char * range, - const char * cookies, - tr_web_done_func done_func, - void * done_func_user_data ); +struct tr_web_task * tr_webRun( tr_session * session, + const char * url, + const char * range, + const char * cookies, + tr_web_done_func done_func, + void * done_func_user_data ); struct evbuffer; -void tr_webRunWithBuffer( tr_session * session, - const char * url, - const char * range, - const char * cookies, - tr_web_done_func done_func, - void * done_func_user_data, - struct evbuffer * buffer ); +struct tr_web_task * tr_webRunWithBuffer( tr_session * session, + const char * url, + const char * range, + const char * cookies, + tr_web_done_func done_func, + void * done_func_user_data, + struct evbuffer * buffer ); + +void tr_webGetTaskInfo( struct tr_web_task * task, tr_web_task_info info, void * dst ); void tr_http_escape( struct evbuffer *out, const char *str, int len, bool escape_slashes ); diff --git a/libtransmission/webseed.c b/libtransmission/webseed.c index ba0e01b7b..4e4c74e71 100644 --- a/libtransmission/webseed.c +++ b/libtransmission/webseed.c @@ -22,6 +22,7 @@ #include "list.h" #include "peer-mgr.h" #include "torrent.h" +#include "trevent.h" /* tr_runInEventThread() */ #include "utils.h" #include "web.h" #include "webseed.h" @@ -35,6 +36,10 @@ struct tr_webseed_task tr_piece_index_t piece_index; uint32_t piece_offset; uint32_t length; + tr_block_index_t blocks_done; + uint32_t block_size; + struct tr_web_task * web_task; + long response_code; int torrent_id; }; @@ -52,19 +57,54 @@ struct tr_webseed int torrent_id; bool is_stopping; int consecutive_failures; + int retry_tickcount; + int retry_challenge; + int idle_connections; + int active_transfers; + char ** file_urls; +}; + +struct tr_blockwrite_info +{ + struct tr_webseed * webseed; + struct evbuffer * data; + tr_piece_index_t piece_index; + tr_block_index_t block_index; + tr_block_index_t count; + uint32_t block_offset; +}; + +struct tr_http_info +{ + struct tr_webseed * webseed; + char * redirect_url; + tr_piece_index_t piece_index; + uint32_t piece_offset; }; enum { TR_IDLE_TIMER_MSEC = 2000, - MAX_CONSECUIVE_FAILURES = 5 + FAILURE_RETRY_INTERVAL = 150, + + MAX_CONSECUTIVE_FAILURES = 5, + + MAX_WEBSEED_CONNECTIONS = 4 }; static void webseed_free( struct tr_webseed * w ) { tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id ); + const tr_info * inf = tr_torrentInfo( tor ); + tr_file_index_t i; + + for( i = 0; i < inf->fileCount; i++ ) { + if( w->file_urls[i] ) + tr_free( w->file_urls[i] ); + } + tr_free( w->file_urls ); /* webseed destruct */ event_free( w->timer ); @@ -89,21 +129,35 @@ publish( tr_webseed * w, tr_peer_event * e ) } static void -fire_client_got_rej( tr_torrent * tor, tr_webseed * w, tr_block_index_t block ) +fire_client_got_rejs( tr_torrent * tor, tr_webseed * w, + tr_block_index_t block, tr_block_index_t count ) { + tr_block_index_t i; tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_REJ; tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length ); - publish( w, &e ); + for( i = 1; i <= count; i++ ) { + if( i == count ) + e.length = tr_torBlockCountBytes( tor, block + count - 1 ); + publish( w, &e ); + e.offset += e.length; + } } static void -fire_client_got_block( tr_torrent * tor, tr_webseed * w, tr_block_index_t block ) +fire_client_got_blocks( tr_torrent * tor, tr_webseed * w, + tr_block_index_t block, tr_block_index_t count ) { + tr_block_index_t i; tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_BLOCK; tr_torrentGetBlockLocation( tor, block, &e.pieceIndex, &e.offset, &e.length ); - publish( w, &e ); + for( i = 1; i <= count; i++ ) { + if( i == count ) + e.length = tr_torBlockCountBytes( tor, block + count - 1 ); + publish( w, &e ); + e.offset += e.length; + } } static void @@ -121,17 +175,132 @@ fire_client_got_data( tr_webseed * w, uint32_t length ) ***/ static void -on_content_changed( struct evbuffer * buf UNUSED, - const struct evbuffer_cb_info * info, - void * vw ) +write_block_func( void * vblock ) { - tr_webseed * w = vw; + struct tr_blockwrite_info * block = vblock; + struct tr_webseed * w = block->webseed; + struct evbuffer * buf = block->data; + struct tr_torrent * tor; - if( ( info->n_added > 0 ) && !w->is_stopping ) + tor = tr_torrentFindFromId( w->session, w->torrent_id ); + if( tor ) + { + const uint32_t block_size = tor->blockSize; + uint32_t len = evbuffer_get_length( buf ); + uint32_t offset_end = block->block_offset + len; + tr_cache * cache = w->session->cache; + tr_piece_index_t piece = block->piece_index; + + while( true ) + { + if( len > block_size) { + tr_cacheWriteBlock( cache, tor, piece, offset_end - len, + block_size, buf ); + len -= block_size; + } + else { + tr_cacheWriteBlock( cache, tor, piece, offset_end - len, + len, buf ); + break; + } + } + fire_client_got_blocks( tor, w, block->block_index, block->count ); + } + + evbuffer_free( buf ); + tr_free( block ); +} + +static void +connection_succeeded( void * vinf ) +{ + struct tr_http_info * inf = vinf; + struct tr_webseed * w = inf->webseed; + struct tr_torrent * tor; + + if( ++w->active_transfers >= w->retry_challenge && w->retry_challenge ) + /* the server seems to be accepting more connections now */ + w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0; + + if( inf->redirect_url && + (tor = tr_torrentFindFromId( w->session, w->torrent_id ))) + { + uint64_t file_offset; + tr_file_index_t file_index; + + tr_ioFindFileLocation( tor, inf->piece_index, inf->piece_offset, + &file_index, &file_offset ); + tr_free( w->file_urls[file_index] ); + w->file_urls[file_index] = inf->redirect_url; + } +} + +static void +on_content_changed( struct evbuffer * buf, + const struct evbuffer_cb_info * info, + void * vtask ) +{ + struct tr_webseed_task * task = vtask; + struct tr_webseed * w = task->webseed; + uint32_t len; + + if( info->n_added <= 0 ) + return; + + if( !w->is_stopping ) { tr_bandwidthUsed( &w->bandwidth, TR_DOWN, info->n_added, true, tr_time_msec( ) ); fire_client_got_data( w, info->n_added ); } + + if( !task->response_code ) { + tr_webGetTaskInfo( task->web_task, TR_WEB_GET_CODE, &task->response_code ); + + if( task->response_code == 206 ) { + struct tr_http_info * inf = tr_new( struct tr_http_info, 1 ); + long redirects; + + inf->webseed = w; + inf->piece_index = task->piece_index; + inf->piece_offset = task->piece_offset; + tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REDIRECTS, &redirects ); + if( redirects ) { + char * redirect_url; + tr_webGetTaskInfo( task->web_task, TR_WEB_GET_REAL_URL, &redirect_url ); + inf->redirect_url = tr_strdup( redirect_url ); + } + else + inf->redirect_url = NULL; + /* run this in the webseed thread to avoid tampering with mutexes and to + not cost the web thrad too much time */ + tr_runInEventThread( task->session, connection_succeeded, inf ); + } + } + + len = evbuffer_get_length( buf ); + + if( task->response_code == 206 && len >= task->block_size ) + { + /* one (ore more) block(s) received. write to hd */ + const uint32_t block_size = task->block_size; + const tr_block_index_t completed = len / block_size; + struct tr_blockwrite_info * b = tr_new( struct tr_blockwrite_info, 1 ); + + b->webseed = task->webseed; + b->piece_index = task->piece_index; + b->block_index = task->block + task->blocks_done; + b->count = completed; + b->block_offset = task->piece_offset + task->blocks_done * block_size; + b->data = evbuffer_new( ); + + /* we don't use locking on this evbuffer so we must copy out the data + that will be needed when writing the block in a different thread */ + evbuffer_remove_buffer( task->content, b->data, + block_size * completed ); + + tr_runInEventThread( task->session, write_block_func, b ); + task->blocks_done += completed; + } } static void task_request_next_chunk( struct tr_webseed_task * task ); @@ -147,6 +316,22 @@ static void on_idle( tr_webseed * w ) { tr_torrent * tor = tr_torrentFindFromId( w->session, w->torrent_id ); + int want, running_tasks = tr_list_size( w->tasks ); + + if( w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES ) { + want = w->idle_connections; + + if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL ) { + /* some time has passed since our connection attempts failed. try again */ + ++want; + /* if this challenge is fulfilled we will reset consecutive_failures */ + w->retry_challenge = running_tasks + want; + } + } + else { + want = MAX_WEBSEED_CONNECTIONS - running_tasks; + w->retry_challenge = running_tasks + w->idle_connections + 1; + } if( w->is_stopping && !webseed_has_tasks( w ) ) { @@ -155,23 +340,23 @@ on_idle( tr_webseed * w ) else if( !w->is_stopping && tor && tor->isRunning && !tr_torrentIsSeed( tor ) - && ( w->consecutive_failures < MAX_CONSECUIVE_FAILURES ) ) + && want ) { int i; int got = 0; - const int max = tor->blockCountInPiece; - const int want = max - tr_list_size( w->tasks ); tr_block_index_t * blocks = NULL; - if( want > 0 ) - { - blocks = tr_new( tr_block_index_t, want ); - tr_peerMgrGetNextRequests( tor, &w->parent, want, blocks, &got ); - } + blocks = tr_new( tr_block_index_t, want*2 ); + tr_peerMgrGetNextRequests( tor, &w->parent, want, blocks, &got, true ); + + w->idle_connections -= MIN( w->idle_connections, got ); + if( w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want ) + w->retry_tickcount = 0; for( i=0; iwebseed = w; task->session = w->session; @@ -180,9 +365,12 @@ on_idle( tr_webseed * w ) task->piece_index = tr_torBlockPiece( tor, b ); task->piece_offset = ( tor->blockSize * b ) - ( tor->info.pieceSize * task->piece_index ); - task->length = tr_torBlockCountBytes( tor, b ); + task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes( tor, be ); + task->blocks_done = 0; + task->response_code = 0; + task->block_size = tor->blockSize; task->content = evbuffer_new( ); - evbuffer_add_cb( task->content, on_content_changed, w ); + evbuffer_add_cb( task->content, on_content_changed, task ); tr_list_append( &w->tasks, task ); task_request_next_chunk( task ); } @@ -206,16 +394,25 @@ web_response_func( tr_session * session, tr_torrent * tor = tr_torrentFindFromId( session, t->torrent_id ); const int success = ( response_code == 206 ); - if( success ) - w->consecutive_failures = 0; - else - ++w->consecutive_failures; - if( tor ) { + /* active_transfers was only increased if the connection was successful */ + if( t->response_code == 206 ) + --w->active_transfers; + if( !success ) { - fire_client_got_rej( tor, w, t->block ); + const tr_block_index_t blocks_remain = (t->length + tor->blockSize - 1) + / tor->blockSize - t->blocks_done; + + if( blocks_remain ) + fire_client_got_rejs( tor, w, t->block + t->blocks_done, blocks_remain ); + + if( t->blocks_done ) + ++w->idle_connections; + else if( ++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && !w->retry_tickcount ) + /* now wait a while until retrying to establish a connection */ + ++w->retry_tickcount; tr_list_remove_data( &w->tasks, t ); evbuffer_free( t->content ); @@ -223,16 +420,30 @@ web_response_func( tr_session * session, } else { - if( evbuffer_get_length( t->content ) < t->length ) + const uint32_t bytes_done = t->blocks_done * tor->blockSize; + const uint32_t buf_len = evbuffer_get_length( t->content ); + + if( bytes_done + buf_len < t->length ) { + /* request finished successfully but there's still data missing. that + means we've reached the end of a file and need to request the next one */ + t->response_code = 0; task_request_next_chunk( t ); } else { - tr_cacheWriteBlock( session->cache, tor, - t->piece_index, t->piece_offset, t->length, - t->content ); - fire_client_got_block( tor, w, t->block ); + if( buf_len ) { + /* on_content_changed() will not write a block if it is smaller than + the torrent's block size, i.e. the torrent's very last block */ + tr_cacheWriteBlock( session->cache, tor, + t->piece_index, t->piece_offset + bytes_done, + buf_len, t->content ); + + fire_client_got_blocks( tor, t->webseed, + t->block + t->blocks_done, 1 ); + } + + ++w->idle_connections; tr_list_remove_data( &w->tasks, t ); evbuffer_free( t->content ); @@ -250,7 +461,6 @@ make_url( tr_webseed * w, const tr_file * file ) struct evbuffer * buf = evbuffer_new( ); evbuffer_add( buf, w->base_url, w->base_url_len ); - evbuffer_add( buf, "", 1 ); /* if url ends with a '/', add the torrent name */ if( w->base_url[w->base_url_len - 1] == '/' && file->name ) @@ -266,14 +476,14 @@ task_request_next_chunk( struct tr_webseed_task * t ) if( tor != NULL ) { char range[64]; - struct evbuffer * url; + char ** urls = t->webseed->file_urls; const tr_info * inf = tr_torrentInfo( tor ); - const uint32_t remain = t->length - evbuffer_get_length( t->content ); + const uint32_t remain = t->length - t->blocks_done * tor->blockSize + - evbuffer_get_length( t->content ); const uint64_t total_offset = inf->pieceSize * t->piece_index - + t->piece_offset - + evbuffer_get_length( t->content ); + + t->piece_offset + t->length - remain; const tr_piece_index_t step_piece = total_offset / inf->pieceSize; const uint32_t step_piece_offset = total_offset - ( inf->pieceSize * step_piece ); @@ -288,12 +498,13 @@ task_request_next_chunk( struct tr_webseed_task * t ) file = &inf->files[file_index]; this_pass = MIN( remain, file->length - file_offset ); - url = make_url( t->webseed, file ); + if( !urls[file_index] ) + urls[file_index] = evbuffer_free_to_str( make_url( t->webseed, file ) ); + tr_snprintf( range, sizeof range, "%"PRIu64"-%"PRIu64, file_offset, file_offset + this_pass - 1 ); - tr_webRunWithBuffer( t->session, (char *) evbuffer_pullup( url, -1 ), - range, NULL, web_response_func, t, t->content ); - evbuffer_free( url ); + t->web_task = tr_webRunWithBuffer( t->session, urls[file_index], + range, NULL, web_response_func, t, t->content ); } } @@ -320,6 +531,8 @@ static void webseed_timer_func( evutil_socket_t foo UNUSED, short bar UNUSED, void * vw ) { tr_webseed * w = vw; + if( w->retry_tickcount ) + ++w->retry_tickcount; on_idle( w ); tr_timerAddMsec( w->timer, TR_IDLE_TIMER_MSEC ); } @@ -332,6 +545,7 @@ tr_webseedNew( struct tr_torrent * tor, { tr_webseed * w = tr_new0( tr_webseed, 1 ); tr_peer * peer = &w->parent; + const tr_info * inf = tr_torrentInfo( tor ); /* construct parent class */ tr_peerConstruct( peer ); @@ -347,6 +561,7 @@ tr_webseedNew( struct tr_torrent * tor, w->base_url = tr_strndup( url, w->base_url_len ); w->callback = callback; w->callback_data = callback_data; + w->file_urls = tr_new0( char *, inf->fileCount ); //tr_rcConstruct( &w->download_rate ); tr_bandwidthConstruct( &w->bandwidth, tor->session, &tor->bandwidth ); w->timer = evtimer_new( w->session->event_base, webseed_timer_func, w );