1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2025-01-04 05:56:02 +00:00
transmission/libtransmission/webseed.cc
Charles Kerr a822a46e47
refactor: add fetch options object to tr_webRun() (#2620)
* refactor: add fetch options object to tr_webRun()

Fold `tr_webRun()`, `tr_webRunWithCookies()`, and `tr_webRunWebseed()`
into a single API that takes an options argument that callers can
customize to their needs.

Also does a surface cleanup pass to the tr_webseed and tr_web internal
structures, e.g. making fields const where possible, not using raw
pointers, and making some fields private.

This revision is still full of code smells. Refactoring the entire
system is overwhelming, so instead I'm doing it in incremental steps.
2022-02-13 20:09:56 -06:00

551 lines
15 KiB
C++

// This file Copyright © 2008-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <set>
#include <string>
#include <string_view>
#include <event2/buffer.h>
#include <event2/event.h>
#include "transmission.h"
#include "bandwidth.h"
#include "cache.h"
#include "peer-mgr.h"
#include "torrent.h"
#include "trevent.h" /* tr_runInEventThread() */
#include "utils.h"
#include "web-utils.h"
#include "web.h"
#include "webseed.h"
using namespace std::literals;
namespace
{
struct tr_webseed;
void on_idle(tr_webseed* w);
class tr_webseed_task
{
private:
std::shared_ptr<evbuffer> const content_{ evbuffer_new(), evbuffer_free };
public:
tr_webseed_task(tr_torrent* tor, tr_webseed* webseed_in, tr_block_span_t span)
: webseed{ webseed_in }
, session{ tor->session }
, block{ span.begin }
, piece_index{ tor->pieceForBlock(this->block) }
, piece_offset{ static_cast<uint32_t>(
int64_t{ tor->blockSize() } * this->block - tor->pieceSize() * this->piece_index) }
, block_size{ tor->blockSize() }
, length{ (span.end - 1 - span.begin) * tor->blockSize() + tor->blockSize(span.end - 1) }
{
}
tr_webseed* const webseed;
[[nodiscard]] auto* content() const
{
return content_.get();
}
tr_session* const session;
tr_block_index_t const block;
tr_piece_index_t const piece_index;
uint32_t const piece_offset;
uint32_t const block_size;
uint32_t const length;
bool dead = false;
tr_block_index_t blocks_done = 0;
tr_web_task* web_task = nullptr;
long response_code = 0;
};
/**
* Manages how many web tasks should be running at a time.
*
* - When all is well, allow multiple tasks running in parallel.
* - If we get an error, throttle down to only one at a time
* until we get piece data.
* - If we have too many errors in a row, put the peer in timeout
* and don't allow _any_ connections for awhile.
*/
class ConnectionLimiter
{
public:
void taskStarted()
{
++n_tasks;
}
void taskFinished(bool success)
{
if (!success)
{
taskFailed();
}
TR_ASSERT(n_tasks > 0);
--n_tasks;
}
void gotData()
{
TR_ASSERT(n_tasks > 0);
n_consecutive_failures = 0;
paused_until = 0;
}
[[nodiscard]] size_t slotsAvailable() const
{
if (isPaused())
{
return 0;
}
auto const max = maxConnections();
if (n_tasks >= max)
{
return 0;
}
return max - n_tasks;
}
private:
[[nodiscard]] bool isPaused() const
{
return paused_until > tr_time();
}
[[nodiscard]] size_t maxConnections() const
{
return n_consecutive_failures > 0 ? 1 : MaxConnections;
}
void taskFailed()
{
TR_ASSERT(n_tasks > 0);
if (++n_consecutive_failures >= MaxConsecutiveFailures)
{
paused_until = tr_time() + TimeoutIntervalSecs;
}
}
static time_t constexpr TimeoutIntervalSecs = 120;
static size_t constexpr MaxConnections = 4;
static size_t constexpr MaxConsecutiveFailures = MaxConnections;
size_t n_tasks = 0;
size_t n_consecutive_failures = 0;
time_t paused_until = 0;
};
struct tr_webseed : public tr_peer
{
public:
tr_webseed(struct tr_torrent* tor, std::string_view url, tr_peer_callback callback_in, void* callback_data_in)
: tr_peer{ tor }
, torrent_id{ tr_torrentId(tor) }
, base_url{ url }
, callback{ callback_in }
, callback_data{ callback_data_in }
, bandwidth(tor->bandwidth)
, pulse_timer(evtimer_new(session->event_base, &tr_webseed::onTimer, this), event_free)
{
// init parent bits
have.setHasAll();
tr_peerUpdateProgress(tor, this);
startTimer();
}
~tr_webseed() override
{
// flag all the pending tasks as dead
std::for_each(std::begin(tasks), std::end(tasks), [](auto* task) { task->dead = true; });
tasks.clear();
}
bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override
{
unsigned int Bps = 0;
bool is_active = false;
if (direction == TR_DOWN)
{
is_active = !std::empty(tasks);
Bps = bandwidth.getPieceSpeedBytesPerSecond(now, direction);
}
if (setme_Bps != nullptr)
{
*setme_Bps = Bps;
}
return is_active;
}
int const torrent_id;
std::string const base_url;
tr_peer_callback const callback;
void* const callback_data;
Bandwidth bandwidth;
ConnectionLimiter connection_limiter;
std::set<tr_webseed_task*> tasks;
private:
void startTimer()
{
tr_timerAddMsec(pulse_timer.get(), IdleTimerMsec);
}
static void onTimer(evutil_socket_t /*fd*/, short /*what*/, void* vwebseed)
{
auto* const webseed = static_cast<tr_webseed*>(vwebseed);
on_idle(webseed);
webseed->startTimer();
}
std::shared_ptr<event> const pulse_timer;
static int constexpr IdleTimerMsec = 2000;
};
/***
****
***/
void publish(tr_webseed* w, tr_peer_event* e)
{
if (w->callback != nullptr)
{
(*w->callback)(w, e, w->callback_data);
}
}
void fire_client_got_rejs(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count)
{
auto e = tr_peer_event{};
e.eventType = TR_PEER_CLIENT_GOT_REJ;
tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length);
for (tr_block_index_t i = 1; i <= count; i++)
{
if (i == count)
{
e.length = tor->blockSize(block + count - 1);
}
publish(w, &e);
e.offset += e.length;
}
}
void fire_client_got_blocks(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count)
{
auto e = tr_peer_event{};
e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length);
for (tr_block_index_t i = 1; i <= count; i++)
{
if (i == count)
{
e.length = tor->blockSize(block + count - 1);
}
publish(w, &e);
e.offset += e.length;
}
}
void fire_client_got_piece_data(tr_webseed* w, uint32_t length)
{
auto e = tr_peer_event{};
e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
e.length = length;
publish(w, &e);
}
/***
****
***/
struct write_block_data
{
private:
std::shared_ptr<evbuffer> const content_{ evbuffer_new(), evbuffer_free };
public:
write_block_data(tr_session* session_in, int torrent_id_in, tr_webseed* webseed_in)
: session{ session_in }
, torrent_id{ torrent_id_in }
, webseed{ webseed_in }
{
}
[[nodiscard]] auto* content() const
{
return content_.get();
}
tr_session* const session;
int const torrent_id;
tr_webseed* const webseed;
tr_piece_index_t piece_index;
tr_block_index_t block_index;
tr_block_index_t count;
uint32_t block_offset;
};
void write_block_func(void* vdata)
{
auto* const data = static_cast<write_block_data*>(vdata);
struct tr_webseed* const w = data->webseed;
auto* const buf = data->content();
auto* const tor = tr_torrentFindFromId(data->session, data->torrent_id);
if (tor != nullptr)
{
uint32_t const block_size = tor->blockSize();
uint32_t len = evbuffer_get_length(buf);
uint32_t const offset_end = data->block_offset + len;
tr_cache* cache = data->session->cache;
tr_piece_index_t const piece = data->piece_index;
if (!tor->hasPiece(piece))
{
while (len > 0)
{
uint32_t const bytes_this_pass = std::min(len, block_size);
tr_cacheWriteBlock(cache, tor, piece, offset_end - len, bytes_this_pass, buf);
len -= bytes_this_pass;
}
fire_client_got_blocks(tor, w, data->block_index, data->count);
}
}
delete data;
}
/***
****
***/
void on_content_changed(evbuffer* buf, evbuffer_cb_info const* info, void* vtask)
{
size_t const n_added = info->n_added;
auto* const task = static_cast<tr_webseed_task*>(vtask);
auto* const session = task->session;
auto const lock = session->unique_lock();
if (!task->dead && n_added > 0)
{
auto* const w = task->webseed;
w->bandwidth.notifyBandwidthConsumed(TR_DOWN, n_added, true, tr_time_msec());
fire_client_got_piece_data(w, n_added);
uint32_t const len = evbuffer_get_length(buf);
if (task->response_code == 0)
{
task->response_code = tr_webGetTaskResponseCode(task->web_task);
task->webseed->connection_limiter.gotData();
}
if (task->response_code == 206 && len >= task->block_size)
{
/* once we've got at least one full block, save it */
uint32_t const block_size = task->block_size;
tr_block_index_t const completed = len / block_size;
auto* const data = new write_block_data{ session, w->torrent_id, task->webseed };
data->piece_index = task->piece_index;
data->block_index = task->block + task->blocks_done;
data->count = completed;
data->block_offset = task->piece_offset + task->blocks_done * block_size;
/* we don't use locking on this evbuffer so we must copy out the data
that will be needed when writing the block in a different thread */
evbuffer_remove_buffer(task->content(), data->content(), (size_t)block_size * (size_t)completed);
tr_runInEventThread(w->session, write_block_func, data);
task->blocks_done += completed;
}
}
}
void task_request_next_chunk(tr_webseed_task* task);
void on_idle(tr_webseed* w)
{
auto* const tor = tr_torrentFindFromId(w->session, w->torrent_id);
if (tor == nullptr || !tor->isRunning || tor->isDone())
{
return;
}
for (auto const span : tr_peerMgrGetNextRequests(tor, w, w->connection_limiter.slotsAvailable()))
{
w->connection_limiter.taskStarted();
auto* const task = new tr_webseed_task{ tor, w, span };
evbuffer_add_cb(task->content(), on_content_changed, task);
w->tasks.insert(task);
task_request_next_chunk(task);
tr_peerMgrClientSentRequests(tor, w, span);
}
}
void web_response_func(
tr_session* session,
bool /*did_connect*/,
bool /*did_timeout*/,
long response_code,
std::string_view /*response*/,
void* vtask)
{
auto* const t = static_cast<tr_webseed_task*>(vtask);
bool const success = response_code == 206;
tr_webseed* w = t->webseed;
w->connection_limiter.taskFinished(success);
if (t->dead)
{
delete t;
return;
}
tr_torrent* tor = tr_torrentFindFromId(session, w->torrent_id);
if (tor != nullptr)
{
if (!success)
{
tr_block_index_t const blocks_remain = (t->length + tor->blockSize() - 1) / tor->blockSize() - t->blocks_done;
if (blocks_remain != 0)
{
fire_client_got_rejs(tor, w, t->block + t->blocks_done, blocks_remain);
}
w->tasks.erase(t);
delete t;
}
else
{
uint32_t const bytes_done = t->blocks_done * tor->blockSize();
uint32_t const buf_len = evbuffer_get_length(t->content());
if (bytes_done + buf_len < t->length)
{
/* request finished successfully but there's still data missing. that
means we've reached the end of a file and need to request the next one */
t->response_code = 0;
task_request_next_chunk(t);
}
else
{
if (buf_len != 0 && !tor->hasPiece(t->piece_index))
{
/* on_content_changed() will not write a block if it is smaller than
the torrent's block size, i.e. the torrent's very last block */
tr_cacheWriteBlock(
session->cache,
tor,
t->piece_index,
t->piece_offset + bytes_done,
buf_len,
t->content());
fire_client_got_blocks(tor, t->webseed, t->block + t->blocks_done, 1);
}
w->tasks.erase(t);
delete t;
on_idle(w);
}
}
}
}
std::string make_url(tr_webseed* w, std::string_view name)
{
auto url = w->base_url;
if (tr_strvEndsWith(url, "/"sv) && !std::empty(name))
{
tr_http_escape(url, name, false);
}
return url;
}
void task_request_next_chunk(tr_webseed_task* t)
{
tr_webseed* w = t->webseed;
tr_torrent* const tor = tr_torrentFindFromId(w->session, w->torrent_id);
if (tor == nullptr)
{
return;
}
auto const piece_size = tor->pieceSize();
uint64_t const remain = t->length - t->blocks_done * tor->blockSize() - evbuffer_get_length(t->content());
auto const total_offset = tor->offset(t->piece_index, t->piece_offset, t->length - remain);
tr_piece_index_t const step_piece = total_offset / piece_size;
uint64_t const step_piece_offset = total_offset - uint64_t(piece_size) * step_piece;
auto const [file_index, file_offset] = tor->fileOffset(step_piece, step_piece_offset);
uint64_t this_pass = std::min(remain, tor->fileSize(file_index) - file_offset);
auto const url = make_url(t->webseed, tor->fileSubpath(file_index));
auto options = tr_web_options{ url, web_response_func, t };
options.range = tr_strvJoin(std::to_string(file_offset), "-"sv, std::to_string(file_offset + this_pass - 1));
options.torrent_id = tor->uniqueId;
options.buffer = t->content();
t->web_task = tr_webRun(tor->session, std::move(options));
}
} // namespace
/***
****
***/
tr_peer* tr_webseedNew(tr_torrent* torrent, std::string_view url, tr_peer_callback callback, void* callback_data)
{
return new tr_webseed(torrent, url, callback, callback_data);
}
tr_webseed_view tr_webseedView(tr_peer const* peer)
{
auto const* w = dynamic_cast<tr_webseed const*>(peer);
if (w == nullptr)
{
return {};
}
auto bytes_per_second = unsigned{ 0 };
auto const is_downloading = peer->is_transferring_pieces(tr_time_msec(), TR_DOWN, &bytes_per_second);
return { w->base_url.c_str(), is_downloading, bytes_per_second };
}