From 18aabdeb069edd77157639f1c0bfa1485b7ae15a Mon Sep 17 00:00:00 2001 From: Mike Gelfand Date: Wed, 28 Jun 2017 18:46:06 +0300 Subject: [PATCH] Introduce peer socket struct to improve readability --- Transmission.xcodeproj/project.pbxproj | 4 + libtransmission/CMakeLists.txt | 1 + libtransmission/Makefile.am | 1 + libtransmission/handshake.c | 4 +- libtransmission/net.c | 33 ++-- libtransmission/net.h | 4 +- libtransmission/peer-io.c | 256 ++++++++++++++----------- libtransmission/peer-io.h | 6 +- libtransmission/peer-mgr.c | 50 +++-- libtransmission/peer-mgr.h | 3 +- libtransmission/peer-msgs.c | 2 +- libtransmission/peer-socket.h | 49 +++++ libtransmission/session.c | 2 +- libtransmission/tr-utp.c | 3 +- 14 files changed, 263 insertions(+), 155 deletions(-) create mode 100644 libtransmission/peer-socket.h diff --git a/Transmission.xcodeproj/project.pbxproj b/Transmission.xcodeproj/project.pbxproj index 6f12fd3ac..9474ce94e 100644 --- a/Transmission.xcodeproj/project.pbxproj +++ b/Transmission.xcodeproj/project.pbxproj @@ -348,6 +348,7 @@ C1425B351EE9C5F5001DB85F /* tr-assert.c in Sources */ = {isa = PBXBuildFile; fileRef = C1425B321EE9C5EA001DB85F /* tr-assert.c */; }; C1425B361EE9C605001DB85F /* tr-assert.h in Headers */ = {isa = PBXBuildFile; fileRef = C1425B331EE9C5EA001DB85F /* tr-assert.h */; }; C1425B371EE9C705001DB85F /* tr-macros.h in Headers */ = {isa = PBXBuildFile; fileRef = C1425B341EE9C5EA001DB85F /* tr-macros.h */; }; + C1425B381EE9C805001DB85F /* peer-socket.h in Headers */ = {isa = PBXBuildFile; fileRef = C1425B351EE9C5EA001DB85F /* peer-socket.h */; }; C1639A741A55F4E000E42033 /* libb64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = C1639A6F1A55F4D600E42033 /* libb64.a */; }; C1639A781A55F56600E42033 /* cdecode.c in Sources */ = {isa = PBXBuildFile; fileRef = C1639A761A55F56600E42033 /* cdecode.c */; }; C1639A791A55F56600E42033 /* cencode.c in Sources */ = {isa = PBXBuildFile; fileRef = C1639A771A55F56600E42033 /* cencode.c */; }; @@ -992,6 +993,7 @@ C1425B321EE9C5EA001DB85F /* tr-assert.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = "tr-assert.c"; path = "libtransmission/tr-assert.c"; sourceTree = ""; }; C1425B331EE9C5EA001DB85F /* tr-assert.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = "tr-assert.h"; path = "libtransmission/tr-assert.h"; sourceTree = ""; }; C1425B341EE9C5EA001DB85F /* tr-macros.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = "tr-macros.h"; path = "libtransmission/tr-macros.h"; sourceTree = ""; }; + C1425B351EE9C5EA001DB85F /* peer-socket.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = "peer-socket.h"; path = "libtransmission/peer-socket.h"; sourceTree = ""; }; C1639A6F1A55F4D600E42033 /* libb64.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = libb64.a; sourceTree = BUILT_PRODUCTS_DIR; }; C1639A761A55F56600E42033 /* cdecode.c */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.c; name = cdecode.c; path = "third-party/libb64/src/cdecode.c"; sourceTree = ""; }; C1639A771A55F56600E42033 /* cencode.c */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.c; name = cencode.c; path = "third-party/libb64/src/cencode.c"; sourceTree = ""; }; @@ -1364,6 +1366,7 @@ A22CFCA60FC24ED80009BD3E /* tr-dht.c */, A22CFCA70FC24ED80009BD3E /* tr-dht.h */, C1425B341EE9C5EA001DB85F /* tr-macros.h */, + C1425B351EE9C5EA001DB85F /* peer-socket.h */, A284214212DA663E00FBDDBB /* tr-udp.c */, A284214312DA663E00FBDDBB /* tr-udp.h */, A2679292130E00A000CB7464 /* tr-utp.c */, @@ -1785,6 +1788,7 @@ BEFC1E3B0C07861A00B0BB3C /* platform.h in Headers */, C1425B361EE9C605001DB85F /* tr-assert.h in Headers */, C1425B371EE9C705001DB85F /* tr-macros.h in Headers */, + C1425B381EE9C805001DB85F /* peer-socket.h in Headers */, BEFC1E450C07861A00B0BB3C /* net.h in Headers */, BEFC1E490C07861A00B0BB3C /* metainfo.h in Headers */, BEFC1E4D0C07861A00B0BB3C /* session.h in Headers */, diff --git a/libtransmission/CMakeLists.txt b/libtransmission/CMakeLists.txt index 8b9fe33b7..99854b8ff 100644 --- a/libtransmission/CMakeLists.txt +++ b/libtransmission/CMakeLists.txt @@ -144,6 +144,7 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS peer-io.h peer-mgr.h peer-msgs.h + peer-socket.h platform.h platform-quota.h port-forwarding.h diff --git a/libtransmission/Makefile.am b/libtransmission/Makefile.am index f835d22b7..dfb66c7a9 100644 --- a/libtransmission/Makefile.am +++ b/libtransmission/Makefile.am @@ -145,6 +145,7 @@ noinst_HEADERS = \ peer-io.h \ peer-mgr.h \ peer-msgs.h \ + peer-socket.h \ platform.h \ platform-quota.h \ port-forwarding.h \ diff --git a/libtransmission/handshake.c b/libtransmission/handshake.c index 737b23633..a710ae070 100644 --- a/libtransmission/handshake.c +++ b/libtransmission/handshake.c @@ -1150,7 +1150,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake) int errcode = errno; tr_handshake* handshake = vhandshake; - if (io->utp_socket != NULL && !io->isIncoming && handshake->state == AWAITING_YB) + if (io->socket.type == TR_PEER_SOCKET_TYPE_UTP && !io->isIncoming && handshake->state == AWAITING_YB) { /* This peer probably doesn't speak uTP. */ @@ -1171,7 +1171,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake) tr_peerMgrSetUtpFailed(tor, tr_peerIoGetAddress(io, NULL), true); } - if (!tr_peerIoReconnect(handshake->io)) + if (tr_peerIoReconnect(handshake->io) == 0) { uint8_t msg[HANDSHAKE_SIZE]; buildHandshakeMessage(handshake, msg); diff --git a/libtransmission/net.c b/libtransmission/net.c index eae47c222..588dc2b27 100644 --- a/libtransmission/net.c +++ b/libtransmission/net.c @@ -240,10 +240,12 @@ static socklen_t setup_sockaddr(tr_address const* addr, tr_port port, struct soc } } -tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed) +struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed) { TR_ASSERT(tr_address_is_valid(addr)); + struct tr_peer_socket ret = TR_PEER_SOCKET_INIT; + static int const domains[NUM_TR_AF_INET_TYPES] = { AF_INET, AF_INET6 }; tr_socket_t s; struct sockaddr_storage sock; @@ -255,14 +257,14 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr if (!tr_address_is_valid_for_peers(addr, port)) { - return TR_BAD_SOCKET; /* -EINVAL */ + return ret; } s = tr_fdSocketCreate(session, domains[addr->type], SOCK_STREAM); if (s == TR_BAD_SOCKET) { - return TR_BAD_SOCKET; + return ret; } /* seeds don't need much of a read buffer... */ @@ -280,7 +282,7 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr if (evutil_make_socket_nonblocking(s) == -1) { tr_netClose(session, s); - return TR_BAD_SOCKET; + return ret; } addrlen = setup_sockaddr(addr, port, &sock); @@ -295,7 +297,7 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr tr_logAddError(_("Couldn't set source address %s on %" PRIdMAX ": %s"), tr_address_to_string(source_addr), (intmax_t)s, tr_net_strerror(err_buf, sizeof(err_buf), sockerrno)); tr_netClose(session, s); - return TR_BAD_SOCKET; /* -errno */ + return ret; } if (connect(s, (struct sockaddr*)&sock, addrlen) == -1 && @@ -304,8 +306,7 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr #endif sockerrno != EINPROGRESS) { - int tmperrno; - tmperrno = sockerrno; + int const tmperrno = sockerrno; if ((tmperrno != ENETUNREACH && tmperrno != EHOSTUNREACH) || addr->type == TR_AF_INET) { @@ -314,24 +315,32 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr } tr_netClose(session, s); - s = TR_BAD_SOCKET; /* -tmperrno */ + } + else + { + ret = tr_peer_socket_tcp_create(s); } tr_logAddDeep(__FILE__, __LINE__, NULL, "New OUTGOING connection %" PRIdMAX " (%s)", (intmax_t)s, tr_peerIoAddrStr(addr, port)); - return s; + return ret; } -struct UTPSocket* tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed UNUSED) +struct tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed UNUSED) { - struct UTPSocket* ret = NULL; + struct tr_peer_socket ret = TR_PEER_SOCKET_INIT; if (tr_address_is_valid_for_peers(addr, port)) { struct sockaddr_storage ss; socklen_t const sslen = setup_sockaddr(addr, port, &ss); - ret = UTP_Create(tr_utpSendTo, session, (struct sockaddr*)&ss, sslen); + struct UTPSocket* const socket = UTP_Create(tr_utpSendTo, session, (struct sockaddr*)&ss, sslen); + + if (socket != NULL) + { + ret = tr_peer_socket_utp_create(socket); + } } return ret; diff --git a/libtransmission/net.h b/libtransmission/net.h index 0a066e68c..8973dc57a 100644 --- a/libtransmission/net.h +++ b/libtransmission/net.h @@ -120,9 +120,9 @@ static inline bool tr_address_is_valid(tr_address const* a) struct tr_session; -tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed); +struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed); -struct UTPSocket* tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed); +struct tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed); tr_socket_t tr_netBindTCP(tr_address const* addr, tr_port port, bool suppressMsgs); diff --git a/libtransmission/peer-io.c b/libtransmission/peer-io.c index a96b7d749..969819b0c 100644 --- a/libtransmission/peer-io.c +++ b/libtransmission/peer-io.c @@ -157,7 +157,7 @@ static void didWriteWrapper(tr_peerIo* io, unsigned int bytes_transferred) unsigned int const payload = MIN(next->length, bytes_transferred); /* For uTP sockets, the overhead is computed in utp_on_overhead. */ - unsigned int const overhead = io->socket != TR_BAD_SOCKET ? guessPacketOverhead(payload) : 0; + unsigned int const overhead = io->socket.type == TR_PEER_SOCKET_TYPE_TCP ? guessPacketOverhead(payload) : 0; uint64_t const now = tr_time_msec(); tr_bandwidthUsed(&io->bandwidth, TR_UP, payload, next->isPieceData, now); @@ -264,7 +264,7 @@ static void event_read_cb(evutil_socket_t fd, short event UNUSED, void* vio) tr_peerIo* io = vio; TR_ASSERT(tr_isPeerIo(io)); - TR_ASSERT(io->socket != TR_BAD_SOCKET); + TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP); int res; int e; @@ -350,7 +350,7 @@ static void event_write_cb(evutil_socket_t fd, short event UNUSED, void* vio) tr_peerIo* io = vio; TR_ASSERT(tr_isPeerIo(io)); - TR_ASSERT(io->socket != TR_BAD_SOCKET); + TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP); int res = 0; int e; @@ -623,20 +623,22 @@ static struct UTPFunctionTable dummy_utp_function_table = #endif /* #ifdef WITH_UTP */ static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port, - uint8_t const* torrentHash, bool isIncoming, bool isSeed, tr_socket_t socket, struct UTPSocket* utp_socket) + uint8_t const* torrentHash, bool isIncoming, bool isSeed, struct tr_peer_socket const socket) { TR_ASSERT(session != NULL); TR_ASSERT(session->events != NULL); TR_ASSERT(tr_amInEventThread(session)); - TR_ASSERT((socket == TR_BAD_SOCKET) == (utp_socket != NULL)); -#ifndef WITH_UTP - TR_ASSERT(socket != TR_BAD_SOCKET); + +#ifdef WITH_UTP + TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP || socket.type == TR_PEER_SOCKET_TYPE_UTP); +#else + TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP); #endif - if (socket != TR_BAD_SOCKET) + if (socket.type == TR_PEER_SOCKET_TYPE_TCP) { - tr_netSetTOS(socket, session->peerSocketTOS); - maybeSetCongestionAlgorithm(socket, session->peer_congestion_algorithm); + tr_netSetTOS(socket.handle.tcp, session->peerSocketTOS); + maybeSetCongestionAlgorithm(socket.handle.tcp, session->peer_congestion_algorithm); } tr_peerIo* io = tr_new0(tr_peerIo, 1); @@ -648,7 +650,6 @@ static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_add io->isSeed = isSeed; io->port = port; io->socket = socket; - io->utp_socket = utp_socket; io->isIncoming = isIncoming; io->timeCreated = tr_time(); io->inbuf = evbuffer_new(); @@ -656,41 +657,47 @@ static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_add tr_bandwidthConstruct(&io->bandwidth, session, parent); tr_bandwidthSetPeer(&io->bandwidth, io); dbgmsg(io, "bandwidth is %p; its parent is %p", (void*)&io->bandwidth, (void*)parent); - dbgmsg(io, "socket is %" PRIdMAX ", utp_socket is %p", (intmax_t)socket, (void*)utp_socket); - if (io->socket != TR_BAD_SOCKET) + switch (socket.type) { - io->event_read = event_new(session->event_base, io->socket, EV_READ, event_read_cb, io); - io->event_write = event_new(session->event_base, io->socket, EV_WRITE, event_write_cb, io); - } + case TR_PEER_SOCKET_TYPE_TCP: + dbgmsg(io, "socket (tcp) is %" PRIdMAX, (intmax_t)socket.handle.tcp); + io->event_read = event_new(session->event_base, socket.handle.tcp, EV_READ, event_read_cb, io); + io->event_write = event_new(session->event_base, socket.handle.tcp, EV_WRITE, event_write_cb, io); + break; #ifdef WITH_UTP - else - { - UTP_SetSockopt(utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE); + case TR_PEER_SOCKET_TYPE_UTP: + dbgmsg(io, "socket (utp) is %p", (void*)socket.handle.utp); + UTP_SetSockopt(socket.handle.utp, SO_RCVBUF, UTP_READ_BUFFER_SIZE); dbgmsg(io, "%s", "calling UTP_SetCallbacks &utp_function_table"); - UTP_SetCallbacks(utp_socket, &utp_function_table, io); + UTP_SetCallbacks(socket.handle.utp, &utp_function_table, io); if (!isIncoming) { dbgmsg(io, "%s", "calling UTP_Connect"); - UTP_Connect(utp_socket); + UTP_Connect(socket.handle.utp); } - } + + break; #endif + default: + TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type); + } + return io; } -tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port, tr_socket_t fd, - struct UTPSocket* utp_socket) +tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port, + struct tr_peer_socket const socket) { TR_ASSERT(session != NULL); TR_ASSERT(tr_address_is_valid(addr)); - return tr_peerIoNew(session, parent, addr, port, NULL, true, false, fd, utp_socket); + return tr_peerIoNew(session, parent, addr, port, NULL, true, false, socket); } tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port, @@ -700,26 +707,26 @@ tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_ad TR_ASSERT(tr_address_is_valid(addr)); TR_ASSERT(torrentHash != NULL); - tr_socket_t fd = TR_BAD_SOCKET; - struct UTPSocket* utp_socket = NULL; + struct tr_peer_socket socket = TR_PEER_SOCKET_INIT; if (utp) { - utp_socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed); + socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed); } - if (utp_socket == NULL) + if (socket.type == TR_PEER_SOCKET_TYPE_NONE) { - fd = tr_netOpenPeerSocket(session, addr, port, isSeed); - dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)fd); + socket = tr_netOpenPeerSocket(session, addr, port, isSeed); + dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)(socket.type != TR_PEER_SOCKET_TYPE_NONE ? + socket.handle.tcp : TR_BAD_SOCKET)); } - if (fd == TR_BAD_SOCKET && utp_socket == NULL) + if (socket.type == TR_PEER_SOCKET_TYPE_NONE) { return NULL; } - return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, fd, utp_socket); + return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, socket); } /*** @@ -732,7 +739,9 @@ static void event_enable(tr_peerIo* io, short event) TR_ASSERT(io->session != NULL); TR_ASSERT(io->session->events != NULL); - if (io->socket != TR_BAD_SOCKET) + bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP; + + if (need_events) { TR_ASSERT(event_initialized(io->event_read)); TR_ASSERT(event_initialized(io->event_write)); @@ -742,7 +751,7 @@ static void event_enable(tr_peerIo* io, short event) { dbgmsg(io, "enabling ready-to-read polling"); - if (io->socket != TR_BAD_SOCKET) + if (need_events) { event_add(io->event_read, NULL); } @@ -754,7 +763,7 @@ static void event_enable(tr_peerIo* io, short event) { dbgmsg(io, "enabling ready-to-write polling"); - if (io->socket != TR_BAD_SOCKET) + if (need_events) { event_add(io->event_write, NULL); } @@ -769,7 +778,9 @@ static void event_disable(struct tr_peerIo* io, short event) TR_ASSERT(io->session != NULL); TR_ASSERT(io->session->events != NULL); - if (io->socket != TR_BAD_SOCKET) + bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP; + + if (need_events) { TR_ASSERT(event_initialized(io->event_read)); TR_ASSERT(event_initialized(io->event_write)); @@ -779,7 +790,7 @@ static void event_disable(struct tr_peerIo* io, short event) { dbgmsg(io, "disabling ready-to-read polling"); - if (io->socket != TR_BAD_SOCKET) + if (need_events) { event_del(io->event_read); } @@ -791,7 +802,7 @@ static void event_disable(struct tr_peerIo* io, short event) { dbgmsg(io, "disabling ready-to-write polling"); - if (io->socket != TR_BAD_SOCKET) + if (need_events) { event_del(io->event_write); } @@ -825,12 +836,30 @@ void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled) static void io_close_socket(tr_peerIo* io) { - if (io->socket != TR_BAD_SOCKET) + switch (io->socket.type) { - tr_netClose(io->session, io->socket); - io->socket = TR_BAD_SOCKET; + case TR_PEER_SOCKET_TYPE_NONE: + break; + + case TR_PEER_SOCKET_TYPE_TCP: + tr_netClose(io->session, io->socket.handle.tcp); + break; + +#ifdef WITH_UTP + + case TR_PEER_SOCKET_TYPE_UTP: + UTP_SetCallbacks(io->socket.handle.utp, &dummy_utp_function_table, NULL); + UTP_Close(io->socket.handle.utp); + break; + +#endif + + default: + TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type); } + io->socket = TR_PEER_SOCKET_INIT; + if (io->event_read != NULL) { event_free(io->event_read); @@ -842,18 +871,6 @@ static void io_close_socket(tr_peerIo* io) event_free(io->event_write); io->event_write = NULL; } - -#ifdef WITH_UTP - - if (io->utp_socket != NULL) - { - UTP_SetCallbacks(io->utp_socket, &dummy_utp_function_table, NULL); - UTP_Close(io->utp_socket); - - io->utp_socket = NULL; - } - -#endif } static void io_dtor(void* vio) @@ -966,18 +983,20 @@ int tr_peerIoReconnect(tr_peerIo* io) io_close_socket(io); io->socket = tr_netOpenPeerSocket(session, &io->addr, io->port, io->isSeed); - io->event_read = event_new(session->event_base, io->socket, EV_READ, event_read_cb, io); - io->event_write = event_new(session->event_base, io->socket, EV_WRITE, event_write_cb, io); - if (io->socket != TR_BAD_SOCKET) + if (io->socket.type != TR_PEER_SOCKET_TYPE_TCP) { - event_enable(io, pendingEvents); - tr_netSetTOS(io->socket, session->peerSocketTOS); - maybeSetCongestionAlgorithm(io->socket, session->peer_congestion_algorithm); - return 0; + return -1; } - return -1; + io->event_read = event_new(session->event_base, io->socket.handle.tcp, EV_READ, event_read_cb, io); + io->event_write = event_new(session->event_base, io->socket.handle.tcp, EV_WRITE, event_write_cb, io); + + event_enable(io, pendingEvents); + tr_netSetTOS(io->socket.handle.tcp, session->peerSocketTOS); + maybeSetCongestionAlgorithm(io->socket.handle.tcp, session->peer_congestion_algorithm); + + return 0; } /** @@ -1254,46 +1273,55 @@ static int tr_peerIoTryRead(tr_peerIo* io, size_t howmuch) if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch)) != 0) { - if (io->utp_socket != NULL) /* utp peer connection */ + switch (io->socket.type) { + case TR_PEER_SOCKET_TYPE_UTP: /* UTP_RBDrained notifies libutp that your read buffer is emtpy. * It opens up the congestion window by sending an ACK (soonish) * if one was not going to be sent. */ if (evbuffer_get_length(io->inbuf) == 0) { - UTP_RBDrained(io->utp_socket); - } - } - else /* tcp peer connection */ - { - int e; - char err_buf[512]; - - EVUTIL_SET_SOCKET_ERROR(0); - res = evbuffer_read(io->inbuf, io->socket, (int)howmuch); - e = EVUTIL_SOCKET_ERROR(); - - dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : ""); - - if (evbuffer_get_length(io->inbuf)) - { - canReadWrapper(io); + UTP_RBDrained(io->socket.handle.utp); } - if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS) - { - short what = BEV_EVENT_READING | BEV_EVENT_ERROR; + break; - if (res == 0) + case TR_PEER_SOCKET_TYPE_TCP: + { + int e; + char err_buf[512]; + + EVUTIL_SET_SOCKET_ERROR(0); + res = evbuffer_read(io->inbuf, io->socket.handle.tcp, (int)howmuch); + e = EVUTIL_SOCKET_ERROR(); + + dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : ""); + + if (evbuffer_get_length(io->inbuf)) { - what |= BEV_EVENT_EOF; + canReadWrapper(io); } - dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, - tr_net_strerror(err_buf, sizeof(err_buf), e)); + if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS) + { + short what = BEV_EVENT_READING | BEV_EVENT_ERROR; - io->gotError(io, what, io->userData); + if (res == 0) + { + what |= BEV_EVENT_EOF; + } + + dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, + tr_net_strerror(err_buf, sizeof(err_buf), e)); + + io->gotError(io, what, io->userData); + } + + break; } + + default: + TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type); } } @@ -1313,34 +1341,42 @@ static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch) if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_UP, howmuch)) != 0) { - if (io->utp_socket != NULL) /* utp peer connection */ + switch (io->socket.type) { - UTP_Write(io->utp_socket, howmuch); + case TR_PEER_SOCKET_TYPE_UTP: + UTP_Write(io->socket.handle.utp, howmuch); n = old_len - evbuffer_get_length(io->outbuf); - } - else - { - int e; + break; - EVUTIL_SET_SOCKET_ERROR(0); - n = tr_evbuffer_write(io, io->socket, howmuch); - e = EVUTIL_SOCKET_ERROR(); - - if (n > 0) + case TR_PEER_SOCKET_TYPE_TCP: { - didWriteWrapper(io, n); + int e; + + EVUTIL_SET_SOCKET_ERROR(0); + n = tr_evbuffer_write(io, io->socket.handle.tcp, howmuch); + e = EVUTIL_SOCKET_ERROR(); + + if (n > 0) + { + didWriteWrapper(io, n); + } + + if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS) + { + char errstr[512]; + short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR; + + dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, + tr_net_strerror(errstr, sizeof(errstr), e)); + + io->gotError(io, what, io->userData); + } + + break; } - if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS) - { - char errstr[512]; - short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR; - - dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, - tr_net_strerror(errstr, sizeof(errstr), e)); - - io->gotError(io, what, io->userData); - } + default: + TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type); } } diff --git a/libtransmission/peer-io.h b/libtransmission/peer-io.h index ff05ffbca..471633128 100644 --- a/libtransmission/peer-io.h +++ b/libtransmission/peer-io.h @@ -22,6 +22,7 @@ #include "bandwidth.h" #include "crypto.h" #include "net.h" /* tr_address */ +#include "peer-socket.h" #include "utils.h" /* tr_time() */ struct evbuffer; @@ -76,8 +77,7 @@ typedef struct tr_peerIo bool isSeed; tr_port port; - tr_socket_t socket; - struct UTPSocket* utp_socket; + struct tr_peer_socket socket; int refCount; @@ -113,7 +113,7 @@ tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, struct tr_bandwidth* parent uint8_t const* torrentHash, bool isSeed, bool utp); tr_peerIo* tr_peerIoNewIncoming(tr_session* session, struct tr_bandwidth* parent, struct tr_address const* addr, tr_port port, - tr_socket_t socket, struct UTPSocket* utp_socket); + struct tr_peer_socket socket); void tr_peerIoRefImpl(char const* file, int line, tr_peerIo* io); diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index d7283bb43..08cc18212 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -2071,7 +2071,7 @@ static bool myHandshakeDoneCB(tr_handshake* handshake, tr_peerIo* io, bool readA /* In principle, this flag specifies whether the peer groks uTP, not whether it's currently connected over uTP. */ - if (io->utp_socket != NULL) + if (io->socket.type == TR_PEER_SOCKET_TYPE_UTP) { atom->flags |= ADDED_F_UTP_FLAGS; } @@ -2123,8 +2123,31 @@ static bool myHandshakeDoneCB(tr_handshake* handshake, tr_peerIo* io, bool readA return success; } -void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, tr_socket_t socket, - struct UTPSocket* utp_socket) +static void close_peer_socket(struct tr_peer_socket const socket, tr_session* session) +{ + switch (socket.type) + { + case TR_PEER_SOCKET_TYPE_NONE: + break; + + case TR_PEER_SOCKET_TYPE_TCP: + tr_netClose(session, socket.handle.tcp); + break; + +#ifdef WITH_UTP + + case TR_PEER_SOCKET_TYPE_UTP: + UTP_Close(socket.handle.utp); + break; + +#endif + + default: + TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type); + } +} + +void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, struct tr_peer_socket const socket) { TR_ASSERT(tr_isSession(manager->session)); @@ -2135,33 +2158,18 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, if (tr_sessionIsAddressBlocked(session, addr)) { tr_logAddDebug("Banned IP address \"%s\" tried to connect to us", tr_address_to_string(addr)); - - if (socket != TR_BAD_SOCKET) - { - tr_netClose(session, socket); - } - else - { - UTP_Close(utp_socket); - } + close_peer_socket(socket, session); } else if (getExistingHandshake(&manager->incomingHandshakes, addr) != NULL) { - if (socket != TR_BAD_SOCKET) - { - tr_netClose(session, socket); - } - else - { - UTP_Close(utp_socket); - } + close_peer_socket(socket, session); } else /* we don't have a connection to them yet... */ { tr_peerIo* io; tr_handshake* handshake; - io = tr_peerIoNewIncoming(session, &session->bandwidth, addr, port, socket, utp_socket); + io = tr_peerIoNewIncoming(session, &session->bandwidth, addr, port, socket); handshake = tr_handshakeNew(io, session->encryptionMode, myHandshakeDoneCB, manager); diff --git a/libtransmission/peer-mgr.h b/libtransmission/peer-mgr.h index f6d70e5e9..8f0c53c02 100644 --- a/libtransmission/peer-mgr.h +++ b/libtransmission/peer-mgr.h @@ -87,8 +87,7 @@ bool tr_peerMgrDidPeerRequest(tr_torrent const* torrent, tr_peer const* peer, tr void tr_peerMgrRebuildRequests(tr_torrent* torrent); -void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, tr_socket_t socket, - struct UTPSocket* utp_socket); +void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, struct tr_peer_socket socket); tr_pex* tr_peerMgrCompactToPex(void const* compact, size_t compactLen, uint8_t const* added_f, size_t added_f_len, size_t* setme_pex_count); diff --git a/libtransmission/peer-msgs.c b/libtransmission/peer-msgs.c index d347de134..268a3878b 100644 --- a/libtransmission/peer-msgs.c +++ b/libtransmission/peer-msgs.c @@ -2655,7 +2655,7 @@ bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); - return msgs->io->utp_socket != NULL; + return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP; } bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs) diff --git a/libtransmission/peer-socket.h b/libtransmission/peer-socket.h new file mode 100644 index 000000000..97cf8158e --- /dev/null +++ b/libtransmission/peer-socket.h @@ -0,0 +1,49 @@ +/* + * This file Copyright (C) 2017 Mnemosyne LLC + * + * It may be used under the GNU GPL versions 2 or 3 + * or any future license endorsed by Mnemosyne LLC. + * + */ + +#pragma once + +#ifndef __TRANSMISSION__ +#error only libtransmission should #include this header. +#endif + +#include "net.h" +#include "tr-assert.h" + +enum tr_peer_socket_type +{ + TR_PEER_SOCKET_TYPE_NONE, + TR_PEER_SOCKET_TYPE_TCP, + TR_PEER_SOCKET_TYPE_UTP +}; + +union tr_peer_socket_handle +{ + tr_socket_t tcp; + struct UTPSocket* utp; +}; + +struct tr_peer_socket +{ + enum tr_peer_socket_type type; + union tr_peer_socket_handle handle; +}; + +#define TR_PEER_SOCKET_INIT ((struct tr_peer_socket){ .type = TR_PEER_SOCKET_TYPE_NONE }) + +static inline struct tr_peer_socket tr_peer_socket_tcp_create(tr_socket_t const handle) +{ + TR_ASSERT(handle != TR_BAD_SOCKET); + return (struct tr_peer_socket){ .type = TR_PEER_SOCKET_TYPE_TCP, .handle.tcp = handle }; +} + +static inline struct tr_peer_socket tr_peer_socket_utp_create(struct UTPSocket* const handle) +{ + TR_ASSERT(handle != NULL); + return (struct tr_peer_socket){ .type = TR_PEER_SOCKET_TYPE_UTP, .handle.utp = handle }; +} diff --git a/libtransmission/session.c b/libtransmission/session.c index e9a5f6d0f..86d054f7f 100644 --- a/libtransmission/session.c +++ b/libtransmission/session.c @@ -175,7 +175,7 @@ static void accept_incoming_peer(evutil_socket_t fd, short what UNUSED, void* vs { tr_logAddDeep(__FILE__, __LINE__, NULL, "new incoming connection %" PRIdMAX " (%s)", (intmax_t)clientSocket, tr_peerIoAddrStr(&clientAddr, clientPort)); - tr_peerMgrAddIncoming(session->peerMgr, &clientAddr, clientPort, clientSocket, NULL); + tr_peerMgrAddIncoming(session->peerMgr, &clientAddr, clientPort, tr_peer_socket_tcp_create(clientSocket)); } } diff --git a/libtransmission/tr-utp.c b/libtransmission/tr-utp.c index 9d3579bb7..d85c9efbf 100644 --- a/libtransmission/tr-utp.c +++ b/libtransmission/tr-utp.c @@ -31,6 +31,7 @@ THE SOFTWARE. #include "session.h" #include "crypto-utils.h" /* tr_rand_int_weak() */ #include "peer-mgr.h" +#include "peer-socket.h" #include "tr-assert.h" #include "tr-utp.h" #include "utils.h" @@ -117,7 +118,7 @@ static void incoming(void* closure, struct UTPSocket* s) return; } - tr_peerMgrAddIncoming(ss->peerMgr, &addr, port, TR_BAD_SOCKET, s); + tr_peerMgrAddIncoming(ss->peerMgr, &addr, port, tr_peer_socket_utp_create(s)); } void tr_utpSendTo(void* closure, unsigned char const* buf, size_t buflen, struct sockaddr const* to, socklen_t tolen)