Introduce peer socket struct to improve readability

This commit is contained in:
Mike Gelfand 2017-06-28 18:46:06 +03:00
parent 74548e01b6
commit 18aabdeb06
14 changed files with 263 additions and 155 deletions

View File

@ -348,6 +348,7 @@
C1425B351EE9C5F5001DB85F /* tr-assert.c in Sources */ = {isa = PBXBuildFile; fileRef = C1425B321EE9C5EA001DB85F /* tr-assert.c */; };
C1425B361EE9C605001DB85F /* tr-assert.h in Headers */ = {isa = PBXBuildFile; fileRef = C1425B331EE9C5EA001DB85F /* tr-assert.h */; };
C1425B371EE9C705001DB85F /* tr-macros.h in Headers */ = {isa = PBXBuildFile; fileRef = C1425B341EE9C5EA001DB85F /* tr-macros.h */; };
C1425B381EE9C805001DB85F /* peer-socket.h in Headers */ = {isa = PBXBuildFile; fileRef = C1425B351EE9C5EA001DB85F /* peer-socket.h */; };
C1639A741A55F4E000E42033 /* libb64.a in Frameworks */ = {isa = PBXBuildFile; fileRef = C1639A6F1A55F4D600E42033 /* libb64.a */; };
C1639A781A55F56600E42033 /* cdecode.c in Sources */ = {isa = PBXBuildFile; fileRef = C1639A761A55F56600E42033 /* cdecode.c */; };
C1639A791A55F56600E42033 /* cencode.c in Sources */ = {isa = PBXBuildFile; fileRef = C1639A771A55F56600E42033 /* cencode.c */; };
@ -992,6 +993,7 @@
C1425B321EE9C5EA001DB85F /* tr-assert.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = "tr-assert.c"; path = "libtransmission/tr-assert.c"; sourceTree = "<group>"; };
C1425B331EE9C5EA001DB85F /* tr-assert.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = "tr-assert.h"; path = "libtransmission/tr-assert.h"; sourceTree = "<group>"; };
C1425B341EE9C5EA001DB85F /* tr-macros.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = "tr-macros.h"; path = "libtransmission/tr-macros.h"; sourceTree = "<group>"; };
C1425B351EE9C5EA001DB85F /* peer-socket.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = "peer-socket.h"; path = "libtransmission/peer-socket.h"; sourceTree = "<group>"; };
C1639A6F1A55F4D600E42033 /* libb64.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = libb64.a; sourceTree = BUILT_PRODUCTS_DIR; };
C1639A761A55F56600E42033 /* cdecode.c */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.c; name = cdecode.c; path = "third-party/libb64/src/cdecode.c"; sourceTree = "<group>"; };
C1639A771A55F56600E42033 /* cencode.c */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.c; name = cencode.c; path = "third-party/libb64/src/cencode.c"; sourceTree = "<group>"; };
@ -1364,6 +1366,7 @@
A22CFCA60FC24ED80009BD3E /* tr-dht.c */,
A22CFCA70FC24ED80009BD3E /* tr-dht.h */,
C1425B341EE9C5EA001DB85F /* tr-macros.h */,
C1425B351EE9C5EA001DB85F /* peer-socket.h */,
A284214212DA663E00FBDDBB /* tr-udp.c */,
A284214312DA663E00FBDDBB /* tr-udp.h */,
A2679292130E00A000CB7464 /* tr-utp.c */,
@ -1785,6 +1788,7 @@
BEFC1E3B0C07861A00B0BB3C /* platform.h in Headers */,
C1425B361EE9C605001DB85F /* tr-assert.h in Headers */,
C1425B371EE9C705001DB85F /* tr-macros.h in Headers */,
C1425B381EE9C805001DB85F /* peer-socket.h in Headers */,
BEFC1E450C07861A00B0BB3C /* net.h in Headers */,
BEFC1E490C07861A00B0BB3C /* metainfo.h in Headers */,
BEFC1E4D0C07861A00B0BB3C /* session.h in Headers */,

View File

@ -144,6 +144,7 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS
peer-io.h
peer-mgr.h
peer-msgs.h
peer-socket.h
platform.h
platform-quota.h
port-forwarding.h

View File

