From e6d75a4b7753f1a1228fb2870eb465c81df3a03b Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Fri, 16 Dec 2022 01:23:12 -0600 Subject: [PATCH] refactor: tr_peerIo (#4372) --- libtransmission/announcer.cc | 2 +- libtransmission/bandwidth.cc | 10 +- libtransmission/handshake.cc | 169 +++-- libtransmission/handshake.h | 8 +- libtransmission/net.cc | 24 - libtransmission/peer-io.cc | 832 +++++++++--------------- libtransmission/peer-io.h | 239 ++++--- libtransmission/peer-mgr.cc | 18 +- libtransmission/peer-msgs.cc | 142 ++-- libtransmission/peer-socket.cc | 98 ++- libtransmission/peer-socket.h | 40 +- libtransmission/tr-buffer.h | 18 +- libtransmission/tr-utp.cc | 20 +- tests/libtransmission/handshake-test.cc | 32 +- 14 files changed, 781 insertions(+), 871 deletions(-) diff --git a/libtransmission/announcer.cc b/libtransmission/announcer.cc index 35ca45e32..b1d9feba5 100644 --- a/libtransmission/announcer.cc +++ b/libtransmission/announcer.cc @@ -245,7 +245,7 @@ private: { for (auto& stop : stops_) { - announce(stop, [](tr_announce_response const&) {}); + announce(stop, [](tr_announce_response const& /*response*/) {}); } stops_.clear(); diff --git a/libtransmission/bandwidth.cc b/libtransmission/bandwidth.cc index 663398a75..ea0bfee41 100644 --- a/libtransmission/bandwidth.cc +++ b/libtransmission/bandwidth.cc @@ -150,10 +150,10 @@ void tr_bandwidth::allocateBandwidth( bandwidth.bytes_left_ = next_pulse_speed * period_msec / 1000U; } - /* add this bandwidth's peer, if any, to the peer pool */ + // add this bandwidth's peer, if any, to the peer pool if (auto shared = this->peer_.lock(); shared) { - shared->priority = priority; + shared->set_priority(priority); peer_pool.push_back(std::move(shared)); } @@ -213,9 +213,9 @@ void tr_bandwidth::allocate(tr_direction dir, unsigned int period_msec) for (auto const& io : refs) { - io->flushOutgoingProtocolMsgs(); + io->flush_outgoing_protocol_msgs(); - switch (io->priority) + switch (io->priority()) { case TR_PRI_HIGH: high.push_back(io.get()); @@ -244,7 +244,7 @@ void tr_bandwidth::allocate(tr_direction dir, unsigned int period_msec) * or (2) the next tr_bandwidth::allocate () call, when we start over again. */ for (auto const& io : refs) { - io->setEnabled(dir, io->hasBandwidthLeft(dir)); + io->set_enabled(dir, io->has_bandwidth_left(dir)); } } diff --git a/libtransmission/handshake.cc b/libtransmission/handshake.cc index a2bb23417..cb314ca37 100644 --- a/libtransmission/handshake.cc +++ b/libtransmission/handshake.cc @@ -32,7 +32,7 @@ using DH = tr_message_stream_encryption::DH; bool tr_handshake::build_handshake_message(tr_peerIo* io, uint8_t* buf) const { - auto const& info_hash = io->torrentHash(); + auto const& info_hash = io->torrent_hash(); TR_ASSERT_MSG(info_hash != tr_sha1_digest_t{}, "build_handshake_message requires an info_hash"); auto const info = mediator_->torrent(info_hash); @@ -62,16 +62,16 @@ bool tr_handshake::build_handshake_message(tr_peerIo* io, uint8_t* buf) const tr_handshake::ParseResult tr_handshake::parse_handshake(tr_peerIo* peer_io) { - tr_logAddTraceHand(this, fmt::format("payload: need {}, got {}", HandshakeSize, peer_io->readBufferSize())); + tr_logAddTraceHand(this, fmt::format("payload: need {}, got {}", HandshakeSize, peer_io->read_buffer_size())); - if (peer_io->readBufferSize() < HandshakeSize) + if (peer_io->read_buffer_size() < HandshakeSize) { return ParseResult::EncryptionWrong; } /* confirm the protocol */ auto name = decltype(HandshakeName){}; - peer_io->readBytes(std::data(name), std::size(name)); + peer_io->read_bytes(std::data(name), std::size(name)); if (name != HandshakeName) { return ParseResult::EncryptionWrong; @@ -80,16 +80,16 @@ tr_handshake::ParseResult tr_handshake::parse_handshake(tr_peerIo* peer_io) /* read the reserved bytes */ auto flags = tr_bitfield{ HandshakeFlagsBits }; auto reserved = std::array{}; - peer_io->readBytes(std::data(reserved), std::size(reserved)); + peer_io->read_bytes(std::data(reserved), std::size(reserved)); flags.setRaw(std::data(reserved), std::size(reserved)); - peer_io->enableDHT(flags.test(DhtFlag)); - peer_io->enableLTEP(flags.test(LtepFlag)); - peer_io->enableFEXT(flags.test(FextFlag)); + peer_io->set_supports_dht(flags.test(DhtFlag)); + peer_io->set_supports_ltep(flags.test(LtepFlag)); + peer_io->set_supports_fext(flags.test(FextFlag)); // torrent hash auto info_hash = tr_sha1_digest_t{}; - peer_io->readBytes(std::data(info_hash), std::size(info_hash)); - if (info_hash == tr_sha1_digest_t{} || info_hash != peer_io->torrentHash()) + peer_io->read_bytes(std::data(info_hash), std::size(info_hash)); + if (info_hash == tr_sha1_digest_t{} || info_hash != peer_io->torrent_hash()) { tr_logAddTraceHand(this, "peer returned the wrong hash. wtf?"); return ParseResult::BadTorrent; @@ -97,7 +97,7 @@ tr_handshake::ParseResult tr_handshake::parse_handshake(tr_peerIo* peer_io) // peer_id auto peer_id = tr_peer_id_t{}; - peer_io->readBytes(std::data(peer_id), std::size(peer_id)); + peer_io->read_bytes(std::data(peer_id), std::size(peer_id)); set_peer_id(peer_id); /* peer id */ @@ -161,14 +161,14 @@ void tr_handshake::send_ya(tr_peerIo* io) ReadState tr_handshake::read_yb(tr_peerIo* peer_io) { - if (peer_io->readBufferSize() < std::size(HandshakeName)) + if (peer_io->read_buffer_size() < std::size(HandshakeName)) { return READ_LATER; } - bool const is_encrypted = !peer_io->readBufferStartsWith(HandshakeName); + bool const is_encrypted = !peer_io->read_buffer_starts_with(HandshakeName); auto peer_public_key = DH::key_bigend_t{}; - if (is_encrypted && (peer_io->readBufferSize() < std::size(peer_public_key))) + if (is_encrypted && (peer_io->read_buffer_size() < std::size(peer_public_key))) { return READ_LATER; } @@ -184,7 +184,7 @@ ReadState tr_handshake::read_yb(tr_peerIo* peer_io) set_have_read_anything_from_peer(true); // get the peer's public key - peer_io->readBytes(std::data(peer_public_key), std::size(peer_public_key)); + peer_io->read_bytes(std::data(peer_public_key), std::size(peer_public_key)); dh_.setPeerPublicKey(peer_public_key); /* now send these: HASH('req1', S), HASH('req2', SKEY) xor HASH('req3', S), @@ -194,7 +194,7 @@ ReadState tr_handshake::read_yb(tr_peerIo* peer_io) /* HASH('req1', S) */ outbuf.add(tr_sha1::digest("req1"sv, dh_.secret())); - auto const& info_hash = peer_io->torrentHash(); + auto const& info_hash = peer_io->torrent_hash(); TR_ASSERT_MSG(info_hash != tr_sha1_digest_t{}, "readYb requires an info_hash"); /* HASH('req2', SKEY) xor HASH('req3', S) */ @@ -214,7 +214,7 @@ ReadState tr_handshake::read_yb(tr_peerIo* peer_io) * PadC is reserved for future extensions to the handshake... * standard practice at this time is for it to be zero-length */ peer_io->write(outbuf, false); - peer_io->encryptInit(peer_io->isIncoming(), dh_, info_hash); + peer_io->encrypt_init(peer_io->is_incoming(), dh_, info_hash); outbuf.add(VC); outbuf.addUint32(crypto_provide()); outbuf.addUint16(0); @@ -241,7 +241,7 @@ ReadState tr_handshake::read_yb(tr_peerIo* peer_io) // A will be able to resynchronize on ENCRYPT(VC)" ReadState tr_handshake::read_vc(tr_peerIo* peer_io) { - auto const info_hash = peer_io->torrentHash(); + auto const info_hash = peer_io->torrent_hash(); TR_ASSERT_MSG(info_hash != tr_sha1_digest_t{}, "readVC requires an info_hash"); // find the end of PadB by looking for `ENCRYPT(VC)` @@ -252,24 +252,24 @@ ReadState tr_handshake::read_vc(tr_peerIo* peer_io) for (size_t i = 0; i < PadbMaxlen; ++i) { - if (peer_io->readBufferSize() < std::size(needle)) + if (peer_io->read_buffer_size() < std::size(needle)) { tr_logAddTraceHand(this, "not enough bytes... returning read_more"); return READ_LATER; } - if (peer_io->readBufferStartsWith(needle)) + if (peer_io->read_buffer_starts_with(needle)) { tr_logAddTraceHand(this, "got it!"); // We already know it's a match; now we just need to // consume it from the read buffer. - peer_io->decryptInit(peer_io->isIncoming(), dh_, info_hash); - peer_io->readBytes(std::data(needle), std::size(needle)); + peer_io->decrypt_init(peer_io->is_incoming(), dh_, info_hash); + peer_io->read_bytes(std::data(needle), std::size(needle)); set_state(tr_handshake::State::AwaitingCryptoSelect); return READ_NOW; } - peer_io->readBufferDrain(1); + peer_io->read_buffer_drain(1); } tr_logAddTraceHand(this, "couldn't find ENCRYPT(VC)"); @@ -278,13 +278,13 @@ ReadState tr_handshake::read_vc(tr_peerIo* peer_io) ReadState tr_handshake::read_crypto_select(tr_peerIo* peer_io) { - if (static size_t constexpr NeedLen = sizeof(uint32_t) + sizeof(uint16_t); peer_io->readBufferSize() < NeedLen) + if (static size_t constexpr NeedLen = sizeof(uint32_t) + sizeof(uint16_t); peer_io->read_buffer_size() < NeedLen) { return READ_LATER; } auto crypto_select = uint32_t{}; - peer_io->readUint32(&crypto_select); + peer_io->read_uint32(&crypto_select); crypto_select_ = crypto_select; tr_logAddTraceHand(this, fmt::format("crypto select is {}", crypto_select)); @@ -295,7 +295,7 @@ ReadState tr_handshake::read_crypto_select(tr_peerIo* peer_io) } uint16_t pad_d_len = 0; - peer_io->readUint16(&pad_d_len); + peer_io->read_uint16(&pad_d_len); tr_logAddTraceHand(this, fmt::format("pad_d_len is {}", pad_d_len)); if (pad_d_len > 512) @@ -313,13 +313,13 @@ ReadState tr_handshake::read_crypto_select(tr_peerIo* peer_io) ReadState tr_handshake::read_pad_d(tr_peerIo* peer_io) { size_t const needlen = pad_d_len_; - tr_logAddTraceHand(this, fmt::format("pad d: need {}, got {}", needlen, peer_io->readBufferSize())); - if (peer_io->readBufferSize() < needlen) + tr_logAddTraceHand(this, fmt::format("pad d: need {}, got {}", needlen, peer_io->read_buffer_size())); + if (peer_io->read_buffer_size() < needlen) { return READ_LATER; } - peer_io->readBufferDrain(needlen); + peer_io->read_buffer_drain(needlen); set_state(tr_handshake::State::AwaitingHandshake); return READ_NOW; @@ -334,15 +334,15 @@ ReadState tr_handshake::read_pad_d(tr_peerIo* peer_io) ReadState tr_handshake::read_handshake(tr_peerIo* peer_io) { static auto constexpr Needlen = IncomingHandshakeLen; - tr_logAddTraceHand(this, fmt::format("payload: need {}, got {}", Needlen, peer_io->readBufferSize())); - if (peer_io->readBufferSize() < Needlen) + tr_logAddTraceHand(this, fmt::format("payload: need {}, got {}", Needlen, peer_io->read_buffer_size())); + if (peer_io->read_buffer_size() < Needlen) { return READ_LATER; } set_have_read_anything_from_peer(true); - if (peer_io->readBufferStartsWith(HandshakeName)) // unencrypted + if (peer_io->read_buffer_starts_with(HandshakeName)) // unencrypted { if (encryption_mode_ == TR_ENCRYPTION_REQUIRED) { @@ -361,7 +361,7 @@ ReadState tr_handshake::read_handshake(tr_peerIo* peer_io) } auto name = decltype(HandshakeName){}; - peer_io->readBytes(std::data(name), std::size(name)); + peer_io->read_bytes(std::data(name), std::size(name)); if (name != HandshakeName) { return done(false); @@ -370,15 +370,15 @@ ReadState tr_handshake::read_handshake(tr_peerIo* peer_io) // reserved bytes / flags auto reserved = std::array{}; auto flags = tr_bitfield{ HandshakeFlagsBits }; - peer_io->readBytes(std::data(reserved), std::size(reserved)); + peer_io->read_bytes(std::data(reserved), std::size(reserved)); flags.setRaw(std::data(reserved), std::size(reserved)); - peer_io->enableDHT(flags.test(DhtFlag)); - peer_io->enableLTEP(flags.test(LtepFlag)); - peer_io->enableFEXT(flags.test(FextFlag)); + peer_io->set_supports_dht(flags.test(DhtFlag)); + peer_io->set_supports_ltep(flags.test(LtepFlag)); + peer_io->set_supports_fext(flags.test(FextFlag)); /* torrent hash */ auto hash = tr_sha1_digest_t{}; - peer_io->readBytes(std::data(hash), std::size(hash)); + peer_io->read_bytes(std::data(hash), std::size(hash)); if (is_incoming()) { @@ -388,11 +388,11 @@ ReadState tr_handshake::read_handshake(tr_peerIo* peer_io) return done(false); } - peer_io->setTorrentHash(hash); + peer_io->set_torrent_hash(hash); } else // outgoing { - if (peer_io->torrentHash() != hash) + if (peer_io->torrent_hash() != hash) { tr_logAddTraceHand(this, "peer returned the wrong hash. wtf?"); return done(false); @@ -412,7 +412,7 @@ ReadState tr_handshake::read_handshake(tr_peerIo* peer_io) return done(false); } - peer_io->writeBytes(std::data(msg), std::size(msg), false); + peer_io->write_bytes(std::data(msg), std::size(msg), false); have_sent_bittorrent_handshake_ = true; } @@ -424,11 +424,11 @@ ReadState tr_handshake::read_peer_id(tr_peerIo* peer_io) { // read the peer_id auto peer_id = tr_peer_id_t{}; - if (peer_io->readBufferSize() < std::size(peer_id)) + if (peer_io->read_buffer_size() < std::size(peer_id)) { return READ_LATER; } - peer_io->readBytes(std::data(peer_id), std::size(peer_id)); + peer_io->read_bytes(std::data(peer_id), std::size(peer_id)); set_peer_id(peer_id); auto client = std::array{}; @@ -436,7 +436,7 @@ ReadState tr_handshake::read_peer_id(tr_peerIo* peer_io) tr_logAddTraceHand(this, fmt::format("peer-id is '{}' ... isIncoming is {}", std::data(client), is_incoming())); // if we've somehow connected to ourselves, don't keep the connection - auto const info_hash = peer_io_->torrentHash(); + auto const info_hash = peer_io_->torrent_hash(); auto const info = mediator_->torrent(info_hash); auto const connected_to_self = info && info->client_peer_id == peer_id; @@ -448,15 +448,15 @@ ReadState tr_handshake::read_ya(tr_peerIo* peer_io) auto peer_public_key = DH::key_bigend_t{}; tr_logAddTraceHand( this, - fmt::format("in readYa... need {}, have {}", std::size(peer_public_key), peer_io->readBufferSize())); + fmt::format("in readYa... need {}, have {}", std::size(peer_public_key), peer_io->read_buffer_size())); - if (peer_io->readBufferSize() < std::size(peer_public_key)) + if (peer_io->read_buffer_size() < std::size(peer_public_key)) { return READ_LATER; } /* read the incoming peer's public key */ - peer_io->readBytes(std::data(peer_public_key), std::size(peer_public_key)); + peer_io->read_bytes(std::data(peer_public_key), std::size(peer_public_key)); dh_.setPeerPublicKey(peer_public_key); // send our public key to the peer @@ -474,21 +474,21 @@ ReadState tr_handshake::read_pad_a(tr_peerIo* peer_io) for (size_t i = 0; i < PadaMaxlen; ++i) { - if (peer_io->readBufferSize() < std::size(needle)) + if (peer_io->read_buffer_size() < std::size(needle)) { tr_logAddTraceHand(this, "not enough bytes... returning read_more"); return READ_LATER; } - if (peer_io->readBufferStartsWith(needle)) + if (peer_io->read_buffer_starts_with(needle)) { tr_logAddTraceHand(this, "found it... looking setting to awaiting_crypto_provide"); - peer_io->readBufferDrain(std::size(needle)); + peer_io->read_buffer_drain(std::size(needle)); set_state(State::AwaitingCryptoProvide); return READ_NOW; } - peer_io->readBufferDrain(1U); + peer_io->read_buffer_drain(1U); } tr_logAddTraceHand(this, "couldn't find HASH('req', S)"); @@ -505,7 +505,7 @@ ReadState tr_handshake::read_crypto_provide(tr_peerIo* peer_io) size_t const needlen = sizeof(obfuscated_hash) + /* HASH('req2', SKEY) xor HASH('req3', S) */ std::size(VC) + sizeof(crypto_provide) + sizeof(padc_len); - if (peer_io->readBufferSize() < needlen) + if (peer_io->read_buffer_size() < needlen) { return READ_LATER; } @@ -515,7 +515,7 @@ ReadState tr_handshake::read_crypto_provide(tr_peerIo* peer_io) * by building the latter and xor'ing it with what the peer sent us */ tr_logAddTraceHand(this, "reading obfuscated torrent hash..."); auto req2 = tr_sha1_digest_t{}; - peer_io->readBytes(std::data(req2), std::size(req2)); + peer_io->read_bytes(std::data(req2), std::size(req2)); auto const req3 = tr_sha1::digest("req3"sv, dh_.secret()); for (size_t i = 0; i < std::size(obfuscated_hash); ++i) @@ -528,7 +528,7 @@ ReadState tr_handshake::read_crypto_provide(tr_peerIo* peer_io) bool const client_is_seed = info->is_done; bool const peer_is_seed = mediator_->is_peer_known_seed(info->id, peer_io->address()); tr_logAddTraceHand(this, fmt::format("got INCOMING connection's encrypted handshake for torrent [{}]", info->id)); - peer_io->setTorrentHash(info->info_hash); + peer_io->set_torrent_hash(info->info_hash); if (client_is_seed && peer_is_seed) { @@ -544,18 +544,18 @@ ReadState tr_handshake::read_crypto_provide(tr_peerIo* peer_io) /* next part: ENCRYPT(VC, crypto_provide, len(PadC), */ - auto const& info_hash = peer_io->torrentHash(); + auto const& info_hash = peer_io->torrent_hash(); TR_ASSERT_MSG(info_hash != tr_sha1_digest_t{}, "readCryptoProvide requires an info_hash"); - peer_io->decryptInit(peer_io->isIncoming(), dh_, info_hash); + peer_io->decrypt_init(peer_io->is_incoming(), dh_, info_hash); auto vc_in = vc_t{}; - peer_io->readBytes(std::data(vc_in), std::size(vc_in)); + peer_io->read_bytes(std::data(vc_in), std::size(vc_in)); - peer_io->readUint32(&crypto_provide); + peer_io->read_uint32(&crypto_provide); crypto_provide_ = crypto_provide; tr_logAddTraceHand(this, fmt::format("crypto_provide is {}", crypto_provide)); - peer_io->readUint16(&padc_len); + peer_io->read_uint16(&padc_len); tr_logAddTraceHand(this, fmt::format("padc is {}", padc_len)); if (padc_len > PadcMaxlen) { @@ -570,18 +570,18 @@ ReadState tr_handshake::read_crypto_provide(tr_peerIo* peer_io) ReadState tr_handshake::read_pad_c(tr_peerIo* peer_io) { - if (auto const needlen = pad_c_len_ + sizeof(uint16_t); peer_io->readBufferSize() < needlen) + if (auto const needlen = pad_c_len_ + sizeof(uint16_t); peer_io->read_buffer_size() < needlen) { return READ_LATER; } // read the throwaway padc auto pad_c = std::array{}; - peer_io->readBytes(std::data(pad_c), pad_c_len_); + peer_io->read_bytes(std::data(pad_c), pad_c_len_); /* read ia_len */ uint16_t ia_len = 0; - peer_io->readUint16(&ia_len); + peer_io->read_uint16(&ia_len); tr_logAddTraceHand(this, fmt::format("ia_len is {}", ia_len)); ia_len_ = ia_len; set_state(State::AwaitingIa); @@ -592,9 +592,9 @@ ReadState tr_handshake::read_ia(tr_peerIo* peer_io) { size_t const needlen = ia_len_; - tr_logAddTraceHand(this, fmt::format("reading IA... have {}, need {}", peer_io->readBufferSize(), needlen)); + tr_logAddTraceHand(this, fmt::format("reading IA... have {}, need {}", peer_io->read_buffer_size(), needlen)); - if (peer_io->readBufferSize() < needlen) + if (peer_io->read_buffer_size() < needlen) { return READ_LATER; } @@ -603,9 +603,9 @@ ReadState tr_handshake::read_ia(tr_peerIo* peer_io) *** B->A: ENCRYPT(VC, crypto_select, len(padD), padD), ENCRYPT2(Payload Stream) **/ - auto const& info_hash = peer_io->torrentHash(); + auto const& info_hash = peer_io->torrent_hash(); TR_ASSERT_MSG(info_hash != tr_sha1_digest_t{}, "readIA requires an info_hash"); - peer_io->encryptInit(peer_io->isIncoming(), dh_, info_hash); + peer_io->encrypt_init(peer_io->is_incoming(), dh_, info_hash); auto outbuf = libtransmission::Buffer{}; // send VC @@ -664,8 +664,8 @@ ReadState tr_handshake::read_ia(tr_peerIo* peer_io) ReadState tr_handshake::read_payload_stream(tr_peerIo* peer_io) { static auto constexpr Needlen = HandshakeSize; - tr_logAddTraceHand(this, fmt::format("reading payload stream... have {}, need {}", peer_io->readBufferSize(), Needlen)); - if (peer_io->readBufferSize() < Needlen) + tr_logAddTraceHand(this, fmt::format("reading payload stream... have {}, need {}", peer_io->read_buffer_size(), Needlen)); + if (peer_io->read_buffer_size() < Needlen) { return READ_LATER; } @@ -690,8 +690,6 @@ ReadState tr_handshake::read_payload_stream(tr_peerIo* peer_io) ReadState tr_handshake::can_read(tr_peerIo* peer_io, void* vhandshake, size_t* piece) { - TR_ASSERT(tr_isPeerIo(peer_io)); - auto* handshake = static_cast(vhandshake); bool ready_for_more = true; @@ -771,46 +769,45 @@ ReadState tr_handshake::can_read(tr_peerIo* peer_io, void* vhandshake, size_t* p } else if (handshake->is_state(State::AwaitingPadC)) { - ready_for_more = peer_io->readBufferSize() >= handshake->pad_c_len_; + ready_for_more = peer_io->read_buffer_size() >= handshake->pad_c_len_; } else if (handshake->is_state(State::AwaitingPadD)) { - ready_for_more = peer_io->readBufferSize() >= handshake->pad_d_len_; + ready_for_more = peer_io->read_buffer_size() >= handshake->pad_d_len_; } else if (handshake->is_state(State::AwaitingIa)) { - ready_for_more = peer_io->readBufferSize() >= handshake->ia_len_; + ready_for_more = peer_io->read_buffer_size() >= handshake->ia_len_; } } return ret; } -void tr_handshake::on_error(tr_peerIo* io, short what, void* vhandshake) +void tr_handshake::on_error(tr_peerIo* io, tr_error const& error, void* vhandshake) { - auto const errcode = errno; auto* handshake = static_cast(vhandshake); - if (io->socket.is_utp() && !io->isIncoming() && handshake->is_state(State::AwaitingYb)) + if (io->is_utp() && !io->is_incoming() && handshake->is_state(State::AwaitingYb)) { // the peer probably doesn't speak µTP. - auto const info_hash = io->torrentHash(); + auto const info_hash = io->torrent_hash(); auto const info = handshake->mediator_->torrent(info_hash); /* Don't mark a peer as non-µTP unless it's really a connect failure. */ - if ((errcode == ETIMEDOUT || errcode == ECONNREFUSED) && info) + if ((error.code == ETIMEDOUT || error.code == ECONNREFUSED) && info) { handshake->mediator_->set_utp_failed(info_hash, io->address()); } - if (handshake->mediator_->allows_tcp() && io->reconnect() == 0) + if (handshake->mediator_->allows_tcp() && io->reconnect()) { auto msg = std::array{}; handshake->build_handshake_message(io, std::data(msg)); handshake->have_sent_bittorrent_handshake_ = true; handshake->set_state(State::AwaitingHandshake); - io->writeBytes(std::data(msg), std::size(msg), false); + io->write_bytes(std::data(msg), std::size(msg), false); } } @@ -818,20 +815,18 @@ void tr_handshake::on_error(tr_peerIo* io, short what, void* vhandshake) * have encountered a peer that doesn't do encryption... reconnect and * try a plaintext handshake */ if ((handshake->is_state(State::AwaitingYb) || handshake->is_state(State::AwaitingVc)) && - handshake->encryption_mode_ != TR_ENCRYPTION_REQUIRED && handshake->mediator_->allows_tcp() && io->reconnect() == 0) + handshake->encryption_mode_ != TR_ENCRYPTION_REQUIRED && handshake->mediator_->allows_tcp() && io->reconnect()) { auto msg = std::array{}; tr_logAddTraceHand(handshake, "handshake failed, trying plaintext..."); handshake->build_handshake_message(io, std::data(msg)); handshake->have_sent_bittorrent_handshake_ = true; handshake->set_state(State::AwaitingHandshake); - io->writeBytes(std::data(msg), std::size(msg), false); + io->write_bytes(std::data(msg), std::size(msg), false); } else { - tr_logAddTraceHand( - handshake, - fmt::format("libevent got an error: what={:d}, errno={:d} ({:s})", what, errcode, tr_strerror(errcode))); + tr_logAddTraceHand(handshake, fmt::format("handshake socket err: {:s} ({:d})", error.message, error.code)); handshake->done(false); } } @@ -924,7 +919,7 @@ tr_handshake::tr_handshake(Mediator* mediator, std::shared_ptr peer_i { timeout_timer_->startSingleShot(HandshakeTimeoutSec); - peer_io_->setCallbacks(&tr_handshake::can_read, nullptr, &tr_handshake::on_error, this); + peer_io_->set_callbacks(&tr_handshake::can_read, nullptr, &tr_handshake::on_error, this); if (is_incoming()) { @@ -941,6 +936,6 @@ tr_handshake::tr_handshake(Mediator* mediator, std::shared_ptr peer_i have_sent_bittorrent_handshake_ = true; set_state(State::AwaitingHandshake); - peer_io_->writeBytes(std::data(msg), std::size(msg), false); + peer_io_->write_bytes(std::data(msg), std::size(msg), false); } } diff --git a/libtransmission/handshake.h b/libtransmission/handshake.h index 9277006a9..a9a6573c4 100644 --- a/libtransmission/handshake.h +++ b/libtransmission/handshake.h @@ -108,7 +108,7 @@ private: static ReadState can_read(tr_peerIo* peer_io, void* vhandshake, size_t* piece); - static void on_error(tr_peerIo* io, short what, void* vhandshake); + static void on_error(tr_peerIo* io, tr_error const&, void* vhandshake); bool build_handshake_message(tr_peerIo* io, uint8_t* buf) const; @@ -141,13 +141,13 @@ private: ReadState done(bool is_connected) { - peer_io_->clearCallbacks(); + peer_io_->clear_callbacks(); return fire_done(is_connected) ? READ_LATER : READ_ERR; } [[nodiscard]] auto is_incoming() const noexcept { - return peer_io_->isIncoming(); + return peer_io_->is_incoming(); } [[nodiscard]] constexpr auto state() const noexcept @@ -181,7 +181,7 @@ private: auto walk = data; walk = std::copy(std::begin(public_key), std::end(public_key), walk); walk += mediator_->pad(walk, PadMax); - io->writeBytes(data, walk - data, false); + io->write_bytes(data, walk - data, false); } bool fire_done(bool is_connected); diff --git a/libtransmission/net.cc b/libtransmission/net.cc index 0a5ecdc83..2e99d54f5 100644 --- a/libtransmission/net.cc +++ b/libtransmission/net.cc @@ -268,30 +268,6 @@ tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const& addr, return ret; } -tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const& addr, tr_port port, bool /*client_is_seed*/) -{ - auto ret = tr_peer_socket{}; - - if (session->utp_context != nullptr && addr.is_valid_for_peers(port)) - { - auto const [ss, sslen] = addr.to_sockaddr(port); - - if (auto* const sock = utp_create_socket(session->utp_context); sock != nullptr) - { - if (utp_connect(sock, reinterpret_cast(&ss), sslen) != -1) - { - ret = tr_peer_socket{ addr, port, sock }; - } - else - { - utp_close(sock); - } - } - } - - return ret; -} - static tr_socket_t tr_netBindTCPImpl(tr_address const& addr, tr_port port, bool suppress_msgs, int* err_out) { TR_ASSERT(addr.is_valid()); diff --git a/libtransmission/peer-io.cc b/libtransmission/peer-io.cc index d20f22e58..ba6a831f8 100644 --- a/libtransmission/peer-io.cc +++ b/libtransmission/peer-io.cc @@ -38,93 +38,60 @@ #define EPIPE WSAECONNRESET #endif -/* The amount of read bufferring that we allow for µTP sockets. */ - -static constexpr auto UtpReadBufferSize = 256 * 1024; - #define tr_logAddErrorIo(io, msg) tr_logAddError(msg, (io)->display_name()) #define tr_logAddWarnIo(io, msg) tr_logAddWarn(msg, (io)->display_name()) #define tr_logAddDebugIo(io, msg) tr_logAddDebug(msg, (io)->display_name()) #define tr_logAddTraceIo(io, msg) tr_logAddTrace(msg, (io)->display_name()) -static constexpr size_t guessPacketOverhead(size_t d) -{ - /** - * https://web.archive.org/web/20140912230020/http://sd.wareonearth.com:80/~phil/net/overhead/ - * - * TCP over Ethernet: - * Assuming no header compression (e.g. not PPP) - * Add 20 IPv4 header or 40 IPv6 header (no options) - * Add 20 TCP header - * Add 12 bytes optional TCP timestamps - * Max TCP Payload data rates over ethernet are thus: - * (1500-40)/ (38+1500) = 94.9285 % IPv4, minimal headers - * (1500-52)/ (38+1500) = 94.1482 % IPv4, TCP timestamps - * (1500-52)/ (42+1500) = 93.9040 % 802.1q, IPv4, TCP timestamps - * (1500-60)/ (38+1500) = 93.6281 % IPv6, minimal headers - * (1500-72)/ (38+1500) = 92.8479 % IPv6, TCP timestamps - * (1500-72)/ (42+1500) = 92.6070 % 802.1q, IPv6, TCP timestamps - */ - double const assumed_payload_data_rate = 94.0; - - return (size_t)(d * (100.0 / assumed_payload_data_rate) - d); -} - /*** **** ***/ -static void didWriteWrapper(tr_peerIo* io, size_t bytes_transferred) +void tr_peerIo::did_write_wrapper(size_t bytes_transferred) { - while (bytes_transferred != 0 && tr_isPeerIo(io) && !std::empty(io->outbuf_info)) + auto const keep_alive = shared_from_this(); + + while (bytes_transferred != 0 && !std::empty(outbuf_info_)) { - auto& [n_bytes_left, is_piece_data] = io->outbuf_info.front(); + auto& [n_bytes_left, is_piece_data] = outbuf_info_.front(); size_t const payload = std::min(uint64_t{ n_bytes_left }, uint64_t{ bytes_transferred }); /* For µTP sockets, the overhead is computed in utp_on_overhead. */ - size_t const overhead = io->socket.is_tcp() ? guessPacketOverhead(payload) : 0; + size_t const overhead = socket_.guess_packet_overhead(payload); uint64_t const now = tr_time_msec(); - io->bandwidth().notifyBandwidthConsumed(TR_UP, payload, is_piece_data, now); + bandwidth().notifyBandwidthConsumed(TR_UP, payload, is_piece_data, now); if (overhead > 0) { - io->bandwidth().notifyBandwidthConsumed(TR_UP, overhead, false, now); + bandwidth().notifyBandwidthConsumed(TR_UP, overhead, false, now); } - if (io->didWrite != nullptr) + if (did_write_ != nullptr) { - io->didWrite(io, payload, is_piece_data, io->userData); - } - - if (!tr_isPeerIo(io)) - { - break; + did_write_(this, payload, is_piece_data, user_data_); } bytes_transferred -= payload; n_bytes_left -= payload; if (n_bytes_left == 0) { - io->outbuf_info.pop_front(); + outbuf_info_.pop_front(); } } } -static void canReadWrapper(tr_peerIo* io_in) +void tr_peerIo::can_read_wrapper() { - auto const io = io_in->shared_from_this(); - tr_logAddTraceIo(io, "canRead"); + // try to consume the input buffer - tr_session const* const session = io->session; - - /* try to consume the input buffer */ - if (io->canRead == nullptr) + if (can_read_ == nullptr) { return; } - auto const lock = session->unique_lock(); + auto const lock = session_->unique_lock(); + auto const keep_alive = shared_from_this(); auto const now = tr_time_msec(); auto done = bool{ false }; @@ -133,33 +100,33 @@ static void canReadWrapper(tr_peerIo* io_in) while (!done && !err) { size_t piece = 0; - auto const old_len = io->readBufferSize(); - auto const read_state = io->canRead == nullptr ? READ_ERR : io->canRead(io.get(), io->userData, &piece); - auto const used = old_len - io->readBufferSize(); - auto const overhead = guessPacketOverhead(used); + auto const old_len = read_buffer_size(); + auto const read_state = can_read_ != nullptr ? can_read_(this, user_data_, &piece) : READ_ERR; + auto const used = old_len - read_buffer_size(); + auto const overhead = socket_.guess_packet_overhead(used); if (piece != 0 || piece != used) { if (piece != 0) { - io->bandwidth().notifyBandwidthConsumed(TR_DOWN, piece, true, now); + bandwidth().notifyBandwidthConsumed(TR_DOWN, piece, true, now); } if (used != piece) { - io->bandwidth().notifyBandwidthConsumed(TR_DOWN, used - piece, false, now); + bandwidth().notifyBandwidthConsumed(TR_DOWN, used - piece, false, now); } } if (overhead > 0) { - io->bandwidth().notifyBandwidthConsumed(TR_UP, overhead, false, now); + bandwidth().notifyBandwidthConsumed(TR_UP, overhead, false, now); } switch (read_state) { case READ_NOW: - if (io->readBufferSize() != 0) + if (!std::empty(inbuf_)) { continue; } @@ -178,130 +145,44 @@ static void canReadWrapper(tr_peerIo* io_in) } } -static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio) -{ - auto* io = static_cast(vio); - - TR_ASSERT(tr_isPeerIo(io)); - TR_ASSERT(io->socket.is_tcp()); - - /* Limit the input buffer to 256K, so it doesn't grow too large */ - tr_direction const dir = TR_DOWN; - size_t const max = 256 * 1024; - - io->pendingEvents &= ~EV_READ; - - auto const curlen = io->readBufferSize(); - auto howmuch = curlen >= max ? 0 : max - curlen; - howmuch = io->bandwidth().clamp(TR_DOWN, howmuch); - - tr_logAddTraceIo(io, "libevent says this peer is ready to read"); - - /* if we don't have any bandwidth left, stop reading */ - if (howmuch < 1) - { - io->setEnabled(dir, false); - return; - } - - tr_error* error = nullptr; - if (auto const res = io->inbuf.addSocket(fd, howmuch, &error); res > 0) - { - io->setEnabled(dir, true); - - /* Invoke the user callback - must always be called last */ - canReadWrapper(io); - } - else - { - short what = BEV_EVENT_READING; - - if (res == 0) /* EOF */ - { - what |= BEV_EVENT_EOF; - } - if (error != nullptr) - { - if (error->code == EAGAIN || error->code == EINTR) - { - io->setEnabled(dir, true); - return; - } - - what |= BEV_EVENT_ERROR; - - tr_logAddDebugIo( - io, - fmt::format("event_read_cb err: res:{}, what:{}, errno:{} ({})", res, what, error->code, error->message)); - } - - io->call_error_callback(what); - } - - tr_error_clear(&error); -} - // Helps us to ignore errors that say "try again later" // since that's what peer-io does by default anyway. -[[nodiscard]] static auto constexpr canRetryFromError(int error_code) +[[nodiscard]] static auto constexpr canRetryFromError(int error_code) noexcept { return error_code == 0 || error_code == EAGAIN || error_code == EINTR || error_code == EINPROGRESS; } -static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio) +void tr_peerIo::event_read_cb(evutil_socket_t fd, short /*event*/, void* vio) { - auto* io = static_cast(vio); + static auto constexpr MaxLen = RcvBuf; - TR_ASSERT(tr_isPeerIo(io)); - TR_ASSERT(io->socket.is_tcp()); + auto* const io = static_cast(vio); + tr_logAddTraceIo(io, "libevent says this peer socket is ready for reading"); - io->pendingEvents &= ~EV_WRITE; + TR_ASSERT(io->socket_.is_tcp()); + TR_ASSERT(io->socket_.handle.tcp == fd); - tr_logAddTraceIo(io, "libevent says this peer is ready to write"); + io->pending_events_ &= ~EV_READ; - /* Write as much as possible, since the socket is non-blocking, write() will - * return if it can't write any more data without blocking */ - auto constexpr Dir = TR_UP; - auto const howmuch = io->bandwidth().clamp(Dir, std::size(io->outbuf)); + // if we don't have any bandwidth left, stop reading + auto const n_used = std::size(io->inbuf_); + auto const n_left = n_used >= MaxLen ? 0 : MaxLen - n_used; + io->try_read(n_left); +} - // if we don't have any bandwidth left, stop writing - if (howmuch < 1) - { - io->setEnabled(Dir, false); - return; - } +void tr_peerIo::event_write_cb(evutil_socket_t fd, short /*event*/, void* vio) +{ + auto* const io = static_cast(vio); + tr_logAddTraceIo(io, "libevent says this peer socket is ready for writing"); - tr_error* error = nullptr; - auto const n_written = io->outbuf.toSocket(fd, howmuch, &error); - auto const should_retry = (error == nullptr) || canRetryFromError(error->code); + TR_ASSERT(io->socket_.is_tcp()); + TR_ASSERT(io->socket_.handle.tcp == fd); - // schedule another write if we have more data to write & think future writes would succeed - if (!std::empty(io->outbuf) && (n_written > 0 || should_retry)) - { - io->setEnabled(Dir, true); - } + io->pending_events_ &= ~EV_WRITE; - if (n_written > 0) - { - didWriteWrapper(io, n_written); - } - else - { - auto const what = BEV_EVENT_WRITING | (error != nullptr ? BEV_EVENT_ERROR : BEV_EVENT_EOF); - - tr_logAddDebugIo( - io, - fmt::format( - "event_write_cb got an err. n_written:{}, what:{}, errno:{} ({})", - n_written, - what, - (error != nullptr ? error->code : 0), - (error != nullptr ? error->message : "EOF"))); - - io->call_error_callback(what); - } - - tr_error_clear(&error); + // Write as much as possible. Since the socket is non-blocking, + // write() will return if it can't write any more without blocking + io->try_write(SIZE_MAX); } /** @@ -311,168 +192,83 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio) #ifdef WITH_UTP /* µTP callbacks */ -void tr_peerIo::readBufferAdd(void const* data, size_t n_bytes) -{ - inbuf.add(data, n_bytes); - setEnabled(TR_DOWN, true); - canReadWrapper(this); -} - -static size_t utp_get_rb_size(tr_peerIo* const io) -{ - auto const bytes = io->bandwidth().clamp(TR_DOWN, UtpReadBufferSize); - - tr_logAddTraceIo(io, fmt::format("utp_get_rb_size is saying it's ready to read {} bytes", bytes)); - return UtpReadBufferSize - bytes; -} - -static size_t tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch, tr_error** error = nullptr); - -static void utp_on_writable(tr_peerIo* io) -{ - tr_logAddTraceIo(io, "libutp says this peer is ready to write"); - - auto const n = tr_peerIoTryWrite(io, SIZE_MAX); - io->setEnabled(TR_UP, n != 0 && !std::empty(io->outbuf)); -} - -static void utp_on_state_change(tr_peerIo* const io, int const state) +void tr_peerIo::on_utp_state_change(int state) { if (state == UTP_STATE_CONNECT) { - tr_logAddTraceIo(io, "utp_on_state_change -- changed to connected"); - io->utp_supported_ = true; + tr_logAddTraceIo(this, "utp_on_state_change -- changed to connected"); + utp_supported_ = true; } else if (state == UTP_STATE_WRITABLE) { - tr_logAddTraceIo(io, "utp_on_state_change -- changed to writable"); + tr_logAddTraceIo(this, "utp_on_state_change -- changed to writable"); - if ((io->pendingEvents & EV_WRITE) != 0) + if ((pending_events_ & EV_WRITE) != 0) { - utp_on_writable(io); + try_write(SIZE_MAX); } } else if (state == UTP_STATE_EOF) { - io->call_error_callback(BEV_EVENT_EOF); + tr_error* error = nullptr; + tr_error_set(&error, ENOTCONN, tr_strerror(ENOTCONN)); + call_error_callback(*error); + tr_error_clear(&error); } else if (state == UTP_STATE_DESTROYING) { - tr_logAddErrorIo(io, "Impossible state UTP_STATE_DESTROYING"); + tr_logAddErrorIo(this, "Impossible state UTP_STATE_DESTROYING"); } else { - tr_logAddErrorIo(io, fmt::format(_("Unknown state: {state}"), fmt::arg("state", state))); + tr_logAddErrorIo(this, fmt::format(_("Unknown state: {state}"), fmt::arg("state", state))); } } -static void utp_on_error(tr_peerIo* const io, int const errcode) +void tr_peerIo::on_utp_error(int errcode) { - if (errcode == UTP_ETIMEDOUT) + tr_logAddTraceIo(this, fmt::format("utp_on_error -- {}", utp_error_code_names[errcode])); + + if (got_error_ != nullptr) { - // high frequency error: we log as trace - tr_logAddTraceIo(io, fmt::format("utp_on_error -- UTP_ETIMEDOUT")); + tr_error* error = nullptr; + tr_error_set(&error, errcode, utp_error_code_names[errcode]); + call_error_callback(*error); + tr_error_clear(&error); } - else - { - tr_logAddDebugIo(io, fmt::format("utp_on_error -- {}", utp_error_code_names[errcode])); - } - - if (io->gotError != nullptr) - { - errno = errcode; - io->call_error_callback(BEV_EVENT_ERROR); - } -} - -static void utp_on_overhead(tr_peerIo* const io, bool const send, size_t const count, int /*type*/) -{ - tr_logAddTraceIo(io, fmt::format("utp_on_overhead -- count is {}", count)); - - io->bandwidth().notifyBandwidthConsumed(send ? TR_UP : TR_DOWN, count, false, tr_time_msec()); -} - -static uint64 utp_callback(utp_callback_arguments* args) -{ - auto* const io = static_cast(utp_get_userdata(args->socket)); - - if (io == nullptr) - { -#ifdef TR_UTP_TRACE - - if (args->callback_type != UTP_ON_STATE_CHANGE || args->u1.state != UTP_STATE_DESTROYING) - { - fmt::print( - stderr, - FMT_STRING("[µTP] [{}:{}] [{}] io is null! buf={}, len={}, flags={}, send/error_code/state={}, type={}\n"), - fmt::ptr(args->context), - fmt::ptr(args->socket), - utp_callback_names[args->callback_type], - fmt::ptr(args->buf), - args->len, - args->flags, - args->u1.send, - args->u2.type); - } - -#endif - - return 0; - } - - TR_ASSERT(tr_isPeerIo(io)); - TR_ASSERT(io->socket.handle.utp == args->socket); - - switch (args->callback_type) - { - case UTP_ON_READ: - io->readBufferAdd(args->buf, args->len); - break; - - case UTP_GET_READ_BUFFER_SIZE: - return utp_get_rb_size(io); - - case UTP_ON_STATE_CHANGE: - utp_on_state_change(io, args->u1.state); - break; - - case UTP_ON_ERROR: - utp_on_error(io, args->u1.error_code); - break; - - case UTP_ON_OVERHEAD_STATISTICS: - utp_on_overhead(io, args->u1.send != 0, args->len, args->u2.type); - break; - } - - return 0; } #endif /* #ifdef WITH_UTP */ tr_peerIo::tr_peerIo( - tr_session* session_in, - tr_sha1_digest_t const* torrent_hash, + tr_session* session, + tr_sha1_digest_t const* info_hash, bool is_incoming, bool is_seed, - tr_bandwidth* parent_bandwidth, - tr_peer_socket sock) - : socket{ std::move(sock) } - , session{ session_in } - , bandwidth_{ parent_bandwidth } - , torrent_hash_{ torrent_hash != nullptr ? *torrent_hash : tr_sha1_digest_t{} } + tr_bandwidth* parent_bandwidth) + : bandwidth_{ parent_bandwidth } + , info_hash_{ info_hash != nullptr ? *info_hash : tr_sha1_digest_t{} } + , session_{ session } , is_seed_{ is_seed } , is_incoming_{ is_incoming } { - if (socket.is_tcp()) +} + +void tr_peerIo::set_socket(tr_peer_socket socket_in) +{ + close(); // tear down the previous socket, if any + + socket_ = std::move(socket_in); + + if (socket_.is_tcp()) { - event_read.reset(event_new(session->eventBase(), socket.handle.tcp, EV_READ, event_read_cb, this)); - event_write.reset(event_new(session->eventBase(), socket.handle.tcp, EV_WRITE, event_write_cb, this)); + event_read_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_READ, &tr_peerIo::event_read_cb, this)); + event_write_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_WRITE, &tr_peerIo::event_write_cb, this)); } #ifdef WITH_UTP - else if (socket.is_utp()) + else if (socket_.is_utp()) { - utp_set_userdata(socket.handle.utp, this); + utp_set_userdata(socket_.handle.utp, this); } #endif else @@ -484,51 +280,107 @@ tr_peerIo::tr_peerIo( std::shared_ptr tr_peerIo::create( tr_session* session, tr_bandwidth* parent, - tr_sha1_digest_t const* torrent_hash, + tr_sha1_digest_t const* info_hash, bool is_incoming, - bool is_seed, - tr_peer_socket sock) + bool is_seed) { TR_ASSERT(session != nullptr); auto lock = session->unique_lock(); - TR_ASSERT(sock.is_valid()); - TR_ASSERT(session->allowsTCP() || !sock.is_tcp()); - - auto io = std::make_shared(session, torrent_hash, is_incoming, is_seed, parent, std::move(sock)); + auto io = std::make_shared(session, info_hash, is_incoming, is_seed, parent); io->bandwidth().setPeer(io); tr_logAddTraceIo(io, fmt::format("bandwidth is {}; its parent is {}", fmt::ptr(&io->bandwidth()), fmt::ptr(parent))); return io; } -void tr_peerIo::utpInit([[maybe_unused]] struct_utp_context* ctx) +void tr_peerIo::utp_init([[maybe_unused]] struct_utp_context* ctx) { #ifdef WITH_UTP + utp_context_set_option(ctx, UTP_RCVBUF, RcvBuf); - utp_set_callback(ctx, UTP_ON_READ, &utp_callback); - utp_set_callback(ctx, UTP_GET_READ_BUFFER_SIZE, &utp_callback); - utp_set_callback(ctx, UTP_ON_STATE_CHANGE, &utp_callback); - utp_set_callback(ctx, UTP_ON_ERROR, &utp_callback); - utp_set_callback(ctx, UTP_ON_OVERHEAD_STATISTICS, &utp_callback); + // note: all the callback handlers here need to check `userdata` for nullptr + // because libutp can fire callbacks on a socket after utp_close() is called - utp_context_set_option(ctx, UTP_RCVBUF, UtpReadBufferSize); + utp_set_callback( + ctx, + UTP_ON_READ, + [](utp_callback_arguments* args) -> uint64 + { + if (auto* const io = static_cast(utp_get_userdata(args->socket)); io != nullptr) + { + io->inbuf_.add(args->buf, args->len); + io->set_enabled(TR_DOWN, true); + io->can_read_wrapper(); + } + return {}; + }); + utp_set_callback( + ctx, + UTP_GET_READ_BUFFER_SIZE, + [](utp_callback_arguments* args) -> uint64 + { + if (auto const* const io = static_cast(utp_get_userdata(args->socket)); io != nullptr) + { + return std::size(io->inbuf_); + } + return {}; + }); + + utp_set_callback( + ctx, + UTP_ON_ERROR, + [](utp_callback_arguments* args) -> uint64 + { + if (auto* const io = static_cast(utp_get_userdata(args->socket)); io != nullptr) + { + io->on_utp_error(args->u1.error_code); + } + return {}; + }); + + utp_set_callback( + ctx, + UTP_ON_OVERHEAD_STATISTICS, + [](utp_callback_arguments* args) -> uint64 + { + if (auto* const io = static_cast(utp_get_userdata(args->socket)); io != nullptr) + { + tr_logAddTraceIo(io, fmt::format("{:d} overhead bytes via utp", args->len)); + io->bandwidth().notifyBandwidthConsumed(args->u1.send != 0 ? TR_UP : TR_DOWN, args->len, false, tr_time_msec()); + } + return {}; + }); + + utp_set_callback( + ctx, + UTP_ON_STATE_CHANGE, + [](utp_callback_arguments* args) -> uint64 + { + if (auto* const io = static_cast(utp_get_userdata(args->socket)); io != nullptr) + { + io->on_utp_state_change(args->u1.state); + } + return {}; + }); #endif } -std::shared_ptr tr_peerIo::newIncoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket) +std::shared_ptr tr_peerIo::new_incoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket) { TR_ASSERT(session != nullptr); - return tr_peerIo::create(session, parent, nullptr, true, false, std::move(socket)); + auto peer_io = tr_peerIo::create(session, parent, nullptr, true, false); + peer_io->set_socket(std::move(socket)); + return peer_io; } -std::shared_ptr tr_peerIo::newOutgoing( +std::shared_ptr tr_peerIo::new_outgoing( tr_session* session, tr_bandwidth* parent, tr_address const& addr, tr_port port, - tr_sha1_digest_t const& torrent_hash, + tr_sha1_digest_t const& info_hash, bool is_seed, bool utp) { @@ -536,96 +388,109 @@ std::shared_ptr tr_peerIo::newOutgoing( TR_ASSERT(addr.is_valid()); TR_ASSERT(utp || session->allowsTCP()); - auto socket = tr_peer_socket{}; + if (!addr.is_valid_for_peers(port)) + { + return {}; + } + auto peer_io = tr_peerIo::create(session, parent, &info_hash, false, is_seed); + +#ifdef WITH_UTP if (utp) { - socket = tr_netOpenPeerUTPSocket(session, addr, port, is_seed); - } + auto* const sock = utp_create_socket(session->utp_context); + utp_set_userdata(sock, peer_io.get()); + peer_io->set_socket(tr_peer_socket{ addr, port, sock }); - if (!socket.is_valid()) + auto const [ss, sslen] = addr.to_sockaddr(port); + if (utp_connect(sock, reinterpret_cast(&ss), sslen) == 0) + { + return peer_io; + } + } +#endif + + if (!peer_io->socket_.is_valid()) { - socket = tr_netOpenPeerSocket(session, addr, port, is_seed); - tr_logAddDebug(fmt::format("tr_netOpenPeerSocket returned {}", socket.is_tcp() ? socket.handle.tcp : TR_BAD_SOCKET)); + if (auto sock = tr_netOpenPeerSocket(session, addr, port, is_seed); sock.is_valid()) + { + peer_io->set_socket(std::move(sock)); + return peer_io; + } } - if (!socket.is_valid()) - { - return nullptr; - } - - return create(session, parent, &torrent_hash, false, is_seed, std::move(socket)); + return {}; } /*** **** ***/ -static void event_enable(tr_peerIo* io, short event) +void tr_peerIo::event_enable(short event) { - TR_ASSERT(io->session != nullptr); + TR_ASSERT(session_ != nullptr); - bool const need_events = io->socket.is_tcp(); - TR_ASSERT(!need_events || io->event_read); - TR_ASSERT(!need_events || io->event_write); + bool const need_events = socket_.is_tcp(); + TR_ASSERT(!need_events || event_read_); + TR_ASSERT(!need_events || event_write_); - if ((event & EV_READ) != 0 && (io->pendingEvents & EV_READ) == 0) + if ((event & EV_READ) != 0 && (pending_events_ & EV_READ) == 0) { - tr_logAddTraceIo(io, "enabling ready-to-read polling"); + tr_logAddTraceIo(this, "enabling ready-to-read polling"); if (need_events) { - event_add(io->event_read.get(), nullptr); + event_add(event_read_.get(), nullptr); } - io->pendingEvents |= EV_READ; + pending_events_ |= EV_READ; } - if ((event & EV_WRITE) != 0 && (io->pendingEvents & EV_WRITE) == 0) + if ((event & EV_WRITE) != 0 && (pending_events_ & EV_WRITE) == 0) { - tr_logAddTraceIo(io, "enabling ready-to-write polling"); + tr_logAddTraceIo(this, "enabling ready-to-write polling"); if (need_events) { - event_add(io->event_write.get(), nullptr); + event_add(event_write_.get(), nullptr); } - io->pendingEvents |= EV_WRITE; + pending_events_ |= EV_WRITE; } } -static void event_disable(tr_peerIo* io, short event) +void tr_peerIo::event_disable(short event) { - bool const need_events = io->socket.is_tcp(); - TR_ASSERT(!need_events || io->event_read); - TR_ASSERT(!need_events || io->event_write); + bool const need_events = socket_.is_tcp(); + TR_ASSERT(!need_events || event_read_); + TR_ASSERT(!need_events || event_write_); - if ((event & EV_READ) != 0 && (io->pendingEvents & EV_READ) != 0) + if ((event & EV_READ) != 0 && (pending_events_ & EV_READ) != 0) { - tr_logAddTraceIo(io, "disabling ready-to-read polling"); + tr_logAddTraceIo(this, "disabling ready-to-read polling"); if (need_events) { - event_del(io->event_read.get()); + event_del(event_read_.get()); } - io->pendingEvents &= ~EV_READ; + pending_events_ &= ~EV_READ; } - if ((event & EV_WRITE) != 0 && (io->pendingEvents & EV_WRITE) != 0) + if ((event & EV_WRITE) != 0 && (pending_events_ & EV_WRITE) != 0) { - tr_logAddTraceIo(io, "disabling ready-to-write polling"); + tr_logAddTraceIo(this, "disabling ready-to-write polling"); if (need_events) { - event_del(io->event_write.get()); + event_del(event_write_.get()); } - io->pendingEvents &= ~EV_WRITE; + pending_events_ &= ~EV_WRITE; } } -void tr_peerIo::setEnabled(tr_direction dir, bool is_enabled) +void tr_peerIo::set_enabled(tr_direction dir, bool is_enabled) { TR_ASSERT(tr_isDirection(dir)); @@ -633,11 +498,11 @@ void tr_peerIo::setEnabled(tr_direction dir, bool is_enabled) if (is_enabled) { - event_enable(this, event); + event_enable(event); } else { - event_disable(this, event); + event_disable(event); } } @@ -645,65 +510,63 @@ void tr_peerIo::setEnabled(tr_direction dir, bool is_enabled) **** ***/ -static void io_close_socket(tr_peerIo* io) +void tr_peerIo::close() { - io->socket.close(io->session); - io->event_write.reset(); - io->event_read.reset(); - io->socket = {}; + socket_.close(session_); + event_write_.reset(); + event_read_.reset(); } tr_peerIo::~tr_peerIo() { - auto const lock = session->unique_lock(); + auto const lock = session_->unique_lock(); - clearCallbacks(); + clear_callbacks(); tr_logAddTraceIo(this, "in tr_peerIo destructor"); - event_disable(this, EV_READ | EV_WRITE); - io_close_socket(this); + event_disable(EV_READ | EV_WRITE); + close(); } -void tr_peerIo::setCallbacks(tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* user_data) +void tr_peerIo::set_callbacks(CanRead can_read, DidWrite did_write, GotError got_error, void* user_data) { - this->canRead = readcb; - this->didWrite = writecb; - this->gotError = errcb; - this->userData = user_data; + can_read_ = can_read; + did_write_ = did_write; + got_error_ = got_error; + user_data_ = user_data; } void tr_peerIo::clear() { - clearCallbacks(); - setEnabled(TR_UP, false); - setEnabled(TR_DOWN, false); - io_close_socket(this); + clear_callbacks(); + set_enabled(TR_UP, false); + set_enabled(TR_DOWN, false); + close(); } -int tr_peerIo::reconnect() +bool tr_peerIo::reconnect() { - TR_ASSERT(tr_isPeerIo(this)); - TR_ASSERT(!this->isIncoming()); - TR_ASSERT(this->session->allowsTCP()); + TR_ASSERT(!this->is_incoming()); + TR_ASSERT(this->session_->allowsTCP()); - short int const pending_events = this->pendingEvents; - event_disable(this, EV_READ | EV_WRITE); + short int const pending_events = this->pending_events_; + event_disable(EV_READ | EV_WRITE); - io_close_socket(this); + close(); - auto const [addr, port] = socketAddress(); - this->socket = tr_netOpenPeerSocket(session, addr, port, this->isSeed()); + auto const [addr, port] = socket_address(); + socket_ = tr_netOpenPeerSocket(session_, addr, port, is_seed()); - if (!this->socket.is_tcp()) + if (!socket_.is_tcp()) { - return -1; + return false; } - this->event_read.reset(event_new(session->eventBase(), this->socket.handle.tcp, EV_READ, event_read_cb, this)); - this->event_write.reset(event_new(session->eventBase(), this->socket.handle.tcp, EV_WRITE, event_write_cb, this)); + this->event_read_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_READ, event_read_cb, this)); + this->event_write_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_WRITE, event_write_cb, this)); - event_enable(this, pending_events); + event_enable(pending_events); - return 0; + return true; } /** @@ -723,10 +586,10 @@ static size_t getDesiredOutputBufferSize(tr_peerIo const* io, uint64_t now) return std::max(ceiling, current_speed_bytes_per_second * period); } -size_t tr_peerIo::getWriteBufferSpace(uint64_t now) const noexcept +size_t tr_peerIo::get_write_buffer_space(uint64_t now) const noexcept { size_t const desired_len = getDesiredOutputBufferSize(this, now); - size_t const current_len = std::size(outbuf); + size_t const current_len = std::size(outbuf_); return desired_len > current_len ? desired_len - current_len : 0U; } @@ -738,63 +601,63 @@ void tr_peerIo::write(libtransmission::Buffer& buf, bool is_piece_data) { auto [bytes, len] = buf.pullup(); encrypt(len, bytes); - outbuf_info.emplace_back(std::size(buf), is_piece_data); - outbuf.add(buf); + outbuf_info_.emplace_back(std::size(buf), is_piece_data); + outbuf_.add(buf); } -void tr_peerIo::writeBytes(void const* bytes, size_t n_bytes, bool is_piece_data) +void tr_peerIo::write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data) { - auto const old_size = std::size(outbuf); + auto const old_size = std::size(outbuf_); - outbuf.reserve(old_size + n_bytes); - outbuf.add(bytes, n_bytes); + outbuf_.reserve(old_size + n_bytes); + outbuf_.add(bytes, n_bytes); - for (auto iter = std::begin(outbuf) + old_size, end = std::end(outbuf); iter != end; ++iter) + for (auto iter = std::begin(outbuf_) + old_size, end = std::end(outbuf_); iter != end; ++iter) { encrypt(1, &*iter); } - outbuf_info.emplace_back(n_bytes, is_piece_data); + outbuf_info_.emplace_back(n_bytes, is_piece_data); } /*** **** ***/ -void tr_peerIo::readBytes(void* bytes, size_t byte_count) +void tr_peerIo::read_bytes(void* bytes, size_t byte_count) { - TR_ASSERT(readBufferSize() >= byte_count); + TR_ASSERT(read_buffer_size() >= byte_count); - inbuf.toBuf(bytes, byte_count); + inbuf_.toBuf(bytes, byte_count); - if (isEncrypted()) + if (is_encrypted()) { decrypt(byte_count, bytes); } } -void tr_peerIo::readUint16(uint16_t* setme) +void tr_peerIo::read_uint16(uint16_t* setme) { auto tmp = uint16_t{}; - readBytes(&tmp, sizeof(tmp)); + read_bytes(&tmp, sizeof(tmp)); *setme = ntohs(tmp); } -void tr_peerIo::readUint32(uint32_t* setme) +void tr_peerIo::read_uint32(uint32_t* setme) { auto tmp = uint32_t{}; - readBytes(&tmp, sizeof(tmp)); + read_bytes(&tmp, sizeof(tmp)); *setme = ntohl(tmp); } -void tr_peerIo::readBufferDrain(size_t byte_count) +void tr_peerIo::read_buffer_drain(size_t byte_count) { auto buf = std::array{}; while (byte_count > 0) { auto const this_pass = std::min(byte_count, std::size(buf)); - readBytes(std::data(buf), this_pass); + read_bytes(std::data(buf), this_pass); byte_count -= this_pass; } } @@ -803,153 +666,104 @@ void tr_peerIo::readBufferDrain(size_t byte_count) **** ***/ -static size_t tr_peerIoTryRead(tr_peerIo* io, size_t howmuch, tr_error** error) +size_t tr_peerIo::try_read(size_t max) { - auto n_read = size_t{ 0U }; + static auto constexpr Dir = TR_DOWN; - howmuch = io->bandwidth().clamp(TR_DOWN, howmuch); - if (howmuch == 0) + if (max == 0) { - return n_read; + return {}; } - TR_ASSERT(io->socket.is_valid()); - if (io->socket.is_tcp()) + // Do not write more than the bandwidth allows. + // If there is no bandwidth left available, disable writes. + max = bandwidth().clamp(TR_DOWN, max); + if (max == 0) { - tr_error* my_error = nullptr; - n_read = io->inbuf.addSocket(io->socket.handle.tcp, howmuch, &my_error); - if (io->readBufferSize() != 0) - { - canReadWrapper(io); - } - - if (my_error != nullptr) - { - if (canRetryFromError(my_error->code)) - { - tr_error_clear(&my_error); - } - else - { - short const what = BEV_EVENT_READING | BEV_EVENT_ERROR | (n_read == 0 ? BEV_EVENT_EOF : 0); - auto const msg = fmt::format( - "tr_peerIoTryRead err: res:{} what:{}, errno:{} ({})", - n_read, - what, - my_error->code, - my_error->message); - tr_logAddTraceIo(io, msg); - - io->call_error_callback(what); - - tr_error_propagate(error, &my_error); - } - } + set_enabled(Dir, false); + return {}; } -#ifdef WITH_UTP - else if (io->socket.is_utp()) + + auto& buf = inbuf_; + tr_error* error = nullptr; + auto const n_read = socket_.try_read(buf, max, &error); + set_enabled(Dir, error == nullptr || canRetryFromError(error->code)); + + if (error != nullptr) { - // UTP_RBDrained notifies libutp that your read buffer is empty. - // It opens up the congestion window by sending an ACK (soonish) - // if one was not going to be sent. - if (io->readBufferSize() == 0) + if (!canRetryFromError(error->code)) { - utp_read_drained(io->socket.handle.utp); + tr_logAddTraceIo(this, fmt::format("try_read err: n_read:{} errno:{} ({})", n_read, error->code, error->message)); + call_error_callback(*error); } + + tr_error_clear(&error); + } + else if (!std::empty(buf)) + { + can_read_wrapper(); } -#endif return n_read; } -static size_t tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch, tr_error** error) +size_t tr_peerIo::try_write(size_t max) { - auto n_written = size_t{ 0U }; + static auto constexpr Dir = TR_UP; - auto const old_len = std::size(io->outbuf); - - howmuch = std::min(howmuch, old_len); - howmuch = io->bandwidth().clamp(TR_UP, howmuch); - if (howmuch == 0) + if (max == 0) { - return n_written; + return {}; } - if (io->socket.is_tcp()) + auto& buf = outbuf_; + max = std::min(max, std::size(buf)); + max = bandwidth().clamp(Dir, max); + if (max == 0) { - tr_error* my_error = nullptr; - n_written = io->outbuf.toSocket(io->socket.handle.tcp, howmuch, &my_error); - - if (n_written > 0) - { - didWriteWrapper(io, n_written); - } - - if (my_error != nullptr) - { - if (canRetryFromError(my_error->code)) - { - tr_error_clear(&my_error); - } - else - { - short constexpr What = BEV_EVENT_WRITING | BEV_EVENT_ERROR; - tr_logAddTraceIo( - io, - fmt::format( - "tr_peerIoTryWrite err: res:{}, what:{}, errno:{} ({})", - n_written, - What, - my_error->code, - my_error->message)); - - io->call_error_callback(What); - - tr_error_propagate(error, &my_error); - } - } + set_enabled(Dir, false); + return {}; } -#ifdef WITH_UTP - else if (io->socket.is_utp()) + + tr_error* error = nullptr; + auto const n_written = socket_.try_write(buf, max, &error); + // enable further writes if there's more data to write + set_enabled(Dir, !std::empty(buf) && (error == nullptr || canRetryFromError(error->code))); + + if (error != nullptr) { - auto iov = io->outbuf.vecs(howmuch); - errno = 0; - auto const n = utp_writev(io->socket.handle.utp, reinterpret_cast(std::data(iov)), std::size(iov)); - auto const error_code = errno; - if (n > 0) + if (!canRetryFromError(error->code)) { - n_written = static_cast(n); - io->outbuf.drain(n); - didWriteWrapper(io, n); - } - else if (n < 0 && !canRetryFromError(error_code)) - { - tr_error_set(error, error_code, tr_strerror(error_code)); + tr_logAddTraceIo( + this, + fmt::format("try_write err: wrote:{}, errno:{} ({})", n_written, error->code, error->message)); + call_error_callback(*error); } + + tr_error_clear(&error); + } + else if (n_written > 0U) + { + did_write_wrapper(n_written); } -#endif return n_written; } -size_t tr_peerIo::flush(tr_direction dir, size_t limit, tr_error** error) +size_t tr_peerIo::flush(tr_direction dir, size_t limit) { TR_ASSERT(tr_isDirection(dir)); - auto const bytes_used = dir == TR_DOWN ? tr_peerIoTryRead(this, limit, error) : tr_peerIoTryWrite(this, limit, error); - tr_logAddTraceIo( - this, - fmt::format("flushing peer-io, direction:{}, limit:{}, byte_used:{}", static_cast(dir), limit, bytes_used)); - return bytes_used; + return dir == TR_DOWN ? try_read(limit) : try_write(limit); } -size_t tr_peerIo::flushOutgoingProtocolMsgs(tr_error** error) +size_t tr_peerIo::flush_outgoing_protocol_msgs() { size_t byte_count = 0; /* count up how many bytes are used by non-piece-data messages at the front of our outbound queue */ - for (auto const& [n_bytes, is_piece_data] : outbuf_info) + for (auto const& [n_bytes, is_piece_data] : outbuf_info_) { if (is_piece_data) { @@ -959,5 +773,5 @@ size_t tr_peerIo::flushOutgoingProtocolMsgs(tr_error** error) byte_count += n_bytes; } - return flush(TR_UP, byte_count, error); + return flush(TR_UP, byte_count); } diff --git a/libtransmission/peer-io.h b/libtransmission/peer-io.h index f08d8b826..f0003f48c 100644 --- a/libtransmission/peer-io.h +++ b/libtransmission/peer-io.h @@ -31,10 +31,14 @@ #include "tr-buffer.h" #include "utils-ev.h" -class tr_peerIo; struct tr_bandwidth; struct struct_utp_context; +namespace libtransmission::test +{ +class HandshakeTest; +} // namespace libtransmission::test + /** * @addtogroup networked_io Networked IO * @{ @@ -47,137 +51,144 @@ enum ReadState READ_ERR }; -auto inline constexpr PEER_IO_MAGIC_NUMBER = 206745; - -namespace libtransmission::test -{ - -class HandshakeTest; - -} // namespace libtransmission::test - class tr_peerIo final : public std::enable_shared_from_this { using DH = tr_message_stream_encryption::DH; using Filter = tr_message_stream_encryption::Filter; public: + using CanRead = ReadState (*)(tr_peerIo* io, void* user_data, size_t* setme_piece_byte_count); + using DidWrite = void (*)(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* userData); + using GotError = void (*)(tr_peerIo* io, tr_error const& error, void* userData); + + tr_peerIo( + tr_session* session_in, + tr_sha1_digest_t const* info_hash, + bool is_incoming, + bool is_seed, + tr_bandwidth* parent_bandwidth); + ~tr_peerIo(); - static std::shared_ptr newOutgoing( + static std::shared_ptr new_outgoing( tr_session* session, tr_bandwidth* parent, tr_address const& addr, tr_port port, - tr_sha1_digest_t const& torrent_hash, + tr_sha1_digest_t const& info_hash, bool is_seed, bool utp); - static std::shared_ptr newIncoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket); + static std::shared_ptr new_incoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket); + + void set_socket(tr_peer_socket); + + [[nodiscard]] constexpr auto is_utp() const noexcept + { + return socket_.is_utp(); + } void clear(); - void readBytes(void* bytes, size_t byte_count); + void read_bytes(void* bytes, size_t byte_count); - void readUint8(uint8_t* setme) + void read_uint8(uint8_t* setme) { - readBytes(setme, sizeof(uint8_t)); + read_bytes(setme, sizeof(uint8_t)); } - void readUint16(uint16_t* setme); - void readUint32(uint32_t* setme); + void read_uint16(uint16_t* setme); + void read_uint32(uint32_t* setme); - int reconnect(); + [[nodiscard]] bool reconnect(); - void setEnabled(tr_direction dir, bool is_enabled); + void set_enabled(tr_direction dir, bool is_enabled); [[nodiscard]] constexpr auto const& address() const noexcept { - return socket.address(); + return socket_.address(); } - [[nodiscard]] constexpr auto socketAddress() const noexcept + [[nodiscard]] constexpr auto socket_address() const noexcept { - return socket.socketAddress(); + return socket_.socketAddress(); } [[nodiscard]] auto display_name() const { - return socket.display_name(); + return socket_.display_name(); } - void readBufferDrain(size_t byte_count); + void read_buffer_drain(size_t byte_count); - [[nodiscard]] auto readBufferSize() const noexcept + [[nodiscard]] auto read_buffer_size() const noexcept { - return std::size(inbuf); + return std::size(inbuf_); } template - [[nodiscard]] auto readBufferStartsWith(T const& t) const noexcept + [[nodiscard]] auto read_buffer_starts_with(T const& t) const noexcept { - return inbuf.startsWith(t); + return inbuf_.startsWith(t); } - void readBufferAdd(void const* data, size_t n_bytes); + size_t flush_outgoing_protocol_msgs(); + size_t flush(tr_direction dir, size_t byte_limit); - size_t flushOutgoingProtocolMsgs(tr_error** error = nullptr); - size_t flush(tr_direction dir, size_t byte_limit, tr_error** error = nullptr); - - void writeBytes(void const* bytes, size_t n_bytes, bool is_piece_data); + void write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data); // Write all the data from `buf`. // This is a destructive add: `buf` is empty after this call. void write(libtransmission::Buffer& buf, bool is_piece_data); - [[nodiscard]] size_t getWriteBufferSpace(uint64_t now) const noexcept; + [[nodiscard]] size_t get_write_buffer_space(uint64_t now) const noexcept; - [[nodiscard]] auto hasBandwidthLeft(tr_direction dir) noexcept + [[nodiscard]] auto has_bandwidth_left(tr_direction dir) noexcept { return bandwidth_.clamp(dir, 1024) > 0; } - [[nodiscard]] auto getPieceSpeedBytesPerSecond(uint64_t now, tr_direction dir) noexcept + [[nodiscard]] auto get_piece_speed_bytes_per_second(uint64_t now, tr_direction dir) noexcept { return bandwidth_.getPieceSpeedBytesPerSecond(now, dir); } - constexpr void enableFEXT(bool flag) noexcept + constexpr void set_supports_fext(bool flag) noexcept { fast_extension_supported_ = flag; } - [[nodiscard]] constexpr auto supportsFEXT() const noexcept + [[nodiscard]] constexpr auto supports_fext() const noexcept { return fast_extension_supported_; } - constexpr void enableLTEP(bool flag) noexcept + constexpr void set_supports_ltep(bool flag) noexcept { extended_protocol_supported_ = flag; } - [[nodiscard]] constexpr auto supportsLTEP() const noexcept + [[nodiscard]] constexpr auto supports_ltep() const noexcept { return extended_protocol_supported_; } - constexpr void enableDHT(bool flag) noexcept + constexpr void set_supports_dht(bool flag) noexcept { dht_supported_ = flag; } - [[nodiscard]] constexpr auto supportsDHT() const noexcept + [[nodiscard]] constexpr auto supports_dht() const noexcept { return dht_supported_; } - [[nodiscard]] constexpr auto supportsUTP() const noexcept + [[nodiscard]] constexpr auto supports_utp() const noexcept { return utp_supported_; } - [[nodiscard]] constexpr auto isSeed() const noexcept + [[nodiscard]] constexpr auto is_seed() const noexcept { return is_seed_; } @@ -192,68 +203,49 @@ public: return bandwidth_; } - void setParent(tr_bandwidth* parent) + void set_parent(tr_bandwidth* parent) { bandwidth_.setParent(parent); } - [[nodiscard]] constexpr auto isIncoming() const noexcept + [[nodiscard]] constexpr auto is_incoming() const noexcept { return is_incoming_; } - void setTorrentHash(tr_sha1_digest_t hash) noexcept + void set_torrent_hash(tr_sha1_digest_t hash) noexcept { - torrent_hash_ = hash; + info_hash_ = hash; } - [[nodiscard]] constexpr auto const& torrentHash() const noexcept + [[nodiscard]] constexpr auto const& torrent_hash() const noexcept { - return torrent_hash_; + return info_hash_; } - using tr_can_read_cb = ReadState (*)(tr_peerIo* io, void* user_data, size_t* setme_piece_byte_count); - using tr_did_write_cb = void (*)(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* userData); - using tr_net_error_cb = void (*)(tr_peerIo* io, short what, void* userData); - void setCallbacks(tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* user_data); + void set_callbacks(CanRead can_read, DidWrite did_write, GotError got_error, void* user_data); - void clearCallbacks() + void clear_callbacks() { - setCallbacks(nullptr, nullptr, nullptr, nullptr); + set_callbacks(nullptr, nullptr, nullptr, nullptr); } - tr_peer_socket socket = {}; - - tr_session* const session; - - tr_can_read_cb canRead = nullptr; - tr_did_write_cb didWrite = nullptr; - tr_net_error_cb gotError = nullptr; - void* userData = nullptr; - - void call_error_callback(short what) + [[nodiscard]] constexpr auto priority() const noexcept { - if (gotError != nullptr) - { - gotError(this, what, userData); - } + return priority_; } - libtransmission::Buffer inbuf; - libtransmission::Buffer outbuf; + [[nodiscard]] constexpr auto is_encrypted() const noexcept + { + return filter_.is_active(); + } - std::deque> outbuf_info; + constexpr void set_priority(tr_priority_t priority) + { + priority_ = priority; + } - libtransmission::evhelpers::event_unique_ptr event_read; - libtransmission::evhelpers::event_unique_ptr event_write; - - short int pendingEvents = 0; - - tr_priority_t priority = TR_PRI_NORMAL; - - bool utp_supported_ = false; - - void decryptInit(bool is_incoming, DH const& dh, tr_sha1_digest_t const& info_hash) + void decrypt_init(bool is_incoming, DH const& dh, tr_sha1_digest_t const& info_hash) { filter_.decryptInit(is_incoming, dh, info_hash); } @@ -263,53 +255,88 @@ public: filter_.decrypt(buflen, buf); } - void encryptInit(bool is_incoming, DH const& dh, tr_sha1_digest_t const& info_hash) + void encrypt_init(bool is_incoming, DH const& dh, tr_sha1_digest_t const& info_hash) { filter_.encryptInit(is_incoming, dh, info_hash); } + static void utp_init(struct_utp_context* ctx); + +private: + static constexpr auto RcvBuf = size_t{ 256 * 1024 }; + + friend class libtransmission::test::HandshakeTest; + + void call_error_callback(tr_error const& error) + { + if (got_error_ != nullptr) + { + got_error_(this, error, user_data_); + } + } + void encrypt(size_t buflen, void* buf) { filter_.encrypt(buflen, buf); } - [[nodiscard]] bool isEncrypted() const noexcept - { - return filter_.is_active(); - } + void on_utp_state_change(int new_state); + void on_utp_error(int errcode); - static void utpInit(struct_utp_context* ctx); + void close(); - tr_peerIo( - tr_session* session_in, - tr_sha1_digest_t const* torrent_hash, - bool is_incoming, - bool is_seed, - tr_bandwidth* parent_bandwidth, - tr_peer_socket sock); + static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio); + static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio); -private: - friend class libtransmission::test::HandshakeTest; + void event_enable(short event); + void event_disable(short event); + + void can_read_wrapper(); + void did_write_wrapper(size_t bytes_transferred); + + size_t try_read(size_t max); + size_t try_write(size_t max); // this is only public for testing purposes. - // production code should use newOutgoing() or newIncoming() + // production code should use new_outgoing() or new_incoming() static std::shared_ptr create( tr_session* session, tr_bandwidth* parent, - tr_sha1_digest_t const* torrent_hash, + tr_sha1_digest_t const* info_hash, bool is_incoming, - bool is_seed, - tr_peer_socket socket); - - tr_bandwidth bandwidth_; + bool is_seed); Filter filter_; - tr_sha1_digest_t torrent_hash_; + std::deque> outbuf_info_; + + tr_peer_socket socket_ = {}; + + tr_bandwidth bandwidth_; + + tr_sha1_digest_t info_hash_; + + libtransmission::Buffer inbuf_; + libtransmission::Buffer outbuf_; + + tr_session* const session_; + + CanRead can_read_ = nullptr; + DidWrite did_write_ = nullptr; + GotError got_error_ = nullptr; + void* user_data_ = nullptr; + + libtransmission::evhelpers::event_unique_ptr event_read_; + libtransmission::evhelpers::event_unique_ptr event_write_; + + short int pending_events_ = 0; + + tr_priority_t priority_ = TR_PRI_NORMAL; bool const is_seed_; bool const is_incoming_; + bool utp_supported_ = false; bool dht_supported_ = false; bool extended_protocol_supported_ = false; bool fast_extension_supported_ = false; diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 1acfc4640..46dec84bd 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -1089,11 +1089,11 @@ static bool on_handshake_done(tr_peerMgr* manager, tr_handshake::Result const& r bool const ok = result.is_connected; bool success = false; - tr_swarm* const s = getExistingSwarm(manager, result.io->torrentHash()); + tr_swarm* const s = getExistingSwarm(manager, result.io->torrent_hash()); - auto const [addr, port] = result.io->socketAddress(); + auto const [addr, port] = result.io->socket_address(); - if (result.io->isIncoming()) + if (result.io->is_incoming()) { manager->incoming_handshakes.erase(addr); } @@ -1135,7 +1135,7 @@ static bool on_handshake_done(tr_peerMgr* manager, tr_handshake::Result const& r atom->piece_data_time = 0; atom->lastConnectionAt = tr_time(); - if (!result.io->isIncoming()) + if (!result.io->is_incoming()) { atom->flags |= ADDED_F_CONNECTABLE; atom->flags2 &= ~MyflagUnreachable; @@ -1143,7 +1143,7 @@ static bool on_handshake_done(tr_peerMgr* manager, tr_handshake::Result const& r /* In principle, this flag specifies whether the peer groks µTP, not whether it's currently connected over µTP. */ - if (result.io->socket.is_utp()) + if (result.io->is_utp()) { atom->flags |= ADDED_F_UTP_FLAGS; } @@ -1152,7 +1152,7 @@ static bool on_handshake_done(tr_peerMgr* manager, tr_handshake::Result const& r { tr_logAddTraceSwarm(s, fmt::format("banned peer {} tried to reconnect", atom->display_name())); } - else if (result.io->isIncoming() && s->peerCount() >= s->tor->peerLimit()) + else if (result.io->is_incoming() && s->peerCount() >= s->tor->peerLimit()) { /* too many peers already */ } @@ -1170,7 +1170,7 @@ static bool on_handshake_done(tr_peerMgr* manager, tr_handshake::Result const& r client = tr_quark_new(std::data(buf)); } - result.io->setParent(&s->tor->bandwidth_); + result.io->set_parent(&s->tor->bandwidth_); createBitTorrentPeer(s->tor, result.io, atom, client); success = true; @@ -1202,7 +1202,7 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket) manager->incoming_handshakes.try_emplace( address, &manager->handshake_mediator_, - tr_peerIo::newIncoming(session, &session->top_bandwidth_, std::move(socket)), + tr_peerIo::new_incoming(session, &session->top_bandwidth_, std::move(socket)), session->encryptionMode(), [manager](tr_handshake::Result const& result) { return on_handshake_done(manager, result); }); } @@ -2759,7 +2759,7 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom) s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom.display_name())); - auto peer_io = tr_peerIo::newOutgoing( + auto peer_io = tr_peerIo::new_outgoing( mgr->session, &mgr->session->top_bandwidth_, atom.addr, diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 754fe25c3..e89c9c82a 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -203,7 +203,7 @@ class tr_peerMsgsImpl; static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece); static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs); static void didWrite(tr_peerIo* io, size_t bytes_written, bool was_piece_data, void* vmsgs); -static void gotError(tr_peerIo* io, short what, void* vmsgs); +static void gotError(tr_peerIo* io, tr_error const& err, void* vmsgs); static void peerPulse(void* vmsgs); static void protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req); static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke); @@ -268,20 +268,20 @@ public: pex_timer_->startRepeating(SendPexInterval); } - if (io->supportsUTP()) + if (io->supports_utp()) { tr_peerMgrSetUtpSupported(torrent, io->address()); tr_peerMgrSetUtpFailed(torrent, io->address(), false); } - if (io->supportsLTEP()) + if (io->supports_ltep()) { sendLtepHandshake(this); } tellPeerWhatWeHave(this); - if (session->allowsDHT() && io->supportsDHT()) + if (session->allowsDHT() && io->supports_dht()) { // only send PORT over IPv6 iff IPv6 DHT is running (BEP-32). if (io->address().is_ipv4() || tr_globalIPv6(nullptr).has_value()) @@ -290,7 +290,7 @@ public: } } - io->setCallbacks(canRead, didWrite, gotError, this); + io->set_callbacks(canRead, didWrite, gotError, this); updateDesiredRequestCount(this); } @@ -326,7 +326,7 @@ public: bool isTransferringPieces(uint64_t now, tr_direction dir, tr_bytes_per_second_t* setme_bytes_per_second) const override { - auto const bytes_per_second = io->getPieceSpeedBytesPerSecond(now, dir); + auto const bytes_per_second = io->get_piece_speed_bytes_per_second(now, dir); if (setme_bytes_per_second != nullptr) { @@ -374,17 +374,17 @@ public: [[nodiscard]] bool is_utp_connection() const noexcept override { - return io->socket.is_utp(); + return io->is_utp(); } [[nodiscard]] bool is_encrypted() const override { - return io->isEncrypted(); + return io->is_encrypted(); } [[nodiscard]] bool is_incoming_connection() const override { - return io->isIncoming(); + return io->is_incoming(); } [[nodiscard]] tr_bandwidth& bandwidth() noexcept override @@ -409,7 +409,7 @@ public: [[nodiscard]] std::pair socketAddress() const override { - return io->socketAddress(); + return io->socket_address(); } [[nodiscard]] std::string display_name() const override @@ -729,7 +729,7 @@ tr_peerMsgs* tr_peerMsgsNew( static void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const* req) { - TR_ASSERT(msgs->io->supportsFEXT()); + TR_ASSERT(msgs->io->supports_fext()); auto& out = msgs->outMessages; @@ -795,7 +795,7 @@ static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke) static void protocolSendHaveAll(tr_peerMsgsImpl* msgs) { - TR_ASSERT(msgs->io->supportsFEXT()); + TR_ASSERT(msgs->io->supports_fext()); auto& out = msgs->outMessages; @@ -809,7 +809,7 @@ static void protocolSendHaveAll(tr_peerMsgsImpl* msgs) static void protocolSendHaveNone(tr_peerMsgsImpl* msgs) { - TR_ASSERT(msgs->io->supportsFEXT()); + TR_ASSERT(msgs->io->supports_fext()); auto& out = msgs->outMessages; @@ -854,7 +854,7 @@ static bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* setme) static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) { - if (auto const must_send_rej = msgs->io->supportsFEXT(); must_send_rej) + if (auto const must_send_rej = msgs->io->supports_fext(); must_send_rej) { for (auto const& req : msgs->peer_requested_) { @@ -872,7 +872,7 @@ static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) static void sendLtepHandshake(tr_peerMsgsImpl* msgs) { auto& out = msgs->outMessages; - auto const ipv6 = tr_globalIPv6(msgs->io->session); + auto const ipv6 = tr_globalIPv6(msgs->session); static tr_quark version_quark = 0; if (msgs->clientSentLtepHandshake) @@ -987,7 +987,7 @@ static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len) // so try using a strbuf to handle it on the stack auto tmp = tr_strbuf{}; tmp.resize(len); - msgs->io->readBytes(std::data(tmp), std::size(tmp)); + msgs->io->read_bytes(std::data(tmp), std::size(tmp)); auto const handshake_sv = tmp.sv(); auto val = tr_variant{}; @@ -1062,14 +1062,14 @@ static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len) uint8_t const* addr = nullptr; auto addr_len = size_t{}; - if (msgs->io->isIncoming() && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4) + if (msgs->io->is_incoming() && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4) { pex.addr.type = TR_AF_INET; memcpy(&pex.addr.addr.addr4, addr, 4); tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, 1); } - if (msgs->io->isIncoming() && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16) + if (msgs->io->is_incoming() && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16) { pex.addr.type = TR_AF_INET6; memcpy(&pex.addr.addr.addr6, addr, 16); @@ -1093,7 +1093,7 @@ static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen) auto tmp = std::vector{}; tmp.resize(msglen); - msgs->io->readBytes(std::data(tmp), std::size(tmp)); + msgs->io->read_bytes(std::data(tmp), std::size(tmp)); char const* const msg_end = std::data(tmp) + std::size(tmp); auto dict = tr_variant{}; @@ -1164,7 +1164,7 @@ static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen) auto tmp = std::vector{}; tmp.resize(msglen); - msgs->io->readBytes(std::data(tmp), std::size(tmp)); + msgs->io->read_bytes(std::data(tmp), std::size(tmp)); if (tr_variant val; tr_variantFromBuf(&val, TR_VARIANT_PARSE_BENC | TR_VARIANT_PARSE_INPLACE, tmp)) { @@ -1209,7 +1209,7 @@ static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen) TR_ASSERT(msglen > 0); auto ltep_msgid = uint8_t{}; - msgs->io->readUint8(<ep_msgid); + msgs->io->read_uint8(<ep_msgid); msglen--; if (ltep_msgid == LtepMessages::Handshake) @@ -1217,7 +1217,7 @@ static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen) logtrace(msgs, "got ltep handshake"); parseLtepHandshake(msgs, msglen); - if (msgs->io->supportsLTEP()) + if (msgs->io->supports_ltep()) { sendLtepHandshake(msgs); msgs->sendPex(); @@ -1238,7 +1238,7 @@ static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen) else { logtrace(msgs, fmt::format(FMT_STRING("skipping unknown ltep message ({:d})"), static_cast(ltep_msgid))); - msgs->io->readBufferDrain(msglen); + msgs->io->read_buffer_drain(msglen); } } @@ -1250,7 +1250,7 @@ static ReadState readBtLength(tr_peerMsgsImpl* msgs, size_t inlen) return READ_LATER; } - msgs->io->readUint32(&len); + msgs->io->read_uint32(&len); if (len == 0) /* peer sent us a keepalive message */ { logtrace(msgs, "got KeepAlive"); @@ -1274,7 +1274,7 @@ static ReadState readBtId(tr_peerMsgsImpl* msgs, size_t inlen) } auto id = uint8_t{}; - msgs->io->readUint8(&id); + msgs->io->read_uint8(&id); msgs->incoming.id = id; logtrace( msgs, @@ -1350,7 +1350,7 @@ static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* re msgs->peer_requested_.emplace_back(*req); prefetchPieces(msgs); } - else if (msgs->io->supportsFEXT()) + else if (msgs->io->supports_fext()) { protocolSendReject(msgs, req); } @@ -1411,7 +1411,7 @@ static int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptrio->readBufferSize() >= inlen); + TR_ASSERT(msgs->io->read_buffer_size() >= inlen); logtrace(msgs, "In readBtPiece"); @@ -1424,8 +1424,8 @@ static ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_ } auto req = peer_request{}; - msgs->io->readUint32(&req.index); - msgs->io->readUint32(&req.offset); + msgs->io->read_uint32(&req.index); + msgs->io->read_uint32(&req.offset); req.length = msgs->incoming.length - 9; logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length)); msgs->incoming.block_req = req; @@ -1449,7 +1449,7 @@ static ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_ auto const n_to_read = std::min({ n_left_in_block, n_left_in_req, inlen }); auto const old_length = std::size(*block_buf); block_buf->resize(old_length + n_to_read); - msgs->io->readBytes(&((*block_buf)[old_length]), n_to_read); + msgs->io->read_bytes(&((*block_buf)[old_length]), n_to_read); msgs->publish(tr_peer_event::GotPieceData(n_to_read)); *setme_piece_bytes_read += n_to_read; @@ -1493,9 +1493,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) { uint8_t const id = msgs->incoming.id; #ifdef TR_ENABLE_ASSERTS - auto const start_buflen = msgs->io->readBufferSize(); + auto const start_buflen = msgs->io->read_buffer_size(); #endif - bool const fext = msgs->io->supportsFEXT(); + bool const fext = msgs->io->supports_fext(); auto ui32 = uint32_t{}; auto msglen = uint32_t{ msgs->incoming.length }; @@ -1556,7 +1556,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) break; case BtPeerMsgs::Have: - msgs->io->readUint32(&ui32); + msgs->io->read_uint32(&ui32); logtrace(msgs, fmt::format(FMT_STRING("got Have: {:d}"), ui32)); if (msgs->torrent->hasMetainfo() && ui32 >= msgs->torrent->pieceCount()) @@ -1579,7 +1579,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) { logtrace(msgs, "got a bitfield"); auto tmp = std::vector(msglen); - msgs->io->readBytes(std::data(tmp), std::size(tmp)); + msgs->io->read_bytes(std::data(tmp), std::size(tmp)); msgs->have_ = tr_bitfield{ msgs->torrent->hasMetainfo() ? msgs->torrent->pieceCount() : std::size(tmp) * 8 }; msgs->have_.setRaw(std::data(tmp), std::size(tmp)); msgs->publish(tr_peer_event::GotBitfield(&msgs->have_)); @@ -1590,9 +1590,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::Request: { struct peer_request r; - msgs->io->readUint32(&r.index); - msgs->io->readUint32(&r.offset); - msgs->io->readUint32(&r.length); + msgs->io->read_uint32(&r.index); + msgs->io->read_uint32(&r.offset); + msgs->io->read_uint32(&r.length); logtrace(msgs, fmt::format(FMT_STRING("got Request: {:d}:{:d}->{:d}"), r.index, r.offset, r.length)); peerMadeRequest(msgs, &r); break; @@ -1601,9 +1601,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::Cancel: { struct peer_request r; - msgs->io->readUint32(&r.index); - msgs->io->readUint32(&r.offset); - msgs->io->readUint32(&r.length); + msgs->io->read_uint32(&r.index); + msgs->io->read_uint32(&r.offset); + msgs->io->read_uint32(&r.length); msgs->cancels_sent_to_client.add(tr_time(), 1); logtrace(msgs, fmt::format(FMT_STRING("got a Cancel {:d}:{:d}->{:d}"), r.index, r.offset, r.length)); @@ -1638,7 +1638,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) logtrace(msgs, "Got a BtPeerMsgs::Port"); auto hport = uint16_t{}; - msgs->io->readUint16(&hport); // readUint16 performs ntoh + msgs->io->read_uint16(&hport); // read_uint16 performs ntoh if (auto const dht_port = tr_port::fromHost(hport); !std::empty(dht_port)) { msgs->dht_port = dht_port; @@ -1649,7 +1649,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::FextSuggest: logtrace(msgs, "Got a BtPeerMsgs::FextSuggest"); - msgs->io->readUint32(&ui32); + msgs->io->read_uint32(&ui32); if (fext) { @@ -1665,7 +1665,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) case BtPeerMsgs::FextAllowedFast: logtrace(msgs, "Got a BtPeerMsgs::FextAllowedFast"); - msgs->io->readUint32(&ui32); + msgs->io->read_uint32(&ui32); if (fext) { @@ -1717,9 +1717,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) { struct peer_request r; logtrace(msgs, "Got a BtPeerMsgs::FextReject"); - msgs->io->readUint32(&r.index); - msgs->io->readUint32(&r.offset); - msgs->io->readUint32(&r.length); + msgs->io->read_uint32(&r.index); + msgs->io->read_uint32(&r.offset); + msgs->io->read_uint32(&r.length); if (fext) { @@ -1742,12 +1742,12 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) default: logtrace(msgs, fmt::format(FMT_STRING("peer sent us an UNKNOWN: {:d}"), static_cast(id))); - msgs->io->readBufferDrain(msglen); + msgs->io->read_buffer_drain(msglen); break; } TR_ASSERT(msglen + 1 == msgs->incoming.length); - TR_ASSERT(msgs->io->readBufferSize() == start_buflen - msglen); + TR_ASSERT(msgs->io->read_buffer_size() == start_buflen - msglen); msgs->state = AwaitingBt::Length; return READ_NOW; @@ -1808,25 +1808,22 @@ static int clientGotBlock( return 0; } -static void didWrite(tr_peerIo* io, size_t bytes_written, bool was_piece_data, void* vmsgs) +static void didWrite(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void* vmsgs) { - auto* msgs = static_cast(vmsgs); + auto* const msgs = static_cast(vmsgs); if (was_piece_data) { msgs->publish(tr_peer_event::SentPieceData(bytes_written)); } - if (tr_isPeerIo(io) && io->userData != nullptr) - { - peerPulse(msgs); - } + peerPulse(msgs); } static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) { auto* msgs = static_cast(vmsgs); - size_t const inlen = io->readBufferSize(); + size_t const inlen = io->read_buffer_size(); logtrace( msgs, @@ -1947,7 +1944,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) size_t bytes_written = 0; struct peer_request req; bool const have_messages = !std::empty(msgs->outMessages); - bool const fext = msgs->io->supportsFEXT(); + bool const fext = msgs->io->supports_fext(); /** *** Protocol messages @@ -1974,7 +1971,8 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) *** Metadata Pieces **/ - if (auto piece = int{}; msgs->io->getWriteBufferSpace(now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece)) + if (auto piece = int{}; + msgs->io->get_write_buffer_space(now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece)) { auto ok = bool{ false }; @@ -2031,7 +2029,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) *** Data Blocks **/ - if (msgs->io->getWriteBufferSpace(now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_)) + if (msgs->io->get_write_buffer_space(now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_)) { req = msgs->peer_requested_.front(); msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); @@ -2133,31 +2131,9 @@ static void peerPulse(void* vmsgs) } } -static void gotError(tr_peerIo* io, short what, void* vmsgs) +static void gotError(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs) { - auto* msgs = static_cast(vmsgs); - - if ((what & BEV_EVENT_TIMEOUT) != 0) - { - logdbg(msgs, fmt::format(FMT_STRING("libevent got a timeout, what={:d}"), what)); - } - - if ((what & BEV_EVENT_EOF) != 0) - { - logdbg(msgs, fmt::format("peer closed connection. {:s}", io->display_name())); - } - else if (what == BEV_EVENT_ERROR) - { - // exact BEV_EVENT_ERROR are high frequency errors from utp_on_error which were already logged appropriately - logtrace(msgs, fmt::format("libevent got an error! what={:d}, errno={:d} ({:s})", what, errno, tr_strerror(errno))); - } - else if ((what & BEV_EVENT_ERROR) != 0) - { - // read or write error - logdbg(msgs, fmt::format("libevent got an error! what={:d}, errno={:d} ({:s})", what, errno, tr_strerror(errno))); - } - - msgs->publish(tr_peer_event::GotError(ENOTCONN)); + static_cast(vmsgs)->publish(tr_peer_event::GotError(ENOTCONN)); } static void sendBitfield(tr_peerMsgsImpl* msgs) @@ -2176,7 +2152,7 @@ static void sendBitfield(tr_peerMsgsImpl* msgs) static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs) { - bool const fext = msgs->io->supportsFEXT(); + bool const fext = msgs->io->supports_fext(); if (fext && msgs->torrent->hasAll()) { diff --git a/libtransmission/peer-socket.cc b/libtransmission/peer-socket.cc index 7248cada8..31f1d9b5a 100644 --- a/libtransmission/peer-socket.cc +++ b/libtransmission/peer-socket.cc @@ -49,7 +49,7 @@ tr_peer_socket::tr_peer_socket(tr_address const& address, tr_port port, struct U void tr_peer_socket::close(tr_session* session) { - if (is_tcp()) + if (is_tcp() && (handle.tcp != TR_BAD_SOCKET)) { tr_netClose(session, handle.tcp); } @@ -64,3 +64,99 @@ void tr_peer_socket::close(tr_session* session) type_ = Type::None; handle = {}; } + +size_t tr_peer_socket::try_write(Buffer& buf, size_t max, tr_error** error) const +{ + if (max == size_t{}) + { + return {}; + } + + if (is_tcp()) + { + return buf.toSocket(handle.tcp, max, error); + } + +#ifdef WITH_UTP + if (is_utp()) + { + auto iov = buf.vecs(max); + + errno = 0; + auto const n = utp_writev(handle.utp, reinterpret_cast(std::data(iov)), std::size(iov)); + auto const error_code = errno; + + if (n > 0) + { + buf.drain(n); + return static_cast(n); + } + + if (n < 0 && error_code != 0) + { + tr_error_set(error, error_code, tr_strerror(error_code)); + } + } +#endif + + return {}; +} + +size_t tr_peer_socket::try_read(Buffer& buf, size_t max, tr_error** error) const +{ + if (max == size_t{}) + { + return {}; + } + + if (is_tcp()) + { + return buf.addSocket(handle.tcp, max, error); + } + +#ifdef WITH_UTP + if (is_utp()) + { + // utp_read_drained() notifies libutp that this read buffer is + // empty. It opens up the congestion window by sending an ACK + // (soonish) if one was not going to be sent. + if (std::empty(buf)) + { + utp_read_drained(handle.utp); + } + } +#endif + + return {}; +} + +tr_peer_socket tr_netOpenPeerUTPSocket( + tr_session* session, + tr_address const& addr, + tr_port port, + bool /*client_is_seed*/, + void* userdata) +{ + auto ret = tr_peer_socket{}; + + if (session->utp_context != nullptr && addr.is_valid_for_peers(port)) + { + auto const [ss, sslen] = addr.to_sockaddr(port); + + if (auto* const sock = utp_create_socket(session->utp_context); sock != nullptr) + { + utp_set_userdata(sock, userdata); + + if (utp_connect(sock, reinterpret_cast(&ss), sslen) != -1) + { + ret = tr_peer_socket{ addr, port, sock }; + } + else + { + utp_close(sock); + } + } + } + + return ret; +} diff --git a/libtransmission/peer-socket.h b/libtransmission/peer-socket.h index 7b5a84a85..9b4715c4b 100644 --- a/libtransmission/peer-socket.h +++ b/libtransmission/peer-socket.h @@ -15,8 +15,10 @@ #include "transmission.h" +#include "error.h" #include "net.h" #include "tr-assert.h" +#include "tr-buffer.h" struct UTPSocket; struct tr_session; @@ -24,6 +26,8 @@ struct tr_session; class tr_peer_socket { public: + using Buffer = libtransmission::Buffer; + tr_peer_socket() = default; tr_peer_socket(tr_session* session, tr_address const& address, tr_port port, tr_socket_t sock); tr_peer_socket(tr_address const& address, tr_port port, struct UTPSocket* const sock); @@ -35,6 +39,9 @@ public: void close(tr_session* session); + size_t try_write(Buffer& buf, size_t max, tr_error** error) const; + size_t try_read(Buffer& buf, size_t max, tr_error** error) const; + [[nodiscard]] constexpr std::pair socketAddress() const noexcept { return std::make_pair(address_, port_); @@ -85,6 +92,32 @@ public: #endif } + [[nodiscard]] constexpr size_t guess_packet_overhead(size_t n_bytes) const noexcept + { + if (is_tcp()) + { + // https://web.archive.org/web/20140912230020/http://sd.wareonearth.com:80/~phil/net/overhead/ + // TCP over Ethernet: + // Assuming no header compression (e.g. not PPP) + // Add 20 IPv4 header or 40 IPv6 header (no options) + // Add 20 TCP header + // Add 12 bytes optional TCP timestamps + // Max TCP Payload data rates over ethernet are thus: + // (1500-40)/ (38+1500) = 94.9285 % IPv4, minimal headers + // (1500-52)/ (38+1500) = 94.1482 % IPv4, TCP timestamps + // (1500-52)/ (42+1500) = 93.9040 % 802.1q, IPv4, TCP timestamps + // (1500-60)/ (38+1500) = 93.6281 % IPv6, minimal headers + // (1500-72)/ (38+1500) = 92.8479 % IPv6, TCP timestamps + // (1500-72)/ (42+1500) = 92.6070 % 802.1q, IPv6, TCP timestamps + + // So, let's guess around 7% overhead + return n_bytes / 14U; + } + + // We only guess for TCP; uTP tracks its overhead via UTP_ON_OVERHEAD_STATISTICS + return {}; + } + union { tr_socket_t tcp; @@ -106,4 +139,9 @@ private: }; tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const& addr, tr_port port, bool client_is_seed); -tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const& addr, tr_port port, bool client_is_seed); +tr_peer_socket tr_netOpenPeerUTPSocket( + tr_session* session, + tr_address const& addr, + tr_port port, + bool client_is_seed, + void* userdata); diff --git a/libtransmission/tr-buffer.h b/libtransmission/tr-buffer.h index a0d8d81c9..5f7d98963 100644 --- a/libtransmission/tr-buffer.h +++ b/libtransmission/tr-buffer.h @@ -281,17 +281,27 @@ public: evbuffer_expand(buf_.get(), n_bytes - size()); } - // -1 on error, 0 on eof, >0 for num bytes read - auto addSocket(tr_socket_t sockfd, size_t n_bytes, tr_error** error = nullptr) + size_t addSocket(tr_socket_t sockfd, size_t n_bytes, tr_error** error = nullptr) { EVUTIL_SET_SOCKET_ERROR(0); auto const res = evbuffer_read(buf_.get(), sockfd, static_cast(n_bytes)); auto const err = EVUTIL_SOCKET_ERROR(); - if (res == -1) + + if (res > 0) + { + return static_cast(res); + } + + if (res == 0) + { + tr_error_set(error, ENOTCONN, tr_strerror(ENOTCONN)); + } + else { tr_error_set(error, err, tr_net_strerror(err)); } - return res; + + return {}; } // Move all data from one buffer into another. diff --git a/libtransmission/tr-utp.cc b/libtransmission/tr-utp.cc index 79b6676a5..ef343af55 100644 --- a/libtransmission/tr-utp.cc +++ b/libtransmission/tr-utp.cc @@ -112,15 +112,6 @@ static void utp_send_to( ss->udp_core_->sendto(buf, buflen, to, tolen); } -#ifdef TR_UTP_TRACE - -static void utp_log(tr_session* const /*session*/, char const* const msg) -{ - fmt::print(stderr, FMT_STRING("[µTP] {}\n"), msg); -} - -#endif - static uint64 utp_callback(utp_callback_arguments* args) { auto* const session = static_cast(utp_context_get_userdata(args->context)); @@ -131,11 +122,9 @@ static uint64 utp_callback(utp_callback_arguments* args) switch (args->callback_type) { #ifdef TR_UTP_TRACE - case UTP_LOG: - utp_log(session, args->buf); + fmt::print(stderr, FMT_STRING("[µTP] {}\n"), args->buf); break; - #endif case UTP_ON_ACCEPT: @@ -204,20 +193,15 @@ void tr_utpInit(tr_session* session) } utp_context_set_userdata(ctx, session); - utp_set_callback(ctx, UTP_ON_ACCEPT, &utp_callback); utp_set_callback(ctx, UTP_SENDTO, &utp_callback); - - tr_peerIo::utpInit(ctx); + tr_peerIo::utp_init(ctx); #ifdef TR_UTP_TRACE - utp_set_callback(ctx, UTP_LOG, &utp_callback); - utp_context_set_option(ctx, UTP_LOG_NORMAL, 1); utp_context_set_option(ctx, UTP_LOG_MTU, 1); utp_context_set_option(ctx, UTP_LOG_DEBUG, 1); - #endif session->utp_context = ctx; diff --git a/tests/libtransmission/handshake-test.cc b/tests/libtransmission/handshake-test.cc index 0e18c96ad..d06c0f2e6 100644 --- a/tests/libtransmission/handshake-test.cc +++ b/tests/libtransmission/handshake-test.cc @@ -159,7 +159,7 @@ public: auto sockpair = std::array{ -1, -1 }; EXPECT_EQ(0, evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, std::data(sockpair))) << tr_strerror(errno); return std::make_pair( - tr_peerIo::newIncoming( + tr_peerIo::new_incoming( session, &session->top_bandwidth_, tr_peer_socket(session, DefaultPeerAddr, DefaultPeerPort, sockpair[0])), @@ -170,15 +170,9 @@ public: { auto sockpair = std::array{ -1, -1 }; EXPECT_EQ(0, evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, std::data(sockpair))) << tr_strerror(errno); - return std::make_pair( - tr_peerIo::create( - session, - &session->top_bandwidth_, - &info_hash, - false /*is_incoming*/, - false /*is_seed*/, - tr_peer_socket(session, DefaultPeerAddr, DefaultPeerPort, sockpair[0])), - sockpair[1]); + auto peer_io = tr_peerIo::create(session, &session->top_bandwidth_, &info_hash, false /*incoming*/, false /*seed*/); + peer_io->set_socket(tr_peer_socket(session, DefaultPeerAddr, DefaultPeerPort, sockpair[0])); + return std::make_pair(std::move(peer_io), sockpair[1]); } static constexpr auto makePeerId(std::string_view sv) @@ -250,7 +244,7 @@ TEST_F(HandshakeTest, incomingPlaintext) EXPECT_EQ(io, res->io); EXPECT_TRUE(res->peer_id); EXPECT_EQ(peer_id, res->peer_id); - EXPECT_EQ(TorrentWeAreSeeding.info_hash, io->torrentHash()); + EXPECT_EQ(TorrentWeAreSeeding.info_hash, io->torrent_hash()); evutil_closesocket(sock); } @@ -276,7 +270,7 @@ TEST_F(HandshakeTest, incomingPlaintextUnknownInfoHash) EXPECT_TRUE(res->read_anything_from_peer); EXPECT_EQ(io, res->io); EXPECT_FALSE(res->peer_id); - EXPECT_EQ(tr_sha1_digest_t{}, io->torrentHash()); + EXPECT_EQ(tr_sha1_digest_t{}, io->torrent_hash()); evutil_closesocket(sock); } @@ -302,8 +296,8 @@ TEST_F(HandshakeTest, outgoingPlaintext) EXPECT_EQ(io, res->io); EXPECT_TRUE(res->peer_id); EXPECT_EQ(peer_id, res->peer_id); - EXPECT_EQ(UbuntuTorrent.info_hash, io->torrentHash()); - EXPECT_EQ(tr_sha1_to_string(UbuntuTorrent.info_hash), tr_sha1_to_string(io->torrentHash())); + EXPECT_EQ(UbuntuTorrent.info_hash, io->torrent_hash()); + EXPECT_EQ(tr_sha1_to_string(UbuntuTorrent.info_hash), tr_sha1_to_string(io->torrent_hash())); evutil_closesocket(sock); } @@ -340,8 +334,8 @@ TEST_F(HandshakeTest, incomingEncrypted) EXPECT_EQ(io, res->io); EXPECT_TRUE(res->peer_id); EXPECT_EQ(ExpectedPeerId, res->peer_id); - EXPECT_EQ(UbuntuTorrent.info_hash, io->torrentHash()); - EXPECT_EQ(tr_sha1_to_string(UbuntuTorrent.info_hash), tr_sha1_to_string(io->torrentHash())); + EXPECT_EQ(UbuntuTorrent.info_hash, io->torrent_hash()); + EXPECT_EQ(tr_sha1_to_string(UbuntuTorrent.info_hash), tr_sha1_to_string(io->torrent_hash())); evutil_closesocket(sock); } @@ -374,7 +368,7 @@ TEST_F(HandshakeTest, incomingEncryptedUnknownInfoHash) EXPECT_TRUE(res); EXPECT_FALSE(res->is_connected); EXPECT_TRUE(res->read_anything_from_peer); - EXPECT_EQ(tr_sha1_digest_t{}, io->torrentHash()); + EXPECT_EQ(tr_sha1_digest_t{}, io->torrent_hash()); evutil_closesocket(sock); } @@ -416,8 +410,8 @@ TEST_F(HandshakeTest, outgoingEncrypted) EXPECT_EQ(io, res->io); EXPECT_TRUE(res->peer_id); EXPECT_EQ(ExpectedPeerId, res->peer_id); - EXPECT_EQ(UbuntuTorrent.info_hash, io->torrentHash()); - EXPECT_EQ(tr_sha1_to_string(UbuntuTorrent.info_hash), tr_sha1_to_string(io->torrentHash())); + EXPECT_EQ(UbuntuTorrent.info_hash, io->torrent_hash()); + EXPECT_EQ(tr_sha1_to_string(UbuntuTorrent.info_hash), tr_sha1_to_string(io->torrent_hash())); evutil_closesocket(sock); }