refactor: timer pt 2 (#3617)

* feat: add convenience variants of TimerMaker::create()

* refactor: use libtransmission::Timer in peer-mgr

* refactor: use libtransmission::Timer in peer-msgs

* refactor: use libtransmission::Timer in tr-utp

* refactor: use libtransmission::Timer in tr-dht

* refactor: use libtransmission::Timer in port-forwarding

* refactor: use libtransmission::Timer in webseed

* refactor: use libtransmission::Timer in tr-lpd

* refactor: use libtransmission::Timer in rpc-server

* chore: remove unused function tr_timerAdd()

* chore: remove unused function tr_gettimeofday()

* fixup! chore: remove unused function tr_timerAdd()

* fixup! refactor: use libtransmission::Timer in port-forwarding
This commit is contained in:
Charles Kerr 2022-08-11 12:28:37 -05:00 committed by GitHub
parent 7c152c1db4
commit 63eab54fd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 214 additions and 340 deletions

View File

@ -181,7 +181,7 @@ jobs:
run: brew install cmake gettext libdeflate libevent libnatpmp libpsl miniupnpc ninja
- name: Get Dependencies (GTK)
if: ${{ needs.what-to-make.outputs.make-gtk == 'true' }}
run: brew install gtkmm3
run: brew install gtkmm3 libjpeg
- name: Get Dependencies (Qt)
if: ${{ needs.what-to-make.outputs.make-qt == 'true' }}
run: brew install qt@5
@ -349,7 +349,7 @@ jobs:
run: brew install cmake gettext libdeflate libevent libnatpmp libpsl miniupnpc ninja
- name: Get Dependencies (GTK)
if: ${{ needs.what-to-make.outputs.make-gtk == 'true' }}
run: brew install gtkmm3
run: brew install gtkmm3 libjpeg
- name: Get Dependencies (Qt)
if: ${{ needs.what-to-make.outputs.make-qt == 'true' }}
run: brew install qt@5

View File

@ -5,6 +5,7 @@
#include <algorithm>
#include <cerrno> /* error codes ERANGE, ... */
#include <chrono>
#include <climits> /* INT_MAX */
#include <cmath>
#include <cstdint>
@ -18,8 +19,6 @@
#include <utility>
#include <vector>
#include <event2/event.h>
#include <fmt/format.h>
#define LIBTRANSMISSION_PEER_MODULE
@ -48,6 +47,8 @@
#include "utils.h"
#include "webseed.h"
using namespace std::literals;
// use for bitwise operations w/peer_atom.flags2
static auto constexpr MyflagBanned = int{ 1 };
@ -560,27 +561,17 @@ private:
bool is_endgame_ = false;
};
struct EventDeleter
{
void operator()(struct event* ev) const
{
event_free(ev);
}
};
using UniqueTimer = std::unique_ptr<struct event, EventDeleter>;
struct tr_peerMgr
{
explicit tr_peerMgr(tr_session* session_in)
: session{ session_in }
, bandwidth_timer_{ evtimer_new(session->eventBase(), bandwidthPulseMarshall, this) }
, rechoke_timer_{ evtimer_new(session->eventBase(), rechokePulseMarshall, this) }
, refill_upkeep_timer_{ evtimer_new(session->eventBase(), refillUpkeepMarshall, this) }
, bandwidth_timer_{ session->timerMaker().create([this]() { bandwidthPulse(); }) }
, rechoke_timer_{ session->timerMaker().create([this]() { rechokePulseMarshall(); }) }
, refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) }
{
tr_timerAddMsec(*bandwidth_timer_, BandwidthPeriodMsec);
tr_timerAddMsec(*rechoke_timer_, RechokePeriodMsec);
tr_timerAddMsec(*refill_upkeep_timer_, RefillUpkeepPeriodMsec);
bandwidth_timer_->startRepeating(BandwidthPeriod);
rechoke_timer_->startSingleShot(RechokePeriod);
refill_upkeep_timer_->startRepeating(RefillUpkeepPeriod);
}
tr_peerMgr(tr_peerMgr&&) = delete;
@ -601,7 +592,7 @@ struct tr_peerMgr
void rechokeSoon() noexcept
{
tr_timerAddMsec(*rechoke_timer_, 100);
rechoke_timer_->startSingleShot(100ms);
}
void bandwidthPulse();
@ -614,34 +605,19 @@ struct tr_peerMgr
Handshakes incoming_handshakes;
private:
static void bandwidthPulseMarshall(evutil_socket_t, short /*reason*/, void* vmgr)
void rechokePulseMarshall()
{
auto* const self = static_cast<tr_peerMgr*>(vmgr);
self->bandwidthPulse();
tr_timerAddMsec(*self->bandwidth_timer_, BandwidthPeriodMsec);
rechokePulse();
rechoke_timer_->startSingleShot(RechokePeriod);
}
static void rechokePulseMarshall(evutil_socket_t, short /*reason*/, void* vmgr)
{
auto* const self = static_cast<tr_peerMgr*>(vmgr);
self->rechokePulse();
tr_timerAddMsec(*self->rechoke_timer_, RechokePeriodMsec);
}
std::unique_ptr<libtransmission::Timer> const bandwidth_timer_;
std::unique_ptr<libtransmission::Timer> const rechoke_timer_;
std::unique_ptr<libtransmission::Timer> const refill_upkeep_timer_;
static void refillUpkeepMarshall(evutil_socket_t, short /*reason*/, void* vmgr)
{
auto* const self = static_cast<tr_peerMgr*>(vmgr);
self->refillUpkeep();
tr_timerAddMsec(*self->refill_upkeep_timer_, RefillUpkeepPeriodMsec);
}
UniqueTimer const bandwidth_timer_;
UniqueTimer const rechoke_timer_;
UniqueTimer const refill_upkeep_timer_;
static auto constexpr BandwidthPeriodMsec = int{ 500 };
static auto constexpr RechokePeriodMsec = int{ 10 * 1000 };
static auto constexpr RefillUpkeepPeriodMsec = int{ 10 * 1000 };
static auto constexpr BandwidthPeriod = 500ms;
static auto constexpr RechokePeriod = 10s;
static auto constexpr RefillUpkeepPeriod = 10s;
// how frequently to decide which peers live and die
static auto constexpr ReconnectPeriodMsec = int{ 500 };
@ -2579,8 +2555,9 @@ void tr_peerMgr::bandwidthPulse()
pumpAllPeers(this);
/* allocate bandwidth to the peers */
session->top_bandwidth_.allocate(TR_UP, BandwidthPeriodMsec);
session->top_bandwidth_.allocate(TR_DOWN, BandwidthPeriodMsec);
auto const msec = std::chrono::duration_cast<std::chrono::milliseconds>(BandwidthPeriod).count();
session->top_bandwidth_.allocate(TR_UP, msec);
session->top_bandwidth_.allocate(TR_DOWN, msec);
/* torrent upkeep */
for (auto* const tor : session->torrents())

