feat: tr_runInEventThread() now takes std::function, typesafe args (#2739)

This commit is contained in:
Charles Kerr 2022-03-04 20:26:03 -08:00 committed by GitHub
parent 8d8ea2f4df
commit cdd819772d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 72 additions and 126 deletions

View File

@ -27,7 +27,6 @@
#include "peer-mgr.h" /* pex */
#include "quark.h"
#include "torrent.h"
#include "trevent.h" /* tr_runInEventThread() */
#include "utils.h"
#include "web-utils.h"
#include "web.h"

View File

@ -853,10 +853,8 @@ static void io_close_socket(tr_peerIo* io)
}
}
static void io_dtor(void* vio)
static void io_dtor(tr_peerIo* const io)
{
auto* io = static_cast<tr_peerIo*>(vio);
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(tr_amInEventThread(io->session));
TR_ASSERT(io->session->events != nullptr);

View File

@ -703,11 +703,11 @@ static bool bindUnixSocket(
#endif
}
static void startServer(void* vserver);
static void startServer(tr_rpc_server* server);
static void rpc_server_on_start_retry(evutil_socket_t /*fd*/, short /*type*/, void* context)
{
startServer(context);
startServer(static_cast<tr_rpc_server*>(context));
}
static int rpc_server_start_retry(tr_rpc_server* server)
@ -737,10 +737,8 @@ static void rpc_server_start_retry_cancel(tr_rpc_server* server)
server->start_retry_counter = 0;
}
static void startServer(void* vserver)
static void startServer(tr_rpc_server* server)
{
auto* server = static_cast<tr_rpc_server*>(vserver);
if (server->httpd != nullptr)
{
return;
@ -816,10 +814,8 @@ static void stopServer(tr_rpc_server* server)
tr_logAddNamedDbg(MyName, "Stopped listening on %s", tr_rpc_address_with_port(server).c_str());
}
static void onEnabledChanged(void* vserver)
static void onEnabledChanged(tr_rpc_server* const server)
{
auto* server = static_cast<tr_rpc_server*>(vserver);
if (!server->isEnabled)
{
stopServer(server);
@ -842,10 +838,8 @@ bool tr_rpcIsEnabled(tr_rpc_server const* server)
return server->isEnabled;
}
static void restartServer(void* vserver)
static void restartServer(tr_rpc_server* const server)
{
auto* server = static_cast<tr_rpc_server*>(vserver);
if (server->isEnabled)
{
stopServer(server);

View File

@ -161,18 +161,7 @@ void tr_session::WebMediator::notifyBandwidthConsumed(int torrent_id, size_t byt
void tr_session::WebMediator::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const
{
// marshall the `func` call into the libtransmission thread
using wrapper_t = std::pair<tr_web::FetchDoneFunc, tr_web::FetchResponse>;
auto constexpr callback = [](void* vwrapped)
{
auto* const wrapped = static_cast<wrapper_t*>(vwrapped);
wrapped->first(wrapped->second);
delete wrapped;
};
tr_runInEventThread(session_, callback, new wrapper_t{ func, std::move(response) });
tr_runInEventThread(session_, std::move(func), std::move(response));
}
void tr_sessionFetch(tr_session* session, tr_web::FetchOptions&& options)
@ -589,8 +578,6 @@ static void onSaveTimer(evutil_socket_t /*fd*/, short /*what*/, void* vsession)
****
***/
static void tr_sessionInitImpl(void* /*vdata*/);
struct init_data
{
bool messageQueuingEnabled;
@ -600,6 +587,8 @@ struct init_data
std::condition_variable_any done_cv;
};
static void tr_sessionInitImpl(init_data* data);
tr_session* tr_sessionInit(char const* config_dir, bool messageQueuingEnabled, tr_variant* clientSettings)
{
TR_ASSERT(tr_variantIsDict(clientSettings));
@ -705,9 +694,8 @@ static void onNowTimer(evutil_socket_t /*fd*/, short /*what*/, void* vsession)
static void loadBlocklists(tr_session* session);
static void tr_sessionInitImpl(void* vdata)
static void tr_sessionInitImpl(init_data* data)
{
auto* data = static_cast<struct init_data*>(vdata);
tr_variant const* const clientSettings = data->clientSettings;
tr_session* session = data->session;
@ -781,11 +769,10 @@ static void tr_sessionInitImpl(void* vdata)
static void turtleBootstrap(tr_session* /*session*/, struct tr_turtle_info* /*turtle*/);
static void setPeerPort(tr_session* session, tr_port port);
static void sessionSetImpl(void* vdata)
static void sessionSetImpl(struct init_data* const data)
{
auto* data = static_cast<struct init_data*>(vdata);
tr_session* session = data->session;
tr_variant* settings = data->clientSettings;
tr_session* const session = data->session;
tr_variant* const settings = data->clientSettings;
TR_ASSERT(tr_isSession(session));
TR_ASSERT(tr_variantIsDict(settings));
@ -1232,9 +1219,8 @@ bool tr_sessionIsIncompleteDirEnabled(tr_session const* session)
**** Peer Port
***/
static void peerPortChanged(void* vsession)
static void peerPortChanged(tr_session* const session)
{
auto* session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_isSession(session));
close_incoming_peer_port(session);
@ -1441,10 +1427,8 @@ static void turtleUpdateTable(struct tr_turtle_info* t)
}
}
static void altSpeedToggled(void* vsession)
static void altSpeedToggled(tr_session* const session)
{
auto* session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_isSession(session));
updateBandwidth(session, TR_UP);
@ -1932,10 +1916,8 @@ static void sessionCloseImplFinish(tr_session* session)
session->isClosed = true;
}
static void sessionCloseImpl(void* vsession)
static void sessionCloseImpl(tr_session* const session)
{
auto* session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_isSession(session));
sessionCloseImplStart(session);
@ -2028,9 +2010,8 @@ struct sessionLoadTorrentsData
bool done;
};
static void sessionLoadTorrents(void* vdata)
static void sessionLoadTorrents(struct sessionLoadTorrentsData* const data)
{
auto* data = static_cast<struct sessionLoadTorrentsData*>(vdata);
TR_ASSERT(tr_isSession(data->session));
tr_sys_path_info info;
@ -2143,10 +2124,8 @@ bool tr_sessionIsDHTEnabled(tr_session const* session)
return session->isDHTEnabled;
}
static void toggleDHTImpl(void* vsession)
static void toggleDHTImpl(tr_session* const session)
{
auto* session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_isSession(session));
tr_udpUninit(session);
@ -2179,10 +2158,8 @@ bool tr_sessionIsUTPEnabled(tr_session const* session)
#endif
}
static void toggle_utp(void* vsession)
static void toggle_utp(tr_session* const session)
{
auto* session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_isSession(session));
session->isUTPEnabled = !session->isUTPEnabled;
@ -2209,10 +2186,8 @@ void tr_sessionSetUTPEnabled(tr_session* session, bool enabled)
****
***/
static void toggleLPDImpl(void* vsession)
static void toggleLPDImpl(tr_session* const session)
{
auto* session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_isSession(session));
if (session->isLPDEnabled)
@ -2309,9 +2284,8 @@ struct port_forwarding_data
struct tr_shared* shared;
};
static void setPortForwardingEnabled(void* vdata)
static void setPortForwardingEnabled(struct port_forwarding_data* const data)
{
auto* data = static_cast<struct port_forwarding_data*>(vdata);
tr_sharedTraversalEnable(data->shared, data->enabled);
tr_free(data);
}

View File

@ -897,10 +897,8 @@ void tr_torrentChangeMyPort(tr_torrent* tor)
}
}
static inline void tr_torrentManualUpdateImpl(void* vtor)
static inline void tr_torrentManualUpdateImpl(tr_torrent* const tor)
{
auto* tor = static_cast<tr_torrent*>(vtor);
TR_ASSERT(tr_isTorrent(tor));
if (tor->isRunning)
@ -1321,9 +1319,8 @@ static void freeTorrent(tr_torrent* tor)
static void torrentSetQueued(tr_torrent* tor, bool queued);
static void torrentStartImpl(void* vtor)
static void torrentStartImpl(tr_torrent* const tor)
{
auto* tor = static_cast<tr_torrent*>(vtor);
auto const lock = tor->unique_lock();
TR_ASSERT(tr_isTorrent(tor));
@ -1432,9 +1429,8 @@ void tr_torrentStartNow(tr_torrent* tor)
}
}
static void onVerifyDoneThreadFunc(void* vtor)
static void onVerifyDoneThreadFunc(tr_torrent* const tor)
{
auto* const tor = static_cast<tr_torrent*>(vtor);
TR_ASSERT(tr_amInEventThread(tor->session));
if (tor->isDeleting)
@ -1461,9 +1457,8 @@ static void onVerifyDone(tr_torrent* tor, bool aborted, void* /*unused*/)
tr_runInEventThread(tor->session, onVerifyDoneThreadFunc, tor);
}
static void verifyTorrent(void* vtor)
static void verifyTorrent(tr_torrent* const tor)
{
auto* tor = static_cast<tr_torrent*>(vtor);
TR_ASSERT(tr_amInEventThread(tor->session));
auto const lock = tor->unique_lock();
@ -1509,9 +1504,8 @@ void tr_torrentSave(tr_torrent* tor)
}
}
static void stopTorrent(void* vtor)
static void stopTorrent(tr_torrent* const tor)
{
auto* tor = static_cast<tr_torrent*>(vtor);
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tr_amInEventThread(tor->session));
auto const lock = tor->unique_lock();
@ -1559,9 +1553,8 @@ void tr_torrentStop(tr_torrent* tor)
tr_runInEventThread(tor->session, stopTorrent, tor);
}
static void closeTorrent(void* vtor)
static void closeTorrent(tr_torrent* const tor)
{
auto* const tor = static_cast<tr_torrent*>(vtor);
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tr_amInEventThread(tor->session));
@ -1607,9 +1600,8 @@ struct remove_data
static void tr_torrentDeleteLocalData(tr_torrent* /*tor*/, tr_fileFunc /*func*/);
static void removeTorrent(void* vdata)
static void removeTorrent(struct remove_data* const data)
{
auto* const data = static_cast<struct remove_data*>(vdata);
auto const lock = data->tor->unique_lock();
if (data->deleteFlag)
@ -2336,10 +2328,9 @@ struct LocationData
bool move_from_old_location = false;
};
static void setLocationImpl(void* vdata)
static void setLocationImpl(struct LocationData* const data)
{
auto* data = static_cast<struct LocationData*>(vdata);
tr_torrent* tor = data->tor;
auto* const tor = data->tor;
TR_ASSERT(tr_isTorrent(tor));
auto const lock = tor->unique_lock();
@ -2982,11 +2973,9 @@ struct rename_data
void* callback_user_data;
};
static void torrentRenamePath(void* vdata)
static void torrentRenamePath(struct rename_data* data)
{
auto* data = static_cast<struct rename_data*>(vdata);
tr_torrent* const tor = data->tor;
auto* const tor = data->tor;
TR_ASSERT(tr_isTorrent(tor));
/***

View File

@ -465,10 +465,8 @@ struct getstatus_closure
sig_atomic_t count;
};
static void getstatus(void* cl)
static void getstatus(getstatus_closure* const closure)
{
auto* closure = static_cast<struct getstatus_closure*>(cl);
int good = 0;
int dubious = 0;
int incoming = 0;

View File

@ -4,6 +4,7 @@
// License text can be found in the licenses/ folder.
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>
#include <shared_mutex>
@ -139,26 +140,7 @@ void tr_evthread_init()
struct tr_event_handle
{
// would it be more expensive to use std::function here?
struct callback
{
callback(void (*func)(void*) = nullptr, void* user_data = nullptr)
: func_{ func }
, user_data_{ user_data }
{
}
void invoke() const
{
if (func_ != nullptr)
{
func_(user_data_);
}
}
void (*func_)(void*);
void* user_data_;
};
using callback = std::function<void(void)>;
using work_queue_t = std::list<callback>;
work_queue_t work_queue;
@ -185,9 +167,9 @@ static void onWorkAvailable(evutil_socket_t /*fd*/, short /*flags*/, void* vsess
work_queue_lock.unlock();
// process the work queue
for (auto const& work : work_queue)
for (auto const& func : work_queue)
{
work.invoke();
func();
}
}
@ -281,7 +263,7 @@ bool tr_amInEventThread(tr_session const* session)
***
**/
void tr_runInEventThread(tr_session* session, void (*func)(void*), void* user_data)
void tr_runInEventThread(tr_session* session, std::function<void(void)>&& func)
{
TR_ASSERT(tr_isSession(session));
auto* events = session->events;
@ -289,12 +271,12 @@ void tr_runInEventThread(tr_session* session, void (*func)(void*), void* user_da
if (tr_amInEventThread(session))
{
(*func)(user_data);
func();
}
else
{
auto lock = std::unique_lock(events->work_queue_mutex);
events->work_queue.emplace_back(func, user_data);
events->work_queue.emplace_back(std::move(func));
lock.unlock();
event_active(events->work_queue_event, 0, {});

View File

@ -9,12 +9,29 @@
#error only libtransmission should #include this header.
#endif
#include <functional>
#include <tuple>
#include <utility>
#include "tr-macros.h"
void tr_eventInit(tr_session*);
struct tr_session;
void tr_eventClose(tr_session*);
void tr_eventInit(tr_session* session);
bool tr_amInEventThread(tr_session const*);
void tr_eventClose(tr_session* session);
void tr_runInEventThread(tr_session*, void (*func)(void*), void* user_data);
bool tr_amInEventThread(tr_session const* session);
void tr_runInEventThread(tr_session* session, std::function<void(void)>&& func);
template<typename Func, typename... Args>
void tr_runInEventThread(tr_session* session, Func&& func, Args&&... args)
{
tr_runInEventThread(
session,
std::function<void(void)>{ [func = std::move(func), args = std::make_tuple(std::forward<Args>(args)...)]()
{
std::apply(std::move(func), std::move(args));
} });
}

View File

@ -295,25 +295,20 @@ public:
return content_.get();
}
static void write_block_func(void* vdata)
void write_block_func()
{
auto* const data = static_cast<write_block_data*>(vdata);
auto* const webseed = data->webseed;
auto* const buf = data->content();
auto* const tor = tr_torrentFindFromId(data->session, data->torrent_id);
if (tor == nullptr)
auto* const buf = this->content();
auto* const tor = tr_torrentFindFromId(this->session, this->torrent_id);
if (tor != nullptr)
{
delete data;
return;
auto const len = evbuffer_get_length(buf);
TR_ASSERT(tor->blockSize(this->loc.block) == len);
tr_cacheWriteBlock(tor->session->cache, tor, this->loc, len, buf);
webseed->publishGotBlock(tor, this->loc);
TR_ASSERT(evbuffer_get_length(buf) == 0);
}
auto const len = evbuffer_get_length(buf);
TR_ASSERT(tor->blockSize(data->loc.block) == len);
tr_cacheWriteBlock(tor->session->cache, tor, data->loc, len, buf);
webseed->publishGotBlock(tor, data->loc);
TR_ASSERT(evbuffer_get_length(buf) == 0);
delete data;
delete this;
}
private:
@ -352,7 +347,7 @@ void useFetchedBlocks(tr_webseed_task* task)
{
auto* const data = new write_block_data{ session, tor->uniqueId, webseed, task->loc };
evbuffer_remove_buffer(task->content(), data->content(), block_size);
tr_runInEventThread(session, write_block_data::write_block_func, data);
tr_runInEventThread(session, &write_block_data::write_block_func, data);
}
task->loc = tor->byteLoc(task->loc.byte + block_size);