/* * This file Copyright (C) 2008-2014 Mnemosyne LLC * * It may be used under the GNU GPL versions 2 or 3 * or any future license endorsed by Mnemosyne LLC. * */ #include #include /* strlen() */ #include #include #include "transmission.h" #include "bandwidth.h" #include "cache.h" #include "inout.h" /* tr_ioFindFileLocation() */ #include "list.h" #include "peer-mgr.h" #include "torrent.h" #include "trevent.h" /* tr_runInEventThread() */ #include "utils.h" #include "web.h" #include "webseed.h" struct tr_webseed_task { bool dead; struct evbuffer* content; struct tr_webseed* webseed; tr_session* session; tr_block_index_t block; tr_piece_index_t piece_index; uint32_t piece_offset; uint32_t length; tr_block_index_t blocks_done; uint32_t block_size; struct tr_web_task* web_task; long response_code; }; struct tr_webseed { tr_peer parent; tr_bandwidth bandwidth; tr_session* session; tr_peer_callback callback; void* callback_data; tr_list* tasks; struct event* timer; char* base_url; size_t base_url_len; int torrent_id; int consecutive_failures; int retry_tickcount; int retry_challenge; int idle_connections; int active_transfers; char** file_urls; }; enum { TR_IDLE_TIMER_MSEC = 2000, /* */ FAILURE_RETRY_INTERVAL = 150, /* */ MAX_CONSECUTIVE_FAILURES = 5, /* */ MAX_WEBSEED_CONNECTIONS = 4 }; /*** **** ***/ static void publish(tr_webseed* w, tr_peer_event* e) { if (w->callback != nullptr) { (*w->callback)(&w->parent, e, w->callback_data); } } static void fire_client_got_rejs(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count) { auto e = tr_peer_event{}; e.eventType = TR_PEER_CLIENT_GOT_REJ; tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length); for (tr_block_index_t i = 1; i <= count; i++) { if (i == count) { e.length = tr_torBlockCountBytes(tor, block + count - 1); } publish(w, &e); e.offset += e.length; } } static void fire_client_got_blocks(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count) { auto e = tr_peer_event{}; e.eventType = TR_PEER_CLIENT_GOT_BLOCK; tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length); for (tr_block_index_t i = 1; i <= count; i++) { if (i == count) { e.length = tr_torBlockCountBytes(tor, block + count - 1); } publish(w, &e); e.offset += e.length; } } static void fire_client_got_piece_data(tr_webseed* w, uint32_t length) { auto e = tr_peer_event{}; e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; e.length = length; publish(w, &e); } /*** **** ***/ struct write_block_data { tr_session* session; int torrent_id; struct tr_webseed* webseed; struct evbuffer* content; tr_piece_index_t piece_index; tr_block_index_t block_index; tr_block_index_t count; uint32_t block_offset; }; static void write_block_func(void* vdata) { auto* data = static_cast(vdata); struct tr_webseed* w = data->webseed; struct evbuffer* buf = data->content; struct tr_torrent* tor; tor = tr_torrentFindFromId(data->session, data->torrent_id); if (tor != nullptr) { uint32_t const block_size = tor->blockSize; uint32_t len = evbuffer_get_length(buf); uint32_t const offset_end = data->block_offset + len; tr_cache* cache = data->session->cache; tr_piece_index_t const piece = data->piece_index; if (!tr_torrentPieceIsComplete(tor, piece)) { while (len > 0) { uint32_t const bytes_this_pass = std::min(len, block_size); tr_cacheWriteBlock(cache, tor, piece, offset_end - len, bytes_this_pass, buf); len -= bytes_this_pass; } fire_client_got_blocks(tor, w, data->block_index, data->count); } } evbuffer_free(buf); tr_free(data); } /*** **** ***/ struct connection_succeeded_data { struct tr_webseed* webseed; char* real_url; tr_piece_index_t piece_index; uint32_t piece_offset; }; static void connection_succeeded(void* vdata) { auto* data = static_cast(vdata); struct tr_webseed* w = data->webseed; if (++w->active_transfers >= w->retry_challenge && w->retry_challenge != 0) { /* the server seems to be accepting more connections now */ w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0; } if (data->real_url != nullptr) { tr_torrent const* const tor = tr_torrentFindFromId(w->session, w->torrent_id); if (tor != nullptr) { uint64_t file_offset; tr_file_index_t file_index; tr_ioFindFileLocation(tor, data->piece_index, data->piece_offset, &file_index, &file_offset); tr_free(w->file_urls[file_index]); w->file_urls[file_index] = data->real_url; data->real_url = nullptr; } } tr_free(data->real_url); tr_free(data); } /*** **** ***/ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info const* info, void* vtask) { size_t const n_added = info->n_added; auto* task = static_cast(vtask); tr_session* session = task->session; tr_sessionLock(session); if (!task->dead && n_added > 0) { uint32_t len; struct tr_webseed* w = task->webseed; tr_bandwidthUsed(&w->bandwidth, TR_DOWN, n_added, true, tr_time_msec()); fire_client_got_piece_data(w, n_added); len = evbuffer_get_length(buf); if (task->response_code == 0) { task->response_code = tr_webGetTaskResponseCode(task->web_task); if (task->response_code == 206) { struct connection_succeeded_data* data; data = tr_new(struct connection_succeeded_data, 1); data->webseed = w; data->real_url = tr_strdup(tr_webGetTaskRealUrl(task->web_task)); data->piece_index = task->piece_index; data->piece_offset = task->piece_offset + task->blocks_done * task->block_size + len - 1; /* processing this uses a tr_torrent pointer, so push the work to the libevent thread... */ tr_runInEventThread(w->session, connection_succeeded, data); } } if (task->response_code == 206 && len >= task->block_size) { /* once we've got at least one full block, save it */ struct write_block_data* data; uint32_t const block_size = task->block_size; tr_block_index_t const completed = len / block_size; data = tr_new(struct write_block_data, 1); data->webseed = task->webseed; data->piece_index = task->piece_index; data->block_index = task->block + task->blocks_done; data->count = completed; data->block_offset = task->piece_offset + task->blocks_done * block_size; data->content = evbuffer_new(); data->torrent_id = w->torrent_id; data->session = w->session; /* we don't use locking on this evbuffer so we must copy out the data that will be needed when writing the block in a different thread */ evbuffer_remove_buffer(task->content, data->content, (size_t)block_size * (size_t)completed); tr_runInEventThread(w->session, write_block_func, data); task->blocks_done += completed; } } tr_sessionUnlock(session); } static void task_request_next_chunk(struct tr_webseed_task* task); static void on_idle(tr_webseed* w) { int want; int running_tasks = tr_list_size(w->tasks); tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id); if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES) { want = w->idle_connections; if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL) { /* some time has passed since our connection attempts failed. try again */ ++want; /* if this challenge is fulfilled we will reset consecutive_failures */ w->retry_challenge = running_tasks + want; } } else { want = MAX_WEBSEED_CONNECTIONS - running_tasks; w->retry_challenge = running_tasks + w->idle_connections + 1; } if (tor != nullptr && tor->isRunning && !tr_torrentIsSeed(tor) && want > 0) { int got = 0; tr_block_index_t* blocks = nullptr; blocks = tr_new(tr_block_index_t, want * 2); tr_peerMgrGetNextRequests(tor, &w->parent, want, blocks, &got, true); w->idle_connections -= std::min(w->idle_connections, got); if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want) { w->retry_tickcount = 0; } for (int i = 0; i < got; ++i) { tr_block_index_t const b = blocks[i * 2]; tr_block_index_t const be = blocks[i * 2 + 1]; struct tr_webseed_task* task; task = tr_new0(struct tr_webseed_task, 1); task->session = tor->session; task->webseed = w; task->block = b; task->piece_index = tr_torBlockPiece(tor, b); task->piece_offset = tor->blockSize * b - tor->info.pieceSize * task->piece_index; task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes(tor, be); task->blocks_done = 0; task->response_code = 0; task->block_size = tor->blockSize; task->content = evbuffer_new(); evbuffer_add_cb(task->content, on_content_changed, task); tr_list_append(&w->tasks, task); task_request_next_chunk(task); } tr_free(blocks); } } static void web_response_func( tr_session* session, bool did_connect, bool did_timeout, long response_code, void const* response, size_t response_byte_count, void* vtask) { TR_UNUSED(did_connect); TR_UNUSED(did_timeout); TR_UNUSED(response); TR_UNUSED(response_byte_count); auto* t = static_cast(vtask); bool const success = response_code == 206; if (t->dead) { evbuffer_free(t->content); tr_free(t); return; } tr_webseed* w = t->webseed; tr_torrent* tor = tr_torrentFindFromId(session, w->torrent_id); if (tor != nullptr) { /* active_transfers was only increased if the connection was successful */ if (t->response_code == 206) { --w->active_transfers; } if (!success) { tr_block_index_t const blocks_remain = (t->length + tor->blockSize - 1) / tor->blockSize - t->blocks_done; if (blocks_remain != 0) { fire_client_got_rejs(tor, w, t->block + t->blocks_done, blocks_remain); } if (t->blocks_done != 0) { ++w->idle_connections; } else if (++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && w->retry_tickcount == 0) { /* now wait a while until retrying to establish a connection */ ++w->retry_tickcount; } tr_list_remove_data(&w->tasks, t); evbuffer_free(t->content); tr_free(t); } else { uint32_t const bytes_done = t->blocks_done * tor->blockSize; uint32_t const buf_len = evbuffer_get_length(t->content); if (bytes_done + buf_len < t->length) { /* request finished successfully but there's still data missing. that means we've reached the end of a file and need to request the next one */ t->response_code = 0; task_request_next_chunk(t); } else { if (buf_len != 0 && !tr_torrentPieceIsComplete(tor, t->piece_index)) { /* on_content_changed() will not write a block if it is smaller than the torrent's block size, i.e. the torrent's very last block */ tr_cacheWriteBlock(session->cache, tor, t->piece_index, t->piece_offset + bytes_done, buf_len, t->content); fire_client_got_blocks(tor, t->webseed, t->block + t->blocks_done, 1); } ++w->idle_connections; tr_list_remove_data(&w->tasks, t); evbuffer_free(t->content); tr_free(t); on_idle(w); } } } } static struct evbuffer* make_url(tr_webseed* w, tr_file const* file) { struct evbuffer* buf = evbuffer_new(); evbuffer_add(buf, w->base_url, w->base_url_len); /* if url ends with a '/', add the torrent name */ if (w->base_url[w->base_url_len - 1] == '/' && file->name != nullptr) { tr_http_escape(buf, file->name, strlen(file->name), false); } return buf; } static void task_request_next_chunk(struct tr_webseed_task* t) { tr_webseed* w = t->webseed; tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id); if (tor != nullptr) { char range[64]; char** urls = t->webseed->file_urls; tr_info const* inf = tr_torrentInfo(tor); uint64_t const remain = t->length - t->blocks_done * tor->blockSize - evbuffer_get_length(t->content); uint64_t const total_offset = tr_pieceOffset(tor, t->piece_index, t->piece_offset, t->length - remain); tr_piece_index_t const step_piece = total_offset / inf->pieceSize; uint64_t const step_piece_offset = total_offset - (uint64_t)inf->pieceSize * step_piece; tr_file_index_t file_index; tr_file const* file; uint64_t file_offset; uint64_t this_pass; tr_ioFindFileLocation(tor, step_piece, step_piece_offset, &file_index, &file_offset); file = &inf->files[file_index]; this_pass = std::min(remain, file->length - file_offset); if (urls[file_index] == nullptr) { urls[file_index] = evbuffer_free_to_str(make_url(t->webseed, file), nullptr); } tr_snprintf(range, sizeof(range), "%" PRIu64 "-%" PRIu64, file_offset, file_offset + this_pass - 1); t->web_task = tr_webRunWebseed(tor, urls[file_index], range, web_response_func, t, t->content); } } /*** **** ***/ static void webseed_timer_func(evutil_socket_t fd, short what, void* vw) { TR_UNUSED(fd); TR_UNUSED(what); auto* w = static_cast(vw); if (w->retry_tickcount != 0) { ++w->retry_tickcount; } on_idle(w); tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC); } /*** **** tr_peer virtual functions ***/ static bool webseed_is_transferring_pieces(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* setme_Bps) { unsigned int Bps = 0; bool is_active = false; if (direction == TR_DOWN) { tr_webseed const* w = (tr_webseed const*)peer; is_active = w->tasks != nullptr; Bps = tr_bandwidthGetPieceSpeed_Bps(&w->bandwidth, now, direction); } if (setme_Bps != nullptr) { *setme_Bps = Bps; } return is_active; } static void webseed_destruct(tr_peer* peer) { tr_webseed* w = (tr_webseed*)peer; /* flag all the pending tasks as dead */ for (tr_list* l = w->tasks; l != nullptr; l = l->next) { auto* task = static_cast(l->data); task->dead = true; } tr_list_free(&w->tasks, nullptr); /* if we have an array of file URLs, free it */ if (w->file_urls != nullptr) { tr_torrent const* const tor = tr_torrentFindFromId(w->session, w->torrent_id); tr_info const* const inf = tr_torrentInfo(tor); for (tr_file_index_t i = 0; i < inf->fileCount; ++i) { tr_free(w->file_urls[i]); } tr_free(w->file_urls); } /* webseed destruct */ event_free(w->timer); tr_bandwidthDestruct(&w->bandwidth); tr_free(w->base_url); /* parent class destruct */ tr_peerDestruct(&w->parent); } static struct tr_peer_virtual_funcs const my_funcs = { webseed_destruct, webseed_is_transferring_pieces, }; /*** **** ***/ tr_webseed* tr_webseedNew(struct tr_torrent* tor, char const* url, tr_peer_callback callback, void* callback_data) { tr_webseed* w = tr_new0(tr_webseed, 1); tr_peer* peer = &w->parent; tr_info const* inf = tr_torrentInfo(tor); /* construct parent class */ tr_peerConstruct(peer, tor); peer->client = TR_KEY_webseeds; peer->funcs = &my_funcs; tr_bitfieldSetHasAll(&peer->have); tr_peerUpdateProgress(tor, peer); w->torrent_id = tr_torrentId(tor); w->session = tor->session; w->base_url_len = strlen(url); w->base_url = tr_strndup(url, w->base_url_len); w->callback = callback; w->callback_data = callback_data; w->file_urls = tr_new0(char*, inf->fileCount); tr_bandwidthConstruct(&w->bandwidth, tor->session, &tor->bandwidth); w->timer = evtimer_new(w->session->event_base, webseed_timer_func, w); tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC); return w; }