refactor: trevent use std:: tools intead of pipes (#2520)

This commit is contained in:
Charles Kerr 2022-01-28 17:56:46 -06:00 committed by GitHub
parent 9e264250d1
commit 374c24dde1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 164 additions and 220 deletions

View File

@ -112,7 +112,7 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON)
set(CURL_MINIMUM 7.28.0)
set(CYASSL_MINIMUM 3.0)
set(DEFLATE_MINIMUM 1.9)
set(EVENT2_MINIMUM 2.0.10)
set(EVENT2_MINIMUM 2.1.0)
set(GIO_MINIMUM 2.26.0)
set(GLIB_MINIMUM 2.50.1)
set(GTK_MINIMUM 3.24.0)

View File

@ -13,7 +13,7 @@ if(UNIX)
endif()
find_path(EVENT2_INCLUDE_DIR NAMES event2/event.h HINTS ${_EVENT2_INCLUDEDIR})
find_library(EVENT2_LIBRARY NAMES event-2.1 event-2.0 event HINTS ${_EVENT2_LIBDIR})
find_library(EVENT2_LIBRARY NAMES event-2.1 event HINTS ${_EVENT2_LIBDIR})
if(EVENT2_INCLUDE_DIR)
if(_EVENT2_VERSION)

View File

@ -109,6 +109,11 @@ static tr_thread_id tr_getCurrentThread()
#endif
}
unsigned long tr_threadCurrentId()
{
return (unsigned long)tr_getCurrentThread();
}
static bool tr_areThreadsEqual(tr_thread_id a, tr_thread_id b)
{
#ifdef _WIN32

View File

@ -49,6 +49,8 @@ struct tr_thread;
/** @brief Instantiate a new process thread */
tr_thread* tr_threadNew(void (*func)(void*), void* arg);
unsigned long tr_threadCurrentId();
/** @brief Return nonzero if this function is being called from `thread'
@param thread the thread being tested */
bool tr_amInThread(tr_thread const* thread);

View File

@ -3,20 +3,20 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <cerrno>
#include <cstring>
#include <condition_variable>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <csignal>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <unistd.h> /* read(), write(), pipe() */
#endif
#include <event2/dns.h>
#include <event2/event.h>
#include <event2/thread.h>
#include "transmission.h"
@ -28,118 +28,105 @@
#include "trevent.h"
#include "utils.h"
#ifdef _WIN32
/***
****
***/
using tr_pipe_end_t = SOCKET;
#include <iostream>
static int pgpipe(tr_pipe_end_t handles[2])
namespace
{
SOCKET s;
struct sockaddr_in serv_addr;
int len = sizeof(serv_addr);
namespace impl
{
void* lock_alloc(unsigned /*locktype*/)
{
return new std::recursive_mutex{};
}
handles[0] = handles[1] = INVALID_SOCKET;
void lock_free(void* lock_, unsigned /*locktype*/)
{
delete static_cast<std::recursive_mutex*>(lock_);
}
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
int lock_lock(unsigned mode, void* lock_)
{
auto* lock = static_cast<std::recursive_mutex*>(lock_);
if (mode & EVTHREAD_TRY)
{
tr_logAddDebug("pgpipe failed to create socket: %ui", WSAGetLastError());
return -1;
auto const success = lock->try_lock();
return success ? 0 : -1;
}
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(0);
serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
if (bind(s, (SOCKADDR*)&serv_addr, len) == SOCKET_ERROR)
{
tr_logAddDebug("pgpipe failed to bind: %ui", WSAGetLastError());
closesocket(s);
return -1;
}
if (listen(s, 1) == SOCKET_ERROR)
{
tr_logAddNamedDbg("event", "pgpipe failed to listen: %ui", WSAGetLastError());
closesocket(s);
return -1;
}
if (getsockname(s, (SOCKADDR*)&serv_addr, &len) == SOCKET_ERROR)
{
tr_logAddDebug("pgpipe failed to getsockname: %ui", WSAGetLastError());
closesocket(s);
return -1;
}
if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
{
tr_logAddDebug("pgpipe failed to create socket 2: %ui", WSAGetLastError());
closesocket(s);
return -1;
}
if (connect(handles[1], (SOCKADDR*)&serv_addr, len) == SOCKET_ERROR)
{
tr_logAddDebug("pgpipe failed to connect socket: %ui", WSAGetLastError());
closesocket(s);
return -1;
}
if ((handles[0] = accept(s, (SOCKADDR*)&serv_addr, &len)) == INVALID_SOCKET)
{
tr_logAddDebug("pgpipe failed to accept socket: %ui", WSAGetLastError());
closesocket(handles[1]);
handles[1] = INVALID_SOCKET;
closesocket(s);
return -1;
}
closesocket(s);
lock->lock();
return 0;
}
static int piperead(tr_pipe_end_t s, void* buf, int len)
int lock_unlock(unsigned /*mode*/, void* lock_)
{
int ret = recv(s, static_cast<char*>(buf), len, 0);
static_cast<std::recursive_mutex*>(lock_)->unlock();
return 0;
}
if (ret == -1)
void* cond_alloc(unsigned /*condflags*/)
{
return new std::condition_variable_any();
}
void cond_free(void* cond_)
{
delete static_cast<std::condition_variable_any*>(cond_);
}
int cond_signal(void* cond_, int broadcast)
{
auto* cond = static_cast<std::condition_variable_any*>(cond_);
if (broadcast)
{
int const werror = WSAGetLastError();
switch (werror)
{
/* simplified error mapping (not valid for connect) */
case WSAEWOULDBLOCK:
errno = EAGAIN;
break;
case WSAECONNRESET:
/* EOF on the pipe! (win32 socket based implementation) */
ret = 0;
[[fallthrough]];
default:
errno = werror;
break;
}
cond->notify_all();
}
else
{
errno = 0;
cond->notify_one();
}
return ret;
return 0;
}
#define pipe(a) pgpipe(a)
#define pipewrite(a, b, c) send(a, (char*)b, c, 0)
int cond_wait(void* cond_, void* lock_, struct timeval const* tv)
{
auto* cond = static_cast<std::condition_variable_any*>(cond_);
auto* lock = static_cast<std::recursive_mutex*>(lock_);
if (tv == nullptr)
{
cond->wait(*lock);
return 0;
}
#else
using tr_pipe_end_t = int;
#define piperead(a, b, c) read(a, b, c)
#define pipewrite(a, b, c) write(a, b, c)
#endif
auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec);
auto const success = cond->wait_for(*lock, duration);
return success == std::cv_status::timeout ? 1 : 0;
}
} // namespace impl
void tr_evthread_init()
{
// evthread_enable_lock_debugging();
evthread_lock_callbacks constexpr lock_cbs{ EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE,
impl::lock_alloc, impl::lock_free,
impl::lock_lock, impl::lock_unlock };
evthread_set_lock_callbacks(&lock_cbs);
evthread_condition_callbacks constexpr cond_cbs{ EVTHREAD_CONDITION_API_VERSION,
impl::cond_alloc,
impl::cond_free,
impl::cond_signal,
impl::cond_wait };
evthread_set_condition_callbacks(&cond_cbs);
evthread_set_id_callback(tr_threadCurrentId);
}
} // namespace
/***
****
@ -147,121 +134,87 @@ using tr_pipe_end_t = int;
struct tr_event_handle
{
std::recursive_mutex fds_mutex;
tr_pipe_end_t fds[2] = {};
// would it be more expensive to use std::function here?
struct callback
{
callback(void (*func)(void*) = nullptr, void* user_data = nullptr)
: func_{ func }
, user_data_{ user_data }
{
}
struct event* pipeEvent = nullptr;
struct event_base* base = nullptr;
void invoke() const
{
if (func_ != nullptr)
{
func_(user_data_);
}
}
void (*func_)(void*);
void* user_data_;
};
using work_queue_t = std::list<callback>;
work_queue_t work_queue;
std::mutex work_queue_mutex;
event* work_queue_event = nullptr;
event_base* base = nullptr;
tr_session* session = nullptr;
tr_thread* thread = nullptr;
bool die = false;
};
struct tr_run_data
static void onWorkAvailable(evutil_socket_t /*fd*/, short /*flags*/, void* vsession)
{
void (*func)(void*);
void* user_data;
};
// invariant
auto* const session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_amInEventThread(session));
#define dbgmsg(...) tr_logAddDeepNamed("event", __VA_ARGS__)
// steal the work queue
auto* events = session->events;
auto work_queue_lock = std::unique_lock(events->work_queue_mutex);
auto work_queue = tr_event_handle::work_queue_t{};
std::swap(work_queue, events->work_queue);
work_queue_lock.unlock();
static void readFromPipe(evutil_socket_t fd, short eventType, void* veh)
{
auto* eh = static_cast<tr_event_handle*>(veh);
dbgmsg("readFromPipe: eventType is %hd", eventType);
/* read the command type */
char ch = '\0';
int ret = 0;
do
// process the work queue
for (auto const& work : work_queue)
{
ret = piperead(fd, &ch, 1);
} while (!eh->die && ret == -1 && errno == EAGAIN);
dbgmsg("command is [%c], ret is %d, errno is %d", ch, ret, (int)errno);
switch (ch)
{
case 'r': /* run in libevent thread */
{
struct tr_run_data data;
size_t const nwant = sizeof(data);
ev_ssize_t const ngot = piperead(fd, &data, nwant);
if (!eh->die && ngot == (ev_ssize_t)nwant)
{
dbgmsg("invoking function in libevent thread");
(*data.func)(data.user_data);
}
break;
}
case '\0': /* eof */
{
dbgmsg("pipe eof reached... removing event listener");
event_free(eh->pipeEvent);
tr_netCloseSocket(eh->fds[0]);
event_base_loopexit(eh->base, nullptr);
break;
}
default:
{
TR_ASSERT_MSG(false, "unhandled command type %d", (int)ch);
break;
}
work.invoke();
}
}
static void logFunc(int severity, char const* message)
static void libeventThreadFunc(void* vevents)
{
if (severity >= _EVENT_LOG_ERR)
{
tr_logAddError("%s", message);
}
else
{
tr_logAddDebug("%s", message);
}
}
static void libeventThreadFunc(void* veh)
{
auto* eh = static_cast<tr_event_handle*>(veh);
auto* const events = static_cast<tr_event_handle*>(vevents);
#ifndef _WIN32
/* Don't exit when writing on a broken socket */
signal(SIGPIPE, SIG_IGN);
#endif
/* create the libevent bases */
struct event_base* base = event_base_new();
tr_evthread_init();
/* set the struct's fields */
eh->base = base;
eh->session->event_base = base;
eh->session->evdns_base = evdns_base_new(base, true);
eh->session->events = eh;
// create the libevent base
auto* const base = event_base_new();
/* listen to the pipe's read fd */
eh->pipeEvent = event_new(base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh);
event_add(eh->pipeEvent, nullptr);
event_set_log_callback(logFunc);
// initialize the session struct's event fields
events->base = base;
events->work_queue_event = event_new(base, -1, 0, onWorkAvailable, events->session);
events->session->event_base = base;
events->session->evdns_base = evdns_base_new(base, true);
events->session->events = events;
/* loop until all the events are done */
while (!eh->die)
{
event_base_dispatch(base);
}
// loop until `tr_eventClose()` kills the loop
event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
/* shut down the thread */
// shut down the thread
event_base_free(base);
eh->session->events = nullptr;
delete eh;
events->session->event_base = nullptr;
events->session->evdns_base = nullptr;
events->session->events = nullptr;
delete events;
tr_logAddDebug("Closing libevent thread");
}
@ -269,17 +222,11 @@ void tr_eventInit(tr_session* session)
{
session->events = nullptr;
auto* const eh = new tr_event_handle{};
auto* const events = new tr_event_handle();
events->session = session;
events->thread = tr_threadNew(libeventThreadFunc, events);
if (pipe(eh->fds) == -1)
{
tr_logAddError("Unable to write to pipe() in libtransmission: %s", tr_strerror(errno));
}
eh->session = session;
eh->thread = tr_threadNew(libeventThreadFunc, eh);
/* wait until the libevent thread is running */
// wait until the libevent thread is running
while (session->events == nullptr)
{
tr_wait_msec(100);
@ -290,18 +237,18 @@ void tr_eventClose(tr_session* session)
{
TR_ASSERT(tr_isSession(session));
if (session->events == nullptr)
auto* events = session->events;
if (events == nullptr)
{
return;
}
session->events->die = true;
event_base_loopexit(events->base, nullptr);
if (tr_logGetDeepEnabled())
{
tr_logAddDeep(__FILE__, __LINE__, nullptr, "closing trevent pipe");
}
tr_netCloseSocket(session->events->fds[1]);
}
/**
@ -323,29 +270,19 @@ bool tr_amInEventThread(tr_session const* session)
void tr_runInEventThread(tr_session* session, void (*func)(void*), void* user_data)
{
TR_ASSERT(tr_isSession(session));
TR_ASSERT(session->events != nullptr);
auto* events = session->events;
TR_ASSERT(events != nullptr);
if (tr_amInThread(session->events->thread))
if (tr_amInThread(events->thread))
{
(*func)(user_data);
}
else
{
tr_event_handle* e = session->events;
auto const lock = std::unique_lock(e->fds_mutex);
auto data = tr_run_data{};
auto lock = std::unique_lock(events->work_queue_mutex);
events->work_queue.emplace_back(func, user_data);
lock.unlock();
tr_pipe_end_t const fd = e->fds[1];
char ch = 'r';
ev_ssize_t const res_1 = pipewrite(fd, &ch, 1);
data.func = func;
data.user_data = user_data;
ev_ssize_t const res_2 = pipewrite(fd, &data, sizeof(data));
if (res_1 == -1 || res_2 == -1)
{
tr_logAddError("Unable to write to libtransmisison event queue: %s", tr_strerror(errno));
}
event_active(events->work_queue_event, 0, {});
}
}