1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2024-12-26 01:27:28 +00:00

Refactor/tr peer io member funcs (#3734)

* refactor: make tr_peerIoSetEnabled() a member method

* refactor: make tr_peerIoFlushOutgoingProtocolMsgs() a member method

* refactor: make tr_peerIoFlush() a member method

* refactor: make tr_peerWriteBytes() a member method

* refactor: make tr_peerWriteBuf() a member method

* refactor: make tr_peerIoGetWriteBufferSpace() a member method

* chore: remove unused declaration

* refactor: make tr_peerIoUtpInit() a member method

* refactor: make tr_peerIoNew() a member method

* refactor: make tr_peerIoNewOutgoing() a member method

* refactor: make tr_peerIoNewIncoming() a member method
This commit is contained in:
Charles Kerr 2022-08-29 15:58:18 -05:00 committed by GitHub
parent 0ed595ca0e
commit 7c014e3256
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 133 additions and 164 deletions

View file

@ -182,7 +182,7 @@ void tr_bandwidth::phaseOne(std::vector<tr_peerIo*>& peer_array, tr_direction di
* out in a timely manner. */
size_t const increment = 3000;
int const bytes_used = tr_peerIoFlush(peer_array[i], dir, increment);
int const bytes_used = peer_array[i]->flush(dir, increment);
tr_logAddTrace(fmt::format("peer #{} of {} used {} bytes in this pass", i, n, bytes_used));
@ -212,7 +212,7 @@ void tr_bandwidth::allocate(tr_direction dir, unsigned int period_msec)
for (auto* io : tmp)
{
tr_peerIoRef(io);
tr_peerIoFlushOutgoingProtocolMsgs(io);
io->flushOutgoingProtocolMsgs();
switch (io->priority)
{
@ -243,7 +243,7 @@ void tr_bandwidth::allocate(tr_direction dir, unsigned int period_msec)
* or (2) the next tr_bandwidth::allocate () call, when we start over again. */
for (auto* io : tmp)
{
tr_peerIoSetEnabled(io, dir, io->hasBandwidthLeft(dir));
io->setEnabled(dir, io->hasBandwidthLeft(dir));
}
for (auto* io : tmp)

View file

@ -310,7 +310,7 @@ static void sendPublicKeyAndPad(tr_handshake* handshake)
auto walk = data;
walk = std::copy(std::begin(public_key), std::end(public_key), walk);
walk += handshake->mediator->pad(walk, PadMax);
tr_peerIoWriteBytes(handshake->io, data, walk - data, false);
handshake->io->writeBytes(data, walk - data, false);
}
// 1 A->B: our public key (Ya) and some padding (PadA)
@ -439,7 +439,7 @@ static ReadState readYb(tr_handshake* handshake, struct evbuffer* inbuf)
/* ENCRYPT(VC, crypto_provide, len(PadC), PadC
* PadC is reserved for future extensions to the handshake...
* standard practice at this time is for it to be zero-length */
tr_peerIoWriteBuf(handshake->io, outbuf, false);
handshake->io->writeBuf(outbuf, false);
handshake->io->encryptInit(handshake->io->isIncoming(), handshake->dh, *info_hash);
evbuffer_add(outbuf, std::data(VC), std::size(VC));
evbuffer_add_uint32(outbuf, getCryptoProvide(handshake));
@ -460,7 +460,7 @@ static ReadState readYb(tr_handshake* handshake, struct evbuffer* inbuf)
/* send it */
handshake->io->decryptInit(handshake->io->isIncoming(), handshake->dh, *info_hash);
setReadState(handshake, AWAITING_VC);
tr_peerIoWriteBuf(handshake->io, outbuf, false);
handshake->io->writeBuf(outbuf, false);
/* cleanup */
evbuffer_free(outbuf);
@ -664,7 +664,7 @@ static ReadState readHandshake(tr_handshake* handshake, struct evbuffer* inbuf)
return tr_handshakeDone(handshake, false);
}
tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false);
handshake->io->writeBytes(std::data(msg), std::size(msg), false);
handshake->haveSentBitTorrentHandshake = true;
}
@ -891,7 +891,7 @@ static ReadState readIA(tr_handshake* handshake, struct evbuffer const* inbuf)
/* maybe de-encrypt our connection */
if (crypto_select == CRYPTO_PROVIDE_PLAINTEXT)
{
tr_peerIoWriteBuf(handshake->io, outbuf, false);
handshake->io->writeBuf(outbuf, false);
}
tr_logAddTraceHand(handshake, "sending handshake");
@ -908,7 +908,7 @@ static ReadState readIA(tr_handshake* handshake, struct evbuffer const* inbuf)
}
/* send it out */
tr_peerIoWriteBuf(handshake->io, outbuf, false);
handshake->io->writeBuf(outbuf, false);
evbuffer_free(outbuf);
/* now await the handshake */
@ -1100,7 +1100,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake)
buildHandshakeMessage(handshake, std::data(msg));
handshake->haveSentBitTorrentHandshake = true;
setReadState(handshake, AWAITING_HANDSHAKE);
tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false);
handshake->io->writeBytes(std::data(msg), std::size(msg), false);
}
}
@ -1116,7 +1116,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake)
buildHandshakeMessage(handshake, std::data(msg));
handshake->haveSentBitTorrentHandshake = true;
setReadState(handshake, AWAITING_HANDSHAKE);
tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false);
handshake->io->writeBytes(std::data(msg), std::size(msg), false);
}
else
{
@ -1164,7 +1164,7 @@ tr_handshake* tr_handshakeNew(
handshake->haveSentBitTorrentHandshake = true;
setReadState(handshake, AWAITING_HANDSHAKE);
tr_peerIoWriteBytes(handshake->io, std::data(msg), std::size(msg), false);
handshake->io->writeBytes(std::data(msg), std::size(msg), false);
}
return handshake;

View file

@ -205,7 +205,7 @@ static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio)
/* if we don't have any bandwidth left, stop reading */
if (howmuch < 1)
{
tr_peerIoSetEnabled(io, dir, false);
io->setEnabled(dir, false);
return;
}
@ -215,7 +215,7 @@ static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio)
if (res > 0)
{
tr_peerIoSetEnabled(io, dir, true);
io->setEnabled(dir, true);
/* Invoke the user callback - must always be called last */
canReadWrapper(io);
@ -232,7 +232,7 @@ static void event_read_cb(evutil_socket_t fd, short /*event*/, void* vio)
{
if (e == EAGAIN || e == EINTR)
{
tr_peerIoSetEnabled(io, dir, true);
io->setEnabled(dir, true);
return;
}
@ -282,7 +282,7 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio)
/* if we don't have any bandwidth left, stop writing */
if (howmuch < 1)
{
tr_peerIoSetEnabled(io, dir, false);
io->setEnabled(dir, false);
return;
}
@ -313,7 +313,7 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio)
if (evbuffer_get_length(io->outbuf.get()) != 0)
{
tr_peerIoSetEnabled(io, dir, true);
io->setEnabled(dir, true);
}
didWriteWrapper(io, res);
@ -322,7 +322,7 @@ static void event_write_cb(evutil_socket_t fd, short /*event*/, void* vio)
RESCHEDULE:
if (evbuffer_get_length(io->outbuf.get()) != 0)
{
tr_peerIoSetEnabled(io, dir, true);
io->setEnabled(dir, true);
}
return;
@ -360,7 +360,7 @@ void tr_peerIo::readBufferAdd(void const* data, size_t n_bytes)
return;
}
tr_peerIoSetEnabled(this, TR_DOWN, true);
setEnabled(TR_DOWN, true);
canReadWrapper(this);
}
@ -379,7 +379,7 @@ static void utp_on_writable(tr_peerIo* io)
tr_logAddTraceIo(io, "libutp says this peer is ready to write");
int const n = tr_peerIoTryWrite(io, SIZE_MAX);
tr_peerIoSetEnabled(io, TR_UP, n != 0 && evbuffer_get_length(io->outbuf.get()) != 0);
io->setEnabled(TR_UP, n != 0 && evbuffer_get_length(io->outbuf.get()) != 0);
}
static void utp_on_state_change(tr_peerIo* const io, int const state)
@ -491,7 +491,7 @@ static uint64 utp_callback(utp_callback_arguments* args)
#endif /* #ifdef WITH_UTP */
tr_peerIo* tr_peerIoNew(
tr_peerIo* tr_peerIo::create(
tr_session* session,
tr_bandwidth* parent,
tr_address const* addr,
@ -548,7 +548,7 @@ tr_peerIo* tr_peerIoNew(
return io;
}
void tr_peerIoUtpInit([[maybe_unused]] struct_utp_context* ctx)
void tr_peerIo::utpInit([[maybe_unused]] struct_utp_context* ctx)
{
#ifdef WITH_UTP
@ -563,7 +563,7 @@ void tr_peerIoUtpInit([[maybe_unused]] struct_utp_context* ctx)
#endif
}
tr_peerIo* tr_peerIoNewIncoming(
tr_peerIo* tr_peerIo::newIncoming(
tr_session* session,
tr_bandwidth* parent,
tr_address const* addr,
@ -574,10 +574,10 @@ tr_peerIo* tr_peerIoNewIncoming(
TR_ASSERT(session != nullptr);
TR_ASSERT(tr_address_is_valid(addr));
return tr_peerIoNew(session, parent, addr, port, current_time, nullptr, true, false, socket);
return tr_peerIo::create(session, parent, addr, port, current_time, nullptr, true, false, socket);
}
tr_peerIo* tr_peerIoNewOutgoing(
tr_peerIo* tr_peerIo::newOutgoing(
tr_session* session,
tr_bandwidth* parent,
tr_address const* addr,
@ -611,7 +611,7 @@ tr_peerIo* tr_peerIoNewOutgoing(
return nullptr;
}
return tr_peerIoNew(session, parent, addr, port, current_time, &torrent_hash, false, is_seed, socket);
return create(session, parent, addr, port, current_time, &torrent_hash, false, is_seed, socket);
}
/***
@ -696,22 +696,21 @@ static void event_disable(tr_peerIo* io, short event)
}
}
void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled)
void tr_peerIo::setEnabled(tr_direction dir, bool is_enabled)
{
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(tr_isDirection(dir));
TR_ASSERT(tr_amInEventThread(io->session));
TR_ASSERT(io->session->events != nullptr);
TR_ASSERT(tr_amInEventThread(session));
TR_ASSERT(session->events != nullptr);
short const event = dir == TR_UP ? EV_WRITE : EV_READ;
if (isEnabled)
if (is_enabled)
{
event_enable(io, event);
event_enable(this, event);
}
else
{
event_disable(io, event);
event_disable(this, event);
}
}
@ -825,8 +824,8 @@ void tr_peerIo::setCallbacks(tr_can_read_cb readcb, tr_did_write_cb writecb, tr_
void tr_peerIo::clear()
{
setCallbacks(nullptr, nullptr, nullptr, nullptr);
tr_peerIoSetEnabled(this, TR_UP, false);
tr_peerIoSetEnabled(this, TR_DOWN, false);
setEnabled(TR_UP, false);
setEnabled(TR_DOWN, false);
io_close_socket(this);
}
@ -876,18 +875,11 @@ static unsigned int getDesiredOutputBufferSize(tr_peerIo const* io, uint64_t now
return std::max(ceiling, currentSpeed_Bps * period);
}
size_t tr_peerIoGetWriteBufferSpace(tr_peerIo const* io, uint64_t now)
size_t tr_peerIo::getWriteBufferSpace(uint64_t now) const
{
size_t const desiredLen = getDesiredOutputBufferSize(io, now);
size_t const currentLen = evbuffer_get_length(io->outbuf.get());
size_t freeSpace = 0;
if (desiredLen > currentLen)
{
freeSpace = desiredLen - currentLen;
}
return freeSpace;
size_t const desired_len = getDesiredOutputBufferSize(this, now);
size_t const current_len = evbuffer_get_length(outbuf.get());
return desired_len > current_len ? desired_len - current_len : 0U;
}
/**
@ -917,36 +909,35 @@ static inline void processBuffer(tr_peerIo& io, evbuffer* buffer, size_t offset,
TR_ASSERT(size == 0);
}
void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData)
void tr_peerIo::writeBuf(struct evbuffer* buf, bool isPieceData)
{
size_t const byteCount = evbuffer_get_length(buf);
if (io->isEncrypted())
if (isEncrypted())
{
processBuffer(*io, buf, 0, byteCount);
processBuffer(*this, buf, 0, byteCount);
}
evbuffer_add_buffer(io->outbuf.get(), buf);
io->outbuf_info.emplace_back(byteCount, isPieceData);
evbuffer_add_buffer(outbuf.get(), buf);
outbuf_info.emplace_back(byteCount, isPieceData);
}
void tr_peerIoWriteBytes(tr_peerIo* io, void const* writeme, size_t writeme_len, bool is_piece_data)
void tr_peerIo::writeBytes(void const* writeme, size_t writeme_len, bool is_piece_data)
{
struct evbuffer_iovec iovec;
evbuffer_reserve_space(io->outbuf.get(), writeme_len, &iovec, 1);
evbuffer_reserve_space(outbuf.get(), writeme_len, &iovec, 1);
iovec.iov_len = writeme_len;
memcpy(iovec.iov_base, writeme, iovec.iov_len);
if (io->isEncrypted())
if (isEncrypted())
{
io->encrypt(iovec.iov_len, iovec.iov_base);
encrypt(iovec.iov_len, iovec.iov_base);
}
evbuffer_commit_space(io->outbuf.get(), &iovec, 1);
evbuffer_commit_space(outbuf.get(), &iovec, 1);
io->outbuf_info.emplace_back(writeme_len, is_piece_data);
outbuf_info.emplace_back(writeme_len, is_piece_data);
}
/***
@ -1155,31 +1146,30 @@ static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch)
return n;
}
int tr_peerIoFlush(tr_peerIo* io, tr_direction dir, size_t limit)
int tr_peerIo::flush(tr_direction dir, size_t limit)
{
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(tr_isDirection(dir));
int const bytes_used = dir == TR_DOWN ? tr_peerIoTryRead(io, limit) : tr_peerIoTryWrite(io, limit);
tr_logAddTraceIo(io, fmt::format("flushing peer-io, direction:{}, limit:{}, byte_used:{}", dir, limit, bytes_used));
int const bytes_used = dir == TR_DOWN ? tr_peerIoTryRead(this, limit) : tr_peerIoTryWrite(this, limit);
tr_logAddTraceIo(this, fmt::format("flushing peer-io, direction:{}, limit:{}, byte_used:{}", dir, limit, bytes_used));
return bytes_used;
}
int tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo* io)
int tr_peerIo::flushOutgoingProtocolMsgs()
{
size_t byteCount = 0;
size_t byte_count = 0;
/* count up how many bytes are used by non-piece-data messages
at the front of our outbound queue */
for (auto const& [n_bytes, is_piece_data] : io->outbuf_info)
for (auto const& [n_bytes, is_piece_data] : outbuf_info)
{
if (is_piece_data)
{
break;
}
byteCount += n_bytes;
byte_count += n_bytes;
}
return tr_peerIoFlush(io, TR_UP, byteCount);
return flush(TR_UP, byte_count);
}

View file

@ -72,28 +72,37 @@ class tr_peerIo
using Filter = tr_message_stream_encryption::Filter;
public:
tr_peerIo(
tr_session* session_in,
// TODO: 8 constructor args is too many; maybe a builder object?
static tr_peerIo* newOutgoing(
tr_session* session,
tr_bandwidth* parent,
struct tr_address const* addr,
tr_port port,
time_t current_time,
tr_sha1_digest_t const& torrent_hash,
bool is_seed,
bool utp);
static tr_peerIo* newIncoming(
tr_session* session,
tr_bandwidth* parent,
struct tr_address const* addr,
tr_port port,
time_t current_time,
struct tr_peer_socket const socket);
// this is only public for testing purposes.
// production code should use newOutgoing() or newIncoming()
static tr_peerIo* create(
tr_session* session,
tr_bandwidth* parent,
tr_address const* addr,
tr_port port,
time_t current_time,
tr_sha1_digest_t const* torrent_hash,
bool is_incoming,
tr_address const& addr,
tr_port port,
bool is_seed,
time_t current_time,
tr_bandwidth* parent_bandwidth)
: session{ session_in }
, time_created{ current_time }
, bandwidth_{ parent_bandwidth }
, addr_{ addr }
, port_{ port }
, is_seed_{ is_seed }
, is_incoming_{ is_incoming }
{
if (torrent_hash != nullptr)
{
torrent_hash_ = *torrent_hash;
}
}
struct tr_peer_socket const socket);
void clear();
@ -109,6 +118,8 @@ public:
int reconnect();
void setEnabled(tr_direction dir, bool is_enabled);
[[nodiscard]] constexpr tr_address const& address() const noexcept
{
return addr_;
@ -135,6 +146,13 @@ public:
void readBufferAdd(void const* data, size_t n_bytes);
int flushOutgoingProtocolMsgs();
int flush(tr_direction dir, size_t byte_limit);
void writeBytes(void const* writeme, size_t writeme_len, bool is_piece_data);
void writeBuf(struct evbuffer* buf, bool isPieceData);
size_t getWriteBufferSpace(uint64_t now) const;
[[nodiscard]] auto hasBandwidthLeft(tr_direction dir) noexcept
{
return bandwidth_.clamp(dir, 1024) > 0;
@ -276,10 +294,31 @@ public:
return filter_.get() != nullptr;
}
private:
tr_bandwidth bandwidth_;
static void utpInit(struct_utp_context* ctx);
std::unique_ptr<tr_message_stream_encryption::Filter> filter_;
private:
tr_peerIo(
tr_session* session_in,
tr_sha1_digest_t const* torrent_hash,
bool is_incoming,
tr_address const& addr,
tr_port port,
bool is_seed,
time_t current_time,
tr_bandwidth* parent_bandwidth)
: session{ session_in }
, time_created{ current_time }
, bandwidth_{ parent_bandwidth }
, addr_{ addr }
, port_{ port }
, is_seed_{ is_seed }
, is_incoming_{ is_incoming }
{
if (torrent_hash != nullptr)
{
torrent_hash_ = *torrent_hash;
}
}
Filter& filter()
{
@ -291,6 +330,10 @@ private:
return *filter_;
}
tr_bandwidth bandwidth_;
std::unique_ptr<tr_message_stream_encryption::Filter> filter_;
std::optional<tr_sha1_digest_t> torrent_hash_;
tr_address const addr_;
@ -304,44 +347,6 @@ private:
bool fast_extension_supported_ = false;
};
/**
***
**/
void tr_peerIoUtpInit(struct_utp_context* ctx);
// TODO: 8 constructor args is too many; maybe a builder object?
tr_peerIo* tr_peerIoNewOutgoing(
tr_session* session,
tr_bandwidth* parent,
struct tr_address const* addr,
tr_port port,
time_t current_time,
tr_sha1_digest_t const& torrent_hash,
bool is_seed,
bool utp);
tr_peerIo* tr_peerIoNewIncoming(
tr_session* session,
tr_bandwidth* parent,
struct tr_address const* addr,
tr_port port,
time_t current_time,
struct tr_peer_socket const socket);
// this is only public for testing purposes.
// production code should use tr_peerIoNewOutgoing() or tr_peerIoNewIncoming()
tr_peerIo* tr_peerIoNew(
tr_session* session,
tr_bandwidth* parent,
tr_address const* addr,
tr_port port,
time_t current_time,
tr_sha1_digest_t const* torrent_hash,
bool is_incoming,
bool is_seed,
struct tr_peer_socket const socket);
void tr_peerIoRefImpl(char const* file, int line, tr_peerIo* io);
#define tr_peerIoRef(io) tr_peerIoRefImpl(__FILE__, __LINE__, (io))
@ -360,14 +365,6 @@ constexpr bool tr_isPeerIo(tr_peerIo const* io)
***
**/
void tr_peerIoWriteBytes(tr_peerIo* io, void const* writeme, size_t writeme_len, bool is_piece_data);
void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData);
/**
***
**/
void evbuffer_add_uint8(struct evbuffer* outbuf, uint8_t addme);
void evbuffer_add_uint16(struct evbuffer* outbuf, uint16_t hs);
void evbuffer_add_uint32(struct evbuffer* outbuf, uint32_t hl);
@ -377,22 +374,4 @@ void evbuffer_add_hton_16(struct evbuffer* buf, uint16_t val);
void evbuffer_add_hton_32(struct evbuffer* buf, uint32_t val);
void evbuffer_add_hton_64(struct evbuffer* buf, uint64_t val);
/**
***
**/
size_t tr_peerIoGetWriteBufferSpace(tr_peerIo const* io, uint64_t now);
void tr_peerIoBandwidthUsed(tr_peerIo* io, tr_direction direction, size_t byteCount, int isPieceData);
/**
***
**/
void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled);
int tr_peerIoFlush(tr_peerIo* io, tr_direction dir, size_t byteLimit);
int tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo* io);
/* @} */

View file

@ -1263,10 +1263,10 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address const* addr, tr_port
else /* we don't have a connection to them yet... */
{
auto mediator = std::make_shared<tr_handshake_mediator_impl>(*session);
tr_peerIo* const io = tr_peerIoNewIncoming(session, &session->top_bandwidth_, addr, port, tr_time(), socket);
tr_peerIo* const io = tr_peerIo::newIncoming(session, &session->top_bandwidth_, addr, port, tr_time(), socket);
tr_handshake* const handshake = tr_handshakeNew(mediator, io, session->encryptionMode(), on_handshake_done, manager);
tr_peerIoUnref(io); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
tr_peerIoUnref(io); /* balanced by the implicit ref in tr_peerIo::NewIncoming() */
manager->incoming_handshakes.add(*addr, handshake);
}
@ -2836,7 +2836,7 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
tr_logAddTraceSwarm(s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom.readable()));
tr_peerIo* const io = tr_peerIoNewOutgoing(
tr_peerIo* const io = tr_peerIo::newOutgoing(
mgr->session,
&mgr->session->top_bandwidth_,
&atom.addr,
@ -2859,7 +2859,7 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
TR_ASSERT(io->torrentHash());
tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */
tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIo::newOutgoing() */
s->outgoing_handshakes.add(atom.addr, handshake);
}

View file

@ -2212,7 +2212,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
size_t const len = evbuffer_get_length(msgs->outMessages);
/* flush the protocol messages */
logtrace(msgs, fmt::format(FMT_STRING("flushing outMessages... to {:p} (length is {:d})"), fmt::ptr(msgs->io), len));
tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false);
msgs->io->writeBuf(msgs->outMessages, false);
msgs->clientSentAnythingAt = now;
msgs->outMessagesBatchedAt = 0;
msgs->outMessagesBatchPeriod = LowPriorityIntervalSecs;
@ -2224,7 +2224,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
**/
auto piece = int{};
if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
if (msgs->io->getWriteBufferSpace(now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
{
auto ok = bool{ false };
@ -2283,7 +2283,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
*** Data Blocks
**/
if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_))
if (msgs->io->getWriteBufferSpace(now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_))
{
req = msgs->peer_requested_.front();
msgs->peer_requested_.erase(std::begin(msgs->peer_requested_));
@ -2333,7 +2333,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
size_t const n = evbuffer_get_length(out);
logtrace(msgs, fmt::format(FMT_STRING("sending block {:d}:{:d}->{:d}"), req.index, req.offset, req.length));
TR_ASSERT(n == msglen);
tr_peerIoWriteBuf(msgs->io, out, true);
msgs->io->writeBuf(out, true);
bytesWritten += n;
msgs->clientSentAnythingAt = now;
msgs->blocks_sent_to_peer.add(tr_time(), 1);

View file

@ -214,7 +214,7 @@ void tr_utpInit(tr_session* session)
utp_set_callback(ctx, UTP_ON_ACCEPT, &utp_callback);
utp_set_callback(ctx, UTP_SENDTO, &utp_callback);
tr_peerIoUtpInit(ctx);
tr_peerIo::utpInit(ctx);
#ifdef TR_UTP_TRACE

View file

@ -163,7 +163,7 @@ auto createIncomingIo(tr_session* session)
auto const now = tr_time();
auto const peer_socket = tr_peer_socket_tcp_create(sockpair[0]);
auto* const
io = tr_peerIoNewIncoming(session, &session->top_bandwidth_, &DefaultPeerAddr, DefaultPeerPort, now, peer_socket);
io = tr_peerIo::newIncoming(session, &session->top_bandwidth_, &DefaultPeerAddr, DefaultPeerPort, now, peer_socket);
return std::make_pair(io, sockpair[1]);
}
@ -173,7 +173,7 @@ auto createOutgoingIo(tr_session* session, tr_sha1_digest_t const& info_hash)
EXPECT_EQ(0, evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, std::data(sockpair))) << tr_strerror(errno);
auto const now = tr_time();
auto const peer_socket = tr_peer_socket_tcp_create(sockpair[0]);
auto* const io = tr_peerIoNew(
auto* const io = tr_peerIo::create(
session,
&session->top_bandwidth_,
&DefaultPeerAddr,