refactor: add lightweight observable / observer for decoupling (#5716)

This commit is contained in:
Charles Kerr 2023-07-04 15:47:18 -05:00 committed by GitHub
parent 6a2a4c3032
commit 56b27057fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 270 additions and 170 deletions

View File

@ -75,6 +75,7 @@ target_sources(${TR_NAME}
mime-types.h
net.cc
net.h
observable.h
open-files.cc
open-files.h
peer-common.h

View File

@ -0,0 +1,110 @@
// This file Copyright © 2023 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#pragma once
#ifndef __TRANSMISSION__
#error only libtransmission should #include this header.
#endif
#include <algorithm> // for std::move
#include <cstddef> // for size_t
#include <functional>
#include <small/map.hpp>
#include "tr-assert.h"
namespace libtransmission
{
// An RAII-based subscription to an Observable.
// Returned by SimpleObservable::observe().
// Let it go out-of-scope to cancel the subscription.
class ObserverTag
{
public:
using Callback = std::function<void()>;
ObserverTag() = default;
ObserverTag(ObserverTag&& that)
{
on_destroy_ = std::move(that.on_destroy_);
that.on_destroy_ = nullptr;
}
ObserverTag& operator=(ObserverTag&& that)
{
on_destroy_ = std::move(that.on_destroy_);
that.on_destroy_ = nullptr;
return *this;
}
ObserverTag(ObserverTag const&) = delete;
ObserverTag& operator=(ObserverTag const&) = delete;
ObserverTag(Callback on_destroy)
: on_destroy_{ std::move(on_destroy) }
{
}
~ObserverTag()
{
if (on_destroy_)
on_destroy_();
}
private:
Callback on_destroy_;
};
// A simple observer/observable implementation.
// Intentionally avoids edge cases like thread safety and
// remove-during-emit; this is meant to be as lightweight
// as possible for very basic use cases.
template<typename... Args>
class SimpleObservable
{
using Key = size_t;
public:
using Observer = std::function<void(Args...)>;
~SimpleObservable()
{
TR_ASSERT(std::empty(observers_));
}
auto observe(Observer observer)
{
auto const key = next_key_++;
observers_.emplace(key, std::move(observer));
return ObserverTag{ [this, key]()
{
remove(key);
} };
}
void emit(Args... args) const
{
for (auto& [tag, observer] : observers_)
{
observer((args)...);
}
}
private:
void remove(Key key)
{
[[maybe_unused]] auto const n_removed = observers_.erase(key);
TR_ASSERT(n_removed == 1U);
}
static auto inline next_key_ = Key{ 1U };
small::map<Key, Observer, 64U> observers_;
};
} // namespace libtransmission

View File

@ -317,7 +317,18 @@ public:
tr_swarm(tr_peerMgr* manager_in, tr_torrent* tor_in) noexcept
: manager{ manager_in }
, tor{ tor_in }
, tags_{ {
tor_in->done_.observe([this](tr_torrent*, bool) { on_torrent_done(); }),
tor_in->doomed_.observe([this](tr_torrent*) { on_torrent_doomed(); }),
tor_in->got_bad_piece_.observe([this](tr_torrent*, tr_piece_index_t p) { on_got_bad_piece(p); }),
tor_in->got_metainfo_.observe([this](tr_torrent*) { on_got_metainfo(); }),
tor_in->piece_completed_.observe([this](tr_torrent*, tr_piece_index_t p) { on_piece_completed(p); }),
tor_in->started_.observe([this](tr_torrent*) { on_torrent_started(); }),
tor_in->stopped_.observe([this](tr_torrent*) { on_torrent_stopped(); }),
tor_in->swarm_is_all_seeds_.observe([this](tr_torrent* /*tor*/) { on_swarm_is_all_seeds(); }),
} }
{
rebuildWebseeds();
}
@ -471,11 +482,6 @@ public:
return *pool_is_all_seeds_;
}
void markAllSeedsFlagDirty() noexcept
{
pool_is_all_seeds_.reset();
}
[[nodiscard]] peer_atom* get_existing_atom(std::pair<tr_address, tr_port> const& socket_address) noexcept
{
auto&& it = pool.find(socket_address);
@ -509,7 +515,7 @@ public:
atom->flags |= flags;
}
markAllSeedsFlagDirty();
mark_all_seeds_flag_dirty();
return atom;
}
@ -518,7 +524,7 @@ public:
{
tr_logAddTraceSwarm(this, fmt::format("marking peer {} as a seed", atom.display_name()));
atom.flags |= ADDED_F_SEED_FLAG;
markAllSeedsFlagDirty();
mark_all_seeds_flag_dirty();
}
static void peerCallbackFunc(tr_peer* peer, tr_peer_event const& event, void* vs)
@ -664,12 +670,108 @@ private:
}
}
void mark_all_seeds_flag_dirty() noexcept
{
pool_is_all_seeds_.reset();
}
void on_torrent_doomed()
{
auto const lock = tor->unique_lock();
stop();
tor->swarm = nullptr;
delete this;
}
void on_torrent_done()
{
std::for_each(std::begin(peers), std::end(peers), [](auto* const peer) { peer->set_interested(false); });
}
void on_swarm_is_all_seeds()
{
auto const lock = tor->unique_lock();
for (auto& [socket_address, atom] : pool)
{
mark_atom_as_seed(atom);
}
mark_all_seeds_flag_dirty();
}
void on_piece_completed(tr_piece_index_t piece)
{
bool piece_came_from_peers = false;
for (auto* const peer : peers)
{
// notify the peer that we now have this piece
peer->on_piece_completed(piece);
if (!piece_came_from_peers)
{
piece_came_from_peers = peer->blame.test(piece);
}
}
if (piece_came_from_peers) /* webseed downloads don't belong in announce totals */
{
tr_announcerAddBytes(tor, TR_ANN_DOWN, tor->piece_size(piece));
}
}
void on_got_bad_piece(tr_piece_index_t piece)
{
auto const byte_count = tor->piece_size(piece);
for (auto* const peer : peers)
{
if (peer->blame.test(piece))
{
tr_logAddTraceSwarm(
this,
fmt::format(
"peer {} contributed to corrupt piece ({}); now has {} strikes",
peer->display_name(),
piece,
peer->strikes + 1));
addStrike(peer);
}
}
tr_announcerAddBytes(tor, TR_ANN_CORRUPT, byte_count);
}
void on_got_metainfo()
{
// the webseed list may have changed...
rebuildWebseeds();
// some peer_msgs' progress fields may not be accurate if we
// didn't have the metadata before now... so refresh them all...
for (auto* peer : peers)
{
peer->onTorrentGotMetainfo();
if (peer->isSeed())
{
mark_atom_as_seed(*peer->atom);
}
}
}
void on_torrent_started();
void on_torrent_stopped();
// number of bad pieces a peer is allowed to send before we ban them
static auto constexpr MaxBadPiecesPerPeer = int{ 5 };
// how long we'll let requests we've made linger before we cancel them
static auto constexpr RequestTtlSecs = int{ 90 };
std::array<libtransmission::ObserverTag, 8> const tags_;
mutable std::optional<bool> pool_is_all_seeds_;
bool is_endgame_ = false;
@ -683,6 +785,7 @@ struct tr_peerMgr
, bandwidth_timer_{ session->timerMaker().create([this]() { bandwidthPulse(); }) }
, rechoke_timer_{ session->timerMaker().create([this]() { rechokePulseMarshall(); }) }
, refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) }
, blocklist_tag_{ session->blocklist_changed_.observe([this]() { on_blocklist_changed(); }) }
{
bandwidth_timer_->start_repeating(BandwidthPeriod);
rechoke_timer_->start_repeating(RechokePeriod);
@ -734,10 +837,25 @@ private:
rechoke_timer_->set_interval(RechokePeriod);
}
void on_blocklist_changed() const
{
/* we cache whether or not a peer is blocklisted...
since the blocklist has changed, erase that cached value */
for (auto* const tor : session->torrents())
{
for (auto& [socket_address, atom] : tor->swarm->pool)
{
atom.setBlocklistedDirty();
}
}
}
std::unique_ptr<libtransmission::Timer> const bandwidth_timer_;
std::unique_ptr<libtransmission::Timer> const rechoke_timer_;
std::unique_ptr<libtransmission::Timer> const refill_upkeep_timer_;
libtransmission::ObserverTag const blocklist_tag_;
static auto constexpr BandwidthPeriod = 500ms;
static auto constexpr RechokePeriod = 10s;
static auto constexpr RefillUpkeepPeriod = 10s;
@ -787,21 +905,6 @@ void tr_peerMgrFree(tr_peerMgr* manager)
// ---
void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr)
{
/* we cache whether or not a peer is blocklisted...
since the blocklist has changed, erase that cached value */
for (auto* const tor : mgr->session->torrents())
{
for (auto& [socket_address, atom] : tor->swarm->pool)
{
atom.setBlocklistedDirty();
}
}
}
// ---
void tr_peerMgrSetUtpSupported(tr_torrent* tor, std::pair<tr_address, tr_port> const& socket_address)
{
if (auto* const atom = tor->swarm->get_existing_atom(socket_address); atom != nullptr)
@ -943,30 +1046,6 @@ void tr_peerMgr::refillUpkeep() const
}
}
void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t p)
{
bool piece_came_from_peers = false;
for (auto* const peer : tor->swarm->peers)
{
// notify the peer that we now have this piece
peer->on_piece_completed(p);
if (!piece_came_from_peers)
{
piece_came_from_peers = peer->blame.test(p);
}
}
if (piece_came_from_peers) /* webseed downloads don't belong in announce totals */
{
tr_announcerAddBytes(tor, TR_ANN_DOWN, tor->piece_size(p));
}
// bookkeeping
tor->set_needs_completeness_check();
}
namespace
{
namespace handshake_helpers
@ -1122,20 +1201,6 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket)
}
}
void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor)
{
auto const lock = tor->unique_lock();
auto* const swarm = tor->swarm;
for (auto& [socket_address, atom] : swarm->pool)
{
swarm->mark_atom_as_seed(atom);
}
swarm->markAllSeedsFlagDirty();
}
size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t n_pex)
{
size_t n_used = 0;
@ -1205,29 +1270,6 @@ std::vector<tr_pex> tr_pex::from_compact_ipv6(
// ---
void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t piece_index)
{
auto* const swarm = tor->swarm;
auto const byte_count = tor->piece_size(piece_index);
for (auto* const peer : swarm->peers)
{
if (peer->blame.test(piece_index))
{
tr_logAddTraceSwarm(
swarm,
fmt::format(
"peer {} contributed to corrupt piece ({}); now has {} strikes",
peer->display_name(),
piece_index,
peer->strikes + 1));
swarm->addStrike(peer);
}
}
tr_announcerAddBytes(tor, TR_ANN_CORRUPT, byte_count);
}
namespace
{
namespace get_peers_helpers
@ -1354,23 +1396,16 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty
return pex;
}
void tr_peerMgrStartTorrent(tr_torrent* tor)
void tr_swarm::on_torrent_started()
{
TR_ASSERT(tr_isTorrent(tor));
auto const lock = tor->unique_lock();
tr_swarm* const swarm = tor->swarm;
swarm->is_running = true;
swarm->manager->rechokeSoon();
is_running = true;
manager->rechokeSoon();
}
void tr_peerMgrStopTorrent(tr_torrent* tor)
void tr_swarm::on_torrent_stopped()
{
TR_ASSERT(tr_isTorrent(tor));
tor->swarm->stop();
stop();
}
void tr_peerMgrAddTorrent(tr_peerMgr* manager, tr_torrent* tor)
@ -1382,36 +1417,6 @@ void tr_peerMgrAddTorrent(tr_peerMgr* manager, tr_torrent* tor)
tor->swarm = new tr_swarm{ manager, tor };
}
void tr_peerMgrRemoveTorrent(tr_torrent* tor)
{
TR_ASSERT(tr_isTorrent(tor));
auto const lock = tor->unique_lock();
tor->swarm->stop();
delete tor->swarm;
tor->swarm = nullptr;
}
void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor)
{
auto* const swarm = tor->swarm;
/* the webseed list may have changed... */
swarm->rebuildWebseeds();
/* some peer_msgs' progress fields may not be accurate if we
didn't have the metadata before now... so refresh them all... */
for (auto* peer : swarm->peers)
{
peer->onTorrentGotMetainfo();
if (peer->isSeed())
{
swarm->mark_atom_as_seed(*peer->atom);
}
}
}
int8_t tr_peerMgrPieceAvailability(tr_torrent const* tor, tr_piece_index_t piece)
{
if (!tor->has_metainfo())
@ -1644,15 +1649,6 @@ tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, size_t* setme_count)
return ret;
}
void tr_peerMgrClearInterest(tr_torrent* tor)
{
TR_ASSERT(tr_isTorrent(tor));
auto const lock = tor->unique_lock();
auto& peers = tor->swarm->peers;
std::for_each(std::begin(peers), std::end(peers), [](auto* const peer) { peer->set_interested(false); });
}
namespace
{
namespace update_interest_helpers

View File

@ -179,8 +179,6 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket);
size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t n_pex);
void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor);
enum
{
TR_PEERS_CONNECTED,
@ -193,14 +191,8 @@ enum
uint8_t peer_list_mode,
size_t max_peer_count);
void tr_peerMgrStartTorrent(tr_torrent* tor);
void tr_peerMgrStopTorrent(tr_torrent* tor);
void tr_peerMgrAddTorrent(tr_peerMgr* manager, struct tr_torrent* tor);
void tr_peerMgrRemoveTorrent(tr_torrent* tor);
// return the number of connected peers that have `piece`, or -1 if we already have it
[[nodiscard]] int8_t tr_peerMgrPieceAvailability(tr_torrent const* tor, tr_piece_index_t piece);
@ -208,18 +200,8 @@ void tr_peerMgrTorrentAvailability(tr_torrent const* tor, int8_t* tab, unsigned
[[nodiscard]] uint64_t tr_peerMgrGetDesiredAvailable(tr_torrent const* tor);
void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor);
void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr);
[[nodiscard]] struct tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, size_t* setme_count);
[[nodiscard]] tr_webseed_view tr_peerMgrWebseed(tr_torrent const* tor, size_t i);
void tr_peerMgrClearInterest(tr_torrent* tor);
void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t piece_index);
void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t pieceIndex);
/* @} */

