1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2025-03-15 16:29:34 +00:00

refactor: tr_peerIo read buffer (#3732)

* refactor: make tr_peerIoReadBytes() a member method

* refactor: make tr_peerIoReadUint8() a member method

* refactor: make tr_peerIoReadUint16() a member method

* refactor: make tr_peerIoReadUint32() a member method

* refactor: make tr_peerIoSetIOFuncs() a member method

* refactor: make tr_peerIoReconnect() a member method

* refactor: make tr_peerIoClear() a member method

* refactor: make tr_peerIoDrain() a member method

* refactor: move evbuffer_add_hton_16() impl to cc
This commit is contained in:
Charles Kerr 2022-08-29 13:33:08 -05:00 committed by GitHub
parent cee339e10d
commit 0ed595ca0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 151 additions and 164 deletions

View file

@ -250,7 +250,7 @@ static handshake_parse_err_t parseHandshake(tr_handshake* handshake, struct evbu
/* confirm the protocol */
auto name = std::array<uint8_t, HANDSHAKE_NAME_LEN>{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(name), std::size(name));
handshake->io->readBytes(std::data(name), std::size(name));
if (memcmp(std::data(name), HANDSHAKE_NAME, std::size(name)) != 0)
{
return HANDSHAKE_ENCRYPTION_WRONG;
@ -258,11 +258,11 @@ static handshake_parse_err_t parseHandshake(tr_handshake* handshake, struct evbu
/* read the reserved bytes */
auto reserved = std::array<uint8_t, HANDSHAKE_FLAGS_LEN>{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(reserved), std::size(reserved));
handshake->io->readBytes(std::data(reserved), std::size(reserved));
/* torrent hash */
auto hash = tr_sha1_digest_t{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(hash), std::size(hash));
handshake->io->readBytes(std::data(hash), std::size(hash));
if (auto const torrent_hash = handshake->io->torrentHash(); !torrent_hash || *torrent_hash != hash)
{
tr_logAddTraceHand(handshake, "peer returned the wrong hash. wtf?");
@ -271,7 +271,7 @@ static handshake_parse_err_t parseHandshake(tr_handshake* handshake, struct evbu
// peer_id
auto peer_id = tr_peer_id_t{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(peer_id), std::size(peer_id));
handshake->io->readBytes(std::data(peer_id), std::size(peer_id));
handshake->peer_id = peer_id;
/* peer id */
@ -491,7 +491,7 @@ static ReadState readVC(tr_handshake* handshake, struct evbuffer* inbuf)
tr_logAddTraceHand(handshake, "got it!");
// We already know it's a match; now we just need to
// consume it from the read buffer.
tr_peerIoReadBytes(handshake->io, inbuf, std::data(needle), std::size(needle));
handshake->io->readBytes(std::data(needle), std::size(needle));
setState(handshake, AWAITING_CRYPTO_SELECT);
return READ_NOW;
}
@ -513,7 +513,7 @@ static ReadState readCryptoSelect(tr_handshake* handshake, struct evbuffer* inbu
}
uint32_t crypto_select = 0;
tr_peerIoReadUint32(handshake->io, inbuf, &crypto_select);
handshake->io->readUint32(&crypto_select);
handshake->crypto_select = crypto_select;
tr_logAddTraceHand(handshake, fmt::format("crypto select is {}", crypto_select));
@ -524,7 +524,7 @@ static ReadState readCryptoSelect(tr_handshake* handshake, struct evbuffer* inbu
}
uint16_t pad_d_len = 0;
tr_peerIoReadUint16(handshake->io, inbuf, &pad_d_len);
handshake->io->readUint16(&pad_d_len);
tr_logAddTraceHand(handshake, fmt::format("pad_d_len is {}", pad_d_len));
if (pad_d_len > 512)
@ -550,7 +550,7 @@ static ReadState readPadD(tr_handshake* handshake, struct evbuffer* inbuf)
return READ_LATER;
}
tr_peerIoDrain(handshake->io, inbuf, needlen);
handshake->io->readBufferDrain(needlen);
setState(handshake, AWAITING_HANDSHAKE);
return READ_NOW;
@ -606,7 +606,7 @@ static ReadState readHandshake(tr_handshake* handshake, struct evbuffer* inbuf)
/* pstr (BitTorrent) */
TR_ASSERT(pstrlen == 19);
auto pstr = std::array<uint8_t, 20>{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(pstr), pstrlen);
handshake->io->readBytes(std::data(pstr), pstrlen);
pstr[pstrlen] = '\0';
if (strncmp(reinterpret_cast<char const*>(std::data(pstr)), "BitTorrent protocol", 19) != 0)
@ -616,7 +616,7 @@ static ReadState readHandshake(tr_handshake* handshake, struct evbuffer* inbuf)
/* reserved bytes */
auto reserved = std::array<uint8_t, HANDSHAKE_FLAGS_LEN>{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(reserved), std::size(reserved));
handshake->io->readBytes(std::data(reserved), std::size(reserved));
/**
*** Extensions
@ -628,7 +628,7 @@ static ReadState readHandshake(tr_handshake* handshake, struct evbuffer* inbuf)
/* torrent hash */
auto hash = tr_sha1_digest_t{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(hash), std::size(hash));
handshake->io->readBytes(std::data(hash), std::size(hash));
if (handshake->isIncoming())
{
@ -680,7 +680,7 @@ static ReadState readPeerId(tr_handshake* handshake, struct evbuffer* inbuf)
{
return READ_LATER;
}
tr_peerIoReadBytes(handshake->io, inbuf, std::data(peer_id), std::size(peer_id));
handshake->io->readBytes(std::data(peer_id), std::size(peer_id));
handshake->peer_id = peer_id;
auto client = std::array<char, 128>{};
@ -802,13 +802,13 @@ static ReadState readCryptoProvide(tr_handshake* handshake, struct evbuffer* inb
handshake->io->decryptInit(handshake->io->isIncoming(), handshake->dh, *handshake->io->torrentHash());
auto vc_in = vc_t{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(vc_in), std::size(vc_in));
handshake->io->readBytes(std::data(vc_in), std::size(vc_in));
tr_peerIoReadUint32(handshake->io, inbuf, &crypto_provide);
handshake->io->readUint32(&crypto_provide);
handshake->crypto_provide = crypto_provide;
tr_logAddTraceHand(handshake, fmt::format("crypto_provide is {}", crypto_provide));
tr_peerIoReadUint16(handshake->io, inbuf, &padc_len);
handshake->io->readUint16(&padc_len);
tr_logAddTraceHand(handshake, fmt::format("padc is {}", padc_len));
if (padc_len > PadC_MAXLEN)
{
@ -823,8 +823,6 @@ static ReadState readCryptoProvide(tr_handshake* handshake, struct evbuffer* inb
static ReadState readPadC(tr_handshake* handshake, struct evbuffer* inbuf)
{
uint16_t ia_len = 0;
if (auto const needlen = handshake->pad_c_len + sizeof(uint16_t); evbuffer_get_length(inbuf) < needlen)
{
return READ_LATER;
@ -832,10 +830,11 @@ static ReadState readPadC(tr_handshake* handshake, struct evbuffer* inbuf)
// read the throwaway padc
auto pad_c = std::array<char, PadC_MAXLEN>{};
tr_peerIoReadBytes(handshake->io, inbuf, std::data(pad_c), handshake->pad_c_len);
handshake->io->readBytes(std::data(pad_c), handshake->pad_c_len);
/* read ia_len */
tr_peerIoReadUint16(handshake->io, inbuf, &ia_len);
uint16_t ia_len = 0;
handshake->io->readUint16(&ia_len);
tr_logAddTraceHand(handshake, fmt::format("ia_len is {}", ia_len));
handshake->ia_len = ia_len;
setState(handshake, AWAITING_IA);
@ -955,7 +954,7 @@ static ReadState canRead(tr_peerIo* io, void* vhandshake, size_t* piece)
auto* handshake = static_cast<tr_handshake*>(vhandshake);
evbuffer* const inbuf = io->getReadBuffer();
auto* const inbuf = io->readBuffer();
bool readyForMore = true;
/* no piece data in handshake */
@ -1062,7 +1061,7 @@ static bool fireDoneFunc(tr_handshake* handshake, bool isConnected)
static ReadState tr_handshakeDone(tr_handshake* handshake, bool is_connected)
{
tr_logAddTraceHand(handshake, is_connected ? "handshakeDone: connected" : "handshakeDone: aborting");
tr_peerIoSetIOFuncs(handshake->io, nullptr, nullptr, nullptr, nullptr);
handshake->io->setCallbacks(nullptr, nullptr, nullptr, nullptr);
bool const success = fireDoneFunc(handshake, is_connected);
delete handshake;
@ -1095,7 +1094,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake)
handshake->mediator->setUTPFailed(*hash, io->address());
}
if (handshake->mediator->allowsTCP() && tr_peerIoReconnect(handshake->io) == 0)
if (handshake->mediator->allowsTCP() && handshake->io->reconnect() == 0)
{
auto msg = std::array<uint8_t, HANDSHAKE_SIZE>{};
buildHandshakeMessage(handshake, std::data(msg));
@ -1110,7 +1109,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake)
* try a plaintext handshake */
if ((handshake->state == AWAITING_YB || handshake->state == AWAITING_VC) &&
handshake->encryption_mode != TR_ENCRYPTION_REQUIRED && handshake->mediator->allowsTCP() &&
tr_peerIoReconnect(handshake->io) == 0)
handshake->io->reconnect() == 0)
{
auto msg = std::array<uint8_t, HANDSHAKE_SIZE>{};
tr_logAddTraceHand(handshake, "handshake failed, trying plaintext...");
@ -1148,7 +1147,7 @@ tr_handshake* tr_handshakeNew(
handshake->timeout_timer->startSingleShot(HandshakeTimeoutSec);
tr_peerIoRef(io); /* balanced by the unref in ~tr_handshake() */
tr_peerIoSetIOFuncs(handshake->io, canRead, nullptr, gotError, handshake);
handshake->io->setCallbacks(canRead, nullptr, gotError, handshake);
if (handshake->isIncoming())
{

View file

@ -814,49 +814,47 @@ std::string tr_peerIo::addrStr() const
return tr_isPeerIo(this) ? this->addr_.readable(this->port_) : "error";
}
void tr_peerIoSetIOFuncs(tr_peerIo* io, tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* user_data)
void tr_peerIo::setCallbacks(tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* user_data)
{
io->canRead = readcb;
io->didWrite = writecb;
io->gotError = errcb;
io->userData = user_data;
this->canRead = readcb;
this->didWrite = writecb;
this->gotError = errcb;
this->userData = user_data;
}
void tr_peerIoClear(tr_peerIo* io)
void tr_peerIo::clear()
{
tr_peerIoSetIOFuncs(io, nullptr, nullptr, nullptr, nullptr);
tr_peerIoSetEnabled(io, TR_UP, false);
tr_peerIoSetEnabled(io, TR_DOWN, false);
io_close_socket(io);
setCallbacks(nullptr, nullptr, nullptr, nullptr);
tr_peerIoSetEnabled(this, TR_UP, false);
tr_peerIoSetEnabled(this, TR_DOWN, false);
io_close_socket(this);
}
int tr_peerIoReconnect(tr_peerIo* io)
int tr_peerIo::reconnect()
{
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(!io->isIncoming());
TR_ASSERT(io->session->allowsTCP());
TR_ASSERT(tr_isPeerIo(this));
TR_ASSERT(!this->isIncoming());
TR_ASSERT(this->session->allowsTCP());
tr_session* session = tr_peerIoGetSession(io);
short int const pending_events = this->pendingEvents;
event_disable(this, EV_READ | EV_WRITE);
short int const pendingEvents = io->pendingEvents;
event_disable(io, EV_READ | EV_WRITE);
io_close_socket(this);
io_close_socket(io);
auto const [addr, port] = this->socketAddress();
this->socket = tr_netOpenPeerSocket(session, &addr, port, this->isSeed());
auto const [addr, port] = io->socketAddress();
io->socket = tr_netOpenPeerSocket(session, &addr, port, io->isSeed());
if (io->socket.type != TR_PEER_SOCKET_TYPE_TCP)
if (this->socket.type != TR_PEER_SOCKET_TYPE_TCP)
{
return -1;
}
io->event_read = event_new(session->eventBase(), io->socket.handle.tcp, EV_READ, event_read_cb, io);
io->event_write = event_new(session->eventBase(), io->socket.handle.tcp, EV_WRITE, event_write_cb, io);
this->event_read = event_new(session->eventBase(), this->socket.handle.tcp, EV_READ, event_read_cb, this);
this->event_write = event_new(session->eventBase(), this->socket.handle.tcp, EV_WRITE, event_write_cb, this);
event_enable(io, pendingEvents);
io->session->setSocketTOS(io->socket.handle.tcp, addr.type);
maybeSetCongestionAlgorithm(io->socket.handle.tcp, session->peerCongestionAlgorithm());
event_enable(this, pending_events);
this->session->setSocketTOS(this->socket.handle.tcp, addr.type);
maybeSetCongestionAlgorithm(this->socket.handle.tcp, session->peerCongestionAlgorithm());
return 0;
}
@ -978,45 +976,59 @@ void evbuffer_add_uint64(struct evbuffer* outbuf, uint64_t addme_hll)
evbuffer_add(outbuf, &nll, sizeof(nll));
}
void evbuffer_add_hton_16(struct evbuffer* buf, uint16_t val)
{
evbuffer_add_uint16(buf, val);
}
void evbuffer_add_hton_32(struct evbuffer* buf, uint32_t val)
{
evbuffer_add_uint32(buf, val);
}
void evbuffer_add_hton_64(struct evbuffer* buf, uint64_t val)
{
evbuffer_add_uint64(buf, val);
}
/***
****
***/
void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount)
void tr_peerIo::readBytes(void* bytes, size_t byte_count)
{
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(evbuffer_get_length(inbuf) >= byteCount);
TR_ASSERT(readBufferSize() >= byte_count);
evbuffer_remove(inbuf, bytes, byteCount);
evbuffer_remove(inbuf.get(), bytes, byte_count);
if (io->isEncrypted())
if (isEncrypted())
{
io->decrypt(byteCount, bytes);
decrypt(byte_count, bytes);
}
}
void tr_peerIoReadUint16(tr_peerIo* io, struct evbuffer* inbuf, uint16_t* setme)
void tr_peerIo::readUint16(uint16_t* setme)
{
auto tmp = uint16_t{};
tr_peerIoReadBytes(io, inbuf, &tmp, sizeof(uint16_t));
readBytes(&tmp, sizeof(tmp));
*setme = ntohs(tmp);
}
void tr_peerIoReadUint32(tr_peerIo* io, struct evbuffer* inbuf, uint32_t* setme)
void tr_peerIo::readUint32(uint32_t* setme)
{
auto tmp = uint32_t{};
tr_peerIoReadBytes(io, inbuf, &tmp, sizeof(uint32_t));
readBytes(&tmp, sizeof(tmp));
*setme = ntohl(tmp);
}
void tr_peerIoDrain(tr_peerIo* io, struct evbuffer* inbuf, size_t byte_count)
void tr_peerIo::readBufferDrain(size_t byte_count)
{
auto buf = std::array<char, 4096>{};
while (byte_count > 0)
{
size_t const this_pass = std::min(byte_count, std::size(buf));
tr_peerIoReadBytes(io, inbuf, std::data(buf), this_pass);
auto const this_pass = std::min(byte_count, std::size(buf));
readBytes(std::data(buf), this_pass);
byte_count -= this_pass;
}
}

View file

@ -95,6 +95,20 @@ public:
}
}
void clear();
void readBytes(void* bytes, size_t byte_count);
void readUint8(uint8_t* setme)
{
readBytes(setme, sizeof(uint8_t));
}
void readUint16(uint16_t* setme);
void readUint32(uint32_t* setme);
int reconnect();
[[nodiscard]] constexpr tr_address const& address() const noexcept
{
return addr_;
@ -107,11 +121,18 @@ public:
std::string addrStr() const;
[[nodiscard]] auto getReadBuffer() noexcept
[[nodiscard]] auto readBuffer() noexcept
{
return inbuf.get();
}
void readBufferDrain(size_t byte_count);
[[nodiscard]] auto readBufferSize() const noexcept
{
return evbuffer_get_length(inbuf.get());
}
void readBufferAdd(void const* data, size_t n_bytes);
[[nodiscard]] auto hasBandwidthLeft(tr_direction dir) noexcept
@ -194,6 +215,8 @@ public:
return torrent_hash_;
}
void setCallbacks(tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* user_data);
// TODO(ckerr): yikes, unlike other class' magic_numbers it looks
// like this one isn't being used just for assertions, but also in
// didWriteWrapper() to see if the tr_peerIo got freed during the
@ -337,28 +360,6 @@ constexpr bool tr_isPeerIo(tr_peerIo const* io)
***
**/
constexpr tr_session* tr_peerIoGetSession(tr_peerIo* io)
{
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(io->session != nullptr);
return io->session;
}
int tr_peerIoReconnect(tr_peerIo* io);
/**
***
**/
void tr_peerIoSetIOFuncs(tr_peerIo* io, tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* user_data);
void tr_peerIoClear(tr_peerIo* io);
/**
***
**/
void tr_peerIoWriteBytes(tr_peerIo* io, void const* writeme, size_t writeme_len, bool is_piece_data);
void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData);
@ -372,33 +373,9 @@ void evbuffer_add_uint16(struct evbuffer* outbuf, uint16_t hs);
void evbuffer_add_uint32(struct evbuffer* outbuf, uint32_t hl);
void evbuffer_add_uint64(struct evbuffer* outbuf, uint64_t hll);
static inline void evbuffer_add_hton_16(struct evbuffer* buf, uint16_t val)
{
evbuffer_add_uint16(buf, val);
}
static inline void evbuffer_add_hton_32(struct evbuffer* buf, uint32_t val)
{
evbuffer_add_uint32(buf, val);
}
static inline void evbuffer_add_hton_64(struct evbuffer* buf, uint64_t val)
{
evbuffer_add_uint64(buf, val);
}
void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount);
static inline void tr_peerIoReadUint8(tr_peerIo* io, struct evbuffer* inbuf, uint8_t* setme)
{
tr_peerIoReadBytes(io, inbuf, setme, sizeof(uint8_t));
}
void tr_peerIoReadUint16(tr_peerIo* io, struct evbuffer* inbuf, uint16_t* setme);
void tr_peerIoReadUint32(tr_peerIo* io, struct evbuffer* inbuf, uint32_t* setme);
void tr_peerIoDrain(tr_peerIo* io, struct evbuffer* inbuf, size_t byte_count);
void evbuffer_add_hton_16(struct evbuffer* buf, uint16_t val);
void evbuffer_add_hton_32(struct evbuffer* buf, uint32_t val);
void evbuffer_add_hton_64(struct evbuffer* buf, uint64_t val);
/**
***

View file

@ -286,7 +286,7 @@ public:
}
}
tr_peerIoSetIOFuncs(io, canRead, didWrite, gotError, this);
io->setCallbacks(canRead, didWrite, gotError, this);
updateDesiredRequestCount(this);
}
@ -302,7 +302,7 @@ public:
if (this->io != nullptr)
{
tr_peerIoClear(this->io);
this->io->clear();
tr_peerIoUnref(this->io); /* balanced by the ref in handshakeDoneCB() */
}
@ -1228,7 +1228,7 @@ static void sendLtepHandshake(tr_peerMsgsImpl* msgs)
tr_variantClear(&val);
}
static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len, struct evbuffer* inbuf)
static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len)
{
msgs->peerSentLtepHandshake = true;
@ -1236,7 +1236,7 @@ static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len, struct evbuf
// so try using a strbuf to handle it on the stack
auto tmp = tr_strbuf<char, 512>{};
tmp.resize(len);
tr_peerIoReadBytes(msgs->io, inbuf, std::data(tmp), std::size(tmp));
msgs->io->readBytes(std::data(tmp), std::size(tmp));
auto const handshake_sv = tmp.sv();
auto val = tr_variant{};
@ -1334,7 +1334,7 @@ static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len, struct evbuf
tr_variantClear(&val);
}
static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen)
{
int64_t msg_type = -1;
int64_t piece = -1;
@ -1342,7 +1342,7 @@ static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuf
auto tmp = std::vector<char>{};
tmp.resize(msglen);
tr_peerIoReadBytes(msgs->io, inbuf, std::data(tmp), std::size(tmp));
msgs->io->readBytes(std::data(tmp), std::size(tmp));
char const* const msg_end = std::data(tmp) + std::size(tmp);
auto dict = tr_variant{};
@ -1404,7 +1404,7 @@ static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuf
}
}
static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen)
{
tr_torrent* tor = msgs->torrent;
if (!tor->allowsPex())
@ -1414,7 +1414,7 @@ static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer*
auto tmp = std::vector<char>{};
tmp.resize(msglen);
tr_peerIoReadBytes(msgs->io, inbuf, std::data(tmp), std::size(tmp));
msgs->io->readBytes(std::data(tmp), std::size(tmp));
if (tr_variant val; tr_variantFromBuf(&val, TR_VARIANT_PARSE_BENC | TR_VARIANT_PARSE_INPLACE, tmp))
{
@ -1454,18 +1454,18 @@ static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer*
}
}
static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen)
{
TR_ASSERT(msglen > 0);
auto ltep_msgid = uint8_t{};
tr_peerIoReadUint8(msgs->io, inbuf, &ltep_msgid);
msgs->io->readUint8(&ltep_msgid);
msglen--;
if (ltep_msgid == LtepMessages::Handshake)
{
logtrace(msgs, "got ltep handshake");
parseLtepHandshake(msgs, msglen, inbuf);
parseLtepHandshake(msgs, msglen);
if (msgs->io->supportsLTEP())
{
@ -1477,22 +1477,22 @@ static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* i
{
logtrace(msgs, "got ut pex");
msgs->peerSupportsPex = true;
parseUtPex(msgs, msglen, inbuf);
parseUtPex(msgs, msglen);
}
else if (ltep_msgid == UT_METADATA_ID)
{
logtrace(msgs, "got ut metadata");
msgs->peerSupportsMetadataXfer = true;
parseUtMetadata(msgs, msglen, inbuf);
parseUtMetadata(msgs, msglen);
}
else
{
logtrace(msgs, fmt::format(FMT_STRING("skipping unknown ltep message ({:d})"), static_cast<int>(ltep_msgid)));
evbuffer_drain(inbuf, msglen);
msgs->io->readBufferDrain(msglen);
}
}
static ReadState readBtLength(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen)
static ReadState readBtLength(tr_peerMsgsImpl* msgs, size_t inlen)
{
auto len = uint32_t{};
if (inlen < sizeof(len))
@ -1500,7 +1500,7 @@ static ReadState readBtLength(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, siz
return READ_LATER;
}
tr_peerIoReadUint32(msgs->io, inbuf, &len);
msgs->io->readUint32(&len);
if (len == 0) /* peer sent us a keepalive message */
{
logtrace(msgs, "got KeepAlive");
@ -1514,9 +1514,9 @@ static ReadState readBtLength(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, siz
return READ_NOW;
}
static ReadState readBtMessage(tr_peerMsgsImpl* /*msgs*/, struct evbuffer* /*inbuf*/, size_t /*inlen*/);
static ReadState readBtMessage(tr_peerMsgsImpl* /*msgs*/, size_t /*inlen*/);
static ReadState readBtId(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen)
static ReadState readBtId(tr_peerMsgsImpl* msgs, size_t inlen)
{
if (inlen < sizeof(uint8_t))
{
@ -1524,7 +1524,7 @@ static ReadState readBtId(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t
}
auto id = uint8_t{};
tr_peerIoReadUint8(msgs->io, inbuf, &id);
msgs->io->readUint8(&id);
msgs->incoming.id = id;
logtrace(
msgs,
@ -1542,7 +1542,7 @@ static ReadState readBtId(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t
return READ_NOW;
}
return readBtMessage(msgs, inbuf, inlen - 1);
return readBtMessage(msgs, inlen - 1);
}
static void prefetchPieces(tr_peerMsgsImpl* msgs)
@ -1659,9 +1659,9 @@ static bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint3
static int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>& block_data, tr_block_index_t block);
static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read)
static ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_bytes_read)
{
TR_ASSERT(evbuffer_get_length(inbuf) >= inlen);
TR_ASSERT(msgs->io->readBufferSize() >= inlen);
logtrace(msgs, "In readBtPiece");
@ -1674,8 +1674,8 @@ static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size
}
auto req = peer_request{};
tr_peerIoReadUint32(msgs->io, inbuf, &req.index);
tr_peerIoReadUint32(msgs->io, inbuf, &req.offset);
msgs->io->readUint32(&req.index);
msgs->io->readUint32(&req.offset);
req.length = msgs->incoming.length - 9;
logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length));
msgs->incoming.block_req = req;
@ -1699,7 +1699,7 @@ static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size
auto const n_to_read = std::min({ n_left_in_block, n_left_in_req, inlen });
auto const old_length = std::size(*block_buf);
block_buf->resize(old_length + n_to_read);
tr_peerIoReadBytes(msgs->io, inbuf, &((*block_buf)[old_length]), n_to_read);
msgs->io->readBytes(&((*block_buf)[old_length]), n_to_read);
msgs->publishClientGotPieceData(n_to_read);
*setme_piece_bytes_read += n_to_read;
@ -1744,11 +1744,11 @@ static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size
return err != 0 ? READ_ERR : READ_NOW;
}
static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen)
static ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen)
{
uint8_t const id = msgs->incoming.id;
#ifdef TR_ENABLE_ASSERTS
size_t const startBufLen = evbuffer_get_length(inbuf);
auto const start_buflen = msgs->io->readBufferSize();
#endif
bool const fext = msgs->io->supportsFEXT();
@ -1811,7 +1811,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
break;
case BtPeerMsgs::Have:
tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
msgs->io->readUint32(&ui32);
logtrace(msgs, fmt::format(FMT_STRING("got Have: {:d}"), ui32));
if (msgs->torrent->hasMetainfo() && ui32 >= msgs->torrent->pieceCount())
@ -1834,7 +1834,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
{
logtrace(msgs, "got a bitfield");
auto tmp = std::vector<uint8_t>(msglen);
tr_peerIoReadBytes(msgs->io, inbuf, std::data(tmp), std::size(tmp));
msgs->io->readBytes(std::data(tmp), std::size(tmp));
msgs->have_ = tr_bitfield{ msgs->torrent->hasMetainfo() ? msgs->torrent->pieceCount() : std::size(tmp) * 8 };
msgs->have_.setRaw(std::data(tmp), std::size(tmp));
msgs->publishClientGotBitfield(&msgs->have_);
@ -1845,9 +1845,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
case BtPeerMsgs::Request:
{
struct peer_request r;
tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
msgs->io->readUint32(&r.index);
msgs->io->readUint32(&r.offset);
msgs->io->readUint32(&r.length);
logtrace(msgs, fmt::format(FMT_STRING("got Request: {:d}:{:d}->{:d}"), r.index, r.offset, r.length));
peerMadeRequest(msgs, &r);
break;
@ -1856,9 +1856,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
case BtPeerMsgs::Cancel:
{
struct peer_request r;
tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
msgs->io->readUint32(&r.index);
msgs->io->readUint32(&r.offset);
msgs->io->readUint32(&r.length);
msgs->cancels_sent_to_client.add(tr_time(), 1);
logtrace(msgs, fmt::format(FMT_STRING("got a Cancel {:d}:{:d}->{:d}"), r.index, r.offset, r.length));
@ -1893,7 +1893,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
logtrace(msgs, "Got a BtPeerMsgs::Port");
auto nport = uint16_t{};
tr_peerIoReadUint16(msgs->io, inbuf, &nport);
msgs->io->readUint16(&nport);
if (auto const dht_port = tr_port::fromNetwork(nport); !std::empty(dht_port))
{
msgs->dht_port = dht_port;
@ -1904,7 +1904,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
case BtPeerMsgs::FextSuggest:
logtrace(msgs, "Got a BtPeerMsgs::FextSuggest");
tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
msgs->io->readUint32(&ui32);
if (fext)
{
@ -1920,7 +1920,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
case BtPeerMsgs::FextAllowedFast:
logtrace(msgs, "Got a BtPeerMsgs::FextAllowedFast");
tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
msgs->io->readUint32(&ui32);
if (fext)
{
@ -1972,9 +1972,9 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
{
struct peer_request r;
logtrace(msgs, "Got a BtPeerMsgs::FextReject");
tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
msgs->io->readUint32(&r.index);
msgs->io->readUint32(&r.offset);
msgs->io->readUint32(&r.length);
if (fext)
{
@ -1991,17 +1991,17 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
case BtPeerMsgs::Ltep:
logtrace(msgs, "Got a BtPeerMsgs::Ltep");
parseLtep(msgs, msglen, inbuf);
parseLtep(msgs, msglen);
break;
default:
logtrace(msgs, fmt::format(FMT_STRING("peer sent us an UNKNOWN: {:d}"), static_cast<int>(id)));
tr_peerIoDrain(msgs->io, inbuf, msglen);
msgs->io->readBufferDrain(msglen);
break;
}
TR_ASSERT(msglen + 1 == msgs->incoming.length);
TR_ASSERT(evbuffer_get_length(inbuf) == startBufLen - msglen);
TR_ASSERT(msgs->io->readBufferSize() == start_buflen - msglen);
msgs->state = AwaitingBt::Length;
return READ_NOW;
@ -2070,8 +2070,7 @@ static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void
static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
{
auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);
evbuffer* const in = io->getReadBuffer();
size_t const inlen = evbuffer_get_length(in);
size_t const inlen = io->readBufferSize();
logtrace(
msgs,
@ -2084,22 +2083,22 @@ static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
}
else if (msgs->state == AwaitingBt::Piece)
{
ret = readBtPiece(msgs, in, inlen, piece);
ret = readBtPiece(msgs, inlen, piece);
}
else
{
switch (msgs->state)
{
case AwaitingBt::Length:
ret = readBtLength(msgs, in, inlen);
ret = readBtLength(msgs, inlen);
break;
case AwaitingBt::Id:
ret = readBtId(msgs, in, inlen);
ret = readBtId(msgs, inlen);
break;
case AwaitingBt::Message:
ret = readBtMessage(msgs, in, inlen);
ret = readBtMessage(msgs, inlen);
break;
default: