// This file Copyright © Mnemosyne LLC. // It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), // or any future license endorsed by Mnemosyne LLC. // License text can be found in the licenses/ folder. #include #include #include #include #include #include #include #include // for std::swap() #include #include #include "libtransmission/transmission.h" #include "libtransmission/bandwidth.h" #include "libtransmission/crypto-utils.h" #include "libtransmission/log.h" #include "libtransmission/peer-io.h" #include "libtransmission/tr-assert.h" #include "libtransmission/utils.h" // tr_time_msec() #include "libtransmission/values.h" using namespace libtransmission::Values; Speed tr_bandwidth::get_speed(RateControl& r, unsigned int interval_msec, uint64_t now) { if (now == 0U) { now = tr_time_msec(); } if (now != r.cache_time_) { uint64_t bytes = 0U; uint64_t const cutoff = now - interval_msec; for (int i = r.newest_; r.date_[i] > cutoff;) { bytes += r.size_[i]; if (--i == -1) { i = HistorySize - 1; /* circular history */ } if (i == r.newest_) { break; /* we've come all the way around */ } } r.cache_val_ = Speed{ bytes * 1000U / interval_msec, Speed::Units::Byps }; r.cache_time_ = now; } return r.cache_val_; } void tr_bandwidth::notify_bandwidth_consumed_bytes(uint64_t const now, RateControl& r, size_t size) { if (r.date_[r.newest_] + GranularityMSec >= now) { r.size_[r.newest_] += size; } else { if (++r.newest_ == HistorySize) { r.newest_ = 0; } r.date_[r.newest_] = now; r.size_[r.newest_] = size; } /* invalidate cache_val*/ r.cache_time_ = 0U; } // --- tr_bandwidth::tr_bandwidth(tr_bandwidth* parent, bool is_group) : priority_(is_group ? std::numeric_limits::max() : TR_PRI_NORMAL) { set_parent(parent); } // --- namespace { namespace deparent_helpers { void remove_child(std::vector& v, tr_bandwidth* remove_me) noexcept { // the list isn't sorted -- so instead of erase()ing `it`, // do the cheaper option of overwriting it with the final item if (auto it = std::find(std::begin(v), std::end(v), remove_me); it != std::end(v)) { std::swap(*it, v.back()); v.pop_back(); } } } // namespace deparent_helpers } // namespace void tr_bandwidth::deparent() noexcept { using namespace deparent_helpers; if (parent_ == nullptr) { return; } remove_child(parent_->children_, this); parent_ = nullptr; } void tr_bandwidth::set_parent(tr_bandwidth* new_parent) { TR_ASSERT(this != new_parent); deparent(); if (new_parent != nullptr) { #ifdef TR_ENABLE_ASSERTS TR_ASSERT(new_parent->parent_ != this); auto& children = new_parent->children_; TR_ASSERT(std::find(std::begin(children), std::end(children), this) == std::end(children)); // not already there #endif new_parent->children_.push_back(this); parent_ = new_parent; } } // --- void tr_bandwidth::allocate_bandwidth( tr_priority_t parent_priority, uint64_t period_msec, std::vector>& peer_pool) { auto const priority = std::min(parent_priority, priority_); // set the available bandwidth for (auto const dir : { TR_UP, TR_DOWN }) { if (auto& bandwidth = band_[dir]; bandwidth.is_limited_) { auto const next_pulse_speed = bandwidth.desired_speed_; bandwidth.bytes_left_ = next_pulse_speed.base_quantity() * period_msec / 1000U; } } // add this bandwidth's peer, if any, to the peer pool if (auto shared = peer_.lock(); shared) { TR_ASSERT(tr_isPriority(priority)); shared->set_priority(priority); peer_pool.push_back(std::move(shared)); } // traverse & repeat for the subtree for (auto* child : children_) { child->allocate_bandwidth(priority, period_msec, peer_pool); } } void tr_bandwidth::phase_one(std::vector& peers, tr_direction dir) { // First phase of IO. Tries to distribute bandwidth fairly to keep faster // peers from starving the others. tr_logAddTrace(fmt::format("{} peers to go round-robin for {}", peers.size(), dir == TR_UP ? "upload" : "download")); // Shuffle the peers so they all have equal chance to be first in line. static thread_local auto urbg = tr_urbg{}; std::shuffle(std::begin(peers), std::end(peers), urbg); // Give each peer `Increment` bandwidth bytes to use. Repeat this // process until we run out of bandwidth and/or peers that can use it. for (size_t n_unfinished = std::size(peers); n_unfinished > 0U;) { for (size_t i = 0U; i < n_unfinished;) { // Value of 3000 bytes chosen so that when using µTP we'll send a full-size // frame right away and leave enough buffered data for the next frame to go // out in a timely manner. static auto constexpr Increment = 3000U; auto const bytes_used = peers[i]->flush(dir, Increment); tr_logAddTrace(fmt::format("peer #{} of {} used {} bytes in this pass", i, n_unfinished, bytes_used)); if (bytes_used != Increment) { // peer is done writing for now; move it to the end of the list std::swap(peers[i], peers[n_unfinished - 1]); --n_unfinished; } else { ++i; } } } } void tr_bandwidth::allocate(uint64_t period_msec) { // keep these peers alive for the scope of this function auto refs = std::vector>{}; auto peer_arrays = std::array, 3>{}; auto& high = peer_arrays[0]; auto& normal = peer_arrays[1]; auto& low = peer_arrays[2]; // allocateBandwidth () is a helper function with two purposes: // 1. allocate bandwidth to b and its subtree // 2. accumulate an array of all the peerIos from b and its subtree. allocate_bandwidth(std::numeric_limits::max(), period_msec, refs); for (auto const& io : refs) { io->flush_outgoing_protocol_msgs(); switch (io->priority()) { case TR_PRI_HIGH: high.push_back(io.get()); [[fallthrough]]; case TR_PRI_NORMAL: normal.push_back(io.get()); [[fallthrough]]; case TR_PRI_LOW: low.push_back(io.get()); break; default: TR_ASSERT_MSG(false, "invalid priority"); break; } } // First phase of IO. Tries to distribute bandwidth fairly to keep faster // peers from starving the others. Loop through the peers, giving each a // small chunk of bandwidth. Keep looping until we run out of bandwidth // and/or peers that can use it for (auto& peers : peer_arrays) { phase_one(peers, TR_UP); phase_one(peers, TR_DOWN); } // Second phase of IO. To help us scale in high bandwidth situations, // enable on-demand IO for peers with bandwidth left to burn. // This on-demand IO is enabled until (1) the peer runs out of bandwidth, // or (2) the next tr_bandwidth::allocate () call, when we start over again. for (auto const& io : refs) { io->set_enabled(TR_UP, io->has_bandwidth_left(TR_UP)); io->set_enabled(TR_DOWN, io->has_bandwidth_left(TR_DOWN)); } } // --- size_t tr_bandwidth::clamp(tr_direction const dir, size_t byte_count) const noexcept { TR_ASSERT(tr_isDirection(dir)); if (band_[dir].is_limited_) { byte_count = std::min(byte_count, band_[dir].bytes_left_); } if (parent_ != nullptr && band_[dir].honor_parent_limits_ && byte_count > 0U) { byte_count = parent_->clamp(dir, byte_count); } return byte_count; } void tr_bandwidth::notify_bandwidth_consumed(tr_direction dir, size_t byte_count, bool is_piece_data, uint64_t now) { TR_ASSERT(tr_isDirection(dir)); auto& band = band_[dir]; if (band.is_limited_ && is_piece_data) { band.bytes_left_ -= std::min(band.bytes_left_, byte_count); } notify_bandwidth_consumed_bytes(now, band.raw_, byte_count); if (is_piece_data) { notify_bandwidth_consumed_bytes(now, band.piece_, byte_count); } if (parent_ != nullptr) { parent_->notify_bandwidth_consumed(dir, byte_count, is_piece_data, now); } } // --- tr_bandwidth_limits tr_bandwidth::get_limits() const { auto limits = tr_bandwidth_limits{}; limits.up_limit = get_desired_speed(TR_UP); limits.down_limit = get_desired_speed(TR_DOWN); limits.up_limited = is_limited(TR_UP); limits.down_limited = is_limited(TR_DOWN); return limits; } void tr_bandwidth::set_limits(tr_bandwidth_limits const& limits) { set_desired_speed(TR_UP, limits.up_limit); set_desired_speed(TR_DOWN, limits.down_limit); set_limited(TR_UP, limits.up_limited); set_limited(TR_DOWN, limits.down_limited); }