@ -145,6 +145,7 @@ noinst_HEADERS = \
peer-io.h \
peer-mgr.h \
peer-msgs.h \
peer-socket.h \
platform.h \
platform-quota.h \
port-forwarding.h \

View File

@ -1150,7 +1150,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake)
int errcode = errno;
tr_handshake* handshake = vhandshake;
if (io->utp_socket != NULL && !io->isIncoming && handshake->state == AWAITING_YB)
if (io->socket.type == TR_PEER_SOCKET_TYPE_UTP && !io->isIncoming && handshake->state == AWAITING_YB)
{
/* This peer probably doesn't speak uTP. */
@ -1171,7 +1171,7 @@ static void gotError(tr_peerIo* io, short what, void* vhandshake)
tr_peerMgrSetUtpFailed(tor, tr_peerIoGetAddress(io, NULL), true);
}
if (!tr_peerIoReconnect(handshake->io))
if (tr_peerIoReconnect(handshake->io) == 0)
{
uint8_t msg[HANDSHAKE_SIZE];
buildHandshakeMessage(handshake, msg);

View File

@ -240,10 +240,12 @@ static socklen_t setup_sockaddr(tr_address const* addr, tr_port port, struct soc
}
}
tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed)
struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed)
{
TR_ASSERT(tr_address_is_valid(addr));
struct tr_peer_socket ret = TR_PEER_SOCKET_INIT;
static int const domains[NUM_TR_AF_INET_TYPES] = { AF_INET, AF_INET6 };
tr_socket_t s;
struct sockaddr_storage sock;
@ -255,14 +257,14 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr
if (!tr_address_is_valid_for_peers(addr, port))
{
return TR_BAD_SOCKET; /* -EINVAL */
return ret;
}
s = tr_fdSocketCreate(session, domains[addr->type], SOCK_STREAM);
if (s == TR_BAD_SOCKET)
{
return TR_BAD_SOCKET;
return ret;
}
/* seeds don't need much of a read buffer... */
@ -280,7 +282,7 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr
if (evutil_make_socket_nonblocking(s) == -1)
{
tr_netClose(session, s);
return TR_BAD_SOCKET;
return ret;
}
addrlen = setup_sockaddr(addr, port, &sock);
@ -295,7 +297,7 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr
tr_logAddError(_("Couldn't set source address %s on %" PRIdMAX ": %s"), tr_address_to_string(source_addr), (intmax_t)s,
tr_net_strerror(err_buf, sizeof(err_buf), sockerrno));
tr_netClose(session, s);
return TR_BAD_SOCKET; /* -errno */
return ret;
}
if (connect(s, (struct sockaddr*)&sock, addrlen) == -1 &&
@ -304,8 +306,7 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr
#endif
sockerrno != EINPROGRESS)
{
int tmperrno;
tmperrno = sockerrno;
int const tmperrno = sockerrno;
if ((tmperrno != ENETUNREACH && tmperrno != EHOSTUNREACH) || addr->type == TR_AF_INET)
{
@ -314,24 +315,32 @@ tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr
}
tr_netClose(session, s);
s = TR_BAD_SOCKET; /* -tmperrno */
}
else
{
ret = tr_peer_socket_tcp_create(s);
}
tr_logAddDeep(__FILE__, __LINE__, NULL, "New OUTGOING connection %" PRIdMAX " (%s)", (intmax_t)s,
tr_peerIoAddrStr(addr, port));
return s;
return ret;
}
struct UTPSocket* tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed UNUSED)
struct tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed UNUSED)
{
struct UTPSocket* ret = NULL;
struct tr_peer_socket ret = TR_PEER_SOCKET_INIT;
if (tr_address_is_valid_for_peers(addr, port))
{
struct sockaddr_storage ss;
socklen_t const sslen = setup_sockaddr(addr, port, &ss);
ret = UTP_Create(tr_utpSendTo, session, (struct sockaddr*)&ss, sslen);
struct UTPSocket* const socket = UTP_Create(tr_utpSendTo, session, (struct sockaddr*)&ss, sslen);
if (socket != NULL)
{
ret = tr_peer_socket_utp_create(socket);
}
}
return ret;

View File

@ -120,9 +120,9 @@ static inline bool tr_address_is_valid(tr_address const* a)
struct tr_session;
tr_socket_t tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed);
struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed);
struct UTPSocket* tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed);
struct tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed);
tr_socket_t tr_netBindTCP(tr_address const* addr, tr_port port, bool suppressMsgs);

