refactor: group related tr_peerIo methods together (#4380)

* refactor: group related functions together in the source

* refactor: make tr_peerIo::decrypt() private

* refactor: make tr_peerIo::is_seed() private

* refactor: remove unused tr_isPeerIo()
This commit is contained in:
Charles Kerr 2022-12-16 17:21:01 -06:00 committed by GitHub
parent 67c4e4081c
commit 616e79c0a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 560 additions and 570 deletions

View File

@ -43,9 +43,178 @@
#define tr_logAddDebugIo(io, msg) tr_logAddDebug(msg, (io)->display_name())
#define tr_logAddTraceIo(io, msg) tr_logAddTrace(msg, (io)->display_name())
/***
****
***/
tr_peerIo::tr_peerIo(
tr_session* session,
tr_sha1_digest_t const* info_hash,
bool is_incoming,
bool is_seed,
tr_bandwidth* parent_bandwidth)
: bandwidth_{ parent_bandwidth }
, info_hash_{ info_hash != nullptr ? *info_hash : tr_sha1_digest_t{} }
, session_{ session }
, is_seed_{ is_seed }
, is_incoming_{ is_incoming }
{
}
std::shared_ptr<tr_peerIo> tr_peerIo::create(
tr_session* session,
tr_bandwidth* parent,
tr_sha1_digest_t const* info_hash,
bool is_incoming,
bool is_seed)
{
TR_ASSERT(session != nullptr);
auto lock = session->unique_lock();
auto io = std::make_shared<tr_peerIo>(session, info_hash, is_incoming, is_seed, parent);
io->bandwidth().setPeer(io);
tr_logAddTraceIo(io, fmt::format("bandwidth is {}; its parent is {}", fmt::ptr(&io->bandwidth()), fmt::ptr(parent)));
return io;
}
std::shared_ptr<tr_peerIo> tr_peerIo::new_incoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket)
{
TR_ASSERT(session != nullptr);
auto peer_io = tr_peerIo::create(session, parent, nullptr, true, false);
peer_io->set_socket(std::move(socket));
return peer_io;
}
std::shared_ptr<tr_peerIo> tr_peerIo::new_outgoing(
tr_session* session,
tr_bandwidth* parent,
tr_address const& addr,
tr_port port,
tr_sha1_digest_t const& info_hash,
bool is_seed,
bool utp)
{
TR_ASSERT(session != nullptr);
TR_ASSERT(addr.is_valid());
TR_ASSERT(utp || session->allowsTCP());
if (!addr.is_valid_for_peers(port))
{
return {};
}
auto peer_io = tr_peerIo::create(session, parent, &info_hash, false, is_seed);
#ifdef WITH_UTP
if (utp)
{
auto* const sock = utp_create_socket(session->utp_context);
utp_set_userdata(sock, peer_io.get());
peer_io->set_socket(tr_peer_socket{ addr, port, sock });
auto const [ss, sslen] = addr.to_sockaddr(port);
if (utp_connect(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) == 0)
{
return peer_io;
}
}
#endif
if (!peer_io->socket_.is_valid())
{
if (auto sock = tr_netOpenPeerSocket(session, addr, port, is_seed); sock.is_valid())
{
peer_io->set_socket(std::move(sock));
return peer_io;
}
}
return {};
}
tr_peerIo::~tr_peerIo()
{
auto const lock = session_->unique_lock();
clear_callbacks();
tr_logAddTraceIo(this, "in tr_peerIo destructor");
event_disable(EV_READ | EV_WRITE);
close();
}
///
void tr_peerIo::set_socket(tr_peer_socket socket_in)
{
close(); // tear down the previous socket, if any
socket_ = std::move(socket_in);
if (socket_.is_tcp())
{
event_read_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_READ, &tr_peerIo::event_read_cb, this));
event_write_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_WRITE, &tr_peerIo::event_write_cb, this));
}
#ifdef WITH_UTP
else if (socket_.is_utp())
{
utp_set_userdata(socket_.handle.utp, this);
}
#endif
else
{
TR_ASSERT_MSG(false, "unsupported peer socket type");
}
}
void tr_peerIo::close()
{
socket_.close(session_);
event_write_.reset();
event_read_.reset();
}
void tr_peerIo::clear()
{
clear_callbacks();
set_enabled(TR_UP, false);
set_enabled(TR_DOWN, false);
close();
}
bool tr_peerIo::reconnect()
{
TR_ASSERT(!this->is_incoming());
TR_ASSERT(this->session_->allowsTCP());
short int const pending_events = this->pending_events_;
event_disable(EV_READ | EV_WRITE);
close();
auto const [addr, port] = socket_address();
socket_ = tr_netOpenPeerSocket(session_, addr, port, is_seed());
if (!socket_.is_tcp())
{
return false;
}
this->event_read_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_READ, event_read_cb, this));
this->event_write_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_WRITE, event_write_cb, this));
event_enable(pending_events);
return true;
}
///
// Helps us to ignore errors that say "try again later"
// since that's what peer-io does by default anyway.
[[nodiscard]] static auto constexpr canRetryFromError(int error_code) noexcept
{
return error_code == 0 || error_code == EAGAIN || error_code == EINTR || error_code == EINPROGRESS;
}
///
void tr_peerIo::did_write_wrapper(size_t bytes_transferred)
{
@ -81,6 +250,66 @@ void tr_peerIo::did_write_wrapper(size_t bytes_transferred)
}
}
size_t tr_peerIo::try_write(size_t max)
{
static auto constexpr Dir = TR_UP;
if (max == 0)
{
return {};
}
auto& buf = outbuf_;
max = std::min(max, std::size(buf));
max = bandwidth().clamp(Dir, max);
if (max == 0)
{
set_enabled(Dir, false);
return {};
}
tr_error* error = nullptr;
auto const n_written = socket_.try_write(buf, max, &error);
// enable further writes if there's more data to write
set_enabled(Dir, !std::empty(buf) && (error == nullptr || canRetryFromError(error->code)));
if (error != nullptr)
{
if (!canRetryFromError(error->code))
{
tr_logAddTraceIo(
this,
fmt::format("try_write err: wrote:{}, errno:{} ({})", n_written, error->code, error->message));
call_error_callback(*error);
}
tr_error_clear(&error);
}
else if (n_written > 0U)
{
did_write_wrapper(n_written);
}
return n_written;
}
void tr_peerIo::event_write_cb(evutil_socket_t fd, short /*event*/, void* vio)
{
auto* const io = static_cast<tr_peerIo*>(vio);
tr_logAddTraceIo(io, "libevent says this peer socket is ready for writing");
TR_ASSERT(io->socket_.is_tcp());
TR_ASSERT(io->socket_.handle.tcp == fd);
io->pending_events_ &= ~EV_WRITE;
// Write as much as possible. Since the socket is non-blocking,
// write() will return if it can't write any more without blocking
io->try_write(SIZE_MAX);
}
///
void tr_peerIo::can_read_wrapper()
{
// try to consume the input buffer
@ -145,11 +374,45 @@ void tr_peerIo::can_read_wrapper()
}
}
// Helps us to ignore errors that say "try again later"
// since that's what peer-io does by default anyway.
[[nodiscard]] static auto constexpr canRetryFromError(int error_code) noexcept
size_t tr_peerIo::try_read(size_t max)
{
return error_code == 0 || error_code == EAGAIN || error_code == EINTR || error_code == EINPROGRESS;
static auto constexpr Dir = TR_DOWN;
if (max == 0)
{
return {};
}
// Do not write more than the bandwidth allows.
// If there is no bandwidth left available, disable writes.
max = bandwidth().clamp(TR_DOWN, max);
if (max == 0)
{
set_enabled(Dir, false);
return {};
}
auto& buf = inbuf_;
tr_error* error = nullptr;
auto const n_read = socket_.try_read(buf, max, &error);
set_enabled(Dir, error == nullptr || canRetryFromError(error->code));
if (error != nullptr)
{
if (!canRetryFromError(error->code))
{
tr_logAddTraceIo(this, fmt::format("try_read err: n_read:{} errno:{} ({})", n_read, error->code, error->message));
call_error_callback(*error);
}
tr_error_clear(&error);
}
else if (!std::empty(buf))
{
can_read_wrapper();
}
return n_read;
}
void tr_peerIo::event_read_cb(evutil_socket_t fd, short /*event*/, void* vio)
@ -170,261 +433,7 @@ void tr_peerIo::event_read_cb(evutil_socket_t fd, short /*event*/, void* vio)
io->try_read(n_left);
}
void tr_peerIo::event_write_cb(evutil_socket_t fd, short /*event*/, void* vio)
{
auto* const io = static_cast<tr_peerIo*>(vio);
tr_logAddTraceIo(io, "libevent says this peer socket is ready for writing");
TR_ASSERT(io->socket_.is_tcp());
TR_ASSERT(io->socket_.handle.tcp == fd);
io->pending_events_ &= ~EV_WRITE;
// Write as much as possible. Since the socket is non-blocking,
// write() will return if it can't write any more without blocking
io->try_write(SIZE_MAX);
}
/**
***
**/
#ifdef WITH_UTP
/* µTP callbacks */
void tr_peerIo::on_utp_state_change(int state)
{
if (state == UTP_STATE_CONNECT)
{
tr_logAddTraceIo(this, "utp_on_state_change -- changed to connected");
utp_supported_ = true;
}
else if (state == UTP_STATE_WRITABLE)
{
tr_logAddTraceIo(this, "utp_on_state_change -- changed to writable");
if ((pending_events_ & EV_WRITE) != 0)
{
try_write(SIZE_MAX);
}
}
else if (state == UTP_STATE_EOF)
{
tr_error* error = nullptr;
tr_error_set(&error, ENOTCONN, tr_strerror(ENOTCONN));
call_error_callback(*error);
tr_error_clear(&error);
}
else if (state == UTP_STATE_DESTROYING)
{
tr_logAddErrorIo(this, "Impossible state UTP_STATE_DESTROYING");
}
else
{
tr_logAddErrorIo(this, fmt::format(_("Unknown state: {state}"), fmt::arg("state", state)));
}
}
void tr_peerIo::on_utp_error(int errcode)
{
tr_logAddTraceIo(this, fmt::format("utp_on_error -- {}", utp_error_code_names[errcode]));
if (got_error_ != nullptr)
{
tr_error* error = nullptr;
tr_error_set(&error, errcode, utp_error_code_names[errcode]);
call_error_callback(*error);
tr_error_clear(&error);
}
}
#endif /* #ifdef WITH_UTP */
tr_peerIo::tr_peerIo(
tr_session* session,
tr_sha1_digest_t const* info_hash,
bool is_incoming,
bool is_seed,
tr_bandwidth* parent_bandwidth)
: bandwidth_{ parent_bandwidth }
, info_hash_{ info_hash != nullptr ? *info_hash : tr_sha1_digest_t{} }
, session_{ session }
, is_seed_{ is_seed }
, is_incoming_{ is_incoming }
{
}
void tr_peerIo::set_socket(tr_peer_socket socket_in)
{
close(); // tear down the previous socket, if any
socket_ = std::move(socket_in);
if (socket_.is_tcp())
{
event_read_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_READ, &tr_peerIo::event_read_cb, this));
event_write_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_WRITE, &tr_peerIo::event_write_cb, this));
}
#ifdef WITH_UTP
else if (socket_.is_utp())
{
utp_set_userdata(socket_.handle.utp, this);
}
#endif
else
{
TR_ASSERT_MSG(false, "unsupported peer socket type");
}
}
std::shared_ptr<tr_peerIo> tr_peerIo::create(
tr_session* session,
tr_bandwidth* parent,
tr_sha1_digest_t const* info_hash,
bool is_incoming,
bool is_seed)
{
TR_ASSERT(session != nullptr);
auto lock = session->unique_lock();
auto io = std::make_shared<tr_peerIo>(session, info_hash, is_incoming, is_seed, parent);
io->bandwidth().setPeer(io);
tr_logAddTraceIo(io, fmt::format("bandwidth is {}; its parent is {}", fmt::ptr(&io->bandwidth()), fmt::ptr(parent)));
return io;
}
void tr_peerIo::utp_init([[maybe_unused]] struct_utp_context* ctx)
{
#ifdef WITH_UTP
utp_context_set_option(ctx, UTP_RCVBUF, RcvBuf);
// note: all the callback handlers here need to check `userdata` for nullptr
// because libutp can fire callbacks on a socket after utp_close() is called
utp_set_callback(
ctx,
UTP_ON_READ,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
io->inbuf_.add(args->buf, args->len);
io->set_enabled(TR_DOWN, true);
io->can_read_wrapper();
}
return {};
});
utp_set_callback(
ctx,
UTP_GET_READ_BUFFER_SIZE,
[](utp_callback_arguments* args) -> uint64
{
if (auto const* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
return std::size(io->inbuf_);
}
return {};
});
utp_set_callback(
ctx,
UTP_ON_ERROR,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
io->on_utp_error(args->u1.error_code);
}
return {};
});
utp_set_callback(
ctx,
UTP_ON_OVERHEAD_STATISTICS,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
tr_logAddTraceIo(io, fmt::format("{:d} overhead bytes via utp", args->len));
io->bandwidth().notifyBandwidthConsumed(args->u1.send != 0 ? TR_UP : TR_DOWN, args->len, false, tr_time_msec());
}
return {};
});
utp_set_callback(
ctx,
UTP_ON_STATE_CHANGE,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
io->on_utp_state_change(args->u1.state);
}
return {};
});
#endif
}
std::shared_ptr<tr_peerIo> tr_peerIo::new_incoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket)
{
TR_ASSERT(session != nullptr);
auto peer_io = tr_peerIo::create(session, parent, nullptr, true, false);
peer_io->set_socket(std::move(socket));
return peer_io;
}
std::shared_ptr<tr_peerIo> tr_peerIo::new_outgoing(
tr_session* session,
tr_bandwidth* parent,
tr_address const& addr,
tr_port port,
tr_sha1_digest_t const& info_hash,
bool is_seed,
bool utp)
{
TR_ASSERT(session != nullptr);
TR_ASSERT(addr.is_valid());
TR_ASSERT(utp || session->allowsTCP());
if (!addr.is_valid_for_peers(port))
{
return {};
}
auto peer_io = tr_peerIo::create(session, parent, &info_hash, false, is_seed);
#ifdef WITH_UTP
if (utp)
{
auto* const sock = utp_create_socket(session->utp_context);
utp_set_userdata(sock, peer_io.get());
peer_io->set_socket(tr_peer_socket{ addr, port, sock });
auto const [ss, sslen] = addr.to_sockaddr(port);
if (utp_connect(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) == 0)
{
return peer_io;
}
}
#endif
if (!peer_io->socket_.is_valid())
{
if (auto sock = tr_netOpenPeerSocket(session, addr, port, is_seed); sock.is_valid())
{
peer_io->set_socket(std::move(sock));
return peer_io;
}
}
return {};
}
/***
****
***/
///
void tr_peerIo::event_enable(short event)
{
@ -506,97 +515,56 @@ void tr_peerIo::set_enabled(tr_direction dir, bool is_enabled)
}
}
/***
****
***/
void tr_peerIo::close()
size_t tr_peerIo::flush(tr_direction dir, size_t limit)
{
socket_.close(session_);
event_write_.reset();
event_read_.reset();
TR_ASSERT(tr_isDirection(dir));
return dir == TR_DOWN ? try_read(limit) : try_write(limit);
}
tr_peerIo::~tr_peerIo()
size_t tr_peerIo::flush_outgoing_protocol_msgs()
{
auto const lock = session_->unique_lock();
size_t byte_count = 0;
clear_callbacks();
tr_logAddTraceIo(this, "in tr_peerIo destructor");
event_disable(EV_READ | EV_WRITE);
close();
}
void tr_peerIo::set_callbacks(CanRead can_read, DidWrite did_write, GotError got_error, void* user_data)
{
can_read_ = can_read;
did_write_ = did_write;
got_error_ = got_error;
user_data_ = user_data;
}
void tr_peerIo::clear()
{
clear_callbacks();
set_enabled(TR_UP, false);
set_enabled(TR_DOWN, false);
close();
}
bool tr_peerIo::reconnect()
{
TR_ASSERT(!this->is_incoming());
TR_ASSERT(this->session_->allowsTCP());
short int const pending_events = this->pending_events_;
event_disable(EV_READ | EV_WRITE);
close();
auto const [addr, port] = socket_address();
socket_ = tr_netOpenPeerSocket(session_, addr, port, is_seed());
if (!socket_.is_tcp())
/* count up how many bytes are used by non-piece-data messages
at the front of our outbound queue */
for (auto const& [n_bytes, is_piece_data] : outbuf_info_)
{
return false;
if (is_piece_data)
{
break;
}
byte_count += n_bytes;
}
this->event_read_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_READ, event_read_cb, this));
this->event_write_.reset(event_new(session_->eventBase(), socket_.handle.tcp, EV_WRITE, event_write_cb, this));
event_enable(pending_events);
return true;
return flush(TR_UP, byte_count);
}
/**
***
**/
///
static size_t getDesiredOutputBufferSize(tr_peerIo const* io, uint64_t now)
static size_t get_desired_output_buffer_size(tr_peerIo const* io, uint64_t now)
{
/* this is all kind of arbitrary, but what seems to work well is
* being large enough to hold the next 20 seconds' worth of input,
* or a few blocks, whichever is bigger.
* It's okay to tweak this as needed */
auto const current_speed_bytes_per_second = io->bandwidth().getPieceSpeedBytesPerSecond(now, TR_UP);
unsigned int const period = 15U; /* arbitrary */
/* the 3 is arbitrary; the .5 is to leave room for messages */
static auto const ceiling = static_cast<size_t>(tr_block_info::BlockSize * 3.5);
return std::max(ceiling, current_speed_bytes_per_second * period);
// this is all kind of arbitrary, but what seems to work well is
// being large enough to hold the next 20 seconds' worth of input,
// or a few blocks, whichever is bigger. OK to tweak this as needed.
static auto constexpr PeriodSecs = 15U;
// the 3 is an arbitrary number of blocks;
// the .5 is to leave room for protocol messages
static auto constexpr Floor = static_cast<size_t>(tr_block_info::BlockSize * 3.5);
auto const current_speed_bytes_per_second = io->get_piece_speed_bytes_per_second(now, TR_UP);
return std::max(Floor, current_speed_bytes_per_second * PeriodSecs);
}
size_t tr_peerIo::get_write_buffer_space(uint64_t now) const noexcept
{
size_t const desired_len = getDesiredOutputBufferSize(this, now);
size_t const desired_len = get_desired_output_buffer_size(this, now);
size_t const current_len = std::size(outbuf_);
return desired_len > current_len ? desired_len - current_len : 0U;
}
/**
***
**/
void tr_peerIo::write(libtransmission::Buffer& buf, bool is_piece_data)
{
auto [bytes, len] = buf.pullup();
@ -620,9 +588,7 @@ void tr_peerIo::write_bytes(void const* bytes, size_t n_bytes, bool is_piece_dat
outbuf_info_.emplace_back(n_bytes, is_piece_data);
}
/***
****
***/
///
void tr_peerIo::read_bytes(void* bytes, size_t byte_count)
{
@ -662,116 +628,127 @@ void tr_peerIo::read_buffer_drain(size_t byte_count)
}
}
/***
****
***/
/// UTP
size_t tr_peerIo::try_read(size_t max)
#ifdef WITH_UTP
void tr_peerIo::on_utp_state_change(int state)
{
static auto constexpr Dir = TR_DOWN;
if (max == 0)
if (state == UTP_STATE_CONNECT)
{
return {};
tr_logAddTraceIo(this, "utp_on_state_change -- changed to connected");
utp_supported_ = true;
}
// Do not write more than the bandwidth allows.
// If there is no bandwidth left available, disable writes.
max = bandwidth().clamp(TR_DOWN, max);
if (max == 0)
else if (state == UTP_STATE_WRITABLE)
{
set_enabled(Dir, false);
return {};
}
tr_logAddTraceIo(this, "utp_on_state_change -- changed to writable");
auto& buf = inbuf_;
tr_error* error = nullptr;
auto const n_read = socket_.try_read(buf, max, &error);
set_enabled(Dir, error == nullptr || canRetryFromError(error->code));
if (error != nullptr)
{
if (!canRetryFromError(error->code))
if ((pending_events_ & EV_WRITE) != 0)
{
tr_logAddTraceIo(this, fmt::format("try_read err: n_read:{} errno:{} ({})", n_read, error->code, error->message));
call_error_callback(*error);
try_write(SIZE_MAX);
}
}
else if (state == UTP_STATE_EOF)
{
tr_error* error = nullptr;
tr_error_set(&error, ENOTCONN, tr_strerror(ENOTCONN));
call_error_callback(*error);
tr_error_clear(&error);
}
else if (!std::empty(buf))
else if (state == UTP_STATE_DESTROYING)
{
can_read_wrapper();
tr_logAddErrorIo(this, "Impossible state UTP_STATE_DESTROYING");
}
else
{
tr_logAddErrorIo(this, fmt::format(_("Unknown state: {state}"), fmt::arg("state", state)));
}
return n_read;
}
size_t tr_peerIo::try_write(size_t max)
void tr_peerIo::on_utp_error(int errcode)
{
static auto constexpr Dir = TR_UP;
tr_logAddTraceIo(this, fmt::format("utp_on_error -- {}", utp_error_code_names[errcode]));
if (max == 0)
if (got_error_ != nullptr)
{
return {};
}
auto& buf = outbuf_;
max = std::min(max, std::size(buf));
max = bandwidth().clamp(Dir, max);
if (max == 0)
{
set_enabled(Dir, false);
return {};
}
tr_error* error = nullptr;
auto const n_written = socket_.try_write(buf, max, &error);
// enable further writes if there's more data to write
set_enabled(Dir, !std::empty(buf) && (error == nullptr || canRetryFromError(error->code)));
if (error != nullptr)
{
if (!canRetryFromError(error->code))
{
tr_logAddTraceIo(
this,
fmt::format("try_write err: wrote:{}, errno:{} ({})", n_written, error->code, error->message));
call_error_callback(*error);
}
tr_error* error = nullptr;
tr_error_set(&error, errcode, utp_error_code_names[errcode]);
call_error_callback(*error);
tr_error_clear(&error);
}
else if (n_written > 0U)
{
did_write_wrapper(n_written);
}
return n_written;
}
size_t tr_peerIo::flush(tr_direction dir, size_t limit)
#endif /* #ifdef WITH_UTP */
void tr_peerIo::utp_init([[maybe_unused]] struct_utp_context* ctx)
{
TR_ASSERT(tr_isDirection(dir));
#ifdef WITH_UTP
utp_context_set_option(ctx, UTP_RCVBUF, RcvBuf);
return dir == TR_DOWN ? try_read(limit) : try_write(limit);
}
// note: all the callback handlers here need to check `userdata` for nullptr
// because libutp can fire callbacks on a socket after utp_close() is called
size_t tr_peerIo::flush_outgoing_protocol_msgs()
{
size_t byte_count = 0;
/* count up how many bytes are used by non-piece-data messages
at the front of our outbound queue */
for (auto const& [n_bytes, is_piece_data] : outbuf_info_)
{
if (is_piece_data)
utp_set_callback(
ctx,
UTP_ON_READ,
[](utp_callback_arguments* args) -> uint64
{
break;
}
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
io->inbuf_.add(args->buf, args->len);
io->set_enabled(TR_DOWN, true);
io->can_read_wrapper();
}
return {};
});
byte_count += n_bytes;
}
utp_set_callback(
ctx,
UTP_GET_READ_BUFFER_SIZE,
[](utp_callback_arguments* args) -> uint64
{
if (auto const* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
return std::size(io->inbuf_);
}
return {};
});
return flush(TR_UP, byte_count);
utp_set_callback(
ctx,
UTP_ON_ERROR,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
io->on_utp_error(args->u1.error_code);
}
return {};
});
utp_set_callback(
ctx,
UTP_ON_OVERHEAD_STATISTICS,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
tr_logAddTraceIo(io, fmt::format("{:d} overhead bytes via utp", args->len));
io->bandwidth().notifyBandwidthConsumed(args->u1.send != 0 ? TR_UP : TR_DOWN, args->len, false, tr_time_msec());
}
return {};
});
utp_set_callback(
ctx,
UTP_ON_STATE_CHANGE,
[](utp_callback_arguments* args) -> uint64
{
if (auto* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
io->on_utp_state_change(args->u1.state);
}
return {};
});
#endif
}