View File

@ -4,7 +4,9 @@
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <array>
#include <cerrno>
#include <chrono>
#include <cstring>
#include <ctime>
#include <iterator>
@ -15,7 +17,6 @@
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <fmt/format.h>
@ -43,6 +44,8 @@
#define EBADMSG EINVAL
#endif
using namespace std::literals;
/**
***
**/
@ -113,9 +116,6 @@ auto constexpr Reject = int{ 2 };
} // namespace MetadataMsgType
// seconds between sendPex() calls
static auto constexpr PexIntervalSecs = int{ 90 };
static auto constexpr MinChokePeriodSec = int{ 10 };
// idle seconds before we send a keepalive
@ -203,7 +203,6 @@ static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs);
static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs);
static void gotError(tr_peerIo* io, short what, void* vmsgs);
static void peerPulse(void* vmsgs);
static void pexPulse(evutil_socket_t fd, short what, void* vmsgs);
static void protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req);
static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke);
static void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index);
@ -212,17 +211,6 @@ static void sendInterest(tr_peerMsgsImpl* msgs, bool b);
static void sendLtepHandshake(tr_peerMsgsImpl* msgs);
static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs);
static void updateDesiredRequestCount(tr_peerMsgsImpl* msgs);
//zzz
struct EventDeleter
{
void operator()(struct event* ev) const
{
event_free(ev);
}
};
using UniqueTimer = std::unique_ptr<struct event, EventDeleter>;
#define myLogMacro(msgs, level, text) \
do \
@ -270,8 +258,8 @@ public:
{
if (torrent->allowsPex())
{
pex_timer.reset(evtimer_new(torrent->session->eventBase(), pexPulse, this));
tr_timerAdd(*pex_timer, PexIntervalSecs, 0);
pex_timer_ = torrent->session->timerMaker().create([this]() { sendPex(); });
pex_timer_->startRepeating(SendPexInterval);
}
if (io->supportsUTP())
@ -671,6 +659,8 @@ public:
return RequestLimit{ max_blocks, max_blocks };
}
void sendPex();
private:
[[nodiscard]] size_t maxAvailableReqs() const
{
@ -856,17 +846,20 @@ public:
supplied a reqq argument, it's stored here. */
std::optional<size_t> reqq;
UniqueTimer pex_timer;
std::unique_ptr<libtransmission::Timer> pex_timer_;
tr_bitfield have_;
private:
bool is_active_[2] = { false, false };
std::array<bool, 2> is_active_ = { false, false };
tr_peer_callback const callback_;
void* const callbackData_;
mutable std::optional<float> percent_done_;
// seconds between periodic sendPex() calls
static auto constexpr SendPexInterval = 90s;
};
tr_peerMsgs* tr_peerMsgsNew(tr_torrent* torrent, peer_atom* atom, tr_peerIo* io, tr_peer_callback callback, void* callback_data)
@ -1461,8 +1454,6 @@ static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer*
}
}
static void sendPex(tr_peerMsgsImpl* msgs);
static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
TR_ASSERT(msglen > 0);
@ -1479,7 +1470,7 @@ static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* i
if (msgs->io->supportsLTEP())
{
sendLtepHandshake(msgs);
sendPex(msgs);
msgs->sendPex();
}
}
else if (ltep_msgid == UT_PEX_ID)
@ -2453,31 +2444,31 @@ static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs)
}
}
static void sendPex(tr_peerMsgsImpl* msgs)
void tr_peerMsgsImpl::sendPex()
{
// only send pex if both the torrent and peer support it
if (!msgs->peerSupportsPex || !msgs->torrent->allowsPex())
if (!this->peerSupportsPex || !this->torrent->allowsPex())
{
return;
}
auto& old = msgs->pex;
auto pex = tr_peerMgrGetPeers(msgs->torrent, TR_AF_INET, TR_PEERS_CONNECTED, MaxPexPeerCount);
auto& old4 = this->pex;
auto new4 = tr_peerMgrGetPeers(this->torrent, TR_AF_INET, TR_PEERS_CONNECTED, MaxPexPeerCount);
auto added = std::vector<tr_pex>{};
added.reserve(std::size(pex));
std::set_difference(std::begin(pex), std::end(pex), std::begin(old), std::end(old), std::back_inserter(added));
added.reserve(std::size(new4));
std::set_difference(std::begin(new4), std::end(new4), std::begin(old4), std::end(old4), std::back_inserter(added));
auto dropped = std::vector<tr_pex>{};
dropped.reserve(std::size(old));
std::set_difference(std::begin(old), std::end(old), std::begin(pex), std::end(pex), std::back_inserter(dropped));
dropped.reserve(std::size(old4));
std::set_difference(std::begin(old4), std::end(old4), std::begin(new4), std::end(new4), std::back_inserter(dropped));
auto& old6 = msgs->pex6;
auto pex6 = tr_peerMgrGetPeers(msgs->torrent, TR_AF_INET6, TR_PEERS_CONNECTED, MaxPexPeerCount);
auto& old6 = this->pex6;
auto new6 = tr_peerMgrGetPeers(this->torrent, TR_AF_INET6, TR_PEERS_CONNECTED, MaxPexPeerCount);
auto added6 = std::vector<tr_pex>{};
added6.reserve(std::size(pex6));
std::set_difference(std::begin(pex6), std::end(pex6), std::begin(old6), std::end(old6), std::back_inserter(added6));
added6.reserve(std::size(new6));
std::set_difference(std::begin(new6), std::end(new6), std::begin(old6), std::end(old6), std::back_inserter(added6));
auto dropped6 = std::vector<tr_pex>{};
dropped6.reserve(std::size(old6));
std::set_difference(std::begin(old6), std::end(old6), std::begin(pex6), std::end(pex6), std::back_inserter(dropped6));
std::set_difference(std::begin(old6), std::end(old6), std::begin(new6), std::end(new6), std::back_inserter(dropped6));
// Some peers give us error messages if we send
// more than this many peers in a single pex message.
@ -2490,13 +2481,13 @@ static void sendPex(tr_peerMsgsImpl* msgs)
dropped6.resize(std::min(std::size(dropped6), MaxPexDropped));
logtrace(
msgs,
this,
fmt::format(
FMT_STRING("pex: old peer count {:d}+{:d}, new peer count {:d}+{:d}, added {:d}+{:d}, dropped {:d}+{:d}"),
std::size(old),
std::size(old4),
std::size(old6),
std::size(pex),
std::size(pex6),
std::size(new4),
std::size(new6),
std::size(added),
std::size(added6),
std::size(dropped),
@ -2508,11 +2499,11 @@ static void sendPex(tr_peerMsgsImpl* msgs)
return;
}
evbuffer* const out = msgs->outMessages;
evbuffer* const out = this->outMessages;
// update msgs
std::swap(old, pex);
std::swap(old6, pex6);
std::swap(old4, new4);
std::swap(old6, new6);
// build the pex payload
auto val = tr_variant{};
@ -2622,22 +2613,12 @@ static void sendPex(tr_peerMsgsImpl* msgs)
auto* const payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
evbuffer_add_uint8(out, BtPeerMsgs::Ltep);
evbuffer_add_uint8(out, msgs->ut_pex_id);
evbuffer_add_uint8(out, this->ut_pex_id);
evbuffer_add_buffer(out, payload);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
logtrace(msgs, fmt::format(FMT_STRING("sending a pex message; outMessage size is now {:d}"), evbuffer_get_length(out)));
msgs->dbgOutMessageLen();
this->pokeBatchPeriod(HighPriorityIntervalSecs);
logtrace(this, fmt::format(FMT_STRING("sending a pex message; outMessage size is now {:d}"), evbuffer_get_length(out)));
this->dbgOutMessageLen();
evbuffer_free(payload);
tr_variantFree(&val);
}
static void pexPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmsgs)
{
auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);
sendPex(msgs);
TR_ASSERT(msgs->pex_timer);
tr_timerAdd(*msgs->pex_timer, PexIntervalSecs, 0);
}