View File

@ -157,7 +157,7 @@ static void didWriteWrapper(tr_peerIo* io, unsigned int bytes_transferred)
unsigned int const payload = MIN(next->length, bytes_transferred);
/* For uTP sockets, the overhead is computed in utp_on_overhead. */
unsigned int const overhead = io->socket != TR_BAD_SOCKET ? guessPacketOverhead(payload) : 0;
unsigned int const overhead = io->socket.type == TR_PEER_SOCKET_TYPE_TCP ? guessPacketOverhead(payload) : 0;
uint64_t const now = tr_time_msec();
tr_bandwidthUsed(&io->bandwidth, TR_UP, payload, next->isPieceData, now);
@ -264,7 +264,7 @@ static void event_read_cb(evutil_socket_t fd, short event UNUSED, void* vio)
tr_peerIo* io = vio;
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(io->socket != TR_BAD_SOCKET);
TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
int res;
int e;
@ -350,7 +350,7 @@ static void event_write_cb(evutil_socket_t fd, short event UNUSED, void* vio)
tr_peerIo* io = vio;
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(io->socket != TR_BAD_SOCKET);
TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
int res = 0;
int e;
@ -623,20 +623,22 @@ static struct UTPFunctionTable dummy_utp_function_table =
#endif /* #ifdef WITH_UTP */
static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
uint8_t const* torrentHash, bool isIncoming, bool isSeed, tr_socket_t socket, struct UTPSocket* utp_socket)
uint8_t const* torrentHash, bool isIncoming, bool isSeed, struct tr_peer_socket const socket)
{
TR_ASSERT(session != NULL);
TR_ASSERT(session->events != NULL);
TR_ASSERT(tr_amInEventThread(session));
TR_ASSERT((socket == TR_BAD_SOCKET) == (utp_socket != NULL));
#ifndef WITH_UTP
TR_ASSERT(socket != TR_BAD_SOCKET);
#ifdef WITH_UTP
TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP || socket.type == TR_PEER_SOCKET_TYPE_UTP);
#else
TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP);
#endif
if (socket != TR_BAD_SOCKET)
if (socket.type == TR_PEER_SOCKET_TYPE_TCP)
{
tr_netSetTOS(socket, session->peerSocketTOS);
maybeSetCongestionAlgorithm(socket, session->peer_congestion_algorithm);
tr_netSetTOS(socket.handle.tcp, session->peerSocketTOS);
maybeSetCongestionAlgorithm(socket.handle.tcp, session->peer_congestion_algorithm);
}
tr_peerIo* io = tr_new0(tr_peerIo, 1);
@ -648,7 +650,6 @@ static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_add
io->isSeed = isSeed;
io->port = port;
io->socket = socket;
io->utp_socket = utp_socket;
io->isIncoming = isIncoming;
io->timeCreated = tr_time();
io->inbuf = evbuffer_new();
@ -656,41 +657,47 @@ static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_add
tr_bandwidthConstruct(&io->bandwidth, session, parent);
tr_bandwidthSetPeer(&io->bandwidth, io);
dbgmsg(io, "bandwidth is %p; its parent is %p", (void*)&io->bandwidth, (void*)parent);
dbgmsg(io, "socket is %" PRIdMAX ", utp_socket is %p", (intmax_t)socket, (void*)utp_socket);
if (io->socket != TR_BAD_SOCKET)
switch (socket.type)
{
io->event_read = event_new(session->event_base, io->socket, EV_READ, event_read_cb, io);
io->event_write = event_new(session->event_base, io->socket, EV_WRITE, event_write_cb, io);
}
case TR_PEER_SOCKET_TYPE_TCP:
dbgmsg(io, "socket (tcp) is %" PRIdMAX, (intmax_t)socket.handle.tcp);
io->event_read = event_new(session->event_base, socket.handle.tcp, EV_READ, event_read_cb, io);
io->event_write = event_new(session->event_base, socket.handle.tcp, EV_WRITE, event_write_cb, io);
break;
#ifdef WITH_UTP
else
{
UTP_SetSockopt(utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
case TR_PEER_SOCKET_TYPE_UTP:
dbgmsg(io, "socket (utp) is %p", (void*)socket.handle.utp);
UTP_SetSockopt(socket.handle.utp, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
dbgmsg(io, "%s", "calling UTP_SetCallbacks &utp_function_table");
UTP_SetCallbacks(utp_socket, &utp_function_table, io);
UTP_SetCallbacks(socket.handle.utp, &utp_function_table, io);
if (!isIncoming)
{
dbgmsg(io, "%s", "calling UTP_Connect");
UTP_Connect(utp_socket);
UTP_Connect(socket.handle.utp);
}
}
break;
#endif
default:
TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
}
return io;
}
tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port, tr_socket_t fd,
struct UTPSocket* utp_socket)
tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
struct tr_peer_socket const socket)
{
TR_ASSERT(session != NULL);
TR_ASSERT(tr_address_is_valid(addr));
return tr_peerIoNew(session, parent, addr, port, NULL, true, false, fd, utp_socket);
return tr_peerIoNew(session, parent, addr, port, NULL, true, false, socket);
}
tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
@ -700,26 +707,26 @@ tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_ad
TR_ASSERT(tr_address_is_valid(addr));
TR_ASSERT(torrentHash != NULL);
tr_socket_t fd = TR_BAD_SOCKET;
struct UTPSocket* utp_socket = NULL;
struct tr_peer_socket socket = TR_PEER_SOCKET_INIT;
if (utp)
{
utp_socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed);
socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed);
}
if (utp_socket == NULL)
if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
{
fd = tr_netOpenPeerSocket(session, addr, port, isSeed);
dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)fd);
socket = tr_netOpenPeerSocket(session, addr, port, isSeed);
dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)(socket.type != TR_PEER_SOCKET_TYPE_NONE ?
socket.handle.tcp : TR_BAD_SOCKET));
}
if (fd == TR_BAD_SOCKET && utp_socket == NULL)
if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
{
return NULL;
}
return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, fd, utp_socket);
return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, socket);
}
/***
@ -732,7 +739,9 @@ static void event_enable(tr_peerIo* io, short event)
TR_ASSERT(io->session != NULL);
TR_ASSERT(io->session->events != NULL);
if (io->socket != TR_BAD_SOCKET)
bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
if (need_events)
{
TR_ASSERT(event_initialized(io->event_read));
TR_ASSERT(event_initialized(io->event_write));
@ -742,7 +751,7 @@ static void event_enable(tr_peerIo* io, short event)
{
dbgmsg(io, "enabling ready-to-read polling");
if (io->socket != TR_BAD_SOCKET)
if (need_events)
{
event_add(io->event_read, NULL);
}
@ -754,7 +763,7 @@ static void event_enable(tr_peerIo* io, short event)
{
dbgmsg(io, "enabling ready-to-write polling");
if (io->socket != TR_BAD_SOCKET)
if (need_events)
{
event_add(io->event_write, NULL);
}
@ -769,7 +778,9 @@ static void event_disable(struct tr_peerIo* io, short event)
TR_ASSERT(io->session != NULL);
TR_ASSERT(io->session->events != NULL);
if (io->socket != TR_BAD_SOCKET)
bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
if (need_events)
{
TR_ASSERT(event_initialized(io->event_read));
TR_ASSERT(event_initialized(io->event_write));
@ -779,7 +790,7 @@ static void event_disable(struct tr_peerIo* io, short event)
{
dbgmsg(io, "disabling ready-to-read polling");
if (io->socket != TR_BAD_SOCKET)
if (need_events)
{
event_del(io->event_read);
}
@ -791,7 +802,7 @@ static void event_disable(struct tr_peerIo* io, short event)
{
dbgmsg(io, "disabling ready-to-write polling");
if (io->socket != TR_BAD_SOCKET)
if (need_events)
{
event_del(io->event_write);
}
@ -825,12 +836,30 @@ void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled)
static void io_close_socket(tr_peerIo* io)
{
if (io->socket != TR_BAD_SOCKET)
switch (io->socket.type)
{
tr_netClose(io->session, io->socket);
io->socket = TR_BAD_SOCKET;
case TR_PEER_SOCKET_TYPE_NONE:
break;
case TR_PEER_SOCKET_TYPE_TCP:
tr_netClose(io->session, io->socket.handle.tcp);
break;
#ifdef WITH_UTP
case TR_PEER_SOCKET_TYPE_UTP:
UTP_SetCallbacks(io->socket.handle.utp, &dummy_utp_function_table, NULL);
UTP_Close(io->socket.handle.utp);
break;
#endif
default:
TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
}
io->socket = TR_PEER_SOCKET_INIT;
if (io->event_read != NULL)
{
event_free(io->event_read);
@ -842,18 +871,6 @@ static void io_close_socket(tr_peerIo* io)
event_free(io->event_write);
io->event_write = NULL;
}
#ifdef WITH_UTP
if (io->utp_socket != NULL)
{
UTP_SetCallbacks(io->utp_socket, &dummy_utp_function_table, NULL);
UTP_Close(io->utp_socket);
io->utp_socket = NULL;
}
#endif
}
static void io_dtor(void* vio)
@ -966,18 +983,20 @@ int tr_peerIoReconnect(tr_peerIo* io)
io_close_socket(io);
io->socket = tr_netOpenPeerSocket(session, &io->addr, io->port, io->isSeed);
io->event_read = event_new(session->event_base, io->socket, EV_READ, event_read_cb, io);
io->event_write = event_new(session->event_base, io->socket, EV_WRITE, event_write_cb, io);
if (io->socket != TR_BAD_SOCKET)
if (io->socket.type != TR_PEER_SOCKET_TYPE_TCP)
{
event_enable(io, pendingEvents);
tr_netSetTOS(io->socket, session->peerSocketTOS);
maybeSetCongestionAlgorithm(io->socket, session->peer_congestion_algorithm);
return 0;
return -1;
}
return -1;
io->event_read = event_new(session->event_base, io->socket.handle.tcp, EV_READ, event_read_cb, io);
io->event_write = event_new(session->event_base, io->socket.handle.tcp, EV_WRITE, event_write_cb, io);
event_enable(io, pendingEvents);
tr_netSetTOS(io->socket.handle.tcp, session->peerSocketTOS);
maybeSetCongestionAlgorithm(io->socket.handle.tcp, session->peer_congestion_algorithm);
return 0;
}
/**
@ -1254,46 +1273,55 @@ static int tr_peerIoTryRead(tr_peerIo* io, size_t howmuch)
if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch)) != 0)
{
if (io->utp_socket != NULL) /* utp peer connection */
switch (io->socket.type)
{
case TR_PEER_SOCKET_TYPE_UTP:
/* UTP_RBDrained notifies libutp that your read buffer is emtpy.
* It opens up the congestion window by sending an ACK (soonish)
* if one was not going to be sent. */
if (evbuffer_get_length(io->inbuf) == 0)
{
UTP_RBDrained(io->utp_socket);
}
}
else /* tcp peer connection */
{
int e;
char err_buf[512];
EVUTIL_SET_SOCKET_ERROR(0);
res = evbuffer_read(io->inbuf, io->socket, (int)howmuch);
e = EVUTIL_SOCKET_ERROR();
dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : "");
if (evbuffer_get_length(io->inbuf))
{
canReadWrapper(io);
UTP_RBDrained(io->socket.handle.utp);
}
if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS)
{
short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
break;
if (res == 0)
case TR_PEER_SOCKET_TYPE_TCP:
{
int e;
char err_buf[512];
EVUTIL_SET_SOCKET_ERROR(0);
res = evbuffer_read(io->inbuf, io->socket.handle.tcp, (int)howmuch);
e = EVUTIL_SOCKET_ERROR();
dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : "");
if (evbuffer_get_length(io->inbuf))
{
what |= BEV_EVENT_EOF;
canReadWrapper(io);
}
dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
tr_net_strerror(err_buf, sizeof(err_buf), e));
if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS)
{
short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
io->gotError(io, what, io->userData);
if (res == 0)
{
what |= BEV_EVENT_EOF;
}
dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
tr_net_strerror(err_buf, sizeof(err_buf), e));
io->gotError(io, what, io->userData);
}
break;
}
default:
TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
}
}
@ -1313,34 +1341,42 @@ static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch)
if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_UP, howmuch)) != 0)
{
if (io->utp_socket != NULL) /* utp peer connection */
switch (io->socket.type)
{
UTP_Write(io->utp_socket, howmuch);
case TR_PEER_SOCKET_TYPE_UTP:
UTP_Write(io->socket.handle.utp, howmuch);
n = old_len - evbuffer_get_length(io->outbuf);
}
else
{
int e;
break;
EVUTIL_SET_SOCKET_ERROR(0);
n = tr_evbuffer_write(io, io->socket, howmuch);
e = EVUTIL_SOCKET_ERROR();
if (n > 0)
case TR_PEER_SOCKET_TYPE_TCP:
{
didWriteWrapper(io, n);
int e;
EVUTIL_SET_SOCKET_ERROR(0);
n = tr_evbuffer_write(io, io->socket.handle.tcp, howmuch);
e = EVUTIL_SOCKET_ERROR();
if (n > 0)
{
didWriteWrapper(io, n);
}
if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS)
{
char errstr[512];
short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e,
tr_net_strerror(errstr, sizeof(errstr), e));
io->gotError(io, what, io->userData);
}
break;
}
if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS)
{
char errstr[512];
short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e,
tr_net_strerror(errstr, sizeof(errstr), e));
io->gotError(io, what, io->userData);
}
default:
TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
}
}

