diff --git a/Transmission.xcodeproj/project.pbxproj b/Transmission.xcodeproj/project.pbxproj index bd7d75dc0..f43d831a4 100644 --- a/Transmission.xcodeproj/project.pbxproj +++ b/Transmission.xcodeproj/project.pbxproj @@ -86,7 +86,6 @@ A209EBF91142FEEE002B02D1 /* InfoOptionsViewController.mm in Sources */ = {isa = PBXBuildFile; fileRef = A209EBF81142FEEE002B02D1 /* InfoOptionsViewController.mm */; }; A209EC12114301C6002B02D1 /* InfoOptionsView.xib in Resources */ = {isa = PBXBuildFile; fileRef = A209EC11114301C6002B02D1 /* InfoOptionsView.xib */; }; A209ECA2114319C3002B02D1 /* InfoWindow.xib in Resources */ = {isa = PBXBuildFile; fileRef = A209ECA1114319C3002B02D1 /* InfoWindow.xib */; }; - A209EE5C1144B51E002B02D1 /* history.cc in Sources */ = {isa = PBXBuildFile; fileRef = A209EE5A1144B51E002B02D1 /* history.cc */; }; A209EE5D1144B51E002B02D1 /* history.h in Headers */ = {isa = PBXBuildFile; fileRef = A209EE5B1144B51E002B02D1 /* history.h */; }; A20BFFB70D091CC700CE5D2B /* ToolbarSegmentedCell.mm in Sources */ = {isa = PBXBuildFile; fileRef = A20BFFB60D091CC700CE5D2B /* ToolbarSegmentedCell.mm */; }; A21282A80CA6C66800EAEE0F /* StatusBarView.mm in Sources */ = {isa = PBXBuildFile; fileRef = A21282A60CA6C66800EAEE0F /* StatusBarView.mm */; }; @@ -599,7 +598,6 @@ A209EBF81142FEEE002B02D1 /* InfoOptionsViewController.mm */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.objcpp; path = InfoOptionsViewController.mm; sourceTree = ""; }; A209EC13114301C6002B02D1 /* en */ = {isa = PBXFileReference; lastKnownFileType = file.xib; name = en; path = en.lproj/InfoOptionsView.xib; sourceTree = ""; }; A209ECA1114319C3002B02D1 /* InfoWindow.xib */ = {isa = PBXFileReference; lastKnownFileType = file.xib; path = InfoWindow.xib; sourceTree = ""; }; - A209EE5A1144B51E002B02D1 /* history.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = history.cc; sourceTree = ""; }; A209EE5B1144B51E002B02D1 /* history.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = history.h; sourceTree = ""; }; A20BFFB50D091CC700CE5D2B /* ToolbarSegmentedCell.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = ToolbarSegmentedCell.h; sourceTree = ""; }; A20BFFB60D091CC700CE5D2B /* ToolbarSegmentedCell.mm */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.objcpp; path = ToolbarSegmentedCell.mm; sourceTree = ""; }; @@ -1466,7 +1464,6 @@ A21FBBA90EDA78C300BC3C51 /* bandwidth.h */, A21FBBAA0EDA78C300BC3C51 /* bandwidth.cc */, A209EE5B1144B51E002B02D1 /* history.h */, - A209EE5A1144B51E002B02D1 /* history.cc */, A23547E011CD0B090046EAE6 /* cache.cc */, A23547E111CD0B090046EAE6 /* cache.h */, BEFC1E020C07861A00B0BB3C /* platform.h */, @@ -2483,7 +2480,6 @@ A25964A6106D73A800453B31 /* announcer.cc in Sources */, 4D8017EA10BBC073008A4AF2 /* torrent-magnet.cc in Sources */, 4D80185910BBC0B0008A4AF2 /* magnet.cc in Sources */, - A209EE5C1144B51E002B02D1 /* history.cc in Sources */, A220EC5B118C8A060022B4BE /* tr-lpd.cc in Sources */, C1FEE57A1C3223CC00D62832 /* watchdir.cc in Sources */, A23547E211CD0B090046EAE6 /* cache.cc in Sources */, diff --git a/libtransmission/CMakeLists.txt b/libtransmission/CMakeLists.txt index 22f4983ec..a6d7ee1d4 100644 --- a/libtransmission/CMakeLists.txt +++ b/libtransmission/CMakeLists.txt @@ -27,7 +27,6 @@ set(PROJECT_FILES file-posix.cc file-win32.cc handshake.cc - history.cc inout.cc log.cc magnet.cc diff --git a/libtransmission/history.cc b/libtransmission/history.cc deleted file mode 100644 index 952e5cc28..000000000 --- a/libtransmission/history.cc +++ /dev/null @@ -1,60 +0,0 @@ -/* - * This file Copyright (C) 2010-2014 Mnemosyne LLC - * - * It may be used under the GNU GPL versions 2 or 3 - * or any future license endorsed by Mnemosyne LLC. - * - */ - -#include /* memset() */ - -#include "transmission.h" -#include "history.h" -#include "utils.h" - -void tr_historyAdd(tr_recentHistory* h, time_t now, unsigned int n) -{ - if (h->slices[h->newest].date == now) - { - h->slices[h->newest].n += n; - } - else - { - if (++h->newest == TR_RECENT_HISTORY_PERIOD_SEC) - { - h->newest = 0; - } - - h->slices[h->newest].date = now; - h->slices[h->newest].n = n; - } -} - -unsigned int tr_historyGet(tr_recentHistory const* h, time_t now, unsigned int sec) -{ - unsigned int n = 0; - time_t const cutoff = (now != 0 ? now : tr_time()) - sec; - int i = h->newest; - - for (;;) - { - if (h->slices[i].date <= cutoff) - { - break; - } - - n += h->slices[i].n; - - if (--i == -1) - { - i = TR_RECENT_HISTORY_PERIOD_SEC - 1; /* circular history */ - } - - if (i == h->newest) - { - break; /* we've come all the way around */ - } - } - - return n; -} diff --git a/libtransmission/history.h b/libtransmission/history.h index 12f35c0f6..899aa919e 100644 --- a/libtransmission/history.h +++ b/libtransmission/history.h @@ -12,47 +12,61 @@ #error only libtransmission should #include this header. #endif -#include /* time_t */ - -#include "tr-macros.h" +#include +#include // size_t +#include // time_t +#include // std::accumulate /** - * A generic short-term memory object that remembers how many times - * something happened over the last N seconds. - * - * For example, it could count how many are bytes transferred - * to estimate the speed over the last N seconds. + * A short-term memory object that remembers how many times something + * happened over the last N seconds. tr_peer uses it to count how many + * bytes transferred to estimate the speed over the last N seconds. */ - -enum +class tr_recentHistory { - TR_RECENT_HISTORY_PERIOD_SEC = 60 -}; - -struct tr_recentHistory -{ - /* these are PRIVATE IMPLEMENTATION details included for composition only. - * Don't access these directly! */ - - int newest; - - struct +public: + /** + * @brief add a counter to the recent history object. + * @param when the current time in sec, such as from tr_time() + * @param n how many items to add to the history's counter + */ + void add(time_t now, size_t n) { - unsigned int n; - time_t date; - } slices[TR_RECENT_HISTORY_PERIOD_SEC]; + if (slices[newest].time != now) + { + newest = (newest + 1) % TR_RECENT_HISTORY_PERIOD_SEC; + slices[newest].time = now; + } + + slices[newest].n += n; + } + + /** + * @brief count how many events have occurred in the last N seconds. + * @param when the current time in sec, such as from tr_time() + * @param seconds how many seconds to count back through. + */ + size_t count(time_t now, unsigned int age_sec) const + { + time_t const oldest = now - age_sec; + + return std::accumulate( + std::begin(slices), + std::end(slices), + size_t{ 0 }, + [&oldest](size_t sum, auto const& slice) { return slice.time >= oldest ? sum + slice.n : sum; }); + } + +private: + inline auto static constexpr TR_RECENT_HISTORY_PERIOD_SEC = size_t{ 60 }; + + int newest = 0; + + struct slice_t + { + size_t n = 0; + time_t time = 0; + }; + + std::array slices = {}; }; - -/** - * @brief add a counter to the recent history object. - * @param when the current time in sec, such as from tr_time() - * @param n how many items to add to the history's counter - */ -void tr_historyAdd(tr_recentHistory*, time_t when, unsigned int n); - -/** - * @brief count how many events have occurred in the last N seconds. - * @param when the current time in sec, such as from tr_time() - * @param seconds how many seconds to count back through. - */ -unsigned int tr_historyGet(tr_recentHistory const*, time_t when, unsigned int seconds); diff --git a/libtransmission/peer-common.h b/libtransmission/peer-common.h index f3de52c78..b9e208333 100644 --- a/libtransmission/peer-common.h +++ b/libtransmission/peer-common.h @@ -22,16 +22,14 @@ * @{ */ -struct tr_peer; +class tr_peer; struct tr_swarm; +struct peer_atom; -enum -{ - /* this is the maximum size of a block request. - most bittorrent clients will reject requests - larger than this size. */ - MAX_BLOCK_SIZE = (1024 * 16) -}; +/* This is the maximum size of a block request. + most bittorrent clients will reject requests + larger than this size. */ +auto inline constexpr MAX_BLOCK_SIZE = 1024 * 16; /** *** Peer Publish / Subscribe @@ -66,22 +64,7 @@ struct tr_peer_event tr_port port; /* for GOT_PORT */ }; -using tr_peer_callback = void (*)(struct tr_peer* peer, tr_peer_event const* event, void* client_data); - -/*** -**** -***/ - -using tr_peer_destruct_func = void (*)(struct tr_peer* peer); - -using tr_peer_is_transferring_pieces_func = - bool (*)(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* Bps); - -struct tr_peer_virtual_funcs -{ - tr_peer_destruct_func destruct; - tr_peer_is_transferring_pieces_func is_transferring_pieces; -}; +using tr_peer_callback = void (*)(tr_peer* peer, tr_peer_event const* event, void* client_data); /** * State information about a connected peer. @@ -89,53 +72,55 @@ struct tr_peer_virtual_funcs * @see struct peer_atom * @see tr_peerMsgs */ -struct tr_peer +class tr_peer { +public: + tr_peer(tr_torrent const* tor, peer_atom* atom = nullptr); + virtual ~tr_peer(); + + virtual bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const = 0; + /* whether or not we should free this peer soon. NOTE: private to peer-mgr.c */ - bool doPurge; + bool doPurge = false; /* number of bad pieces they've contributed to */ - uint8_t strikes; + uint8_t strikes = 0; /* how many requests the peer has made that we haven't responded to yet */ - int pendingReqsToClient; + int pendingReqsToClient = 0; /* how many requests we've made and are currently awaiting a response for */ - int pendingReqsToPeer; + int pendingReqsToPeer = 0; + + tr_session* const session; /* Hook to private peer-mgr information */ - struct peer_atom* atom; + peer_atom* const atom; - struct tr_swarm* swarm; + tr_swarm* const swarm; /** how complete the peer's copy of the torrent is. [0.0...1.0] */ - float progress; + float progress = 0.0f; - struct tr_bitfield blame; - struct tr_bitfield have; + struct tr_bitfield blame = {}; + struct tr_bitfield have = {}; /* the client name. For BitTorrent peers, this is the app name derived from the `v' string in LTEP's handshake dictionary */ - tr_quark client; + tr_quark client = TR_KEY_NONE; tr_recentHistory blocksSentToClient; tr_recentHistory blocksSentToPeer; tr_recentHistory cancelsSentToClient; tr_recentHistory cancelsSentToPeer; - - struct tr_peer_virtual_funcs const* funcs; }; -void tr_peerConstruct(struct tr_peer* peer, tr_torrent const* tor); - -void tr_peerDestruct(struct tr_peer* peer); - /** Update the tr_peer.progress field based on the 'have' bitset. */ -void tr_peerUpdateProgress(tr_torrent* tor, struct tr_peer*); +void tr_peerUpdateProgress(tr_torrent* tor, tr_peer*); -bool tr_peerIsSeed(struct tr_peer const* peer); +bool tr_peerIsSeed(tr_peer const* peer); /*** **** diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 0cfacb57a..fbaaeaf39 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -227,60 +227,37 @@ struct tr_peerMgr *** tr_peer virtual functions **/ -static bool tr_peerIsTransferringPieces(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* Bps) -{ - TR_ASSERT(peer != nullptr); - TR_ASSERT(peer->funcs != nullptr); - - return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps); -} - unsigned int tr_peerGetPieceSpeed_Bps(tr_peer const* peer, uint64_t now, tr_direction direction) { unsigned int Bps = 0; - tr_peerIsTransferringPieces(peer, now, direction, &Bps); + peer->is_transferring_pieces(now, direction, &Bps); return Bps; } -static void tr_peerFree(tr_peer* peer) +tr_peer::tr_peer(tr_torrent const* tor, peer_atom* atom_in) + : session{ tor->session } + , atom{ atom_in } + , swarm{ tor->swarm } { - TR_ASSERT(peer != nullptr); - TR_ASSERT(peer->funcs != nullptr); - - (*peer->funcs->destruct)(peer); - - tr_free(peer); -} - -void tr_peerConstruct(tr_peer* peer, tr_torrent const* tor) -{ - TR_ASSERT(peer != nullptr); - TR_ASSERT(tr_isTorrent(tor)); - - *peer = {}; - peer->client = TR_KEY_NONE; - peer->swarm = tor->swarm; - tr_bitfieldConstruct(&peer->have, tor->info.pieceCount); - tr_bitfieldConstruct(&peer->blame, tor->blockCount); + tr_bitfieldConstruct(&have, tor->info.pieceCount); + tr_bitfieldConstruct(&blame, tor->blockCount); } static void peerDeclinedAllRequests(tr_swarm*, tr_peer const*); -void tr_peerDestruct(tr_peer* peer) +tr_peer::~tr_peer() { - TR_ASSERT(peer != nullptr); - - if (peer->swarm != nullptr) + if (swarm != nullptr) { - peerDeclinedAllRequests(peer->swarm, peer); + peerDeclinedAllRequests(swarm, this); } - tr_bitfieldDestruct(&peer->have); - tr_bitfieldDestruct(&peer->blame); + tr_bitfieldDestruct(&have); + tr_bitfieldDestruct(&blame); - if (peer->atom != nullptr) + if (atom != nullptr) { - peer->atom->peer = nullptr; + atom->peer = nullptr; } } @@ -451,7 +428,7 @@ static void swarmFree(void* vs) TR_ASSERT(tr_ptrArrayEmpty(&s->outgoingHandshakes)); TR_ASSERT(tr_ptrArrayEmpty(&s->peers)); - tr_ptrArrayDestruct(&s->webseeds, (PtrArrayForeachFunc)tr_peerFree); + tr_ptrArrayDestruct(&s->webseeds, [](void* peer) { delete static_cast(peer); }); tr_ptrArrayDestruct(&s->pool, (PtrArrayForeachFunc)tr_free); tr_ptrArrayDestruct(&s->outgoingHandshakes, nullptr); tr_ptrArrayDestruct(&s->peers, nullptr); @@ -471,14 +448,14 @@ static void rebuildWebseedArray(tr_swarm* s, tr_torrent* tor) tr_info const* inf = &tor->info; /* clear the array */ - tr_ptrArrayDestruct(&s->webseeds, (PtrArrayForeachFunc)tr_peerFree); + tr_ptrArrayDestruct(&s->webseeds, [](void* peer) { delete static_cast(peer); }); s->webseeds = {}; s->stats.activeWebseedCount = 0; /* repopulate it */ for (unsigned int i = 0; i < inf->webseedCount; ++i) { - tr_webseed* w = tr_webseedNew(tor, inf->webseeds[i], peerCallbackFunc, s); + tr_peer* w = tr_webseedNew(tor, inf->webseeds[i], peerCallbackFunc, s); tr_ptrArrayAppend(&s->webseeds, w); } } @@ -781,11 +758,7 @@ static int countActiveWebseeds(tr_swarm* s) for (int i = 0, n = tr_ptrArraySize(&s->webseeds); i < n; ++i) { - if (tr_peerIsTransferringPieces( - static_cast(tr_ptrArrayNth(&s->webseeds, i)), - now, - TR_DOWN, - nullptr)) + if (static_cast(tr_ptrArrayNth(&s->webseeds, i))->is_transferring_pieces(now, TR_DOWN, nullptr)) { ++activeCount; } @@ -1512,9 +1485,9 @@ static void refillUpkeep(evutil_socket_t fd, short what, void* vmgr) for (int i = 0; i < n; ++i) { struct block_request const* const request = &s->requests[i]; - tr_peerMsgs const* const msgs = PEER_MSGS(request->peer); + auto const* const msgs = dynamic_cast(request->peer); - if (msgs != nullptr && request->sentAt <= too_old && !tr_peerMsgsIsReadingBlock(msgs, request->block)) + if (msgs != nullptr && request->sentAt <= too_old && !msgs->is_reading_block(request->block)) { TR_ASSERT(cancel != nullptr); TR_ASSERT(cancelCount < cancel_buflen); @@ -1528,7 +1501,7 @@ static void refillUpkeep(evutil_socket_t fd, short what, void* vmgr) s->requests[keepCount] = *request; } - keepCount++; + ++keepCount; } } @@ -1539,12 +1512,11 @@ static void refillUpkeep(evutil_socket_t fd, short what, void* vmgr) for (int i = 0; i < cancelCount; ++i) { struct block_request const* const request = &cancel[i]; - tr_peerMsgs* msgs = PEER_MSGS(request->peer); - + auto* msgs = dynamic_cast(request->peer); if (msgs != nullptr) { - tr_historyAdd(&request->peer->cancelsSentToPeer, now, 1); - tr_peerMsgsCancel(msgs, request->block); + request->peer->cancelsSentToPeer.add(now, 1); + msgs->cancel_block_request(request->block); decrementPendingReqCount(request); } } @@ -1671,12 +1643,15 @@ static void peerDeclinedAllRequests(tr_swarm* s, tr_peer const* peer) static void cancelAllRequestsForBlock(tr_swarm* s, tr_block_index_t block, tr_peer* no_notify) { + auto const now = tr_time(); + for (auto* p : getBlockRequestPeers(s, block)) { - if (p != no_notify && tr_isPeerMsgs(p)) + auto* msgs = dynamic_cast(p); + if ((msgs != nullptr) && (msgs != no_notify)) { - tr_historyAdd(&p->cancelsSentToPeer, tr_time(), 1); - tr_peerMsgsCancel(PEER_MSGS(p), block); + msgs->cancelsSentToPeer.add(now, 1); + msgs->cancel_block_request(block); } removeRequestFromTables(s, block, p); @@ -1691,10 +1666,10 @@ void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t p) /* walk through our peers */ for (int i = 0, n = tr_ptrArraySize(&s->peers); i < n; ++i) { - auto* peer = static_cast(tr_ptrArrayNth(&s->peers, i)); + auto* peer = static_cast(tr_ptrArrayNth(&s->peers, i)); - /* notify the peer that we now have this piece */ - tr_peerMsgsHave(PEER_MSGS(peer), p); + // notify the peer that we now have this piece + peer->on_piece_completed(p); if (!pieceCameFromPeers) { @@ -1835,7 +1810,7 @@ static void peerCallbackFunc(tr_peer* peer, tr_peer_event const* e, void* vs) tr_piece_index_t const p = e->pieceIndex; tr_block_index_t const block = _tr_block(tor, p, e->offset); cancelAllRequestsForBlock(s, block, peer); - tr_historyAdd(&peer->blocksSentToClient, tr_time(), 1); + peer->blocksSentToClient.add(tr_time(), 1); pieceListResortPiece(s, pieceListLookup(s, p)); tr_torrentGotBlock(tor, block); break; @@ -1957,8 +1932,7 @@ static void createBitTorrentPeer(tr_torrent* tor, struct tr_peerIo* io, struct p tr_swarm* swarm = tor->swarm; - tr_peer* peer = (tr_peer*)tr_peerMsgsNew(tor, io, peerCallbackFunc, swarm); - peer->atom = atom; + auto* peer = tr_peerMsgsNew(tor, atom, io, peerCallbackFunc, swarm); peer->client = client; atom->peer = peer; @@ -1969,9 +1943,10 @@ static void createBitTorrentPeer(tr_torrent* tor, struct tr_peerIo* io, struct p TR_ASSERT(swarm->stats.peerCount == tr_ptrArraySize(&swarm->peers)); TR_ASSERT(swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount); - tr_peerMsgs* msgs = PEER_MSGS(peer); - tr_peerMsgsUpdateActive(msgs, TR_UP); - tr_peerMsgsUpdateActive(msgs, TR_DOWN); + // TODO is this needed? + // isn't it already initialized in tr_peerMsgsImpl's ctor? + peer->update_active(TR_UP); + peer->update_active(TR_DOWN); } /* FIXME: this is kind of a mess. */ @@ -2597,8 +2572,9 @@ void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor) /* update the bittorrent peers' willingnes... */ for (int i = 0; i < peerCount; ++i) { - tr_peerMsgsUpdateActive(tr_peerMsgsCast(peers[i]), TR_UP); - tr_peerMsgsUpdateActive(tr_peerMsgsCast(peers[i]), TR_DOWN); + auto* msgs = static_cast(peers[i]); + msgs->update_active(TR_UP); + msgs->update_active(TR_DOWN); } } @@ -2759,7 +2735,7 @@ double* tr_peerMgrWebSpeeds_KBps(tr_torrent const* tor) unsigned int Bps = 0; auto const* const peer = static_cast(tr_ptrArrayNth(&s->webseeds, i)); - if (tr_peerIsTransferringPieces(peer, now, TR_DOWN, &Bps)) + if (peer->is_transferring_pieces(now, TR_DOWN, &Bps)) { ret[i] = Bps / (double)tr_speed_K; } @@ -2789,7 +2765,7 @@ struct tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, int* setmeCount) { char* pch; tr_peer* peer = peers[i]; - tr_peerMsgs const* const msgs = PEER_MSGS(peer); + auto const* const msgs = dynamic_cast(peer); struct peer_atom const* atom = peer->atom; tr_peer_stat* stat = ret + i; @@ -2798,23 +2774,23 @@ struct tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, int* setmeCount) stat->port = ntohs(peer->atom->port); stat->from = atom->fromFirst; stat->progress = peer->progress; - stat->isUTP = tr_peerMsgsIsUtpConnection(msgs); - stat->isEncrypted = tr_peerMsgsIsEncrypted(msgs); + stat->isUTP = msgs->is_utp_connection(); + stat->isEncrypted = msgs->is_encrypted(); stat->rateToPeer_KBps = toSpeedKBps(tr_peerGetPieceSpeed_Bps(peer, now_msec, TR_CLIENT_TO_PEER)); stat->rateToClient_KBps = toSpeedKBps(tr_peerGetPieceSpeed_Bps(peer, now_msec, TR_PEER_TO_CLIENT)); - stat->peerIsChoked = tr_peerMsgsIsPeerChoked(msgs); - stat->peerIsInterested = tr_peerMsgsIsPeerInterested(msgs); - stat->clientIsChoked = tr_peerMsgsIsClientChoked(msgs); - stat->clientIsInterested = tr_peerMsgsIsClientInterested(msgs); - stat->isIncoming = tr_peerMsgsIsIncomingConnection(msgs); - stat->isDownloadingFrom = tr_peerMsgsIsActive(msgs, TR_PEER_TO_CLIENT); - stat->isUploadingTo = tr_peerMsgsIsActive(msgs, TR_CLIENT_TO_PEER); + stat->peerIsChoked = msgs->is_peer_choked(); + stat->peerIsInterested = msgs->is_peer_interested(); + stat->clientIsChoked = msgs->is_client_choked(); + stat->clientIsInterested = msgs->is_client_interested(); + stat->isIncoming = msgs->is_incoming_connection(); + stat->isDownloadingFrom = msgs->is_active(TR_PEER_TO_CLIENT); + stat->isUploadingTo = msgs->is_active(TR_CLIENT_TO_PEER); stat->isSeed = tr_peerIsSeed(peer); - stat->blocksToPeer = tr_historyGet(&peer->blocksSentToPeer, now, CANCEL_HISTORY_SEC); - stat->blocksToClient = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC); - stat->cancelsToPeer = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC); - stat->cancelsToClient = tr_historyGet(&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC); + stat->blocksToPeer = peer->blocksSentToPeer.count(now, CANCEL_HISTORY_SEC); + stat->blocksToClient = peer->blocksSentToClient.count(now, CANCEL_HISTORY_SEC); + stat->cancelsToPeer = peer->cancelsSentToPeer.count(now, CANCEL_HISTORY_SEC); + stat->cancelsToClient = peer->cancelsSentToClient.count(now, CANCEL_HISTORY_SEC); stat->pendingReqsToPeer = peer->pendingReqsToPeer; stat->pendingReqsToClient = peer->pendingReqsToClient; @@ -2900,7 +2876,7 @@ void tr_peerMgrClearInterest(tr_torrent* tor) for (int i = 0; i < peerCount; ++i) { - tr_peerMsgsSetInterested(static_cast(tr_ptrArrayNth(&s->peers, i)), false); + static_cast(tr_ptrArrayNth(&s->peers, i))->set_interested(false); } } @@ -2936,7 +2912,7 @@ enum tr_rechoke_state struct tr_rechoke_info { - tr_peer* peer; + tr_peerMsgs* peer; int salt; int rechoke_state; }; @@ -2998,8 +2974,8 @@ static void rechokeDownloads(tr_swarm* s) for (int i = 0; i < peerCount; ++i) { auto const* const peer = static_cast(tr_ptrArrayNth(&s->peers, i)); - int const b = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC); - int const c = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC); + auto const b = peer->blocksSentToClient.count(now, CANCEL_HISTORY_SEC); + auto const c = peer->cancelsSentToPeer.count(now, CANCEL_HISTORY_SEC); if (b == 0) /* ignore unresponsive peers, as described above */ { @@ -3072,17 +3048,17 @@ static void rechokeDownloads(tr_swarm* s) /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */ for (int i = 0; i < peerCount; ++i) { - auto* const peer = static_cast(tr_ptrArrayNth(&s->peers, i)); + auto* const peer = static_cast(tr_ptrArrayNth(&s->peers, i)); if (!isPeerInteresting(s->tor, piece_is_interesting, peer)) { - tr_peerMsgsSetInterested(PEER_MSGS(peer), false); + peer->set_interested(false); } else { tr_rechoke_state rechoke_state; - int const blocks = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC); - int const cancels = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC); + auto const blocks = peer->blocksSentToClient.count(now, CANCEL_HISTORY_SEC); + auto const cancels = peer->cancelsSentToPeer.count(now, CANCEL_HISTORY_SEC); if (blocks == 0 && cancels == 0) { @@ -3131,7 +3107,7 @@ static void rechokeDownloads(tr_swarm* s) for (int i = 0; i < rechoke_count; ++i) { - tr_peerMsgsSetInterested(PEER_MSGS(rechoke[i].peer), i < s->interestedCount); + rechoke[i].peer->set_interested(i < s->interestedCount); } /* cleanup */ @@ -3178,7 +3154,7 @@ static int compareChoke(void const* va, void const* vb) /* is this a new connection? */ static bool isNew(tr_peerMsgs const* msgs) { - return msgs != nullptr && tr_peerMsgsGetConnectionAge(msgs) < 45; + return msgs != nullptr && msgs->get_connection_age() < 45; } /* get a rate for deciding which peers to choke and unchoke. */ @@ -3248,27 +3224,25 @@ static void rechokeUploads(tr_swarm* s, uint64_t const now) /* sort the peers by preference and rate */ for (int i = 0; i < peerCount; ++i) { - tr_peer* peer = peers[i]; - tr_peerMsgs* msgs = PEER_MSGS(peer); - - struct peer_atom* atom = peer->atom; + auto* const peer = dynamic_cast(peers[i]); + struct peer_atom* const atom = peer->atom; if (tr_peerIsSeed(peer)) { /* choke seeds and partial seeds */ - tr_peerMsgsSetChoke(PEER_MSGS(peer), true); + peer->set_choke(true); } else if (chokeAll) { /* choke everyone if we're not uploading */ - tr_peerMsgsSetChoke(PEER_MSGS(peer), true); + peer->set_choke(true); } - else if (msgs != s->optimistic) + else if (peer != s->optimistic) { struct ChokeData* n = &choke[size++]; - n->msgs = msgs; - n->isInterested = tr_peerMsgsIsPeerInterested(msgs); - n->wasChoked = tr_peerMsgsIsPeerChoked(msgs); + n->msgs = peer; + n->isInterested = peer->is_peer_interested(); + n->wasChoked = peer->is_peer_choked(); n->rate = getRate(s->tor, atom, now); n->salt = tr_rand_int_weak(INT_MAX); n->isChoked = true; @@ -3338,7 +3312,7 @@ static void rechokeUploads(tr_swarm* s, uint64_t const now) for (int i = 0; i < size; ++i) { - tr_peerMsgsSetChoke(choke[i].msgs, choke[i].isChoked); + choke[i].msgs->set_choke(choke[i].isChoked); } /* cleanup */ @@ -3426,7 +3400,7 @@ static tr_peer** getPeersToClose(tr_swarm* s, time_t const now_sec, int* setmeSi int peerCount; int outsize = 0; - struct tr_peer** ret = nullptr; + tr_peer** ret = nullptr; tr_peer** peers = (tr_peer**)tr_ptrArrayPeek(&s->peers, &peerCount); for (int i = 0; i < peerCount; ++i) @@ -3527,7 +3501,7 @@ static void removePeer(tr_swarm* s, tr_peer* peer) TR_ASSERT(s->stats.peerCount == tr_ptrArraySize(&s->peers)); TR_ASSERT(s->stats.peerFromCount[atom->fromFirst] >= 0); - tr_peerFree(peer); + delete peer; } static void closePeer(tr_swarm* s, tr_peer* peer) @@ -3570,7 +3544,7 @@ static void closeBadPeers(tr_swarm* s, time_t const now_sec) if (!tr_ptrArrayEmpty(&s->peers)) { int peerCount; - struct tr_peer** peers; + tr_peer** peers; peers = getPeersToClose(s, now_sec, &peerCount); @@ -3793,7 +3767,7 @@ static void pumpAllPeers(tr_peerMgr* mgr) for (int j = 0, n = tr_ptrArraySize(&s->peers); j < n; ++j) { - tr_peerMsgsPulse(static_cast(tr_ptrArrayNth(&s->peers, j))); + static_cast(tr_ptrArrayNth(&s->peers, j))->pulse(); } } } diff --git a/libtransmission/peer-mgr.h b/libtransmission/peer-mgr.h index cd4d0d61e..5df2ccfe9 100644 --- a/libtransmission/peer-mgr.h +++ b/libtransmission/peer-mgr.h @@ -28,6 +28,7 @@ * @{ */ +class tr_peerMsgs; struct UTPSocket; struct peer_atom; struct tr_peerIo; diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 8ebb6c126..5309a4867 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -11,6 +11,7 @@ #include #include #include +#include // std::unique_ptr #include #include @@ -129,9 +130,11 @@ struct peer_request uint32_t length; }; -static void blockToReq(tr_torrent const* tor, tr_block_index_t block, struct peer_request* setme) +static peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block) { - tr_torrentGetBlockLocation(tor, block, &setme->index, &setme->offset, &setme->length); + auto ret = peer_request{}; + tr_torrentGetBlockLocation(tor, block, &ret.index, &ret.offset, &ret.length); + return ret; } /** @@ -142,12 +145,40 @@ static void blockToReq(tr_torrent const* tor, tr_block_index_t block, struct pee * the current message that it's sending us. */ struct tr_incoming { - uint8_t id; - uint32_t length; /* includes the +1 for id length */ - struct peer_request blockReq; /* metadata for incoming blocks */ - struct evbuffer* block; /* piece data for incoming blocks */ + uint8_t id = 0; + uint32_t length = 0; /* includes the +1 for id length */ + struct peer_request blockReq = {}; /* metadata for incoming blocks */ + struct evbuffer* block = nullptr; /* piece data for incoming blocks */ }; +class tr_peerMsgsImpl; +// TODO: make these to be member functions +static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece); +static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs); +static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs); +static void gotError(tr_peerIo* io, short what, void* vmsgs); +static void peerPulse(void* vmsgs); +static void pexPulse(evutil_socket_t fd, short what, void* vmsgs); +static void protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req); +static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke); +static void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index); +static void protocolSendPort(tr_peerMsgsImpl* msgs, uint16_t port); +static void sendInterest(tr_peerMsgsImpl* msgs, bool b); +static void sendLtepHandshake(tr_peerMsgsImpl* msgs); +static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs); +static void updateDesiredRequestCount(tr_peerMsgsImpl* msgs); +//zzz + +struct EventDeleter +{ + void operator()(struct event* ev) const + { + event_free(ev); + } +}; + +using UniqueTimer = std::unique_ptr; + /** * Low-level communication state information about a connected peer. * @@ -162,106 +193,444 @@ struct tr_incoming * @see struct peer_atom * @see tr_peer */ -struct tr_peerMsgs +class tr_peerMsgsImpl : public tr_peerMsgs { - struct tr_peer peer; /* parent */ +public: + tr_peerMsgsImpl(tr_torrent* torrent_in, peer_atom* atom_in, tr_peerIo* io_in, tr_peer_callback callback, void* callbackData) + : tr_peerMsgs{ torrent_in, atom_in } + , outMessagesBatchPeriod{ LOW_PRIORITY_INTERVAL_SECS } + , state{ AWAITING_BT_LENGTH } + , torrent{ torrent_in } + , outMessages{ evbuffer_new() } + , outMessagesBatchedAt{ 0 } + , io{ io_in } + , callback_{ callback } + , callbackData_{ callbackData } + { + if (tr_torrentAllowsPex(torrent)) + { + pex_timer.reset(evtimer_new(torrent->session->event_base, pexPulse, this)); + tr_timerAdd(pex_timer.get(), PEX_INTERVAL_SECS, 0); + } - uint16_t magic_number; + if (tr_peerIoSupportsUTP(io)) + { + tr_address const* addr = tr_peerIoGetAddress(io, nullptr); + tr_peerMgrSetUtpSupported(torrent, addr); + tr_peerMgrSetUtpFailed(torrent, addr, false); + } + if (tr_peerIoSupportsLTEP(io)) + { + sendLtepHandshake(this); + } + + tellPeerWhatWeHave(this); + + if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(io)) + { + /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */ + struct tr_address const* addr = tr_peerIoGetAddress(io, nullptr); + + if (addr->type == TR_AF_INET || tr_globalIPv6() != nullptr) + { + protocolSendPort(this, tr_dhtPort(torrent->session)); + } + } + + tr_peerIoSetIOFuncs(io, canRead, didWrite, gotError, this); + updateDesiredRequestCount(this); + } + + ~tr_peerMsgsImpl() override + { + set_active(TR_UP, false); + set_active(TR_DOWN, false); + + if (this->incoming.block != nullptr) + { + evbuffer_free(this->incoming.block); + } + + if (this->io != nullptr) + { + tr_peerIoClear(this->io); + tr_peerIoUnref(this->io); /* balanced by the ref in handshakeDoneCB() */ + } + + evbuffer_free(this->outMessages); + tr_free(this->pex6); + tr_free(this->pex); + } + + bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override + { + auto const Bps = tr_peerIoGetPieceSpeed_Bps(io, now, direction); + + if (setme_Bps != nullptr) + { + *setme_Bps = Bps; + } + + return Bps > 0; + } + + bool is_peer_choked() const override + { + return peer_is_choked_; + } + + bool is_peer_interested() const override + { + return peer_is_interested_; + } + + bool is_client_choked() const override + { + return client_is_choked_; + } + + bool is_client_interested() const override + { + return client_is_interested_; + } + + bool is_utp_connection() const override + { + return io->socket.type == TR_PEER_SOCKET_TYPE_UTP; + } + + bool is_encrypted() const override + { + return tr_peerIoIsEncrypted(io); + } + + bool is_incoming_connection() const override + { + return tr_peerIoIsIncoming(io); + } + + bool is_active(tr_direction direction) const override + { + TR_ASSERT(tr_isDirection(direction)); + auto const active = is_active_[direction]; + TR_ASSERT(active == calculate_active(direction)); + return active; + } + + void update_active(tr_direction direction) override + { + TR_ASSERT(tr_isDirection(direction)); + + set_active(direction, calculate_active(direction)); + } + + time_t get_connection_age() const override + { + return tr_peerIoGetAge(io); + } + + bool is_reading_block(tr_block_index_t block) const override + { + return state == AWAITING_BT_PIECE && block == _tr_block(torrent, incoming.blockReq.index, incoming.blockReq.offset); + } + + void cancel_block_request(tr_block_index_t block) override + { + protocolSendCancel(this, blockToReq(torrent, block)); + } + + void set_choke(bool peer_is_choked) override + { + time_t const now = tr_time(); + time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC; + + if (chokeChangedAt > fibrillationTime) + { + // TODO dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked); + } + else if (peer_is_choked_ != peer_is_choked) + { + peer_is_choked_ = peer_is_choked; + + if (peer_is_choked_) + { + cancelAllRequestsToClient(this); + } + + protocolSendChoke(this, peer_is_choked_); + chokeChangedAt = now; + update_active(TR_CLIENT_TO_PEER); + } + } + + void pulse() override + { + peerPulse(this); + } + + void on_piece_completed(tr_piece_index_t piece) override + { + protocolSendHave(this, piece); + + // since we have more pieces now, we might not be interested in this peer + update_interest(); + } + + void set_interested(bool interested) override + { + if (client_is_interested_ != interested) + { + client_is_interested_ = interested; + sendInterest(this, interested); + update_active(TR_PEER_TO_CLIENT); + } + } + + void update_interest() + { + // TODO -- might need to poke the mgr on startup + } + + // publishing events + + void publishError(int err) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_ERROR; + e.err = err; + publish(e); + } + + void publishGotBlock(struct peer_request const* req) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_BLOCK; + e.pieceIndex = req->index; + e.offset = req->offset; + e.length = req->length; + publish(e); + } + + void publishGotRej(struct peer_request const* req) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_REJ; + e.pieceIndex = req->index; + e.offset = req->offset; + e.length = req->length; + publish(e); + } + + void publishGotChoke() + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_CHOKE; + publish(e); + } + + void publishClientGotHaveAll() + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL; + publish(e); + } + + void publishClientGotHaveNone() + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE; + publish(e); + } + + void publishClientGotPieceData(uint32_t length) + { + auto e = tr_peer_event{}; + e.length = length; + e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; + publish(e); + } + + void publishPeerGotPieceData(uint32_t length) + { + auto e = tr_peer_event{}; + e.length = length; + e.eventType = TR_PEER_PEER_GOT_PIECE_DATA; + publish(e); + } + + void publishClientGotSuggest(tr_piece_index_t pieceIndex) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; + e.pieceIndex = pieceIndex; + publish(e); + } + + void publishClientGotPort(tr_port port) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_PORT; + e.port = port; + publish(e); + } + + void publishClientGotAllowedFast(tr_piece_index_t pieceIndex) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; + e.pieceIndex = pieceIndex; + publish(e); + } + + void publishClientGotBitfield(tr_bitfield* bitfield) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_BITFIELD; + e.bitfield = bitfield; + publish(e); + } + + void publishClientGotHave(tr_piece_index_t index) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_HAVE; + e.pieceIndex = index; + publish(e); + } + +private: + bool calculate_active(tr_direction direction) const + { + if (direction == TR_CLIENT_TO_PEER) + { + return is_peer_interested() && !is_peer_choked(); + } + + // TR_PEER_TO_CLIENT + + if (!tr_torrentHasMetadata(torrent)) + { + return true; + } + + auto const active = is_client_interested() && !is_client_choked(); + TR_ASSERT(!active || !tr_torrentIsSeed(torrent)); + return active; + } + + void set_active(tr_direction direction, bool active) + { + // TODO dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active); + auto& val = is_active_[direction]; + if (val != active) + { + val = active; + + tr_swarmIncrementActivePeers(torrent->swarm, direction, active); + } + } + + void publish(tr_peer_event const& e) + { + if (callback_ != nullptr) + { + (*callback_)(this, &e, callbackData_); + } + } + +public: /* Whether or not we've choked this peer. */ - bool peer_is_choked; + bool peer_is_choked_ = true; /* whether or not the peer has indicated it will download from us. */ - bool peer_is_interested; + bool peer_is_interested_ = false; /* whether or not the peer is choking us. */ - bool client_is_choked; + bool client_is_choked_ = true; /* whether or not we've indicated to the peer that we would download from them if unchoked. */ - bool client_is_interested; + bool client_is_interested_ = false; - bool peerSupportsPex; - bool peerSupportsMetadataXfer; - bool clientSentLtepHandshake; - bool peerSentLtepHandshake; + bool peerSupportsPex = false; + bool peerSupportsMetadataXfer = false; + bool clientSentLtepHandshake = false; + bool peerSentLtepHandshake = false; - int desiredRequestCount; + int desiredRequestCount = 0; - int prefetchCount; - - bool is_active[2]; + int prefetchCount = 0; /* how long the outMessages batch should be allowed to grow before * it's flushed -- some messages (like requests >:) should be sent * very quickly; others aren't as urgent. */ int8_t outMessagesBatchPeriod; - uint8_t state; - uint8_t ut_pex_id; - uint8_t ut_metadata_id; - uint16_t pexCount; - uint16_t pexCount6; + uint8_t state = AWAITING_BT_LENGTH; + uint8_t ut_pex_id = 0; + uint8_t ut_metadata_id = 0; + uint16_t pexCount = 0; + uint16_t pexCount6 = 0; - tr_port dht_port; + tr_port dht_port = 0; - encryption_preference_t encryption_preference; + encryption_preference_t encryption_preference = ENCRYPTION_PREFERENCE_UNKNOWN; - size_t metadata_size_hint; + size_t metadata_size_hint = 0; #if 0 size_t fastsetSize; tr_piece_index_t fastset[MAX_FAST_SET_SIZE]; #endif - tr_torrent* torrent; + tr_torrent* const torrent; - tr_peer_callback callback; - void* callbackData; + evbuffer* const outMessages; /* all the non-piece messages */ - struct evbuffer* outMessages; /* all the non-piece messages */ + struct peer_request peerAskedFor[REQQ] = {}; - struct peer_request peerAskedFor[REQQ]; + int peerAskedForMetadata[METADATA_REQQ] = {}; + int peerAskedForMetadataCount = 0; - int peerAskedForMetadata[METADATA_REQQ]; - int peerAskedForMetadataCount; + tr_pex* pex = nullptr; + tr_pex* pex6 = nullptr; - tr_pex* pex; - tr_pex* pex6; + time_t clientSentAnythingAt = 0; - time_t clientSentAnythingAt; - - time_t chokeChangedAt; + time_t chokeChangedAt = 0; /* when we started batching the outMessages */ - time_t outMessagesBatchedAt; + time_t outMessagesBatchedAt = 0; - struct tr_incoming incoming; + struct tr_incoming incoming = {}; /* if the peer supports the Extension Protocol in BEP 10 and supplied a reqq argument, it's stored here. Otherwise, the value is zero and should be ignored. */ - int64_t reqq; + int64_t reqq = 0; - struct event* pexTimer; + UniqueTimer pex_timer; - struct tr_peerIo* io; + struct tr_peerIo* io = nullptr; + +private: + bool is_active_[2] = { false, false }; + + tr_peer_callback const callback_; + void* const callbackData_; }; -/** -*** -**/ - -static constexpr tr_session* getSession(struct tr_peerMsgs* msgs) +tr_peerMsgs* tr_peerMsgsNew(tr_torrent* torrent, peer_atom* atom, tr_peerIo* io, tr_peer_callback callback, void* callbackData) { - return msgs->torrent->session; + return new tr_peerMsgsImpl(torrent, atom, io, callback, callbackData); } /** *** **/ -static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5); +static void myDebug(char const* file, int line, tr_peerMsgsImpl const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5); -static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) +static void myDebug(char const* file, int line, tr_peerMsgsImpl const* msgs, char const* fmt, ...) { tr_sys_file_t const fp = tr_logGetFile(); @@ -280,7 +649,7 @@ static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, tr_logGetTimeStr(timestr, sizeof(timestr)), tr_torrentName(msgs->torrent), tr_peerIoGetAddrStr(msgs->io, addrstr, sizeof(addrstr)), - tr_quark_get_string(msgs->peer.client, nullptr)); + tr_quark_get_string(msgs->client, nullptr)); va_start(args, fmt); evbuffer_add_vprintf(buf, fmt, args); va_end(args); @@ -307,7 +676,7 @@ static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, *** **/ -static void pokeBatchPeriod(tr_peerMsgs* msgs, int interval) +static void pokeBatchPeriod(tr_peerMsgsImpl* msgs, int interval) { if (msgs->outMessagesBatchPeriod > interval) { @@ -316,12 +685,12 @@ static void pokeBatchPeriod(tr_peerMsgs* msgs, int interval) } } -static void dbgOutMessageLen(tr_peerMsgs* msgs) +static void dbgOutMessageLen(tr_peerMsgsImpl* msgs) { dbgmsg(msgs, "outMessage size is now %zu", evbuffer_get_length(msgs->outMessages)); } -static void protocolSendReject(tr_peerMsgs* msgs, struct peer_request const* req) +static void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const* req) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); @@ -337,37 +706,37 @@ static void protocolSendReject(tr_peerMsgs* msgs, struct peer_request const* req dbgOutMessageLen(msgs); } -static void protocolSendRequest(tr_peerMsgs* msgs, struct peer_request const* req) +static void protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req) { struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t)); evbuffer_add_uint8(out, BT_REQUEST); - evbuffer_add_uint32(out, req->index); - evbuffer_add_uint32(out, req->offset); - evbuffer_add_uint32(out, req->length); + evbuffer_add_uint32(out, req.index); + evbuffer_add_uint32(out, req.offset); + evbuffer_add_uint32(out, req.length); - dbgmsg(msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length); + dbgmsg(msgs, "requesting %u:%u->%u...", req.index, req.offset, req.length); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } -static void protocolSendCancel(tr_peerMsgs* msgs, struct peer_request const* req) +static void protocolSendCancel(tr_peerMsgsImpl* msgs, peer_request const& req) { struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t)); evbuffer_add_uint8(out, BT_CANCEL); - evbuffer_add_uint32(out, req->index); - evbuffer_add_uint32(out, req->offset); - evbuffer_add_uint32(out, req->length); + evbuffer_add_uint32(out, req.index); + evbuffer_add_uint32(out, req.offset); + evbuffer_add_uint32(out, req.length); - dbgmsg(msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length); + dbgmsg(msgs, "cancelling %u:%u->%u...", req.index, req.offset, req.length); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } -static void protocolSendPort(tr_peerMsgs* msgs, uint16_t port) +static void protocolSendPort(tr_peerMsgsImpl* msgs, uint16_t port) { struct evbuffer* out = msgs->outMessages; @@ -377,7 +746,7 @@ static void protocolSendPort(tr_peerMsgs* msgs, uint16_t port) evbuffer_add_uint16(out, port); } -static void protocolSendHave(tr_peerMsgs* msgs, uint32_t index) +static void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index) { struct evbuffer* out = msgs->outMessages; @@ -409,7 +778,7 @@ static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex) #endif -static void protocolSendChoke(tr_peerMsgs* msgs, bool choke) +static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke) { struct evbuffer* out = msgs->outMessages; @@ -421,7 +790,7 @@ static void protocolSendChoke(tr_peerMsgs* msgs, bool choke) pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } -static void protocolSendHaveAll(tr_peerMsgs* msgs) +static void protocolSendHaveAll(tr_peerMsgsImpl* msgs) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); @@ -435,7 +804,7 @@ static void protocolSendHaveAll(tr_peerMsgs* msgs) pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } -static void protocolSendHaveNone(tr_peerMsgs* msgs) +static void protocolSendHaveNone(tr_peerMsgsImpl* msgs) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); @@ -449,123 +818,6 @@ static void protocolSendHaveNone(tr_peerMsgs* msgs) pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } -/** -*** EVENTS -**/ - -static void publish(tr_peerMsgs* msgs, tr_peer_event* e) -{ - if (msgs->callback != nullptr) - { - (*msgs->callback)(&msgs->peer, e, msgs->callbackData); - } -} - -static void fireError(tr_peerMsgs* msgs, int err) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_ERROR; - e.err = err; - publish(msgs, &e); -} - -static void fireGotBlock(tr_peerMsgs* msgs, struct peer_request const* req) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_BLOCK; - e.pieceIndex = req->index; - e.offset = req->offset; - e.length = req->length; - publish(msgs, &e); -} - -static void fireGotRej(tr_peerMsgs* msgs, struct peer_request const* req) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_REJ; - e.pieceIndex = req->index; - e.offset = req->offset; - e.length = req->length; - publish(msgs, &e); -} - -static void fireGotChoke(tr_peerMsgs* msgs) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_CHOKE; - publish(msgs, &e); -} - -static void fireClientGotHaveAll(tr_peerMsgs* msgs) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL; - publish(msgs, &e); -} - -static void fireClientGotHaveNone(tr_peerMsgs* msgs) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE; - publish(msgs, &e); -} - -static void fireClientGotPieceData(tr_peerMsgs* msgs, uint32_t length) -{ - auto e = tr_peer_event{}; - e.length = length; - e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; - publish(msgs, &e); -} - -static void firePeerGotPieceData(tr_peerMsgs* msgs, uint32_t length) -{ - auto e = tr_peer_event{}; - e.length = length; - e.eventType = TR_PEER_PEER_GOT_PIECE_DATA; - publish(msgs, &e); -} - -static void fireClientGotSuggest(tr_peerMsgs* msgs, uint32_t pieceIndex) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; - e.pieceIndex = pieceIndex; - publish(msgs, &e); -} - -static void fireClientGotPort(tr_peerMsgs* msgs, tr_port port) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_PORT; - e.port = port; - publish(msgs, &e); -} - -static void fireClientGotAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; - e.pieceIndex = pieceIndex; - publish(msgs, &e); -} - -static void fireClientGotBitfield(tr_peerMsgs* msgs, tr_bitfield* bitfield) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_BITFIELD; - e.bitfield = bitfield; - publish(msgs, &e); -} - -static void fireClientGotHave(tr_peerMsgs* msgs, tr_piece_index_t index) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_HAVE; - e.pieceIndex = index; - publish(msgs, &e); -} - /** *** ALLOWED FAST SET *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html @@ -652,84 +904,16 @@ static void updateFastSet(tr_peerMsgs* msgs) } #endif - -/*** -**** ACTIVE -***/ - -static bool tr_peerMsgsCalculateActive(tr_peerMsgs const* msgs, tr_direction direction) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - TR_ASSERT(tr_isDirection(direction)); - - bool is_active; - - if (direction == TR_CLIENT_TO_PEER) - { - is_active = tr_peerMsgsIsPeerInterested(msgs) && !tr_peerMsgsIsPeerChoked(msgs); - } - else /* TR_PEER_TO_CLIENT */ - { - if (!tr_torrentHasMetadata(msgs->torrent)) - { - is_active = true; - } - else - { - is_active = tr_peerMsgsIsClientInterested(msgs) && !tr_peerMsgsIsClientChoked(msgs); - - if (is_active) - { - TR_ASSERT(!tr_torrentIsSeed(msgs->torrent)); - } - } - } - - return is_active; -} - -bool tr_peerMsgsIsActive(tr_peerMsgs const* msgs, tr_direction direction) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - TR_ASSERT(tr_isDirection(direction)); - - bool is_active = msgs->is_active[direction]; - - TR_ASSERT(is_active == tr_peerMsgsCalculateActive(msgs, direction)); - - return is_active; -} - -static void tr_peerMsgsSetActive(tr_peerMsgs* msgs, tr_direction direction, bool is_active) -{ - dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active); - - if (msgs->is_active[direction] != is_active) - { - msgs->is_active[direction] = is_active; - - tr_swarmIncrementActivePeers(msgs->torrent->swarm, direction, is_active); - } -} - -void tr_peerMsgsUpdateActive(tr_peerMsgs* msgs, tr_direction direction) -{ - bool const is_active = tr_peerMsgsCalculateActive(msgs, direction); - - tr_peerMsgsSetActive(msgs, direction, is_active); -} - /** *** INTEREST **/ -static void sendInterest(tr_peerMsgs* msgs, bool b) +static void sendInterest(tr_peerMsgsImpl* msgs, bool b) { TR_ASSERT(msgs != nullptr); struct evbuffer* out = msgs->outMessages; - msgs->client_is_interested = b; dbgmsg(msgs, "Sending %s", b ? "Interested" : "Not Interested"); evbuffer_add_uint32(out, sizeof(uint8_t)); evbuffer_add_uint8(out, b ? BT_INTERESTED : BT_NOT_INTERESTED); @@ -738,24 +922,7 @@ static void sendInterest(tr_peerMsgs* msgs, bool b) dbgOutMessageLen(msgs); } -static void updateInterest(tr_peerMsgs* msgs) -{ - TR_UNUSED(msgs); - - /* FIXME -- might need to poke the mgr on startup */ -} - -void tr_peerMsgsSetInterested(tr_peerMsgs* msgs, bool b) -{ - if (msgs->client_is_interested != b) - { - sendInterest(msgs, b); - - tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT); - } -} - -static bool popNextMetadataRequest(tr_peerMsgs* msgs, int* piece) +static bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* piece) { if (msgs->peerAskedForMetadataCount == 0) { @@ -770,22 +937,22 @@ static bool popNextMetadataRequest(tr_peerMsgs* msgs, int* piece) return true; } -static bool popNextRequest(tr_peerMsgs* msgs, struct peer_request* setme) +static bool popNextRequest(tr_peerMsgsImpl* msgs, struct peer_request* setme) { - if (msgs->peer.pendingReqsToClient == 0) + if (msgs->pendingReqsToClient == 0) { return false; } *setme = msgs->peerAskedFor[0]; - tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->peer.pendingReqsToClient); - --msgs->peer.pendingReqsToClient; + tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->pendingReqsToClient); + --msgs->pendingReqsToClient; return true; } -static void cancelAllRequestsToClient(tr_peerMsgs* msgs) +static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) { struct peer_request req; bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io); @@ -799,70 +966,25 @@ static void cancelAllRequestsToClient(tr_peerMsgs* msgs) } } -void tr_peerMsgsSetChoke(tr_peerMsgs* msgs, bool peer_is_choked) -{ - TR_ASSERT(msgs != nullptr); - - time_t const now = tr_time(); - time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC; - - if (msgs->chokeChangedAt > fibrillationTime) - { - dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked); - } - else if (msgs->peer_is_choked != peer_is_choked) - { - msgs->peer_is_choked = peer_is_choked; - - if (peer_is_choked) - { - cancelAllRequestsToClient(msgs); - } - - protocolSendChoke(msgs, peer_is_choked); - msgs->chokeChangedAt = now; - tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER); - } -} - /** *** **/ -void tr_peerMsgsHave(tr_peerMsgs* msgs, uint32_t index) -{ - protocolSendHave(msgs, index); - - /* since we have more pieces now, we might not be interested in this peer */ - updateInterest(msgs); -} - -/** -*** -**/ - -static bool reqIsValid(tr_peerMsgs const* peer, uint32_t index, uint32_t offset, uint32_t length) +static bool reqIsValid(tr_peerMsgsImpl const* peer, uint32_t index, uint32_t offset, uint32_t length) { return tr_torrentReqIsValid(peer->torrent, index, offset, length); } -static bool requestIsValid(tr_peerMsgs const* msgs, struct peer_request const* req) +static bool requestIsValid(tr_peerMsgsImpl const* msgs, struct peer_request const* req) { return reqIsValid(msgs, req->index, req->offset, req->length); } -void tr_peerMsgsCancel(tr_peerMsgs* msgs, tr_block_index_t block) -{ - struct peer_request req; - blockToReq(msgs->torrent, block, &req); - protocolSendCancel(msgs, &req); -} - /** *** **/ -static void sendLtepHandshake(tr_peerMsgs* msgs) +static void sendLtepHandshake(tr_peerMsgsImpl* msgs) { tr_variant val; bool allow_pex; @@ -902,7 +1024,7 @@ static void sendLtepHandshake(tr_peerMsgs* msgs) } tr_variantInitDict(&val, 8); - tr_variantDictAddBool(&val, TR_KEY_e, getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED); + tr_variantDictAddBool(&val, TR_KEY_e, msgs->session->encryptionMode != TR_CLEAR_PREFERRED); if (ipv6 != nullptr) { @@ -914,7 +1036,7 @@ static void sendLtepHandshake(tr_peerMsgs* msgs) tr_variantDictAddInt(&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength); } - tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(getSession(msgs))); + tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(msgs->session)); tr_variantDictAddInt(&val, TR_KEY_reqq, REQQ); tr_variantDictAddBool(&val, TR_KEY_upload_only, tr_torrentIsSeed(msgs->torrent)); tr_variantDictAddQuark(&val, TR_KEY_v, version_quark); @@ -948,7 +1070,7 @@ static void sendLtepHandshake(tr_peerMsgs* msgs) tr_variantFree(&val); } -static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* inbuf) +static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len, struct evbuffer* inbuf) { int64_t i; tr_variant val; @@ -1035,7 +1157,7 @@ static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* if (tr_variantDictFindInt(&val, TR_KEY_p, &i)) { pex.port = htons((uint16_t)i); - fireClientGotPort(msgs, pex.port); + msgs->publishClientGotPort(pex.port); dbgmsg(msgs, "peer's port is now %d", (int)i); } @@ -1063,7 +1185,7 @@ static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* tr_free(tmp); } -static void parseUtMetadata(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf) +static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf) { int64_t msg_type = -1; int64_t piece = -1; @@ -1133,7 +1255,7 @@ static void parseUtMetadata(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* tr_free(tmp); } -static void parseUtPex(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf) +static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf) { tr_torrent* tor = msgs->torrent; if (!tr_torrentAllowsPex(tor)) @@ -1200,9 +1322,9 @@ static void parseUtPex(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbu tr_variantFree(&val); } -static void sendPex(tr_peerMsgs* msgs); +static void sendPex(tr_peerMsgsImpl* msgs); -static void parseLtep(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf) +static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf) { TR_ASSERT(msglen > 0); @@ -1241,7 +1363,7 @@ static void parseLtep(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf } } -static ReadState readBtLength(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen) +static ReadState readBtLength(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen) { uint32_t len; @@ -1265,9 +1387,9 @@ static ReadState readBtLength(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t return READ_NOW; } -static ReadState readBtMessage(tr_peerMsgs*, struct evbuffer*, size_t); +static ReadState readBtMessage(tr_peerMsgsImpl*, struct evbuffer*, size_t); -static ReadState readBtId(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen) +static ReadState readBtId(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen) { uint8_t id; @@ -1296,38 +1418,38 @@ static ReadState readBtId(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inle } } -static void updatePeerProgress(tr_peerMsgs* msgs) +static void updatePeerProgress(tr_peerMsgsImpl* msgs) { - tr_peerUpdateProgress(msgs->torrent, &msgs->peer); + tr_peerUpdateProgress(msgs->torrent, msgs); - updateInterest(msgs); + msgs->update_interest(); } -static void prefetchPieces(tr_peerMsgs* msgs) +static void prefetchPieces(tr_peerMsgsImpl* msgs) { - if (!getSession(msgs)->isPrefetchEnabled) + if (!msgs->session->isPrefetchEnabled) { return; } - for (int i = msgs->prefetchCount; i < msgs->peer.pendingReqsToClient && i < PREFETCH_SIZE; ++i) + for (int i = msgs->prefetchCount; i < msgs->pendingReqsToClient && i < PREFETCH_SIZE; ++i) { struct peer_request const* req = msgs->peerAskedFor + i; if (requestIsValid(msgs, req)) { - tr_cachePrefetchBlock(getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length); + tr_cachePrefetchBlock(msgs->session->cache, msgs->torrent, req->index, req->offset, req->length); ++msgs->prefetchCount; } } } -static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req) +static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req) { bool const fext = tr_peerIoSupportsFEXT(msgs->io); bool const reqIsValid = requestIsValid(msgs, req); bool const clientHasPiece = reqIsValid && tr_torrentPieceIsComplete(msgs->torrent, req->index); - bool const peerIsChoked = msgs->peer_is_choked; + bool const peerIsChoked = msgs->peer_is_choked_; bool allow = false; @@ -1343,7 +1465,7 @@ static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req) { dbgmsg(msgs, "rejecting request from choked peer"); } - else if (msgs->peer.pendingReqsToClient + 1 >= REQQ) + else if (msgs->pendingReqsToClient + 1 >= REQQ) { dbgmsg(msgs, "rejecting request ... reqq is full"); } @@ -1354,7 +1476,7 @@ static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req) if (allow) { - msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req; + msgs->peerAskedFor[msgs->pendingReqsToClient++] = *req; prefetchPieces(msgs); } else if (fext) @@ -1363,7 +1485,7 @@ static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req) } } -static bool messageLengthIsCorrect(tr_peerMsgs const* msg, uint8_t id, uint32_t len) +static bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len) { switch (id) { @@ -1414,9 +1536,9 @@ static bool messageLengthIsCorrect(tr_peerMsgs const* msg, uint8_t id, uint32_t } } -static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* block, struct peer_request const* req); +static int clientGotBlock(tr_peerMsgsImpl* msgs, struct evbuffer* block, struct peer_request const* req); -static ReadState readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read) +static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read) { TR_ASSERT(evbuffer_get_length(inbuf) >= inlen); @@ -1457,7 +1579,7 @@ static ReadState readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t i tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n); - fireClientGotPieceData(msgs, n); + msgs->publishClientGotPieceData(n); *setme_piece_bytes_read += n; dbgmsg( msgs, @@ -1484,9 +1606,7 @@ static ReadState readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t i } } -static void updateDesiredRequestCount(tr_peerMsgs* msgs); - -static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen) +static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen) { uint8_t const id = msgs->incoming.id; #ifdef TR_ENABLE_ASSERTS @@ -1511,7 +1631,7 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (!messageLengthIsCorrect(msgs, id, msglen + 1)) { dbgmsg(msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen); - fireError(msgs, EMSGSIZE); + msgs->publishError(EMSGSIZE); return READ_ERR; } @@ -1519,33 +1639,33 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t { case BT_CHOKE: dbgmsg(msgs, "got Choke"); - msgs->client_is_choked = true; + msgs->client_is_choked_ = true; if (!fext) { - fireGotChoke(msgs); + msgs->publishGotChoke(); } - tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT); + msgs->update_active(TR_PEER_TO_CLIENT); break; case BT_UNCHOKE: dbgmsg(msgs, "got Unchoke"); - msgs->client_is_choked = false; - tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT); + msgs->client_is_choked_ = false; + msgs->update_active(TR_PEER_TO_CLIENT); updateDesiredRequestCount(msgs); break; case BT_INTERESTED: dbgmsg(msgs, "got Interested"); - msgs->peer_is_interested = true; - tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER); + msgs->peer_is_interested_ = true; + msgs->update_active(TR_CLIENT_TO_PEER); break; case BT_NOT_INTERESTED: dbgmsg(msgs, "got Not Interested"); - msgs->peer_is_interested = false; - tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER); + msgs->peer_is_interested_ = false; + msgs->update_active(TR_CLIENT_TO_PEER); break; case BT_HAVE: @@ -1554,15 +1674,15 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (tr_torrentHasMetadata(msgs->torrent) && ui32 >= msgs->torrent->info.pieceCount) { - fireError(msgs, ERANGE); + msgs->publishError(ERANGE); return READ_ERR; } /* a peer can send the same HAVE message twice... */ - if (!tr_bitfieldHas(&msgs->peer.have, ui32)) + if (!tr_bitfieldHas(&msgs->have, ui32)) { - tr_bitfieldAdd(&msgs->peer.have, ui32); - fireClientGotHave(msgs, ui32); + tr_bitfieldAdd(&msgs->have, ui32); + msgs->publishClientGotHave(ui32); } updatePeerProgress(msgs); @@ -1573,8 +1693,8 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t uint8_t* tmp = tr_new(uint8_t, msglen); dbgmsg(msgs, "got a bitfield"); tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen); - tr_bitfieldSetRaw(&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata(msgs->torrent)); - fireClientGotBitfield(msgs, &msgs->peer.have); + tr_bitfieldSetRaw(&msgs->have, tmp, msglen, tr_torrentHasMetadata(msgs->torrent)); + msgs->publishClientGotBitfield(&msgs->have); updatePeerProgress(msgs); tr_free(tmp); break; @@ -1597,21 +1717,17 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t tr_peerIoReadUint32(msgs->io, inbuf, &r.index); tr_peerIoReadUint32(msgs->io, inbuf, &r.offset); tr_peerIoReadUint32(msgs->io, inbuf, &r.length); - tr_historyAdd(&msgs->peer.cancelsSentToClient, tr_time(), 1); + msgs->cancelsSentToClient.add(tr_time(), 1); dbgmsg(msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length); - for (int i = 0; i < msgs->peer.pendingReqsToClient; ++i) + for (int i = 0; i < msgs->pendingReqsToClient; ++i) { struct peer_request const* req = msgs->peerAskedFor + i; if (req->index == r.index && req->offset == r.offset && req->length == r.length) { - tr_removeElementFromArray( - msgs->peerAskedFor, - i, - sizeof(struct peer_request), - msgs->peer.pendingReqsToClient); - --msgs->peer.pendingReqsToClient; + tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request), msgs->pendingReqsToClient); + --msgs->pendingReqsToClient; break; } } @@ -1629,7 +1745,7 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (msgs->dht_port > 0) { - tr_dhtAddNode(getSession(msgs), tr_peerAddress(&msgs->peer), msgs->dht_port, false); + tr_dhtAddNode(msgs->session, tr_peerAddress(msgs), msgs->dht_port, false); } break; @@ -1640,11 +1756,11 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (fext) { - fireClientGotSuggest(msgs, ui32); + msgs->publishClientGotSuggest(ui32); } else { - fireError(msgs, EMSGSIZE); + msgs->publishError(EMSGSIZE); return READ_ERR; } @@ -1656,11 +1772,11 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (fext) { - fireClientGotAllowedFast(msgs, ui32); + msgs->publishClientGotAllowedFast(ui32); } else { - fireError(msgs, EMSGSIZE); + msgs->publishError(EMSGSIZE); return READ_ERR; } @@ -1671,14 +1787,14 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (fext) { - tr_bitfieldSetHasAll(&msgs->peer.have); - TR_ASSERT(tr_bitfieldHasAll(&msgs->peer.have)); - fireClientGotHaveAll(msgs); + tr_bitfieldSetHasAll(&msgs->have); + TR_ASSERT(tr_bitfieldHasAll(&msgs->have)); + msgs->publishClientGotHaveAll(); updatePeerProgress(msgs); } else { - fireError(msgs, EMSGSIZE); + msgs->publishError(EMSGSIZE); return READ_ERR; } @@ -1689,13 +1805,13 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (fext) { - tr_bitfieldSetHasNone(&msgs->peer.have); - fireClientGotHaveNone(msgs); + tr_bitfieldSetHasNone(&msgs->have); + msgs->publishClientGotHaveNone(); updatePeerProgress(msgs); } else { - fireError(msgs, EMSGSIZE); + msgs->publishError(EMSGSIZE); return READ_ERR; } @@ -1711,11 +1827,11 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t if (fext) { - fireGotRej(msgs, &r); + msgs->publishGotRej(&r); } else { - fireError(msgs, EMSGSIZE); + msgs->publishError(EMSGSIZE); return READ_ERR; } @@ -1741,7 +1857,7 @@ static ReadState readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t } /* returns 0 on success, or an errno on failure */ -static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_request const* req) +static int clientGotBlock(tr_peerMsgsImpl* msgs, struct evbuffer* data, struct peer_request const* req) { TR_ASSERT(msgs != nullptr); TR_ASSERT(req != nullptr); @@ -1764,7 +1880,7 @@ static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_ dbgmsg(msgs, "got block %u:%u->%u", req->index, req->offset, req->length); - if (!tr_peerMgrDidPeerRequest(msgs->torrent, &msgs->peer, block)) + if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block)) { dbgmsg(msgs, "we didn't ask for this message..."); return 0; @@ -1780,25 +1896,23 @@ static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_ *** Save the block **/ - if ((err = tr_cacheWriteBlock(getSession(msgs)->cache, tor, req->index, req->offset, req->length, data)) != 0) + if ((err = tr_cacheWriteBlock(msgs->session->cache, tor, req->index, req->offset, req->length, data)) != 0) { return err; } - tr_bitfieldAdd(&msgs->peer.blame, req->index); - fireGotBlock(msgs, req); + tr_bitfieldAdd(&msgs->blame, req->index); + msgs->publishGotBlock(req); return 0; } -static void peerPulse(void* vmsgs); - static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs) { - auto* msgs = static_cast(vmsgs); + auto* msgs = static_cast(vmsgs); if (wasPieceData) { - firePeerGotPieceData(msgs, bytesWritten); + msgs->publishPeerGotPieceData(bytesWritten); } if (tr_isPeerIo(io) && io->userData != nullptr) @@ -1810,7 +1924,7 @@ static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) { ReadState ret; - auto* msgs = static_cast(vmsgs); + auto* msgs = static_cast(vmsgs); struct evbuffer* in = tr_peerIoGetReadBuffer(io); size_t const inlen = evbuffer_get_length(in); @@ -1855,26 +1969,16 @@ static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) return ret; } -bool tr_peerMsgsIsReadingBlock(tr_peerMsgs const* msgs, tr_block_index_t block) -{ - if (msgs->state != AWAITING_BT_PIECE) - { - return false; - } - - return block == _tr_block(msgs->torrent, msgs->incoming.blockReq.index, msgs->incoming.blockReq.offset); -} - /** *** **/ -static void updateDesiredRequestCount(tr_peerMsgs* msgs) +static void updateDesiredRequestCount(tr_peerMsgsImpl* msgs) { tr_torrent const* const torrent = msgs->torrent; /* there are lots of reasons we might not want to request any blocks... */ - if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked || !msgs->client_is_interested) + if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked_ || !msgs->client_is_interested_) { msgs->desiredRequestCount = 0; } @@ -1889,7 +1993,7 @@ static void updateDesiredRequestCount(tr_peerMsgs* msgs) /* Get the rate limit we should use. * FIXME: this needs to consider all the other peers as well... */ - rate_Bps = tr_peerGetPieceSpeed_Bps(&msgs->peer, now, TR_PEER_TO_CLIENT); + rate_Bps = tr_peerGetPieceSpeed_Bps(msgs, now, TR_PEER_TO_CLIENT); if (tr_torrentUsesSpeedLimit(torrent, TR_PEER_TO_CLIENT)) { @@ -1916,7 +2020,7 @@ static void updateDesiredRequestCount(tr_peerMsgs* msgs) } } -static void updateMetadataRequests(tr_peerMsgs* msgs, time_t now) +static void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now) { int piece; @@ -1948,33 +2052,31 @@ static void updateMetadataRequests(tr_peerMsgs* msgs, time_t now) } } -static void updateBlockRequests(tr_peerMsgs* msgs) +static void updateBlockRequests(tr_peerMsgsImpl* msgs) { if (tr_torrentIsPieceTransferAllowed(msgs->torrent, TR_PEER_TO_CLIENT) && msgs->desiredRequestCount > 0 && - msgs->peer.pendingReqsToPeer <= msgs->desiredRequestCount * 0.66) + msgs->pendingReqsToPeer <= msgs->desiredRequestCount * 0.66) { - TR_ASSERT(tr_peerMsgsIsClientInterested(msgs)); - TR_ASSERT(!tr_peerMsgsIsClientChoked(msgs)); + TR_ASSERT(msgs->is_client_interested()); + TR_ASSERT(!msgs->is_client_choked()); int n; tr_block_index_t* blocks; - int const numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer; + int const numwant = msgs->desiredRequestCount - msgs->pendingReqsToPeer; blocks = tr_new(tr_block_index_t, numwant); - tr_peerMgrGetNextRequests(msgs->torrent, &msgs->peer, numwant, blocks, &n, false); + tr_peerMgrGetNextRequests(msgs->torrent, msgs, numwant, blocks, &n, false); for (int i = 0; i < n; ++i) { - struct peer_request req; - blockToReq(msgs->torrent, blocks[i], &req); - protocolSendRequest(msgs, &req); + protocolSendRequest(msgs, blockToReq(msgs->torrent, blocks[i])); } tr_free(blocks); } } -static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now) +static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) { int piece; size_t bytesWritten = 0; @@ -2093,7 +2195,7 @@ static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now) evbuffer_reserve_space(out, req.length, iovec, 1); err = tr_cacheReadBlock( - getSession(msgs)->cache, + msgs->session->cache, msgs->torrent, req.index, req.offset, @@ -2131,7 +2233,7 @@ static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now) tr_peerIoWriteBuf(msgs->io, out, true); bytesWritten += n; msgs->clientSentAnythingAt = now; - tr_historyAdd(&msgs->peer.blocksSentToPeer, tr_time(), 1); + msgs->blocksSentToPeer.add(tr_time(), 1); } evbuffer_free(out); @@ -2169,7 +2271,7 @@ static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now) static void peerPulse(void* vmsgs) { - auto* msgs = static_cast(vmsgs); + auto* msgs = static_cast(vmsgs); time_t const now = tr_time(); if (tr_isPeerIo(msgs->io)) @@ -2188,17 +2290,9 @@ static void peerPulse(void* vmsgs) } } -void tr_peerMsgsPulse(tr_peerMsgs* msgs) -{ - if (msgs != nullptr) - { - peerPulse(msgs); - } -} - static void gotError(tr_peerIo* io, short what, void* vmsgs) { - auto* msgs = static_cast(vmsgs); + auto* msgs = static_cast(vmsgs); TR_UNUSED(io); @@ -2212,10 +2306,10 @@ static void gotError(tr_peerIo* io, short what, void* vmsgs) dbgmsg(msgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror(errno)); } - fireError(msgs, ENOTCONN); + msgs->publishError(ENOTCONN); } -static void sendBitfield(tr_peerMsgs* msgs) +static void sendBitfield(tr_peerMsgsImpl* msgs) { TR_ASSERT(tr_torrentHasMetadata(msgs->torrent)); @@ -2233,7 +2327,7 @@ static void sendBitfield(tr_peerMsgs* msgs) tr_free(bytes); } -static void tellPeerWhatWeHave(tr_peerMsgs* msgs) +static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs) { bool const fext = tr_peerIoSupportsFEXT(msgs->io); @@ -2370,7 +2464,7 @@ static void tr_set_compare( } } -static void sendPex(tr_peerMsgs* msgs) +static void sendPex(tr_peerMsgsImpl* msgs) { if (msgs->peerSupportsPex && tr_torrentAllowsPex(msgs->torrent)) { @@ -2579,213 +2673,10 @@ static void pexPulse(evutil_socket_t fd, short what, void* vmsgs) TR_UNUSED(fd); TR_UNUSED(what); - auto* msgs = static_cast(vmsgs); + auto* msgs = static_cast(vmsgs); sendPex(msgs); - TR_ASSERT(msgs->pexTimer != nullptr); - tr_timerAdd(msgs->pexTimer, PEX_INTERVAL_SECS, 0); -} - -/*** -**** tr_peer virtual functions -***/ - -static bool peermsgs_is_transferring_pieces( - struct tr_peer const* peer, - uint64_t now, - tr_direction direction, - unsigned int* setme_Bps) -{ - unsigned int Bps = 0; - - if (tr_isPeerMsgs(peer)) - { - tr_peerMsgs const* msgs = (tr_peerMsgs const*)peer; - Bps = tr_peerIoGetPieceSpeed_Bps(msgs->io, now, direction); - } - - if (setme_Bps != nullptr) - { - *setme_Bps = Bps; - } - - return Bps > 0; -} - -static void peermsgs_destruct(tr_peer* peer) -{ - tr_peerMsgs* const msgs = PEER_MSGS(peer); - TR_ASSERT(msgs != nullptr); - if (msgs != nullptr) - { - tr_peerMsgsSetActive(msgs, TR_UP, false); - tr_peerMsgsSetActive(msgs, TR_DOWN, false); - - if (msgs->pexTimer != nullptr) - { - event_free(msgs->pexTimer); - } - - if (msgs->incoming.block != nullptr) - { - evbuffer_free(msgs->incoming.block); - } - - if (msgs->io != nullptr) - { - tr_peerIoClear(msgs->io); - tr_peerIoUnref(msgs->io); /* balanced by the ref in handshakeDoneCB() */ - } - - evbuffer_free(msgs->outMessages); - tr_free(msgs->pex6); - tr_free(msgs->pex); - - tr_peerDestruct(&msgs->peer); - - memset(msgs, ~0, sizeof(tr_peerMsgs)); - } -} - -static struct tr_peer_virtual_funcs const my_funcs = { - peermsgs_destruct, - peermsgs_is_transferring_pieces, -}; - -/*** -**** -***/ - -time_t tr_peerMsgsGetConnectionAge(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return tr_peerIoGetAge(msgs->io); -} - -bool tr_peerMsgsIsPeerChoked(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return msgs->peer_is_choked; -} - -bool tr_peerMsgsIsPeerInterested(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return msgs->peer_is_interested; -} - -bool tr_peerMsgsIsClientChoked(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return msgs->client_is_choked; -} - -bool tr_peerMsgsIsClientInterested(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return msgs->client_is_interested; -} - -bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP; -} - -bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return tr_peerIoIsEncrypted(msgs->io); -} - -bool tr_peerMsgsIsIncomingConnection(tr_peerMsgs const* msgs) -{ - TR_ASSERT(tr_isPeerMsgs(msgs)); - - return tr_peerIoIsIncoming(msgs->io); -} - -/*** -**** -***/ - -bool tr_isPeerMsgs(void const* msgs) -{ - return msgs != nullptr && ((struct tr_peerMsgs const*)msgs)->magic_number == MAGIC_NUMBER; -} - -tr_peerMsgs* tr_peerMsgsCast(void* vm) -{ - auto* m = static_cast(vm); - return tr_isPeerMsgs(m) ? m : nullptr; -} - -tr_peerMsgs* tr_peerMsgsNew(struct tr_torrent* torrent, struct tr_peerIo* io, tr_peer_callback callback, void* callbackData) -{ - TR_ASSERT(io != nullptr); - - tr_peerMsgs* m = tr_new0(tr_peerMsgs, 1); - - tr_peerConstruct(&m->peer, torrent); - m->peer.funcs = &my_funcs; - - m->magic_number = MAGIC_NUMBER; - m->client_is_choked = true; - m->peer_is_choked = true; - m->client_is_interested = false; - m->peer_is_interested = false; - m->is_active[TR_UP] = false; - m->is_active[TR_DOWN] = false; - m->callback = callback; - m->callbackData = callbackData; - m->io = io; - m->torrent = torrent; - m->state = AWAITING_BT_LENGTH; - m->outMessages = evbuffer_new(); - m->outMessagesBatchedAt = 0; - m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; - - if (tr_torrentAllowsPex(torrent)) - { - m->pexTimer = evtimer_new(torrent->session->event_base, pexPulse, m); - tr_timerAdd(m->pexTimer, PEX_INTERVAL_SECS, 0); - } - - if (tr_peerIoSupportsUTP(m->io)) - { - tr_address const* addr = tr_peerIoGetAddress(m->io, nullptr); - tr_peerMgrSetUtpSupported(torrent, addr); - tr_peerMgrSetUtpFailed(torrent, addr, false); - } - - if (tr_peerIoSupportsLTEP(m->io)) - { - sendLtepHandshake(m); - } - - tellPeerWhatWeHave(m); - - if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(m->io)) - { - /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */ - struct tr_address const* addr = tr_peerIoGetAddress(m->io, nullptr); - - if (addr->type == TR_AF_INET || tr_globalIPv6() != nullptr) - { - protocolSendPort(m, tr_dhtPort(torrent->session)); - } - } - - tr_peerIoSetIOFuncs(m->io, canRead, didWrite, gotError, m); - updateDesiredRequestCount(m); - - return m; + TR_ASSERT(msgs->pex_timer); + tr_timerAdd(msgs->pex_timer.get(), PEX_INTERVAL_SECS, 0); } diff --git a/libtransmission/peer-msgs.h b/libtransmission/peer-msgs.h index 57245f203..5ffae70c6 100644 --- a/libtransmission/peer-msgs.h +++ b/libtransmission/peer-msgs.h @@ -15,9 +15,9 @@ #include #include "peer-common.h" +class tr_peer; struct tr_address; struct tr_bitfield; -struct tr_peer; struct tr_peerIo; struct tr_torrent; @@ -26,47 +26,47 @@ struct tr_torrent; * @{ */ -struct tr_peerMsgs; +class tr_peerMsgs : public tr_peer +{ +public: + tr_peerMsgs(tr_torrent* torrent, peer_atom* atom_in) + : tr_peer{ torrent, atom_in } + { + } -#define PEER_MSGS(o) (tr_peerMsgsCast(o)) + virtual ~tr_peerMsgs() override = default; -bool tr_isPeerMsgs(void const* msgs); + virtual bool is_peer_choked() const = 0; + virtual bool is_peer_interested() const = 0; + virtual bool is_client_choked() const = 0; + virtual bool is_client_interested() const = 0; -tr_peerMsgs* tr_peerMsgsCast(void* msgs); + virtual bool is_utp_connection() const = 0; + virtual bool is_encrypted() const = 0; + virtual bool is_incoming_connection() const = 0; -tr_peerMsgs* tr_peerMsgsNew(struct tr_torrent* torrent, struct tr_peerIo* io, tr_peer_callback callback, void* callback_data); + virtual bool is_active(tr_direction direction) const = 0; + virtual void update_active(tr_direction direction) = 0; -bool tr_peerMsgsIsPeerChoked(tr_peerMsgs const* msgs); + virtual time_t get_connection_age() const = 0; + virtual bool is_reading_block(tr_block_index_t block) const = 0; -bool tr_peerMsgsIsPeerInterested(tr_peerMsgs const* msgs); + virtual void cancel_block_request(tr_block_index_t block) = 0; -bool tr_peerMsgsIsClientChoked(tr_peerMsgs const* msgs); + virtual void set_choke(bool peer_is_choked) = 0; + virtual void set_interested(bool client_is_interested) = 0; -bool tr_peerMsgsIsClientInterested(tr_peerMsgs const* msgs); + virtual void pulse() = 0; -bool tr_peerMsgsIsActive(tr_peerMsgs const* msgs, tr_direction direction); + virtual void on_piece_completed(tr_piece_index_t) = 0; +}; -void tr_peerMsgsUpdateActive(tr_peerMsgs* msgs, tr_direction direction); - -time_t tr_peerMsgsGetConnectionAge(tr_peerMsgs const* msgs); - -bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs); - -bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs); - -bool tr_peerMsgsIsIncomingConnection(tr_peerMsgs const* msgs); - -void tr_peerMsgsSetChoke(tr_peerMsgs* msgs, bool peerIsChoked); - -bool tr_peerMsgsIsReadingBlock(tr_peerMsgs const* msgs, tr_block_index_t block); - -void tr_peerMsgsSetInterested(tr_peerMsgs* msgs, bool clientIsInterested); - -void tr_peerMsgsHave(tr_peerMsgs* msgs, uint32_t pieceIndex); - -void tr_peerMsgsPulse(tr_peerMsgs* msgs); - -void tr_peerMsgsCancel(tr_peerMsgs* msgs, tr_block_index_t block); +tr_peerMsgs* tr_peerMsgsNew( + tr_torrent* torrent, + peer_atom* atom, + tr_peerIo* io, + tr_peer_callback callback, + void* callback_data); size_t tr_generateAllowedSet( tr_piece_index_t* setmePieces, diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index 900fe8fcb..f822b0f38 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -24,6 +24,11 @@ #include "web.h" #include "webseed.h" +namespace +{ + +struct tr_webseed; + struct tr_webseed_task { bool dead; @@ -40,36 +45,83 @@ struct tr_webseed_task long response_code; }; -struct tr_webseed +auto constexpr TR_IDLE_TIMER_MSEC = 2000; + +auto constexpr FAILURE_RETRY_INTERVAL = 150; + +auto constexpr MAX_CONSECUTIVE_FAILURES = 5; + +auto constexpr MAX_WEBSEED_CONNECTIONS = 4; + +void webseed_timer_func(evutil_socket_t fd, short what, void* vw); + +struct tr_webseed : public tr_peer { - tr_peer parent; - tr_bandwidth bandwidth; - tr_session* session; - tr_peer_callback callback; - void* callback_data; +public: + tr_webseed(struct tr_torrent* tor, std::string_view url, tr_peer_callback callback_in, void* callback_data_in) + : tr_peer{ tor } + , torrent_id{ tr_torrentId(tor) } + , base_url{ url } + , callback{ callback_in } + , callback_data{ callback_data_in } + { + // init parent bits + tr_bitfieldSetHasAll(&have); + tr_peerUpdateProgress(tor, this); + + file_urls.resize(tr_torrentInfo(tor)->fileCount); + + tr_bandwidthConstruct(&bandwidth, &tor->bandwidth); + timer = evtimer_new(session->event_base, webseed_timer_func, this); + tr_timerAddMsec(timer, TR_IDLE_TIMER_MSEC); + } + + ~tr_webseed() override + { + // flag all the pending tasks as dead + std::for_each(std::begin(tasks), std::end(tasks), [](auto* task) { task->dead = true; }); + tasks.clear(); + + event_free(timer); + tr_bandwidthDestruct(&bandwidth); + } + + bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override + { + unsigned int Bps = 0; + bool is_active = false; + + if (direction == TR_DOWN) + { + is_active = !std::empty(tasks); + Bps = tr_bandwidthGetPieceSpeed_Bps(&bandwidth, now, direction); + } + + if (setme_Bps != nullptr) + { + *setme_Bps = Bps; + } + + return is_active; + } + + int const torrent_id; + std::string const base_url; + tr_peer_callback const callback; + void* const callback_data; + + tr_bandwidth bandwidth = {}; std::set tasks; - struct event* timer; - char* base_url; - size_t base_url_len; - int torrent_id; - int consecutive_failures; - int retry_tickcount; - int retry_challenge; - int idle_connections; - int active_transfers; - char** file_urls; + struct event* timer = nullptr; + int consecutive_failures = 0; + int retry_tickcount = 0; + int retry_challenge = 0; + int idle_connections = 0; + int active_transfers = 0; + std::vector file_urls; }; -enum -{ - TR_IDLE_TIMER_MSEC = 2000, - /* */ - FAILURE_RETRY_INTERVAL = 150, - /* */ - MAX_CONSECUTIVE_FAILURES = 5, - /* */ - MAX_WEBSEED_CONNECTIONS = 4 -}; +} // namespace /*** **** @@ -79,7 +131,7 @@ static void publish(tr_webseed* w, tr_peer_event* e) { if (w->callback != nullptr) { - (*w->callback)(&w->parent, e, w->callback_data); + (*w->callback)(w, e, w->callback_data); } } @@ -210,8 +262,7 @@ static void connection_succeeded(void* vdata) tr_file_index_t file_index; tr_ioFindFileLocation(tor, data->piece_index, data->piece_offset, &file_index, &file_offset); - tr_free(w->file_urls[file_index]); - w->file_urls[file_index] = data->real_url; + w->file_urls[file_index].assign(data->real_url); data->real_url = nullptr; } } @@ -228,7 +279,7 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con { size_t const n_added = info->n_added; auto* task = static_cast(vtask); - tr_session* session = task->session; + auto* session = task->session; tr_sessionLock(session); @@ -256,7 +307,7 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con /* processing this uses a tr_torrent pointer, so push the work to the libevent thread... */ - tr_runInEventThread(w->session, connection_succeeded, data); + tr_runInEventThread(session, connection_succeeded, data); } } @@ -264,10 +315,10 @@ static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info con { /* once we've got at least one full block, save it */ - struct write_block_data* data; uint32_t const block_size = task->block_size; tr_block_index_t const completed = len / block_size; + struct write_block_data* data; data = tr_new(struct write_block_data, 1); data->webseed = task->webseed; data->piece_index = task->piece_index; @@ -322,7 +373,7 @@ static void on_idle(tr_webseed* w) tr_block_index_t* blocks = nullptr; blocks = tr_new(tr_block_index_t, want * 2); - tr_peerMgrGetNextRequests(tor, &w->parent, want, blocks, &got, true); + tr_peerMgrGetNextRequests(tor, w, want, blocks, &got, true); w->idle_connections -= std::min(w->idle_connections, got); @@ -450,19 +501,21 @@ static void web_response_func( } } -static struct evbuffer* make_url(tr_webseed* w, tr_file const* file) +static std::string make_url(tr_webseed* w, tr_file const* file) { struct evbuffer* buf = evbuffer_new(); - evbuffer_add(buf, w->base_url, w->base_url_len); + evbuffer_add(buf, std::data(w->base_url), std::size(w->base_url)); /* if url ends with a '/', add the torrent name */ - if (w->base_url[w->base_url_len - 1] == '/' && file->name != nullptr) + if (*std::rbegin(w->base_url) == '/' && file->name != nullptr) { tr_http_escape(buf, file->name, strlen(file->name), false); } - return buf; + auto const url = std::string{ (char const*)evbuffer_pullup(buf, -1), evbuffer_get_length(buf) }; + evbuffer_free(buf); + return url; } static void task_request_next_chunk(struct tr_webseed_task* t) @@ -472,8 +525,7 @@ static void task_request_next_chunk(struct tr_webseed_task* t) if (tor != nullptr) { - char range[64]; - char** urls = t->webseed->file_urls; + auto& urls = t->webseed->file_urls; tr_info const* inf = tr_torrentInfo(tor); uint64_t const remain = t->length - t->blocks_done * tor->blockSize - evbuffer_get_length(t->content); @@ -491,14 +543,15 @@ static void task_request_next_chunk(struct tr_webseed_task* t) file = &inf->files[file_index]; this_pass = std::min(remain, file->length - file_offset); - if (urls[file_index] == nullptr) + if (std::empty(urls[file_index])) { - urls[file_index] = evbuffer_free_to_str(make_url(t->webseed, file), nullptr); + urls[file_index] = make_url(t->webseed, file); } + char range[64]; tr_snprintf(range, sizeof(range), "%" PRIu64 "-%" PRIu64, file_offset, file_offset + this_pass - 1); - t->web_task = tr_webRunWebseed(tor, urls[file_index], range, web_response_func, t, t->content); + t->web_task = tr_webRunWebseed(tor, urls[file_index].c_str(), range, web_response_func, t, t->content); } } @@ -506,7 +559,10 @@ static void task_request_next_chunk(struct tr_webseed_task* t) **** ***/ -static void webseed_timer_func(evutil_socket_t fd, short what, void* vw) +namespace +{ + +void webseed_timer_func(evutil_socket_t fd, short what, void* vw) { TR_UNUSED(fd); TR_UNUSED(what); @@ -523,99 +579,9 @@ static void webseed_timer_func(evutil_socket_t fd, short what, void* vw) tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC); } -/*** -**** tr_peer virtual functions -***/ +} // unnamed namespace -static bool webseed_is_transferring_pieces(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* setme_Bps) +tr_peer* tr_webseedNew(struct tr_torrent* torrent, std::string_view url, tr_peer_callback callback, void* callback_data) { - unsigned int Bps = 0; - bool is_active = false; - - if (direction == TR_DOWN) - { - tr_webseed const* w = (tr_webseed const*)peer; - is_active = !std::empty(w->tasks); - Bps = tr_bandwidthGetPieceSpeed_Bps(&w->bandwidth, now, direction); - } - - if (setme_Bps != nullptr) - { - *setme_Bps = Bps; - } - - return is_active; -} - -static void webseed_destruct(tr_peer* peer) -{ - tr_webseed* w = (tr_webseed*)peer; - - /* flag all the pending tasks as dead */ - auto& src = w->tasks; - std::for_each(std::begin(src), std::end(src), [](auto* task) { task->dead = true; }); - // Manually destructing is unfortunately necessary until we C++ify - // the tr_peer / tr_peerMsgs / tr_webseed inheritance. Peers are - // curently tr_free()d in tr_peerFree() so we can't new/delete them. - using type = decltype(w->tasks); - w->tasks.~type(); - - /* if we have an array of file URLs, free it */ - if (w->file_urls != nullptr) - { - tr_torrent const* const tor = tr_torrentFindFromId(w->session, w->torrent_id); - tr_info const* const inf = tr_torrentInfo(tor); - - for (tr_file_index_t i = 0; i < inf->fileCount; ++i) - { - tr_free(w->file_urls[i]); - } - - tr_free(w->file_urls); - } - - /* webseed destruct */ - event_free(w->timer); - tr_bandwidthDestruct(&w->bandwidth); - tr_free(w->base_url); - - /* parent class destruct */ - tr_peerDestruct(&w->parent); -} - -static struct tr_peer_virtual_funcs const my_funcs = { - webseed_destruct, - webseed_is_transferring_pieces, -}; - -/*** -**** -***/ - -tr_webseed* tr_webseedNew(struct tr_torrent* tor, char const* url, tr_peer_callback callback, void* callback_data) -{ - tr_webseed* w = tr_new0(tr_webseed, 1); - tr_peer* peer = &w->parent; - tr_info const* inf = tr_torrentInfo(tor); - - /* construct parent class */ - tr_peerConstruct(peer, tor); - peer->client = TR_KEY_webseeds; - peer->funcs = &my_funcs; - tr_bitfieldSetHasAll(&peer->have); - tr_peerUpdateProgress(tor, peer); - - using type = decltype(w->tasks); - new (&w->tasks) type; - w->torrent_id = tr_torrentId(tor); - w->session = tor->session; - w->base_url_len = strlen(url); - w->base_url = tr_strndup(url, w->base_url_len); - w->callback = callback; - w->callback_data = callback_data; - w->file_urls = tr_new0(char*, inf->fileCount); - tr_bandwidthConstruct(&w->bandwidth, &tor->bandwidth); - w->timer = evtimer_new(w->session->event_base, webseed_timer_func, w); - tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC); - return w; + return new tr_webseed(torrent, url, callback, callback_data); } diff --git a/libtransmission/webseed.h b/libtransmission/webseed.h index 5a289db19..23a18b9c3 100644 --- a/libtransmission/webseed.h +++ b/libtransmission/webseed.h @@ -12,8 +12,8 @@ #error only libtransmission should #include this header. #endif -struct tr_webseed; +#include #include "peer-common.h" -tr_webseed* tr_webseedNew(struct tr_torrent* torrent, char const* url, tr_peer_callback callback, void* callback_data); +tr_peer* tr_webseedNew(struct tr_torrent* torrent, std::string_view, tr_peer_callback callback, void* callback_data); diff --git a/tests/libtransmission/history-test.cc b/tests/libtransmission/history-test.cc index 521f8c45c..84b4b2d62 100644 --- a/tests/libtransmission/history-test.cc +++ b/tests/libtransmission/history-test.cc @@ -15,13 +15,13 @@ TEST(History, recentHistory) { auto h = tr_recentHistory{}; - tr_historyAdd(&h, 10000, 1); - EXPECT_EQ(0, tr_historyGet(&h, 12000, 1000)); - EXPECT_EQ(1, tr_historyGet(&h, 12000, 3000)); - EXPECT_EQ(1, tr_historyGet(&h, 12000, 5000)); - tr_historyAdd(&h, 20000, 1); - EXPECT_EQ(0, tr_historyGet(&h, 22000, 1000)); - EXPECT_EQ(1, tr_historyGet(&h, 22000, 3000)); - EXPECT_EQ(2, tr_historyGet(&h, 22000, 15000)); - EXPECT_EQ(2, tr_historyGet(&h, 22000, 20000)); + h.add(10000, 1); + EXPECT_EQ(0, h.count(12000, 1000)); + EXPECT_EQ(1, h.count(12000, 3000)); + EXPECT_EQ(1, h.count(12000, 5000)); + h.add(20000, 1); + EXPECT_EQ(0, h.count(22000, 1000)); + EXPECT_EQ(1, h.count(22000, 3000)); + EXPECT_EQ(2, h.count(22000, 15000)); + EXPECT_EQ(2, h.count(22000, 20000)); }