View File

@ -4,10 +4,7 @@
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <sys/types.h>
#include <event2/event.h>
#include <chrono>
#include <fmt/core.h>
@ -21,22 +18,30 @@
#include "torrent.h"
#include "tr-assert.h"
#include "upnp.h"
#include "utils.h"
#include "utils.h" // for _()
using namespace std::literals;
struct tr_shared
{
bool isEnabled;
bool isShuttingDown;
bool doPortCheck;
tr_shared(tr_session* session_in)
: session{ session_in }
{
}
tr_port_forwarding natpmpStatus;
tr_port_forwarding upnpStatus;
tr_session* const session = nullptr;
tr_upnp* upnp;
tr_natpmp* natpmp;
tr_session* session;
bool isEnabled = false;
bool isShuttingDown = false;
bool doPortCheck = false;
struct event* timer;
tr_port_forwarding natpmpStatus = TR_PORT_UNMAPPED;
tr_port_forwarding upnpStatus = TR_PORT_UNMAPPED;
tr_upnp* upnp = nullptr;
tr_natpmp* natpmp = nullptr;
std::unique_ptr<libtransmission::Timer> timer;
};
/***
@ -109,51 +114,56 @@ static void natPulse(tr_shared* s, bool do_check)
}
}
static void set_evtimer_from_status(tr_shared* s)
static void restartTimer(tr_shared* s)
{
int sec = 0;
int msec = 0;
auto& timer = s->timer;
if (!timer)
{
return;
}
/* when to wake up again */
// when to wake up again
switch (tr_sharedTraversalStatus(s))
{
case TR_PORT_MAPPED:
/* if we're mapped, everything is fine... check back at renew_time
* to renew the port forwarding if it's expired */
// if we're mapped, everything is fine... check back at `renew_time`
// to renew the port forwarding if it's expired
s->doPortCheck = true;
sec = std::max(0, int(s->natpmp->renew_time - tr_time()));
if (auto const now = tr_time(); s->natpmp->renew_time > now)
{
timer->startSingleShot(std::chrono::seconds{ s->natpmp->renew_time - now });
}
else // ???
{
timer->startSingleShot(1min);
}
break;
case TR_PORT_ERROR:
/* some kind of an error. wait 60 seconds and retry */
sec = 60;
// some kind of an error. wait a minute and retry
timer->startSingleShot(1min);
break;
default:
/* in progress. pulse frequently. */
msec = 333000;
// in progress. pulse frequently.
timer->startSingleShot(333ms);
break;
}
if (s->timer != nullptr)
{
tr_timerAdd(*s->timer, sec, msec);
}
}
static void onTimer(evutil_socket_t /*fd*/, short /*what*/, void* vshared)
static void onTimer(void* vshared)
{
auto* s = static_cast<tr_shared*>(vshared);
TR_ASSERT(s != nullptr);
TR_ASSERT(s->timer != nullptr);
TR_ASSERT(s->timer);
/* do something */
natPulse(s, s->doPortCheck);
s->doPortCheck = false;
/* set up the timer for the next pulse */
set_evtimer_from_status(s);
restartTimer(s);
}
/***
@ -162,34 +172,12 @@ static void onTimer(evutil_socket_t /*fd*/, short /*what*/, void* vshared)
tr_shared* tr_sharedInit(tr_session* session)
{
auto* const s = tr_new0(tr_shared, 1);
s->session = session;
s->isEnabled = false;
s->upnpStatus = TR_PORT_UNMAPPED;
s->natpmpStatus = TR_PORT_UNMAPPED;
#if 0
if (isEnabled)
{
s->timer = tr_new0(struct event, 1);
evtimer_set(s->timer, onTimer, s);
tr_timerAdd(s->timer, 0, 333000);
}
#endif
return s;
return new tr_shared{ session };
}
static void stop_timer(tr_shared* s)
{
if (s->timer != nullptr)
{
event_free(s->timer);
s->timer = nullptr;
}
s->timer.reset();
}
static void stop_forwarding(tr_shared* s)
@ -210,18 +198,18 @@ static void stop_forwarding(tr_shared* s)
void tr_sharedClose(tr_session* session)
{
tr_shared* s = session->shared;
tr_shared* shared = session->shared;
s->isShuttingDown = true;
stop_forwarding(s);
s->session->shared = nullptr;
tr_free(s);
shared->isShuttingDown = true;
stop_forwarding(shared);
shared->session->shared = nullptr;
delete shared;
}
static void start_timer(tr_shared* s)
{
s->timer = evtimer_new(s->session->eventBase(), onTimer, s);
set_evtimer_from_status(s);
s->timer = s->session->timerMaker().create(onTimer, s);
restartTimer(s);
}
void tr_sharedTraversalEnable(tr_shared* s, bool is_enable)