View File

@ -1617,10 +1617,7 @@ void tr_sessionReloadBlocklists(tr_session* session)
{
session->blocklists_ = libtransmission::Blocklist::loadBlocklists(session->blocklist_dir_, session->useBlocklist());
if (session->peer_mgr_)
{
tr_peerMgrOnBlocklistChanged(session->peer_mgr_.get());
}
session->blocklist_changed_.emit();
}
size_t tr_blocklistGetRuleCount(tr_session const* session)

View File

@ -38,6 +38,7 @@
#include "global-ip-cache.h"
#include "interned-string.h"
#include "net.h" // tr_socket_t
#include "observable.h"
#include "open-files.h"
#include "port-forwarding.h"
#include "quark.h"
@ -1113,6 +1114,10 @@ private:
std::vector<libtransmission::Blocklist> blocklists_;
public:
libtransmission::SimpleObservable<> blocklist_changed_;
private:
/// other fields
// depends-on: session_thread_, settings_.bind_address_ipv4, local_peer_port_, global_ip_cache (via tr_session::bind_address())
@ -1162,7 +1167,7 @@ public:
std::unique_ptr<Cache> cache = std::make_unique<Cache>(torrents_, 1024 * 1024 * 2);
private:
// depends-on: timer_maker_, top_bandwidth_, utp_context, torrents_, web_
// depends-on: timer_maker_, top_bandwidth_, utp_context, torrents_, web_, blocklist_changed_
std::unique_ptr<struct tr_peerMgr, void (*)(struct tr_peerMgr*)> peer_mgr_;
// depends-on: peer_mgr_, advertised_peer_port_, torrents_

