diff --git a/CMakeLists.txt b/CMakeLists.txt index 7ae7d1a8a..6305f3dd8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,7 +112,7 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON) set(CURL_MINIMUM 7.28.0) set(CYASSL_MINIMUM 3.0) set(DEFLATE_MINIMUM 1.9) -set(EVENT2_MINIMUM 2.0.10) +set(EVENT2_MINIMUM 2.1.0) set(GIO_MINIMUM 2.26.0) set(GLIB_MINIMUM 2.50.1) set(GTK_MINIMUM 3.24.0) diff --git a/cmake/FindEVENT2.cmake b/cmake/FindEVENT2.cmake index b1b9492b1..7c60bf387 100644 --- a/cmake/FindEVENT2.cmake +++ b/cmake/FindEVENT2.cmake @@ -13,7 +13,7 @@ if(UNIX) endif() find_path(EVENT2_INCLUDE_DIR NAMES event2/event.h HINTS ${_EVENT2_INCLUDEDIR}) -find_library(EVENT2_LIBRARY NAMES event-2.1 event-2.0 event HINTS ${_EVENT2_LIBDIR}) +find_library(EVENT2_LIBRARY NAMES event-2.1 event HINTS ${_EVENT2_LIBDIR}) if(EVENT2_INCLUDE_DIR) if(_EVENT2_VERSION) diff --git a/libtransmission/platform.cc b/libtransmission/platform.cc index 4225167a4..abc64746f 100644 --- a/libtransmission/platform.cc +++ b/libtransmission/platform.cc @@ -109,6 +109,11 @@ static tr_thread_id tr_getCurrentThread() #endif } +unsigned long tr_threadCurrentId() +{ + return (unsigned long)tr_getCurrentThread(); +} + static bool tr_areThreadsEqual(tr_thread_id a, tr_thread_id b) { #ifdef _WIN32 diff --git a/libtransmission/platform.h b/libtransmission/platform.h index eefd46901..72d3bf003 100644 --- a/libtransmission/platform.h +++ b/libtransmission/platform.h @@ -49,6 +49,8 @@ struct tr_thread; /** @brief Instantiate a new process thread */ tr_thread* tr_threadNew(void (*func)(void*), void* arg); +unsigned long tr_threadCurrentId(); + /** @brief Return nonzero if this function is being called from `thread' @param thread the thread being tested */ bool tr_amInThread(tr_thread const* thread); diff --git a/libtransmission/trevent.cc b/libtransmission/trevent.cc index 8bf5e95ea..36509a594 100644 --- a/libtransmission/trevent.cc +++ b/libtransmission/trevent.cc @@ -3,20 +3,20 @@ // or any future license endorsed by Mnemosyne LLC. // License text can be found in the licenses/ folder. -#include -#include +#include +#include #include +#include #include #ifdef _WIN32 #include -#else -#include /* read(), write(), pipe() */ #endif #include #include +#include #include "transmission.h" @@ -28,118 +28,105 @@ #include "trevent.h" #include "utils.h" -#ifdef _WIN32 +/*** +**** +***/ -using tr_pipe_end_t = SOCKET; +#include -static int pgpipe(tr_pipe_end_t handles[2]) +namespace { - SOCKET s; - struct sockaddr_in serv_addr; - int len = sizeof(serv_addr); +namespace impl +{ +void* lock_alloc(unsigned /*locktype*/) +{ + return new std::recursive_mutex{}; +} - handles[0] = handles[1] = INVALID_SOCKET; +void lock_free(void* lock_, unsigned /*locktype*/) +{ + delete static_cast(lock_); +} - if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) +int lock_lock(unsigned mode, void* lock_) +{ + auto* lock = static_cast(lock_); + if (mode & EVTHREAD_TRY) { - tr_logAddDebug("pgpipe failed to create socket: %ui", WSAGetLastError()); - return -1; + auto const success = lock->try_lock(); + return success ? 0 : -1; } - - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(0); - serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - - if (bind(s, (SOCKADDR*)&serv_addr, len) == SOCKET_ERROR) - { - tr_logAddDebug("pgpipe failed to bind: %ui", WSAGetLastError()); - closesocket(s); - return -1; - } - - if (listen(s, 1) == SOCKET_ERROR) - { - tr_logAddNamedDbg("event", "pgpipe failed to listen: %ui", WSAGetLastError()); - closesocket(s); - return -1; - } - - if (getsockname(s, (SOCKADDR*)&serv_addr, &len) == SOCKET_ERROR) - { - tr_logAddDebug("pgpipe failed to getsockname: %ui", WSAGetLastError()); - closesocket(s); - return -1; - } - - if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) - { - tr_logAddDebug("pgpipe failed to create socket 2: %ui", WSAGetLastError()); - closesocket(s); - return -1; - } - - if (connect(handles[1], (SOCKADDR*)&serv_addr, len) == SOCKET_ERROR) - { - tr_logAddDebug("pgpipe failed to connect socket: %ui", WSAGetLastError()); - closesocket(s); - return -1; - } - - if ((handles[0] = accept(s, (SOCKADDR*)&serv_addr, &len)) == INVALID_SOCKET) - { - tr_logAddDebug("pgpipe failed to accept socket: %ui", WSAGetLastError()); - closesocket(handles[1]); - handles[1] = INVALID_SOCKET; - closesocket(s); - return -1; - } - - closesocket(s); + lock->lock(); return 0; } -static int piperead(tr_pipe_end_t s, void* buf, int len) +int lock_unlock(unsigned /*mode*/, void* lock_) { - int ret = recv(s, static_cast(buf), len, 0); + static_cast(lock_)->unlock(); + return 0; +} - if (ret == -1) +void* cond_alloc(unsigned /*condflags*/) +{ + return new std::condition_variable_any(); +} + +void cond_free(void* cond_) +{ + delete static_cast(cond_); +} + +int cond_signal(void* cond_, int broadcast) +{ + auto* cond = static_cast(cond_); + if (broadcast) { - int const werror = WSAGetLastError(); - - switch (werror) - { - /* simplified error mapping (not valid for connect) */ - case WSAEWOULDBLOCK: - errno = EAGAIN; - break; - - case WSAECONNRESET: - /* EOF on the pipe! (win32 socket based implementation) */ - ret = 0; - [[fallthrough]]; - - default: - errno = werror; - break; - } + cond->notify_all(); } else { - errno = 0; + cond->notify_one(); } - - return ret; + return 0; } -#define pipe(a) pgpipe(a) -#define pipewrite(a, b, c) send(a, (char*)b, c, 0) +int cond_wait(void* cond_, void* lock_, struct timeval const* tv) +{ + auto* cond = static_cast(cond_); + auto* lock = static_cast(lock_); + if (tv == nullptr) + { + cond->wait(*lock); + return 0; + } -#else -using tr_pipe_end_t = int; -#define piperead(a, b, c) read(a, b, c) -#define pipewrite(a, b, c) write(a, b, c) -#endif + auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec); + auto const success = cond->wait_for(*lock, duration); + return success == std::cv_status::timeout ? 1 : 0; +} + +} // namespace impl + +void tr_evthread_init() +{ + // evthread_enable_lock_debugging(); + + evthread_lock_callbacks constexpr lock_cbs{ EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, + impl::lock_alloc, impl::lock_free, + impl::lock_lock, impl::lock_unlock }; + evthread_set_lock_callbacks(&lock_cbs); + + evthread_condition_callbacks constexpr cond_cbs{ EVTHREAD_CONDITION_API_VERSION, + impl::cond_alloc, + impl::cond_free, + impl::cond_signal, + impl::cond_wait }; + evthread_set_condition_callbacks(&cond_cbs); + + evthread_set_id_callback(tr_threadCurrentId); +} + +} // namespace /*** **** @@ -147,121 +134,87 @@ using tr_pipe_end_t = int; struct tr_event_handle { - std::recursive_mutex fds_mutex; - tr_pipe_end_t fds[2] = {}; + // would it be more expensive to use std::function here? + struct callback + { + callback(void (*func)(void*) = nullptr, void* user_data = nullptr) + : func_{ func } + , user_data_{ user_data } + { + } - struct event* pipeEvent = nullptr; - struct event_base* base = nullptr; + void invoke() const + { + if (func_ != nullptr) + { + func_(user_data_); + } + } + + void (*func_)(void*); + void* user_data_; + }; + + using work_queue_t = std::list; + work_queue_t work_queue; + std::mutex work_queue_mutex; + event* work_queue_event = nullptr; + + event_base* base = nullptr; tr_session* session = nullptr; tr_thread* thread = nullptr; - - bool die = false; }; -struct tr_run_data +static void onWorkAvailable(evutil_socket_t /*fd*/, short /*flags*/, void* vsession) { - void (*func)(void*); - void* user_data; -}; + // invariant + auto* const session = static_cast(vsession); + TR_ASSERT(tr_amInEventThread(session)); -#define dbgmsg(...) tr_logAddDeepNamed("event", __VA_ARGS__) + // steal the work queue + auto* events = session->events; + auto work_queue_lock = std::unique_lock(events->work_queue_mutex); + auto work_queue = tr_event_handle::work_queue_t{}; + std::swap(work_queue, events->work_queue); + work_queue_lock.unlock(); -static void readFromPipe(evutil_socket_t fd, short eventType, void* veh) -{ - auto* eh = static_cast(veh); - - dbgmsg("readFromPipe: eventType is %hd", eventType); - - /* read the command type */ - char ch = '\0'; - - int ret = 0; - do + // process the work queue + for (auto const& work : work_queue) { - ret = piperead(fd, &ch, 1); - } while (!eh->die && ret == -1 && errno == EAGAIN); - - dbgmsg("command is [%c], ret is %d, errno is %d", ch, ret, (int)errno); - - switch (ch) - { - case 'r': /* run in libevent thread */ - { - struct tr_run_data data; - size_t const nwant = sizeof(data); - ev_ssize_t const ngot = piperead(fd, &data, nwant); - - if (!eh->die && ngot == (ev_ssize_t)nwant) - { - dbgmsg("invoking function in libevent thread"); - (*data.func)(data.user_data); - } - - break; - } - - case '\0': /* eof */ - { - dbgmsg("pipe eof reached... removing event listener"); - event_free(eh->pipeEvent); - tr_netCloseSocket(eh->fds[0]); - event_base_loopexit(eh->base, nullptr); - break; - } - - default: - { - TR_ASSERT_MSG(false, "unhandled command type %d", (int)ch); - break; - } + work.invoke(); } } -static void logFunc(int severity, char const* message) +static void libeventThreadFunc(void* vevents) { - if (severity >= _EVENT_LOG_ERR) - { - tr_logAddError("%s", message); - } - else - { - tr_logAddDebug("%s", message); - } -} - -static void libeventThreadFunc(void* veh) -{ - auto* eh = static_cast(veh); + auto* const events = static_cast(vevents); #ifndef _WIN32 /* Don't exit when writing on a broken socket */ signal(SIGPIPE, SIG_IGN); #endif - /* create the libevent bases */ - struct event_base* base = event_base_new(); + tr_evthread_init(); - /* set the struct's fields */ - eh->base = base; - eh->session->event_base = base; - eh->session->evdns_base = evdns_base_new(base, true); - eh->session->events = eh; + // create the libevent base + auto* const base = event_base_new(); - /* listen to the pipe's read fd */ - eh->pipeEvent = event_new(base, eh->fds[0], EV_READ | EV_PERSIST, readFromPipe, veh); - event_add(eh->pipeEvent, nullptr); - event_set_log_callback(logFunc); + // initialize the session struct's event fields + events->base = base; + events->work_queue_event = event_new(base, -1, 0, onWorkAvailable, events->session); + events->session->event_base = base; + events->session->evdns_base = evdns_base_new(base, true); + events->session->events = events; - /* loop until all the events are done */ - while (!eh->die) - { - event_base_dispatch(base); - } + // loop until `tr_eventClose()` kills the loop + event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY); - /* shut down the thread */ + // shut down the thread event_base_free(base); - eh->session->events = nullptr; - delete eh; + events->session->event_base = nullptr; + events->session->evdns_base = nullptr; + events->session->events = nullptr; + delete events; tr_logAddDebug("Closing libevent thread"); } @@ -269,17 +222,11 @@ void tr_eventInit(tr_session* session) { session->events = nullptr; - auto* const eh = new tr_event_handle{}; + auto* const events = new tr_event_handle(); + events->session = session; + events->thread = tr_threadNew(libeventThreadFunc, events); - if (pipe(eh->fds) == -1) - { - tr_logAddError("Unable to write to pipe() in libtransmission: %s", tr_strerror(errno)); - } - - eh->session = session; - eh->thread = tr_threadNew(libeventThreadFunc, eh); - - /* wait until the libevent thread is running */ + // wait until the libevent thread is running while (session->events == nullptr) { tr_wait_msec(100); @@ -290,18 +237,18 @@ void tr_eventClose(tr_session* session) { TR_ASSERT(tr_isSession(session)); - if (session->events == nullptr) + auto* events = session->events; + if (events == nullptr) { return; } - session->events->die = true; + event_base_loopexit(events->base, nullptr); + if (tr_logGetDeepEnabled()) { tr_logAddDeep(__FILE__, __LINE__, nullptr, "closing trevent pipe"); } - - tr_netCloseSocket(session->events->fds[1]); } /** @@ -323,29 +270,19 @@ bool tr_amInEventThread(tr_session const* session) void tr_runInEventThread(tr_session* session, void (*func)(void*), void* user_data) { TR_ASSERT(tr_isSession(session)); - TR_ASSERT(session->events != nullptr); + auto* events = session->events; + TR_ASSERT(events != nullptr); - if (tr_amInThread(session->events->thread)) + if (tr_amInThread(events->thread)) { (*func)(user_data); } else { - tr_event_handle* e = session->events; - auto const lock = std::unique_lock(e->fds_mutex); - auto data = tr_run_data{}; + auto lock = std::unique_lock(events->work_queue_mutex); + events->work_queue.emplace_back(func, user_data); + lock.unlock(); - tr_pipe_end_t const fd = e->fds[1]; - char ch = 'r'; - ev_ssize_t const res_1 = pipewrite(fd, &ch, 1); - - data.func = func; - data.user_data = user_data; - ev_ssize_t const res_2 = pipewrite(fd, &data, sizeof(data)); - - if (res_1 == -1 || res_2 == -1) - { - tr_logAddError("Unable to write to libtransmisison event queue: %s", tr_strerror(errno)); - } + event_active(events->work_queue_event, 0, {}); } }