View File

@ -6,6 +6,7 @@
#include <algorithm>
#include <array>
#include <cerrno>
#include <chrono>
#include <cstring> /* memcpy */
#include <ctime>
#include <string>
@ -19,7 +20,6 @@
#endif
#include <event2/buffer.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/http_struct.h> /* TODO: eventually remove this */
#include <event2/listener.h>
@ -549,9 +549,8 @@ static void handle_request(struct evhttp_request* req, void* arg)
}
static auto constexpr ServerStartRetryCount = int{ 10 };
static auto constexpr ServerStartRetryDelayIncrement = int{ 5 };
static auto constexpr ServerStartRetryDelayStep = int{ 3 };
static auto constexpr ServerStartRetryMaxDelay = int{ 60 };
static auto constexpr ServerStartRetryDelayIncrement = 5s;
static auto constexpr ServerStartRetryMaxDelay = 60s;
static char const* tr_rpc_address_to_string(tr_rpc_address const& addr, char* buf, size_t buflen)
{
@ -664,35 +663,22 @@ static bool bindUnixSocket(
static void startServer(tr_rpc_server* server);
static void rpc_server_on_start_retry(evutil_socket_t /*fd*/, short /*type*/, void* context)
static auto rpc_server_start_retry(tr_rpc_server* server)
{
startServer(static_cast<tr_rpc_server*>(context));
}
static int rpc_server_start_retry(tr_rpc_server* server)
{
int retry_delay = (server->start_retry_counter / ServerStartRetryDelayStep + 1) * ServerStartRetryDelayIncrement;
retry_delay = std::min(retry_delay, int{ ServerStartRetryMaxDelay });
if (server->start_retry_timer == nullptr)
if (!server->start_retry_timer)
{
server->start_retry_timer = evtimer_new(server->session->eventBase(), rpc_server_on_start_retry, server);
server->start_retry_timer = server->session->timerMaker().create([server]() { startServer(server); });
}
tr_timerAdd(*server->start_retry_timer, retry_delay, 0);
++server->start_retry_counter;
return retry_delay;
auto const interval = std::min(ServerStartRetryDelayIncrement * server->start_retry_counter, ServerStartRetryMaxDelay);
server->start_retry_timer->startSingleShot(std::chrono::duration_cast<std::chrono::milliseconds>(interval));
return interval;
}
static void rpc_server_start_retry_cancel(tr_rpc_server* server)
{
if (server->start_retry_timer != nullptr)
{
event_free(server->start_retry_timer);
server->start_retry_timer = nullptr;
}
server->start_retry_timer.reset();
server->start_retry_counter = 0;
}
@ -723,9 +709,9 @@ static void startServer(tr_rpc_server* server)
if (server->start_retry_counter < ServerStartRetryCount)
{
int const retry_delay = rpc_server_start_retry(server);
tr_logAddDebug(fmt::format("Couldn't bind to {}, retrying in {} seconds", addr_port_str, retry_delay));
auto const retry_delay = rpc_server_start_retry(server);
auto const seconds = std::chrono::duration_cast<std::chrono::seconds>(retry_delay).count();
tr_logAddDebug(fmt::format("Couldn't bind to {}, retrying in {} seconds", addr_port_str, seconds));
return;
}

View File

@ -17,6 +17,7 @@
#include "transmission.h"
#include "net.h"
#include "timer.h"
struct event;
struct evhttp;
@ -130,7 +131,7 @@ public:
std::unique_ptr<struct tr_rpc_address> bindAddress;
struct event* start_retry_timer = nullptr;
std::unique_ptr<libtransmission::Timer> start_retry_timer;
struct evhttp* httpd = nullptr;
tr_session* const session;

View File

@ -678,17 +678,15 @@ static void onNowTimer(void* vsession)
}
}
// set the timer to kick again right after the next second
auto const tv = tr_gettimeofday();
int constexpr Min = 100;
int constexpr Max = 999999;
auto const next_second_occurs_in_usec = std::chrono::microseconds{ std::clamp(int(1000000 - tv.tv_usec), Min, Max) };
auto next_second_occurs_in_msec = std::chrono::duration_cast<std::chrono::milliseconds>(next_second_occurs_in_usec);
if (next_second_occurs_in_msec < 100ms)
// set the timer to kick again right after (10ms after) the next second
auto const now = std::chrono::system_clock::now();
auto const target_time = std::chrono::time_point_cast<std::chrono::seconds>(now) + 1s + 10ms;
auto target_interval = target_time - now;
if (target_interval < 100ms)
{
next_second_occurs_in_msec += 1s;
target_interval += 1s;
}
session->now_timer_->setInterval(next_second_occurs_in_msec);
session->now_timer_->setInterval(std::chrono::duration_cast<std::chrono::milliseconds>(target_interval));
}
static void loadBlocklists(tr_session* session);