View File

@ -720,7 +720,7 @@ void torrentStartImpl(tr_torrent* const tor)
torrentResetTransferStats(tor);
tor->session->announcer_->startTorrent(tor);
tor->lpdAnnounceAt = now;
tr_peerMgrStartTorrent(tor);
tor->started_.emit(tor);
}
bool removeTorrentFile(char const* filename, void* /*user_data*/, tr_error** error)
@ -763,7 +763,7 @@ void freeTorrent(tr_torrent* tor)
tr_session* session = tor->session;
tr_peerMgrRemoveTorrent(tor);
tor->doomed_.emit(tor);
session->announcer_->removeTorrent(tor);
@ -869,7 +869,7 @@ void torrentStop(tr_torrent* const tor)
tor->session->verifyRemove(tor);
tr_peerMgrStopTorrent(tor);
tor->stopped_.emit(tor);
tor->session->announcer_->stopTorrent(tor);
tor->session->closeTorrentFiles(tor);
@ -1174,7 +1174,7 @@ void tr_torrent::set_metainfo(tr_torrent_metainfo tm)
metainfo_ = std::move(tm);
torrentInitFromInfoDict(this);
tr_peerMgrOnTorrentGotMetainfo(this);
got_metainfo_.emit(this);
session->onMetadataCompleted(this);
this->set_dirty();
this->mark_edited();
@ -1911,16 +1911,12 @@ void tr_torrent::recheck_completeness()
this->doneDate = tr_time();
}
if (was_leeching && was_running)
{
/* clear interested flag on all peers */
tr_peerMgrClearInterest(this);
}
if (this->current_dir() == this->incomplete_dir())
{
this->set_location(this->download_dir(), true, nullptr, nullptr);
}
done_.emit(this, recent_change);
}
this->session->onTorrentCompletenessChanged(this, completeness, was_running);
@ -2168,7 +2164,7 @@ void tr_torrent::on_tracker_response(tr_tracker_event const* event)
case tr_tracker_event::Type::Counts:
if (is_private() && (event->leechers == 0))
{
tr_peerMgrSetSwarmIsAllSeeds(this);
swarm_is_all_seeds_.emit(this);
}
break;
@ -2318,7 +2314,10 @@ void onFileCompleted(tr_torrent* tor, tr_file_index_t i)
void onPieceCompleted(tr_torrent* tor, tr_piece_index_t piece)
{
tr_peerMgrPieceCompleted(tor, piece);
tor->piece_completed_.emit(tor, piece);
// bookkeeping
tor->set_needs_completeness_check();
// if this piece completes any file, invoke the fileCompleted func for it
auto const span = tor->fpm_.file_span(piece);
@ -2338,7 +2337,7 @@ void onPieceFailed(tr_torrent* tor, tr_piece_index_t piece)
auto const n = tor->piece_size(piece);
tor->corruptCur += n;
tor->downloadedCur -= std::min(tor->downloadedCur, uint64_t{ n });
tr_peerMgrGotBadPiece(tor, piece);
tor->got_bad_piece_.emit(tor, piece);
tor->set_has_piece(piece, false);
}
} // namespace got_block_helpers

View File

@ -27,6 +27,7 @@
#include "crypto-utils.h"
#include "file-piece-map.h"
#include "interned-string.h"
#include "observable.h"
#include "log.h"
#include "session.h"
#include "torrent-metainfo.h"
@ -820,6 +821,15 @@ public:
tr_bandwidth bandwidth_;
libtransmission::SimpleObservable<tr_torrent*, bool /*because_downloaded_last_piece*/> done_;
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t> got_bad_piece_;
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t> piece_completed_;
libtransmission::SimpleObservable<tr_torrent*> doomed_;
libtransmission::SimpleObservable<tr_torrent*> got_metainfo_;
libtransmission::SimpleObservable<tr_torrent*> started_;
libtransmission::SimpleObservable<tr_torrent*> stopped_;
libtransmission::SimpleObservable<tr_torrent*> swarm_is_all_seeds_;
tr_stat stats = {};
// TODO(ckerr): make private once some of torrent.cc's `tr_torrentFoo()` methods are member functions