View File

@ -22,6 +22,7 @@
#include "bandwidth.h"
#include "crypto.h"
#include "net.h" /* tr_address */
#include "peer-socket.h"
#include "utils.h" /* tr_time() */
struct evbuffer;
@ -76,8 +77,7 @@ typedef struct tr_peerIo
bool isSeed;
tr_port port;
tr_socket_t socket;
struct UTPSocket* utp_socket;
struct tr_peer_socket socket;
int refCount;
@ -113,7 +113,7 @@ tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, struct tr_bandwidth* parent
uint8_t const* torrentHash, bool isSeed, bool utp);
tr_peerIo* tr_peerIoNewIncoming(tr_session* session, struct tr_bandwidth* parent, struct tr_address const* addr, tr_port port,
tr_socket_t socket, struct UTPSocket* utp_socket);
struct tr_peer_socket socket);
void tr_peerIoRefImpl(char const* file, int line, tr_peerIo* io);

View File

@ -2071,7 +2071,7 @@ static bool myHandshakeDoneCB(tr_handshake* handshake, tr_peerIo* io, bool readA
/* In principle, this flag specifies whether the peer groks uTP,
not whether it's currently connected over uTP. */
if (io->utp_socket != NULL)
if (io->socket.type == TR_PEER_SOCKET_TYPE_UTP)
{
atom->flags |= ADDED_F_UTP_FLAGS;
}
@ -2123,8 +2123,31 @@ static bool myHandshakeDoneCB(tr_handshake* handshake, tr_peerIo* io, bool readA
return success;
}
void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, tr_socket_t socket,
struct UTPSocket* utp_socket)
static void close_peer_socket(struct tr_peer_socket const socket, tr_session* session)
{
switch (socket.type)
{
case TR_PEER_SOCKET_TYPE_NONE:
break;
case TR_PEER_SOCKET_TYPE_TCP:
tr_netClose(session, socket.handle.tcp);
break;
#ifdef WITH_UTP
case TR_PEER_SOCKET_TYPE_UTP:
UTP_Close(socket.handle.utp);
break;
#endif
default:
TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
}
}
void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, struct tr_peer_socket const socket)
{
TR_ASSERT(tr_isSession(manager->session));
@ -2135,33 +2158,18 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port,
if (tr_sessionIsAddressBlocked(session, addr))
{
tr_logAddDebug("Banned IP address \"%s\" tried to connect to us", tr_address_to_string(addr));
if (socket != TR_BAD_SOCKET)
{
tr_netClose(session, socket);
}
else
{
UTP_Close(utp_socket);
}
close_peer_socket(socket, session);
}
else if (getExistingHandshake(&manager->incomingHandshakes, addr) != NULL)
{
if (socket != TR_BAD_SOCKET)
{
tr_netClose(session, socket);
}
else
{
UTP_Close(utp_socket);
}
close_peer_socket(socket, session);
}
else /* we don't have a connection to them yet... */
{
tr_peerIo* io;
tr_handshake* handshake;
io = tr_peerIoNewIncoming(session, &session->bandwidth, addr, port, socket, utp_socket);
io = tr_peerIoNewIncoming(session, &session->bandwidth, addr, port, socket);
handshake = tr_handshakeNew(io, session->encryptionMode, myHandshakeDoneCB, manager);

View File

@ -87,8 +87,7 @@ bool tr_peerMgrDidPeerRequest(tr_torrent const* torrent, tr_peer const* peer, tr
void tr_peerMgrRebuildRequests(tr_torrent* torrent);
void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, tr_socket_t socket,
struct UTPSocket* utp_socket);
void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, struct tr_peer_socket socket);
tr_pex* tr_peerMgrCompactToPex(void const* compact, size_t compactLen, uint8_t const* added_f, size_t added_f_len,
size_t* setme_pex_count);

View File

@ -2655,7 +2655,7 @@ bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs)
{
TR_ASSERT(tr_isPeerMsgs(msgs));
return msgs->io->utp_socket != NULL;
return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP;
}
bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs)