View File

@ -518,7 +518,7 @@ public:
struct event* udp6_event = nullptr;
struct struct_utp_context* utp_context = nullptr;
struct event* utp_timer = nullptr;
std::unique_ptr<libtransmission::Timer> utp_timer;
/* The open port on the local machine for incoming peer requests */
tr_port private_peer_port;

View File

@ -62,6 +62,20 @@ class TimerMaker
public:
virtual ~TimerMaker() = default;
[[nodiscard]] virtual std::unique_ptr<Timer> create() = 0;
[[nodiscard]] virtual std::unique_ptr<Timer> create(std::function<void()> callback)
{
auto timer = create();
timer->setCallback(std::move(callback));
return timer;
}
[[nodiscard]] virtual std::unique_ptr<Timer> create(Timer::CStyleCallback callback, void* user_data)
{
auto timer = create();
timer->setCallback(callback, user_data);
return timer;
}
};
} // namespace libtransmission

View File

@ -4,6 +4,7 @@
#include <algorithm>
#include <cerrno>
#include <chrono>
#include <csignal> /* sig_atomic_t */
#include <cstdio>
#include <cstring> /* memcpy(), memset() */
@ -20,7 +21,7 @@
#undef gai_strerror
#define gai_strerror gai_strerrorA
#else
#include <sys/time.h>
#include <sys/time.h> // for `struct timezone`
#include <sys/types.h>
#include <sys/socket.h> /* socket(), bind() */
#include <netdb.h>
@ -29,8 +30,6 @@
#include <dht/dht.h>
#include <event2/event.h>
#include <fmt/format.h>
#include "transmission.h"
@ -51,12 +50,10 @@
using namespace std::literals;
static struct event* dht_timer = nullptr;
static std::unique_ptr<libtransmission::Timer> dht_timer;
static unsigned char myid[20];
static tr_session* session_ = nullptr;
static void timer_callback(evutil_socket_t s, short type, void* session);
static bool bootstrap_done(tr_session* session, int af)
{
if (af == 0)
@ -325,8 +322,12 @@ int tr_dhtInit(tr_session* ss)
std::thread(dht_bootstrap, session_, nodes, nodes6).detach();
dht_timer = evtimer_new(session_->eventBase(), timer_callback, session_);
tr_timerAdd(*dht_timer, 0, tr_rand_int_weak(1000000));
dht_timer = session_->timerMaker().create([]() { tr_dhtCallback(nullptr, 0, nullptr, 0, session_); });
auto const random_percent = tr_rand_int_weak(1000) / 1000.0;
static auto constexpr MinInterval = 10ms;
static auto constexpr MaxInterval = 1s;
auto interval = MinInterval + random_percent * (MaxInterval - MinInterval);
dht_timer->startSingleShot(std::chrono::duration_cast<std::chrono::milliseconds>(interval));
tr_logAddDebug("DHT initialized");
@ -342,11 +343,7 @@ void tr_dhtUninit(tr_session* ss)
tr_logAddTrace("Uninitializing DHT");
if (dht_timer != nullptr)
{
event_free(dht_timer);
dht_timer = nullptr;
}
dht_timer.reset();
/* Since we only save known good nodes, avoid erasing older data if we
don't know enough nodes. */
@ -715,12 +712,11 @@ void tr_dhtCallback(unsigned char* buf, int buflen, struct sockaddr* from, sockl
/* Being slightly late is fine,
and has the added benefit of adding some jitter. */
tr_timerAdd(*dht_timer, (int)tosleep, tr_rand_int_weak(1000000));
}
static void timer_callback(evutil_socket_t /*s*/, short /*type*/, void* session)
{
tr_dhtCallback(nullptr, 0, nullptr, 0, session);
auto const random_percent = tr_rand_int_weak(1000) / 1000.0;
auto const min_interval = std::chrono::seconds{ tosleep };
auto const max_interval = std::chrono::seconds{ tosleep + 1 };
auto const interval = min_interval + random_percent * (max_interval - min_interval);
dht_timer->startSingleShot(std::chrono::duration_cast<std::chrono::milliseconds>(interval));
}
/* This function should return true when a node is blacklisted. We do
@ -759,10 +755,19 @@ int dht_sendto(int sockfd, void const* buf, int len, int flags, struct sockaddr
#if defined(_WIN32) && !defined(__MINGW32__)
extern "C" int dht_gettimeofday(struct timeval* tv, [[maybe_unused]] timezone* tz)
/***
****
***/
extern "C" int dht_gettimeofday(struct timeval* tv, [[maybe_unused]] struct timezone* tz)
{
TR_ASSERT(tz == nullptr);
*tv = tr_gettimeofday();
auto const d = std::chrono::system_clock::now().time_since_epoch();
auto const s = std::chrono::duration_cast<std::chrono::seconds>(d);
tv->tv_sec = s.count();
tv->tv_usec = std::chrono::duration_cast<std::chrono::microseconds>(d - s).count();
return 0;
}

View File

@ -4,6 +4,7 @@
#include <algorithm>
#include <cerrno>
#include <chrono>
#include <csignal> /* sig_atomic_t */
#include <cstring> /* strlen(), strncpy(), strstr(), memset() */
#include <type_traits>
@ -49,9 +50,9 @@ static auto constexpr SIZEOF_HASH_STRING = TR_SHA1_DIGEST_STRLEN;
static void event_callback(evutil_socket_t, short /*type*/, void* /*unused*/);
static auto constexpr UpkeepIntervalSecs = int{ 5 };
static auto constexpr UpkeepInterval = 5s;
static struct event* upkeep_timer = nullptr;
static std::unique_ptr<libtransmission::Timer> upkeep_timer;
static tr_socket_t lpd_socket; /**<separate multicast receive socket */
static tr_socket_t lpd_socket2; /**<and multicast send socket */
@ -229,7 +230,7 @@ static bool lpd_extractParam(char const* const str, char const* const name, int
/**
* @} */
static void on_upkeep_timer(evutil_socket_t, short /*unused*/, void* /*unused*/);
static void on_upkeep_timer();
/**
* @brief Initializes Local Peer Discovery for this node
@ -357,8 +358,8 @@ int tr_lpdInit(tr_session* ss, tr_address* /*tr_addr*/)
lpd_event = event_new(ss->eventBase(), lpd_socket, EV_READ | EV_PERSIST, event_callback, nullptr);
event_add(lpd_event, nullptr);
upkeep_timer = evtimer_new(ss->eventBase(), on_upkeep_timer, ss);
tr_timerAdd(*upkeep_timer, UpkeepIntervalSecs, 0);
upkeep_timer = ss->timerMaker().create([]() { on_upkeep_timer(); });
upkeep_timer->startRepeating(UpkeepInterval);
tr_logAddDebug("Local Peer Discovery initialised");
@ -393,8 +394,7 @@ void tr_lpdUninit(tr_session* ss)
event_free(lpd_event);
lpd_event = nullptr;
evtimer_del(upkeep_timer);
upkeep_timer = nullptr;
upkeep_timer.reset();
/* just shut down, we won't remember any former nodes */
evutil_closesocket(lpd_socket);
@ -616,11 +616,11 @@ static int tr_lpdAnnounceMore(time_t const now, int const interval)
return announcesSent;
}
static void on_upkeep_timer(evutil_socket_t /*s*/, short /*type*/, void* /*user_data*/)
static void on_upkeep_timer()
{
time_t const now = tr_time();
tr_lpdAnnounceMore(now, UpkeepIntervalSecs);
tr_timerAdd(*upkeep_timer, UpkeepIntervalSecs, 0);
auto const seconds = std::chrono::duration_cast<std::chrono::seconds>(UpkeepInterval).count();
tr_lpdAnnounceMore(now, seconds);
}
/**

View File

@ -3,8 +3,7 @@
// License text can be found in the licenses/ folder.
#include <cstdint>
#include <event2/event.h>
#include <chrono>
#include <fmt/core.h>
#include <fmt/format.h>
@ -23,6 +22,8 @@
#include "tr-utp.h"
#include "utils.h"
using namespace std::literals;
#ifndef WITH_UTP
void utp_close(UTPSocket* socket)
@ -72,7 +73,7 @@ void tr_utpClose(tr_session* /*session*/)
#else
/* Greg says 50ms works for them. */
static auto constexpr UtpIntervalUs = int{ 500000 };
static auto constexpr UtpInterval = 50ms;
static void utp_on_accept(tr_session* const session, UTPSocket* const s)
{
@ -155,15 +156,17 @@ static uint64 utp_callback(utp_callback_arguments* args)
return 0;
}
static void reset_timer(tr_session* ss)
static void reset_timer(tr_session* session)
{
int sec = 0;
int usec = 0;
auto interval = std::chrono::milliseconds{};
auto const random_percent = tr_rand_int_weak(1000) / 1000.0;
if (tr_sessionIsUTPEnabled(ss))
if (tr_sessionIsUTPEnabled(session))
{
sec = 0;
usec = UtpIntervalUs / 2 + tr_rand_int_weak(UtpIntervalUs);
static auto constexpr MinInterval = UtpInterval * 0.5;
static auto constexpr MaxInterval = UtpInterval * 1.5;
auto const target = MinInterval + random_percent * (MaxInterval - MinInterval);
interval = std::chrono::duration_cast<std::chrono::milliseconds>(target);
}
else
{
@ -172,14 +175,16 @@ static void reset_timer(tr_session* ss)
gracefully and so on. However, since we're not particularly
interested in that happening in a timely manner, we might as
well use a large timeout. */
sec = 2;
usec = tr_rand_int_weak(1000000);
static auto constexpr MinInterval = 2s;
static auto constexpr MaxInterval = 3s;
auto const target = MinInterval + random_percent * (MaxInterval - MinInterval);
interval = std::chrono::duration_cast<std::chrono::milliseconds>(target);
}
tr_timerAdd(*ss->utp_timer, sec, usec);
session->utp_timer->startSingleShot(interval);
}
static void timer_callback(evutil_socket_t /*s*/, short /*type*/, void* vsession)
static void timer_callback(void* vsession)
{
auto* session = static_cast<tr_session*>(vsession);
@ -226,15 +231,9 @@ void tr_utpInit(tr_session* session)
bool tr_utpPacket(unsigned char const* buf, size_t buflen, struct sockaddr const* from, socklen_t fromlen, tr_session* ss)
{
if (!ss->isClosed && ss->utp_timer == nullptr)
if (!ss->isClosed && !ss->utp_timer)
{
ss->utp_timer = evtimer_new(ss->eventBase(), timer_callback, ss);
if (ss->utp_timer == nullptr)
{
return false;
}
ss->utp_timer = ss->timerMaker().create(timer_callback, ss);
reset_timer(ss);
}
@ -248,11 +247,7 @@ bool tr_utpPacket(unsigned char const* buf, size_t buflen, struct sockaddr const
void tr_utpClose(tr_session* session)
{
if (session->utp_timer != nullptr)
{
evtimer_del(session->utp_timer);
session->utp_timer = nullptr;
}
session->utp_timer.reset();
if (session->utp_context != nullptr)
{

View File

@ -36,8 +36,6 @@
#define UTF_CPP_CPLUSPLUS 201703L
#include <utf8.h>
#include <event2/event.h>
#include <fmt/format.h>
#include <fast_float/fast_float.h>
@ -64,20 +62,6 @@ time_t __tr_current_time = 0;
****
***/
struct timeval tr_gettimeofday()
{
auto const d = std::chrono::system_clock::now().time_since_epoch();
auto const s = std::chrono::duration_cast<std::chrono::seconds>(d);
auto ret = timeval{};
ret.tv_sec = s.count();
ret.tv_usec = std::chrono::duration_cast<std::chrono::microseconds>(d - s).count();
return ret;
}
/***
****
***/
void* tr_malloc(size_t size)
{
return size != 0 ? malloc(size) : nullptr;
@ -108,30 +92,6 @@ void tr_free(void* p)
}
}
/***
****
***/
void tr_timerAdd(struct event& timer, int seconds, int microseconds)
{
auto tv = timeval{};
tv.tv_sec = seconds;
tv.tv_usec = microseconds;
TR_ASSERT(tv.tv_sec >= 0);
TR_ASSERT(tv.tv_usec >= 0);
TR_ASSERT(tv.tv_usec < 1000000);
evtimer_add(&timer, &tv);
}
void tr_timerAddMsec(struct event& timer, int milliseconds)
{
int const seconds = milliseconds / 1000;
int const usec = (milliseconds % 1000) * 1000;
tr_timerAdd(timer, seconds, usec);
}
/**
***
**/
@ -311,8 +271,7 @@ std::string_view tr_strvStrip(std::string_view str)
uint64_t tr_time_msec()
{
auto const tv = tr_gettimeofday();
return uint64_t(tv.tv_sec) * 1000 + (tv.tv_usec / 1000);
return std::chrono::system_clock::now().time_since_epoch() / 1ms;
}
void tr_wait_msec(long int delay_milliseconds)

View File

@ -88,21 +88,6 @@ constexpr auto tr_saveFile(std::string_view filename, ContiguousRange const& x,
*/
tr_disk_space tr_dirSpace(std::string_view directory);
/**
* @brief Convenience wrapper around timer_add() to have a timer wake up in a number of seconds and microseconds
* @param timer the timer to set
* @param seconds seconds to wait
* @param microseconds microseconds to wait
*/
void tr_timerAdd(struct event& timer, int seconds, int microseconds);
/**
* @brief Convenience wrapper around timer_add() to have a timer wake up in a number of milliseconds
* @param timer the timer to set
* @param milliseconds milliseconds to wait
*/
void tr_timerAddMsec(struct event& timer, int milliseconds);
/** @brief return the current date in milliseconds */
uint64_t tr_time_msec();
@ -300,9 +285,6 @@ std::string& tr_strvUtf8Clean(std::string_view cleanme, std::string& setme);
*/
[[nodiscard]] std::string tr_strratio(double ratio, char const* infinity);
/** @brief Portability wrapper for gettimeofday(), with tz argument dropped */
struct timeval tr_gettimeofday();
/**
* @brief move a file
* @return `True` on success, `false` otherwise (with `error` set accordingly).

View File

@ -4,6 +4,7 @@
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <chrono>
#include <iterator>
#include <memory>
#include <numeric> // std::accumulate()
@ -13,7 +14,6 @@
#include <vector>
#include <event2/buffer.h>
#include <event2/event.h>
#include <fmt/format.h>
@ -164,9 +164,9 @@ public:
, callback{ callback_in }
, callback_data{ callback_data_in }
, bandwidth_(&tor->bandwidth_)
, pulse_timer(evtimer_new(session->eventBase(), &tr_webseed::onTimer, this), event_free)
, idle_timer(session->timerMaker().create([this]() { on_idle(this); }))
{
startTimer();
idle_timer->startRepeating(IdleTimerInterval);
}
tr_webseed(tr_webseed&&) = delete;
@ -338,21 +338,9 @@ private:
publish(&e);
}
void startTimer()
{
tr_timerAddMsec(*pulse_timer, IdleTimerMsec);
}
static void onTimer(evutil_socket_t /*fd*/, short /*what*/, void* vwebseed)
{
auto* const webseed = static_cast<tr_webseed*>(vwebseed);
on_idle(webseed);
webseed->startTimer();
}
tr_bandwidth bandwidth_;
std::shared_ptr<event> const pulse_timer;
static int constexpr IdleTimerMsec = 2000;
std::unique_ptr<libtransmission::Timer> idle_timer;
static auto constexpr IdleTimerInterval = 2s;
};
/***