From 7c014e3256499b61e7684f671d99f39155c880a5 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Mon, 29 Aug 2022 15:58:18 -0500 Subject: [PATCH] Refactor/tr peer io member funcs (#3734) * refactor: make tr_peerIoSetEnabled() a member method * refactor: make tr_peerIoFlushOutgoingProtocolMsgs() a member method * refactor: make tr_peerIoFlush() a member method * refactor: make tr_peerWriteBytes() a member method * refactor: make tr_peerWriteBuf() a member method * refactor: make tr_peerIoGetWriteBufferSpace() a member method * chore: remove unused declaration * refactor: make tr_peerIoUtpInit() a member method * refactor: make tr_peerIoNew() a member method * refactor: make tr_peerIoNewOutgoing() a member method * refactor: make tr_peerIoNewIncoming() a member method --- libtransmission/bandwidth.cc | 6 +- libtransmission/handshake.cc | 18 +-- libtransmission/peer-io.cc | 100 +++++++--------- libtransmission/peer-io.h | 151 ++++++++++-------------- libtransmission/peer-mgr.cc | 8 +- libtransmission/peer-msgs.cc | 8 +- libtransmission/tr-utp.cc | 2 +- tests/libtransmission/handshake-test.cc | 4 +- 8 files changed, 133 insertions(+), 164 deletions(-) diff --git a/libtransmission/bandwidth.cc b/libtransmission/bandwidth.cc index 7ee7f1774..048a07309 100644 --- a/libtransmission/bandwidth.cc +++ b/libtransmission/bandwidth.cc @@ -182,7 +182,7 @@ void tr_bandwidth::phaseOne(std::vector& peer_array, tr_direction di * out in a timely manner. */ size_t const increment = 3000; - int const bytes_used = tr_peerIoFlush(peer_array[i], dir, increment); + int const bytes_used = peer_array[i]->flush(dir, increment); tr_logAddTrace(fmt::format("peer #{} of {} used {} bytes in this pass", i, n, bytes_used)); @@ -212,7 +212,7 @@ void tr_bandwidth::allocate(tr_direction dir, unsigned int period_msec) for (auto* io : tmp) { tr_peerIoRef(io); - tr_peerIoFlushOutgoingProtocolMsgs(io); + io->flushOutgoingProtocolMsgs(); switch (io->priority) { @@ -243,7 +243,7 @@ void tr_bandwidth::allocate(tr_direction dir, unsigned int period_msec) * or (2) the next tr_bandwidth::allocate () call, when we start over again. */ for (auto* io : tmp) { - tr_peerIoSetEnabled(io, dir, io->hasBandwidthLeft(dir)); + io->setEnabled(dir, io->hasBandwidthLeft(dir)); } for (auto* io : tmp) diff --git a/libtransmission/handshake.cc b/libtransmission/handshake.cc index 87dfb6bc3..33e30528c 100644 --- a/libtransmission/handshake.cc +++ b/libtransmission/handshake.cc @@ -310,7 +310,7 @@ static void sendPublicKeyAndPad(tr_handshake* handshake) auto walk = data; walk = std::copy(std::begin(public_key), std::end(public_key), walk); walk += handshake->mediator->pad(walk, PadMax); - tr_peerIoWriteBytes(handshake->io, data, walk - data, false); + handshake->io->writeBytes(data, walk - data, false); } // 1 A->B: our public key (Ya) and some padding (PadA) @@ -439,7 +439,7 @@ static ReadState readYb(tr_handshake* handshake, struct evbuffer* inbuf) /* ENCRYPT(VC, crypto_provide, len(PadC), PadC * PadC is reserved for future extensions to the handshake... * standard practice at this time is for it to be zero-length */ - tr_peerIoWriteBuf(handshake->io, outbuf, false); + handshake->io->writeBuf(outbuf, false); handshake->io->encryptInit(handshake->io->isIncoming(), handshake->dh, *info_hash); evbuffer_add(outbuf, std::data(VC), std::size(VC)); evbuffer_add_uint32(outbuf, getCryptoProvide(handshake)); @@ -460,7 +460,7 @@ static ReadState readYb(tr_handshake* handshake, struct evbuffer* inbuf) /* send it */ handshake->io->decryptInit(handshake->io->isIncoming(), handshake->dh, *info_hash); setReadState(handshake, AWAITING_VC); - tr_peerIoWriteBuf(handshake->io, outbuf, false); + handshake->io->writeBuf(outbuf, false); /* cleanup */ evbuffer_free(outbuf); @@ -664,7 +664,7 @@ static ReadState readHandshake(tr_handshake* handshake, struct evbuffer* inbuf) return tr_handshakeDone(handshake, false); } - tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false); + handshake->io->writeBytes(std::data(msg), std::size(msg), false); handshake->haveSentBitTorrentHandshake = true; } @@ -891,7 +891,7 @@ static ReadState readIA(tr_handshake* handshake, struct evbuffer const* inbuf) /* maybe de-encrypt our connection */ if (crypto_select == CRYPTO_PROVIDE_PLAINTEXT) { - tr_peerIoWriteBuf(handshake->io, outbuf, false); + handshake->io->writeBuf(outbuf, false); } tr_logAddTraceHand(handshake, "sending handshake"); @@ -908,7 +908,7 @@ static ReadState readIA(tr_handshake* handshake, struct evbuffer const* inbuf) } /* send it out */ - tr_peerIoWriteBuf(handshake->io, outbuf, false); + handshake->io->writeBuf(outbuf, false); evbuffer_free(outbuf); /* now await the handshake */ @@ -1100,7 +1100,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake) buildHandshakeMessage(handshake, std::data(msg)); handshake->haveSentBitTorrentHandshake = true; setReadState(handshake, AWAITING_HANDSHAKE); - tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false); + handshake->io->writeBytes(std::data(msg), std::size(msg), false); } } @@ -1116,7 +1116,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake) buildHandshakeMessage(handshake, std::data(msg)); handshake->haveSentBitTorrentHandshake = true; setReadState(handshake, AWAITING_HANDSHAKE); - tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false); + handshake->io->writeBytes(std::data(msg), std::size(msg), false); } else { @@ -1164,7 +1164,7 @@ tr_handshake* tr_handshakeNew( handshake->haveSentBitTorrentHandshake = true; setReadState(handshake, AWAITING_HANDSHAKE); - tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false); + handshake->io->writeBytes(std::data(msg), std::size(msg), false); } return handshake; diff --git a/libtransmission/peer-io.cc b/libtransmission/peer-io.cc index c2cc7fa1c..6c28833eb 100644 --- a/libtransmission/peer-io.cc +++ b/libtransmission/peer-io.cc @@ -205,7 +205,7 @@ static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio) /* if we don't have any bandwidth left, stop reading */ if (howmuch < 1) { - tr_peerIoSetEnabled(io, dir, false); + io->setEnabled(dir, false); return; } @@ -215,7 +215,7 @@ static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio) if (res > 0) { - tr_peerIoSetEnabled(io, dir, true); + io->setEnabled(dir, true); /* Invoke the user callback - must always be called last */ canReadWrapper(io); @@ -232,7 +232,7 @@ static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio) { if (e == EAGAIN || e == EINTR) { - tr_peerIoSetEnabled(io, dir, true); + io->setEnabled(dir, true); return; } @@ -282,7 +282,7 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio) /* if we don't have any bandwidth left, stop writing */ if (howmuch < 1) { - tr_peerIoSetEnabled(io, dir, false); + io->setEnabled(dir, false); return; } @@ -313,7 +313,7 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio) if (evbuffer_get_length(io->outbuf.get()) != 0) { - tr_peerIoSetEnabled(io, dir, true); + io->setEnabled(dir, true); } didWriteWrapper(io, res); @@ -322,7 +322,7 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio) RESCHEDULE: if (evbuffer_get_length(io->outbuf.get()) != 0) { - tr_peerIoSetEnabled(io, dir, true); + io->setEnabled(dir, true); } return; @@ -360,7 +360,7 @@ void tr_peerIo::readBufferAdd(void const* data, size_t n_bytes) return; } - tr_peerIoSetEnabled(this, TR_DOWN, true); + setEnabled(TR_DOWN, true); canReadWrapper(this); } @@ -379,7 +379,7 @@ static void utp_on_writable(tr_peerIo* io) tr_logAddTraceIo(io, "libutp says this peer is ready to write"); int const n = tr_peerIoTryWrite(io, SIZE_MAX); - tr_peerIoSetEnabled(io, TR_UP, n != 0 && evbuffer_get_length(io->outbuf.get()) != 0); + io->setEnabled(TR_UP, n != 0 && evbuffer_get_length(io->outbuf.get()) != 0); } static void utp_on_state_change(tr_peerIo* const io, int const state) @@ -491,7 +491,7 @@ static uint64 utp_callback(utp_callback_arguments* args) #endif /* #ifdef WITH_UTP */ -tr_peerIo* tr_peerIoNew( +tr_peerIo* tr_peerIo::create( tr_session* session, tr_bandwidth* parent, tr_address const* addr, @@ -548,7 +548,7 @@ tr_peerIo* tr_peerIoNew( return io; } -void tr_peerIoUtpInit([[maybe_unused]] struct_utp_context* ctx) +void tr_peerIo::utpInit([[maybe_unused]] struct_utp_context* ctx) { #ifdef WITH_UTP @@ -563,7 +563,7 @@ void tr_peerIoUtpInit([[maybe_unused]] struct_utp_context* ctx) #endif } -tr_peerIo* tr_peerIoNewIncoming( +tr_peerIo* tr_peerIo::newIncoming( tr_session* session, tr_bandwidth* parent, tr_address const* addr, @@ -574,10 +574,10 @@ tr_peerIo* tr_peerIoNewIncoming( TR_ASSERT(session != nullptr); TR_ASSERT(tr_address_is_valid(addr)); - return tr_peerIoNew(session, parent, addr, port, current_time, nullptr, true, false, socket); + return tr_peerIo::create(session, parent, addr, port, current_time, nullptr, true, false, socket); } -tr_peerIo* tr_peerIoNewOutgoing( +tr_peerIo* tr_peerIo::newOutgoing( tr_session* session, tr_bandwidth* parent, tr_address const* addr, @@ -611,7 +611,7 @@ tr_peerIo* tr_peerIoNewOutgoing( return nullptr; } - return tr_peerIoNew(session, parent, addr, port, current_time, &torrent_hash, false, is_seed, socket); + return create(session, parent, addr, port, current_time, &torrent_hash, false, is_seed, socket); } /*** @@ -696,22 +696,21 @@ static void event_disable(tr_peerIo* io, short event) } } -void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled) +void tr_peerIo::setEnabled(tr_direction dir, bool is_enabled) { - TR_ASSERT(tr_isPeerIo(io)); TR_ASSERT(tr_isDirection(dir)); - TR_ASSERT(tr_amInEventThread(io->session)); - TR_ASSERT(io->session->events != nullptr); + TR_ASSERT(tr_amInEventThread(session)); + TR_ASSERT(session->events != nullptr); short const event = dir == TR_UP ? EV_WRITE : EV_READ; - if (isEnabled) + if (is_enabled) { - event_enable(io, event); + event_enable(this, event); } else { - event_disable(io, event); + event_disable(this, event); } } @@ -825,8 +824,8 @@ void tr_peerIo::setCallbacks(tr_can_read_cb readcb, tr_did_write_cb writecb, tr_ void tr_peerIo::clear() { setCallbacks(nullptr, nullptr, nullptr, nullptr); - tr_peerIoSetEnabled(this, TR_UP, false); - tr_peerIoSetEnabled(this, TR_DOWN, false); + setEnabled(TR_UP, false); + setEnabled(TR_DOWN, false); io_close_socket(this); } @@ -876,18 +875,11 @@ static unsigned int getDesiredOutputBufferSize(tr_peerIo const* io, uint64_t now return std::max(ceiling, currentSpeed_Bps * period); } -size_t tr_peerIoGetWriteBufferSpace(tr_peerIo const* io, uint64_t now) +size_t tr_peerIo::getWriteBufferSpace(uint64_t now) const { - size_t const desiredLen = getDesiredOutputBufferSize(io, now); - size_t const currentLen = evbuffer_get_length(io->outbuf.get()); - size_t freeSpace = 0; - - if (desiredLen > currentLen) - { - freeSpace = desiredLen - currentLen; - } - - return freeSpace; + size_t const desired_len = getDesiredOutputBufferSize(this, now); + size_t const current_len = evbuffer_get_length(outbuf.get()); + return desired_len > current_len ? desired_len - current_len : 0U; } /** @@ -917,36 +909,35 @@ static inline void processBuffer(tr_peerIo& io, evbuffer* buffer, size_t offset, TR_ASSERT(size == 0); } -void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData) +void tr_peerIo::writeBuf(struct evbuffer* buf, bool isPieceData) { size_t const byteCount = evbuffer_get_length(buf); - if (io->isEncrypted()) + if (isEncrypted()) { - processBuffer(*io, buf, 0, byteCount); + processBuffer(*this, buf, 0, byteCount); } - evbuffer_add_buffer(io->outbuf.get(), buf); - io->outbuf_info.emplace_back(byteCount, isPieceData); + evbuffer_add_buffer(outbuf.get(), buf); + outbuf_info.emplace_back(byteCount, isPieceData); } -void tr_peerIoWriteBytes(tr_peerIo* io, void const* writeme, size_t writeme_len, bool is_piece_data) +void tr_peerIo::writeBytes(void const* writeme, size_t writeme_len, bool is_piece_data) { struct evbuffer_iovec iovec; - evbuffer_reserve_space(io->outbuf.get(), writeme_len, &iovec, 1); + evbuffer_reserve_space(outbuf.get(), writeme_len, &iovec, 1); iovec.iov_len = writeme_len; - memcpy(iovec.iov_base, writeme, iovec.iov_len); - if (io->isEncrypted()) + if (isEncrypted()) { - io->encrypt(iovec.iov_len, iovec.iov_base); + encrypt(iovec.iov_len, iovec.iov_base); } - evbuffer_commit_space(io->outbuf.get(), &iovec, 1); + evbuffer_commit_space(outbuf.get(), &iovec, 1); - io->outbuf_info.emplace_back(writeme_len, is_piece_data); + outbuf_info.emplace_back(writeme_len, is_piece_data); } /*** @@ -1155,31 +1146,30 @@ static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch) return n; } -int tr_peerIoFlush(tr_peerIo* io, tr_direction dir, size_t limit) +int tr_peerIo::flush(tr_direction dir, size_t limit) { - TR_ASSERT(tr_isPeerIo(io)); TR_ASSERT(tr_isDirection(dir)); - int const bytes_used = dir == TR_DOWN ? tr_peerIoTryRead(io, limit) : tr_peerIoTryWrite(io, limit); - tr_logAddTraceIo(io, fmt::format("flushing peer-io, direction:{}, limit:{}, byte_used:{}", dir, limit, bytes_used)); + int const bytes_used = dir == TR_DOWN ? tr_peerIoTryRead(this, limit) : tr_peerIoTryWrite(this, limit); + tr_logAddTraceIo(this, fmt::format("flushing peer-io, direction:{}, limit:{}, byte_used:{}", dir, limit, bytes_used)); return bytes_used; } -int tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo* io) +int tr_peerIo::flushOutgoingProtocolMsgs() { - size_t byteCount = 0; + size_t byte_count = 0; /* count up how many bytes are used by non-piece-data messages at the front of our outbound queue */ - for (auto const& [n_bytes, is_piece_data] : io->outbuf_info) + for (auto const& [n_bytes, is_piece_data] : outbuf_info) { if (is_piece_data) { break; } - byteCount += n_bytes; + byte_count += n_bytes; } - return tr_peerIoFlush(io, TR_UP, byteCount); + return flush(TR_UP, byte_count); } diff --git a/libtransmission/peer-io.h b/libtransmission/peer-io.h index a4aaaf332..7a67fdb0d 100644 --- a/libtransmission/peer-io.h +++ b/libtransmission/peer-io.h @@ -72,28 +72,37 @@ class tr_peerIo using Filter = tr_message_stream_encryption::Filter; public: - tr_peerIo( - tr_session* session_in, + // TODO: 8 constructor args is too many; maybe a builder object? + static tr_peerIo* newOutgoing( + tr_session* session, + tr_bandwidth* parent, + struct tr_address const* addr, + tr_port port, + time_t current_time, + tr_sha1_digest_t const& torrent_hash, + bool is_seed, + bool utp); + + static tr_peerIo* newIncoming( + tr_session* session, + tr_bandwidth* parent, + struct tr_address const* addr, + tr_port port, + time_t current_time, + struct tr_peer_socket const socket); + + // this is only public for testing purposes. + // production code should use newOutgoing() or newIncoming() + static tr_peerIo* create( + tr_session* session, + tr_bandwidth* parent, + tr_address const* addr, + tr_port port, + time_t current_time, tr_sha1_digest_t const* torrent_hash, bool is_incoming, - tr_address const& addr, - tr_port port, bool is_seed, - time_t current_time, - tr_bandwidth* parent_bandwidth) - : session{ session_in } - , time_created{ current_time } - , bandwidth_{ parent_bandwidth } - , addr_{ addr } - , port_{ port } - , is_seed_{ is_seed } - , is_incoming_{ is_incoming } - { - if (torrent_hash != nullptr) - { - torrent_hash_ = *torrent_hash; - } - } + struct tr_peer_socket const socket); void clear(); @@ -109,6 +118,8 @@ public: int reconnect(); + void setEnabled(tr_direction dir, bool is_enabled); + [[nodiscard]] constexpr tr_address const& address() const noexcept { return addr_; @@ -135,6 +146,13 @@ public: void readBufferAdd(void const* data, size_t n_bytes); + int flushOutgoingProtocolMsgs(); + int flush(tr_direction dir, size_t byte_limit); + + void writeBytes(void const* writeme, size_t writeme_len, bool is_piece_data); + void writeBuf(struct evbuffer* buf, bool isPieceData); + size_t getWriteBufferSpace(uint64_t now) const; + [[nodiscard]] auto hasBandwidthLeft(tr_direction dir) noexcept { return bandwidth_.clamp(dir, 1024) > 0; @@ -276,10 +294,31 @@ public: return filter_.get() != nullptr; } -private: - tr_bandwidth bandwidth_; + static void utpInit(struct_utp_context* ctx); - std::unique_ptr filter_; +private: + tr_peerIo( + tr_session* session_in, + tr_sha1_digest_t const* torrent_hash, + bool is_incoming, + tr_address const& addr, + tr_port port, + bool is_seed, + time_t current_time, + tr_bandwidth* parent_bandwidth) + : session{ session_in } + , time_created{ current_time } + , bandwidth_{ parent_bandwidth } + , addr_{ addr } + , port_{ port } + , is_seed_{ is_seed } + , is_incoming_{ is_incoming } + { + if (torrent_hash != nullptr) + { + torrent_hash_ = *torrent_hash; + } + } Filter& filter() { @@ -291,6 +330,10 @@ private: return *filter_; } + tr_bandwidth bandwidth_; + + std::unique_ptr filter_; + std::optional torrent_hash_; tr_address const addr_; @@ -304,44 +347,6 @@ private: bool fast_extension_supported_ = false; }; -/** -*** -**/ - -void tr_peerIoUtpInit(struct_utp_context* ctx); - -// TODO: 8 constructor args is too many; maybe a builder object? -tr_peerIo* tr_peerIoNewOutgoing( - tr_session* session, - tr_bandwidth* parent, - struct tr_address const* addr, - tr_port port, - time_t current_time, - tr_sha1_digest_t const& torrent_hash, - bool is_seed, - bool utp); - -tr_peerIo* tr_peerIoNewIncoming( - tr_session* session, - tr_bandwidth* parent, - struct tr_address const* addr, - tr_port port, - time_t current_time, - struct tr_peer_socket const socket); - -// this is only public for testing purposes. -// production code should use tr_peerIoNewOutgoing() or tr_peerIoNewIncoming() -tr_peerIo* tr_peerIoNew( - tr_session* session, - tr_bandwidth* parent, - tr_address const* addr, - tr_port port, - time_t current_time, - tr_sha1_digest_t const* torrent_hash, - bool is_incoming, - bool is_seed, - struct tr_peer_socket const socket); - void tr_peerIoRefImpl(char const* file, int line, tr_peerIo* io); #define tr_peerIoRef(io) tr_peerIoRefImpl(__FILE__, __LINE__, (io)) @@ -360,14 +365,6 @@ constexpr bool tr_isPeerIo(tr_peerIo const* io) *** **/ -void tr_peerIoWriteBytes(tr_peerIo* io, void const* writeme, size_t writeme_len, bool is_piece_data); - -void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData); - -/** -*** -**/ - void evbuffer_add_uint8(struct evbuffer* outbuf, uint8_t addme); void evbuffer_add_uint16(struct evbuffer* outbuf, uint16_t hs); void evbuffer_add_uint32(struct evbuffer* outbuf, uint32_t hl); @@ -377,22 +374,4 @@ void evbuffer_add_hton_16(struct evbuffer* buf, uint16_t val); void evbuffer_add_hton_32(struct evbuffer* buf, uint32_t val); void evbuffer_add_hton_64(struct evbuffer* buf, uint64_t val); -/** -*** -**/ - -size_t tr_peerIoGetWriteBufferSpace(tr_peerIo const* io, uint64_t now); - -void tr_peerIoBandwidthUsed(tr_peerIo* io, tr_direction direction, size_t byteCount, int isPieceData); - -/** -*** -**/ - -void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled); - -int tr_peerIoFlush(tr_peerIo* io, tr_direction dir, size_t byteLimit); - -int tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo* io); - /* @} */ diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 1a2cce67a..83c54b307 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -1263,10 +1263,10 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address const* addr, tr_port else /* we don't have a connection to them yet... */ { auto mediator = std::make_shared(*session); - tr_peerIo* const io = tr_peerIoNewIncoming(session, &session->top_bandwidth_, addr, port, tr_time(), socket); + tr_peerIo* const io = tr_peerIo::newIncoming(session, &session->top_bandwidth_, addr, port, tr_time(), socket); tr_handshake* const handshake = tr_handshakeNew(mediator, io, session->encryptionMode(), on_handshake_done, manager); - tr_peerIoUnref(io); /* balanced by the implicit ref in tr_peerIoNewIncoming() */ + tr_peerIoUnref(io); /* balanced by the implicit ref in tr_peerIo::NewIncoming() */ manager->incoming_handshakes.add(*addr, handshake); } @@ -2836,7 +2836,7 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom) tr_logAddTraceSwarm(s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom.readable())); - tr_peerIo* const io = tr_peerIoNewOutgoing( + tr_peerIo* const io = tr_peerIo::newOutgoing( mgr->session, &mgr->session->top_bandwidth_, &atom.addr, @@ -2859,7 +2859,7 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom) TR_ASSERT(io->torrentHash()); - tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */ + tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIo::newOutgoing() */ s->outgoing_handshakes.add(atom.addr, handshake); } diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 8edc3ea16..9e5b290b4 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -2212,7 +2212,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) size_t const len = evbuffer_get_length(msgs->outMessages); /* flush the protocol messages */ logtrace(msgs, fmt::format(FMT_STRING("flushing outMessages... to {:p} (length is {:d})"), fmt::ptr(msgs->io), len)); - tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false); + msgs->io->writeBuf(msgs->outMessages, false); msgs->clientSentAnythingAt = now; msgs->outMessagesBatchedAt = 0; msgs->outMessagesBatchPeriod = LowPriorityIntervalSecs; @@ -2224,7 +2224,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) **/ auto piece = int{}; - if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece)) + if (msgs->io->getWriteBufferSpace(now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece)) { auto ok = bool{ false }; @@ -2283,7 +2283,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) *** Data Blocks **/ - if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_)) + if (msgs->io->getWriteBufferSpace(now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_)) { req = msgs->peer_requested_.front(); msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); @@ -2333,7 +2333,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) size_t const n = evbuffer_get_length(out); logtrace(msgs, fmt::format(FMT_STRING("sending block {:d}:{:d}->{:d}"), req.index, req.offset, req.length)); TR_ASSERT(n == msglen); - tr_peerIoWriteBuf(msgs->io, out, true); + msgs->io->writeBuf(out, true); bytesWritten += n; msgs->clientSentAnythingAt = now; msgs->blocks_sent_to_peer.add(tr_time(), 1); diff --git a/libtransmission/tr-utp.cc b/libtransmission/tr-utp.cc index 9a5587a57..b238ac8d3 100644 --- a/libtransmission/tr-utp.cc +++ b/libtransmission/tr-utp.cc @@ -214,7 +214,7 @@ void tr_utpInit(tr_session* session) utp_set_callback(ctx, UTP_ON_ACCEPT, &utp_callback); utp_set_callback(ctx, UTP_SENDTO, &utp_callback); - tr_peerIoUtpInit(ctx); + tr_peerIo::utpInit(ctx); #ifdef TR_UTP_TRACE diff --git a/tests/libtransmission/handshake-test.cc b/tests/libtransmission/handshake-test.cc index ace44f2e0..995d79c51 100644 --- a/tests/libtransmission/handshake-test.cc +++ b/tests/libtransmission/handshake-test.cc @@ -163,7 +163,7 @@ auto createIncomingIo(tr_session* session) auto const now = tr_time(); auto const peer_socket = tr_peer_socket_tcp_create(sockpair[0]); auto* const - io = tr_peerIoNewIncoming(session, &session->top_bandwidth_, &DefaultPeerAddr, DefaultPeerPort, now, peer_socket); + io = tr_peerIo::newIncoming(session, &session->top_bandwidth_, &DefaultPeerAddr, DefaultPeerPort, now, peer_socket); return std::make_pair(io, sockpair[1]); } @@ -173,7 +173,7 @@ auto createOutgoingIo(tr_session* session, tr_sha1_digest_t const& info_hash) EXPECT_EQ(0, evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, std::data(sockpair))) << tr_strerror(errno); auto const now = tr_time(); auto const peer_socket = tr_peer_socket_tcp_create(sockpair[0]); - auto* const io = tr_peerIoNew( + auto* const io = tr_peerIo::create( session, &session->top_bandwidth_, &DefaultPeerAddr,