View File

@ -0,0 +1,49 @@
/*
* This file Copyright (C) 2017 Mnemosyne LLC
*
* It may be used under the GNU GPL versions 2 or 3
* or any future license endorsed by Mnemosyne LLC.
*
*/
#pragma once
#ifndef __TRANSMISSION__
#error only libtransmission should #include this header.
#endif
#include "net.h"
#include "tr-assert.h"
enum tr_peer_socket_type
{
TR_PEER_SOCKET_TYPE_NONE,
TR_PEER_SOCKET_TYPE_TCP,
TR_PEER_SOCKET_TYPE_UTP
};
union tr_peer_socket_handle
{
tr_socket_t tcp;
struct UTPSocket* utp;
};
struct tr_peer_socket
{
enum tr_peer_socket_type type;
union tr_peer_socket_handle handle;
};
#define TR_PEER_SOCKET_INIT ((struct tr_peer_socket){ .type = TR_PEER_SOCKET_TYPE_NONE })
static inline struct tr_peer_socket tr_peer_socket_tcp_create(tr_socket_t const handle)
{
TR_ASSERT(handle != TR_BAD_SOCKET);
return (struct tr_peer_socket){ .type = TR_PEER_SOCKET_TYPE_TCP, .handle.tcp = handle };
}
static inline struct tr_peer_socket tr_peer_socket_utp_create(struct UTPSocket* const handle)
{
TR_ASSERT(handle != NULL);
return (struct tr_peer_socket){ .type = TR_PEER_SOCKET_TYPE_UTP, .handle.utp = handle };
}

