From 56b27057fd29d1f8d6355043dbd40fb8e5215423 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Tue, 4 Jul 2023 15:47:18 -0500 Subject: [PATCH] refactor: add lightweight observable / observer for decoupling (#5716) --- libtransmission/CMakeLists.txt | 1 + libtransmission/observable.h | 110 ++++++++++++++ libtransmission/peer-mgr.cc | 264 ++++++++++++++++----------------- libtransmission/peer-mgr.h | 18 --- libtransmission/session.cc | 5 +- libtransmission/session.h | 7 +- libtransmission/torrent.cc | 25 ++-- libtransmission/torrent.h | 10 ++ 8 files changed, 270 insertions(+), 170 deletions(-) create mode 100644 libtransmission/observable.h diff --git a/libtransmission/CMakeLists.txt b/libtransmission/CMakeLists.txt index 72e51c798..3e344bd0e 100644 --- a/libtransmission/CMakeLists.txt +++ b/libtransmission/CMakeLists.txt @@ -75,6 +75,7 @@ target_sources(${TR_NAME} mime-types.h net.cc net.h + observable.h open-files.cc open-files.h peer-common.h diff --git a/libtransmission/observable.h b/libtransmission/observable.h new file mode 100644 index 000000000..c172604ac --- /dev/null +++ b/libtransmission/observable.h @@ -0,0 +1,110 @@ +// This file Copyright © 2023 Mnemosyne LLC. +// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), +// or any future license endorsed by Mnemosyne LLC. +// License text can be found in the licenses/ folder. + +#pragma once + +#ifndef __TRANSMISSION__ +#error only libtransmission should #include this header. +#endif + +#include // for std::move +#include // for size_t +#include + +#include + +#include "tr-assert.h" + +namespace libtransmission +{ + +// An RAII-based subscription to an Observable. +// Returned by SimpleObservable::observe(). +// Let it go out-of-scope to cancel the subscription. +class ObserverTag +{ +public: + using Callback = std::function; + + ObserverTag() = default; + + ObserverTag(ObserverTag&& that) + { + on_destroy_ = std::move(that.on_destroy_); + that.on_destroy_ = nullptr; + } + + ObserverTag& operator=(ObserverTag&& that) + { + on_destroy_ = std::move(that.on_destroy_); + that.on_destroy_ = nullptr; + return *this; + } + + ObserverTag(ObserverTag const&) = delete; + ObserverTag& operator=(ObserverTag const&) = delete; + + ObserverTag(Callback on_destroy) + : on_destroy_{ std::move(on_destroy) } + { + } + + ~ObserverTag() + { + if (on_destroy_) + on_destroy_(); + } + +private: + Callback on_destroy_; +}; + +// A simple observer/observable implementation. +// Intentionally avoids edge cases like thread safety and +// remove-during-emit; this is meant to be as lightweight +// as possible for very basic use cases. +template +class SimpleObservable +{ + using Key = size_t; + +public: + using Observer = std::function; + + ~SimpleObservable() + { + TR_ASSERT(std::empty(observers_)); + } + + auto observe(Observer observer) + { + auto const key = next_key_++; + observers_.emplace(key, std::move(observer)); + return ObserverTag{ [this, key]() + { + remove(key); + } }; + } + + void emit(Args... args) const + { + for (auto& [tag, observer] : observers_) + { + observer((args)...); + } + } + +private: + void remove(Key key) + { + [[maybe_unused]] auto const n_removed = observers_.erase(key); + TR_ASSERT(n_removed == 1U); + } + + static auto inline next_key_ = Key{ 1U }; + small::map observers_; +}; + +} // namespace libtransmission diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 460eba443..80c3396db 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -317,7 +317,18 @@ public: tr_swarm(tr_peerMgr* manager_in, tr_torrent* tor_in) noexcept : manager{ manager_in } , tor{ tor_in } + , tags_{ { + tor_in->done_.observe([this](tr_torrent*, bool) { on_torrent_done(); }), + tor_in->doomed_.observe([this](tr_torrent*) { on_torrent_doomed(); }), + tor_in->got_bad_piece_.observe([this](tr_torrent*, tr_piece_index_t p) { on_got_bad_piece(p); }), + tor_in->got_metainfo_.observe([this](tr_torrent*) { on_got_metainfo(); }), + tor_in->piece_completed_.observe([this](tr_torrent*, tr_piece_index_t p) { on_piece_completed(p); }), + tor_in->started_.observe([this](tr_torrent*) { on_torrent_started(); }), + tor_in->stopped_.observe([this](tr_torrent*) { on_torrent_stopped(); }), + tor_in->swarm_is_all_seeds_.observe([this](tr_torrent* /*tor*/) { on_swarm_is_all_seeds(); }), + } } { + rebuildWebseeds(); } @@ -471,11 +482,6 @@ public: return *pool_is_all_seeds_; } - void markAllSeedsFlagDirty() noexcept - { - pool_is_all_seeds_.reset(); - } - [[nodiscard]] peer_atom* get_existing_atom(std::pair const& socket_address) noexcept { auto&& it = pool.find(socket_address); @@ -509,7 +515,7 @@ public: atom->flags |= flags; } - markAllSeedsFlagDirty(); + mark_all_seeds_flag_dirty(); return atom; } @@ -518,7 +524,7 @@ public: { tr_logAddTraceSwarm(this, fmt::format("marking peer {} as a seed", atom.display_name())); atom.flags |= ADDED_F_SEED_FLAG; - markAllSeedsFlagDirty(); + mark_all_seeds_flag_dirty(); } static void peerCallbackFunc(tr_peer* peer, tr_peer_event const& event, void* vs) @@ -664,12 +670,108 @@ private: } } + void mark_all_seeds_flag_dirty() noexcept + { + pool_is_all_seeds_.reset(); + } + + void on_torrent_doomed() + { + auto const lock = tor->unique_lock(); + stop(); + tor->swarm = nullptr; + delete this; + } + + void on_torrent_done() + { + std::for_each(std::begin(peers), std::end(peers), [](auto* const peer) { peer->set_interested(false); }); + } + + void on_swarm_is_all_seeds() + { + auto const lock = tor->unique_lock(); + + for (auto& [socket_address, atom] : pool) + { + mark_atom_as_seed(atom); + } + + mark_all_seeds_flag_dirty(); + } + + void on_piece_completed(tr_piece_index_t piece) + { + bool piece_came_from_peers = false; + + for (auto* const peer : peers) + { + // notify the peer that we now have this piece + peer->on_piece_completed(piece); + + if (!piece_came_from_peers) + { + piece_came_from_peers = peer->blame.test(piece); + } + } + + if (piece_came_from_peers) /* webseed downloads don't belong in announce totals */ + { + tr_announcerAddBytes(tor, TR_ANN_DOWN, tor->piece_size(piece)); + } + } + + void on_got_bad_piece(tr_piece_index_t piece) + { + auto const byte_count = tor->piece_size(piece); + + for (auto* const peer : peers) + { + if (peer->blame.test(piece)) + { + tr_logAddTraceSwarm( + this, + fmt::format( + "peer {} contributed to corrupt piece ({}); now has {} strikes", + peer->display_name(), + piece, + peer->strikes + 1)); + addStrike(peer); + } + } + + tr_announcerAddBytes(tor, TR_ANN_CORRUPT, byte_count); + } + + void on_got_metainfo() + { + // the webseed list may have changed... + rebuildWebseeds(); + + // some peer_msgs' progress fields may not be accurate if we + // didn't have the metadata before now... so refresh them all... + for (auto* peer : peers) + { + peer->onTorrentGotMetainfo(); + + if (peer->isSeed()) + { + mark_atom_as_seed(*peer->atom); + } + } + } + + void on_torrent_started(); + void on_torrent_stopped(); + // number of bad pieces a peer is allowed to send before we ban them static auto constexpr MaxBadPiecesPerPeer = int{ 5 }; // how long we'll let requests we've made linger before we cancel them static auto constexpr RequestTtlSecs = int{ 90 }; + std::array const tags_; + mutable std::optional pool_is_all_seeds_; bool is_endgame_ = false; @@ -683,6 +785,7 @@ struct tr_peerMgr , bandwidth_timer_{ session->timerMaker().create([this]() { bandwidthPulse(); }) } , rechoke_timer_{ session->timerMaker().create([this]() { rechokePulseMarshall(); }) } , refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) } + , blocklist_tag_{ session->blocklist_changed_.observe([this]() { on_blocklist_changed(); }) } { bandwidth_timer_->start_repeating(BandwidthPeriod); rechoke_timer_->start_repeating(RechokePeriod); @@ -734,10 +837,25 @@ private: rechoke_timer_->set_interval(RechokePeriod); } + void on_blocklist_changed() const + { + /* we cache whether or not a peer is blocklisted... + since the blocklist has changed, erase that cached value */ + for (auto* const tor : session->torrents()) + { + for (auto& [socket_address, atom] : tor->swarm->pool) + { + atom.setBlocklistedDirty(); + } + } + } + std::unique_ptr const bandwidth_timer_; std::unique_ptr const rechoke_timer_; std::unique_ptr const refill_upkeep_timer_; + libtransmission::ObserverTag const blocklist_tag_; + static auto constexpr BandwidthPeriod = 500ms; static auto constexpr RechokePeriod = 10s; static auto constexpr RefillUpkeepPeriod = 10s; @@ -787,21 +905,6 @@ void tr_peerMgrFree(tr_peerMgr* manager) // --- -void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr) -{ - /* we cache whether or not a peer is blocklisted... - since the blocklist has changed, erase that cached value */ - for (auto* const tor : mgr->session->torrents()) - { - for (auto& [socket_address, atom] : tor->swarm->pool) - { - atom.setBlocklistedDirty(); - } - } -} - -// --- - void tr_peerMgrSetUtpSupported(tr_torrent* tor, std::pair const& socket_address) { if (auto* const atom = tor->swarm->get_existing_atom(socket_address); atom != nullptr) @@ -943,30 +1046,6 @@ void tr_peerMgr::refillUpkeep() const } } -void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t p) -{ - bool piece_came_from_peers = false; - - for (auto* const peer : tor->swarm->peers) - { - // notify the peer that we now have this piece - peer->on_piece_completed(p); - - if (!piece_came_from_peers) - { - piece_came_from_peers = peer->blame.test(p); - } - } - - if (piece_came_from_peers) /* webseed downloads don't belong in announce totals */ - { - tr_announcerAddBytes(tor, TR_ANN_DOWN, tor->piece_size(p)); - } - - // bookkeeping - tor->set_needs_completeness_check(); -} - namespace { namespace handshake_helpers @@ -1122,20 +1201,6 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket) } } -void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor) -{ - auto const lock = tor->unique_lock(); - - auto* const swarm = tor->swarm; - - for (auto& [socket_address, atom] : swarm->pool) - { - swarm->mark_atom_as_seed(atom); - } - - swarm->markAllSeedsFlagDirty(); -} - size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t n_pex) { size_t n_used = 0; @@ -1205,29 +1270,6 @@ std::vector tr_pex::from_compact_ipv6( // --- -void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t piece_index) -{ - auto* const swarm = tor->swarm; - auto const byte_count = tor->piece_size(piece_index); - - for (auto* const peer : swarm->peers) - { - if (peer->blame.test(piece_index)) - { - tr_logAddTraceSwarm( - swarm, - fmt::format( - "peer {} contributed to corrupt piece ({}); now has {} strikes", - peer->display_name(), - piece_index, - peer->strikes + 1)); - swarm->addStrike(peer); - } - } - - tr_announcerAddBytes(tor, TR_ANN_CORRUPT, byte_count); -} - namespace { namespace get_peers_helpers @@ -1354,23 +1396,16 @@ std::vector tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty return pex; } -void tr_peerMgrStartTorrent(tr_torrent* tor) +void tr_swarm::on_torrent_started() { - TR_ASSERT(tr_isTorrent(tor)); auto const lock = tor->unique_lock(); - - tr_swarm* const swarm = tor->swarm; - - swarm->is_running = true; - - swarm->manager->rechokeSoon(); + is_running = true; + manager->rechokeSoon(); } -void tr_peerMgrStopTorrent(tr_torrent* tor) +void tr_swarm::on_torrent_stopped() { - TR_ASSERT(tr_isTorrent(tor)); - - tor->swarm->stop(); + stop(); } void tr_peerMgrAddTorrent(tr_peerMgr* manager, tr_torrent* tor) @@ -1382,36 +1417,6 @@ void tr_peerMgrAddTorrent(tr_peerMgr* manager, tr_torrent* tor) tor->swarm = new tr_swarm{ manager, tor }; } -void tr_peerMgrRemoveTorrent(tr_torrent* tor) -{ - TR_ASSERT(tr_isTorrent(tor)); - auto const lock = tor->unique_lock(); - - tor->swarm->stop(); - delete tor->swarm; - tor->swarm = nullptr; -} - -void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor) -{ - auto* const swarm = tor->swarm; - - /* the webseed list may have changed... */ - swarm->rebuildWebseeds(); - - /* some peer_msgs' progress fields may not be accurate if we - didn't have the metadata before now... so refresh them all... */ - for (auto* peer : swarm->peers) - { - peer->onTorrentGotMetainfo(); - - if (peer->isSeed()) - { - swarm->mark_atom_as_seed(*peer->atom); - } - } -} - int8_t tr_peerMgrPieceAvailability(tr_torrent const* tor, tr_piece_index_t piece) { if (!tor->has_metainfo()) @@ -1644,15 +1649,6 @@ tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, size_t* setme_count) return ret; } -void tr_peerMgrClearInterest(tr_torrent* tor) -{ - TR_ASSERT(tr_isTorrent(tor)); - auto const lock = tor->unique_lock(); - - auto& peers = tor->swarm->peers; - std::for_each(std::begin(peers), std::end(peers), [](auto* const peer) { peer->set_interested(false); }); -} - namespace { namespace update_interest_helpers diff --git a/libtransmission/peer-mgr.h b/libtransmission/peer-mgr.h index b808caf9b..2db016504 100644 --- a/libtransmission/peer-mgr.h +++ b/libtransmission/peer-mgr.h @@ -179,8 +179,6 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket); size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t n_pex); -void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor); - enum { TR_PEERS_CONNECTED, @@ -193,14 +191,8 @@ enum uint8_t peer_list_mode, size_t max_peer_count); -void tr_peerMgrStartTorrent(tr_torrent* tor); - -void tr_peerMgrStopTorrent(tr_torrent* tor); - void tr_peerMgrAddTorrent(tr_peerMgr* manager, struct tr_torrent* tor); -void tr_peerMgrRemoveTorrent(tr_torrent* tor); - // return the number of connected peers that have `piece`, or -1 if we already have it [[nodiscard]] int8_t tr_peerMgrPieceAvailability(tr_torrent const* tor, tr_piece_index_t piece); @@ -208,18 +200,8 @@ void tr_peerMgrTorrentAvailability(tr_torrent const* tor, int8_t* tab, unsigned [[nodiscard]] uint64_t tr_peerMgrGetDesiredAvailable(tr_torrent const* tor); -void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor); - -void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr); - [[nodiscard]] struct tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, size_t* setme_count); [[nodiscard]] tr_webseed_view tr_peerMgrWebseed(tr_torrent const* tor, size_t i); -void tr_peerMgrClearInterest(tr_torrent* tor); - -void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t piece_index); - -void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t pieceIndex); - /* @} */ diff --git a/libtransmission/session.cc b/libtransmission/session.cc index 4de1b383a..51a2f1b32 100644 --- a/libtransmission/session.cc +++ b/libtransmission/session.cc @@ -1617,10 +1617,7 @@ void tr_sessionReloadBlocklists(tr_session* session) { session->blocklists_ = libtransmission::Blocklist::loadBlocklists(session->blocklist_dir_, session->useBlocklist()); - if (session->peer_mgr_) - { - tr_peerMgrOnBlocklistChanged(session->peer_mgr_.get()); - } + session->blocklist_changed_.emit(); } size_t tr_blocklistGetRuleCount(tr_session const* session) diff --git a/libtransmission/session.h b/libtransmission/session.h index 17893864f..fca71043f 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -38,6 +38,7 @@ #include "global-ip-cache.h" #include "interned-string.h" #include "net.h" // tr_socket_t +#include "observable.h" #include "open-files.h" #include "port-forwarding.h" #include "quark.h" @@ -1113,6 +1114,10 @@ private: std::vector blocklists_; +public: + libtransmission::SimpleObservable<> blocklist_changed_; + +private: /// other fields // depends-on: session_thread_, settings_.bind_address_ipv4, local_peer_port_, global_ip_cache (via tr_session::bind_address()) @@ -1162,7 +1167,7 @@ public: std::unique_ptr cache = std::make_unique(torrents_, 1024 * 1024 * 2); private: - // depends-on: timer_maker_, top_bandwidth_, utp_context, torrents_, web_ + // depends-on: timer_maker_, top_bandwidth_, utp_context, torrents_, web_, blocklist_changed_ std::unique_ptr peer_mgr_; // depends-on: peer_mgr_, advertised_peer_port_, torrents_ diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index 00e5b3f66..58aaaaa42 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -720,7 +720,7 @@ void torrentStartImpl(tr_torrent* const tor) torrentResetTransferStats(tor); tor->session->announcer_->startTorrent(tor); tor->lpdAnnounceAt = now; - tr_peerMgrStartTorrent(tor); + tor->started_.emit(tor); } bool removeTorrentFile(char const* filename, void* /*user_data*/, tr_error** error) @@ -763,7 +763,7 @@ void freeTorrent(tr_torrent* tor) tr_session* session = tor->session; - tr_peerMgrRemoveTorrent(tor); + tor->doomed_.emit(tor); session->announcer_->removeTorrent(tor); @@ -869,7 +869,7 @@ void torrentStop(tr_torrent* const tor) tor->session->verifyRemove(tor); - tr_peerMgrStopTorrent(tor); + tor->stopped_.emit(tor); tor->session->announcer_->stopTorrent(tor); tor->session->closeTorrentFiles(tor); @@ -1174,7 +1174,7 @@ void tr_torrent::set_metainfo(tr_torrent_metainfo tm) metainfo_ = std::move(tm); torrentInitFromInfoDict(this); - tr_peerMgrOnTorrentGotMetainfo(this); + got_metainfo_.emit(this); session->onMetadataCompleted(this); this->set_dirty(); this->mark_edited(); @@ -1911,16 +1911,12 @@ void tr_torrent::recheck_completeness() this->doneDate = tr_time(); } - if (was_leeching && was_running) - { - /* clear interested flag on all peers */ - tr_peerMgrClearInterest(this); - } - if (this->current_dir() == this->incomplete_dir()) { this->set_location(this->download_dir(), true, nullptr, nullptr); } + + done_.emit(this, recent_change); } this->session->onTorrentCompletenessChanged(this, completeness, was_running); @@ -2168,7 +2164,7 @@ void tr_torrent::on_tracker_response(tr_tracker_event const* event) case tr_tracker_event::Type::Counts: if (is_private() && (event->leechers == 0)) { - tr_peerMgrSetSwarmIsAllSeeds(this); + swarm_is_all_seeds_.emit(this); } break; @@ -2318,7 +2314,10 @@ void onFileCompleted(tr_torrent* tor, tr_file_index_t i) void onPieceCompleted(tr_torrent* tor, tr_piece_index_t piece) { - tr_peerMgrPieceCompleted(tor, piece); + tor->piece_completed_.emit(tor, piece); + + // bookkeeping + tor->set_needs_completeness_check(); // if this piece completes any file, invoke the fileCompleted func for it auto const span = tor->fpm_.file_span(piece); @@ -2338,7 +2337,7 @@ void onPieceFailed(tr_torrent* tor, tr_piece_index_t piece) auto const n = tor->piece_size(piece); tor->corruptCur += n; tor->downloadedCur -= std::min(tor->downloadedCur, uint64_t{ n }); - tr_peerMgrGotBadPiece(tor, piece); + tor->got_bad_piece_.emit(tor, piece); tor->set_has_piece(piece, false); } } // namespace got_block_helpers diff --git a/libtransmission/torrent.h b/libtransmission/torrent.h index 6ddb0bbad..554b0f12c 100644 --- a/libtransmission/torrent.h +++ b/libtransmission/torrent.h @@ -27,6 +27,7 @@ #include "crypto-utils.h" #include "file-piece-map.h" #include "interned-string.h" +#include "observable.h" #include "log.h" #include "session.h" #include "torrent-metainfo.h" @@ -820,6 +821,15 @@ public: tr_bandwidth bandwidth_; + libtransmission::SimpleObservable done_; + libtransmission::SimpleObservable got_bad_piece_; + libtransmission::SimpleObservable piece_completed_; + libtransmission::SimpleObservable doomed_; + libtransmission::SimpleObservable got_metainfo_; + libtransmission::SimpleObservable started_; + libtransmission::SimpleObservable stopped_; + libtransmission::SimpleObservable swarm_is_all_seeds_; + tr_stat stats = {}; // TODO(ckerr): make private once some of torrent.cc's `tr_torrentFoo()` methods are member functions