View File

@ -9,17 +9,11 @@
#error only libtransmission should #include this header.
#endif
/**
***
**/
#include <cstddef> // size_t
#include <cstdint> // uintX_t
#include <ctime>
#include <deque>
#include <memory>
#include <string>
#include <utility> // std::make_pair
#include <utility> // std::pair
#include "transmission.h"
@ -27,11 +21,9 @@
#include "net.h" // tr_address
#include "peer-mse.h"
#include "peer-socket.h"
#include "tr-assert.h"
#include "tr-buffer.h"
#include "utils-ev.h"
struct tr_bandwidth;
struct struct_utp_context;
namespace libtransmission::test
@ -39,11 +31,6 @@ namespace libtransmission::test
class HandshakeTest;
} // namespace libtransmission::test
/**
* @addtogroup networked_io Networked IO
* @{
*/
enum ReadState
{
READ_NOW,
@ -55,12 +42,11 @@ class tr_peerIo final : public std::enable_shared_from_this<tr_peerIo>
{
using DH = tr_message_stream_encryption::DH;
using Filter = tr_message_stream_encryption::Filter;
public:
using CanRead = ReadState (*)(tr_peerIo* io, void* user_data, size_t* setme_piece_byte_count);
using DidWrite = void (*)(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* userData);
using GotError = void (*)(tr_peerIo* io, tr_error const& error, void* userData);
public:
tr_peerIo(
tr_session* session_in,
tr_sha1_digest_t const* info_hash,
@ -81,6 +67,19 @@ public:
static std::shared_ptr<tr_peerIo> new_incoming(tr_session* session, tr_bandwidth* parent, tr_peer_socket socket);
constexpr void set_callbacks(CanRead can_read, DidWrite did_write, GotError got_error, void* user_data)
{
can_read_ = can_read;
did_write_ = did_write;
got_error_ = got_error;
user_data_ = user_data;
}
void clear_callbacks()
{
set_callbacks(nullptr, nullptr, nullptr, nullptr);
}
void set_socket(tr_peer_socket);
[[nodiscard]] constexpr auto is_utp() const noexcept
@ -90,6 +89,25 @@ public:
void clear();
[[nodiscard]] bool reconnect();
void set_enabled(tr_direction dir, bool is_enabled);
///
[[nodiscard]] auto read_buffer_size() const noexcept
{
return std::size(inbuf_);
}
template<typename T>
[[nodiscard]] auto read_buffer_starts_with(T const& t) const noexcept
{
return inbuf_.startsWith(t);
}
void read_buffer_drain(size_t byte_count);
void read_bytes(void* bytes, size_t byte_count);
void read_uint8(uint8_t* setme)
@ -98,11 +116,123 @@ public:
}
void read_uint16(uint16_t* setme);
void read_uint32(uint32_t* setme);
[[nodiscard]] bool reconnect();
///
void set_enabled(tr_direction dir, bool is_enabled);
[[nodiscard]] size_t get_write_buffer_space(uint64_t now) const noexcept;
void write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data);
// Write all the data from `buf`.
// This is a destructive add: `buf` is empty after this call.
void write(libtransmission::Buffer& buf, bool is_piece_data);
size_t flush_outgoing_protocol_msgs();
size_t flush(tr_direction dir, size_t byte_limit);
///
[[nodiscard]] auto has_bandwidth_left(tr_direction dir) const noexcept
{
return bandwidth_.clamp(dir, 1024) > 0;
}
[[nodiscard]] auto get_piece_speed_bytes_per_second(uint64_t now, tr_direction dir) const noexcept
{
return bandwidth_.getPieceSpeedBytesPerSecond(now, dir);
}
///
[[nodiscard]] constexpr auto supports_fext() const noexcept
{
return fast_extension_supported_;
}
constexpr void set_supports_fext(bool flag) noexcept
{
fast_extension_supported_ = flag;
}
///
[[nodiscard]] constexpr auto supports_ltep() const noexcept
{
return extended_protocol_supported_;
}
constexpr void set_supports_ltep(bool flag) noexcept
{
extended_protocol_supported_ = flag;
}
///
[[nodiscard]] constexpr auto supports_dht() const noexcept
{
return dht_supported_;
}
constexpr void set_supports_dht(bool flag) noexcept
{
dht_supported_ = flag;
}
///
[[nodiscard]] constexpr auto const& bandwidth() const noexcept
{
return bandwidth_;
}
[[nodiscard]] constexpr auto& bandwidth() noexcept
{
return bandwidth_;
}
void set_bandwidth(tr_bandwidth* parent)
{
bandwidth_.setParent(parent);
}
///
[[nodiscard]] constexpr auto const& torrent_hash() const noexcept
{
return info_hash_;
}
void set_torrent_hash(tr_sha1_digest_t const& hash) noexcept
{
info_hash_ = hash;
}
///
[[nodiscard]] constexpr auto priority() const noexcept
{
return priority_;
}
constexpr void set_priority(tr_priority_t priority)
{
priority_ = priority;
}
///
[[nodiscard]] constexpr auto supports_utp() const noexcept
{
return utp_supported_;
}
[[nodiscard]] constexpr auto is_incoming() const noexcept
{
return is_incoming_;
}
[[nodiscard]] constexpr auto const& address() const noexcept
{
@ -119,147 +249,25 @@ public:
return socket_.display_name();
}
void read_buffer_drain(size_t byte_count);
[[nodiscard]] auto read_buffer_size() const noexcept
{
return std::size(inbuf_);
}
template<typename T>
[[nodiscard]] auto read_buffer_starts_with(T const& t) const noexcept
{
return inbuf_.startsWith(t);
}
size_t flush_outgoing_protocol_msgs();
size_t flush(tr_direction dir, size_t byte_limit);
void write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data);
// Write all the data from `buf`.
// This is a destructive add: `buf` is empty after this call.
void write(libtransmission::Buffer& buf, bool is_piece_data);
[[nodiscard]] size_t get_write_buffer_space(uint64_t now) const noexcept;
[[nodiscard]] auto has_bandwidth_left(tr_direction dir) noexcept
{
return bandwidth_.clamp(dir, 1024) > 0;
}
[[nodiscard]] auto get_piece_speed_bytes_per_second(uint64_t now, tr_direction dir) noexcept
{
return bandwidth_.getPieceSpeedBytesPerSecond(now, dir);
}
constexpr void set_supports_fext(bool flag) noexcept
{
fast_extension_supported_ = flag;
}
[[nodiscard]] constexpr auto supports_fext() const noexcept
{
return fast_extension_supported_;
}
constexpr void set_supports_ltep(bool flag) noexcept
{
extended_protocol_supported_ = flag;
}
[[nodiscard]] constexpr auto supports_ltep() const noexcept
{
return extended_protocol_supported_;
}
constexpr void set_supports_dht(bool flag) noexcept
{
dht_supported_ = flag;
}
[[nodiscard]] constexpr auto supports_dht() const noexcept
{
return dht_supported_;
}
[[nodiscard]] constexpr auto supports_utp() const noexcept
{
return utp_supported_;
}
[[nodiscard]] constexpr auto is_seed() const noexcept
{
return is_seed_;
}
[[nodiscard]] constexpr auto const& bandwidth() const noexcept
{
return bandwidth_;
}
[[nodiscard]] constexpr auto& bandwidth() noexcept
{
return bandwidth_;
}
void set_parent(tr_bandwidth* parent)
{
bandwidth_.setParent(parent);
}
[[nodiscard]] constexpr auto is_incoming() const noexcept
{
return is_incoming_;
}
void set_torrent_hash(tr_sha1_digest_t hash) noexcept
{
info_hash_ = hash;
}
[[nodiscard]] constexpr auto const& torrent_hash() const noexcept
{
return info_hash_;
}
void set_callbacks(CanRead can_read, DidWrite did_write, GotError got_error, void* user_data);
void clear_callbacks()
{
set_callbacks(nullptr, nullptr, nullptr, nullptr);
}
[[nodiscard]] constexpr auto priority() const noexcept
{
return priority_;
}
///
[[nodiscard]] constexpr auto is_encrypted() const noexcept
{
return filter_.is_active();
}
constexpr void set_priority(tr_priority_t priority)
{
priority_ = priority;
}
void decrypt_init(bool is_incoming, DH const& dh, tr_sha1_digest_t const& info_hash)
{
filter_.decryptInit(is_incoming, dh, info_hash);
}
void decrypt(size_t buflen, void* buf)
{
filter_.decrypt(buflen, buf);
}
void encrypt_init(bool is_incoming, DH const& dh, tr_sha1_digest_t const& info_hash)
{
filter_.encryptInit(is_incoming, dh, info_hash);
}
///
static void utp_init(struct_utp_context* ctx);
private:
@ -267,6 +275,11 @@ private:
friend class libtransmission::test::HandshakeTest;
[[nodiscard]] constexpr auto is_seed() const noexcept
{
return is_seed_;
}
void call_error_callback(tr_error const& error)
{
if (got_error_ != nullptr)
@ -275,6 +288,11 @@ private:
}
}
void decrypt(size_t buflen, void* buf)
{
filter_.decrypt(buflen, buf);
}
void encrypt(size_t buflen, void* buf)
{
filter_.encrypt(buflen, buf);
@ -341,8 +359,3 @@ private:
bool extended_protocol_supported_ = false;
bool fast_extension_supported_ = false;
};
constexpr bool tr_isPeerIo(tr_peerIo const* io)
{
return io != nullptr && io->address().is_valid();
}

View File

@ -1170,7 +1170,7 @@ static bool on_handshake_done(tr_peerMgr* manager, tr_handshake::Result const& r
client = tr_quark_new(std::data(buf));
}
result.io->set_parent(&s->tor->bandwidth_);
result.io->set_bandwidth(&s->tor->bandwidth_);
createBitTorrentPeer(s->tor, result.io, atom, client);
success = true;