View File

@ -175,7 +175,7 @@ static void accept_incoming_peer(evutil_socket_t fd, short what UNUSED, void* vs
{
tr_logAddDeep(__FILE__, __LINE__, NULL, "new incoming connection %" PRIdMAX " (%s)", (intmax_t)clientSocket,
tr_peerIoAddrStr(&clientAddr, clientPort));
tr_peerMgrAddIncoming(session->peerMgr, &clientAddr, clientPort, clientSocket, NULL);
tr_peerMgrAddIncoming(session->peerMgr, &clientAddr, clientPort, tr_peer_socket_tcp_create(clientSocket));
}
}

View File

@ -31,6 +31,7 @@ THE SOFTWARE.
#include "session.h"
#include "crypto-utils.h" /* tr_rand_int_weak() */
#include "peer-mgr.h"
#include "peer-socket.h"
#include "tr-assert.h"
#include "tr-utp.h"
#include "utils.h"
@ -117,7 +118,7 @@ static void incoming(void* closure, struct UTPSocket* s)
return;
}
tr_peerMgrAddIncoming(ss->peerMgr, &addr, port, TR_BAD_SOCKET, s);
tr_peerMgrAddIncoming(ss->peerMgr, &addr, port, tr_peer_socket_utp_create(s));
}
void tr_utpSendTo(void* closure, unsigned char const* buf, size_t buflen, struct sockaddr const* to, socklen_t tolen)