/* * This file Copyright (C) 2007-2014 Mnemosyne LLC * * It may be used under the GNU GPL versions 2 or 3 * or any future license endorsed by Mnemosyne LLC. * */ #include #include #include #include #include #include #include #include "transmission.h" #include "cache.h" #include "completion.h" #include "file.h" #include "log.h" #include "peer-io.h" #include "peer-mgr.h" #include "peer-msgs.h" #include "session.h" #include "torrent.h" #include "torrent-magnet.h" #include "tr-assert.h" #include "tr-dht.h" #include "utils.h" #include "variant.h" #include "version.h" #ifndef EBADMSG #define EBADMSG EINVAL #endif /** *** **/ enum { BT_CHOKE = 0, BT_UNCHOKE = 1, BT_INTERESTED = 2, BT_NOT_INTERESTED = 3, BT_HAVE = 4, BT_BITFIELD = 5, BT_REQUEST = 6, BT_PIECE = 7, BT_CANCEL = 8, BT_PORT = 9, /* */ BT_FEXT_SUGGEST = 13, BT_FEXT_HAVE_ALL = 14, BT_FEXT_HAVE_NONE = 15, BT_FEXT_REJECT = 16, BT_FEXT_ALLOWED_FAST = 17, /* */ BT_LTEP = 20, /* */ LTEP_HANDSHAKE = 0, /* */ UT_PEX_ID = 1, UT_METADATA_ID = 3, /* */ MAX_PEX_PEER_COUNT = 50, /* */ MIN_CHOKE_PERIOD_SEC = 10, /* idle seconds before we send a keepalive */ KEEPALIVE_INTERVAL_SECS = 100, /* */ PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */ /* */ REQQ = 512, /* */ METADATA_REQQ = 64, /* */ MAGIC_NUMBER = 21549, /* used in lowering the outMessages queue period */ IMMEDIATE_PRIORITY_INTERVAL_SECS = 0, HIGH_PRIORITY_INTERVAL_SECS = 2, LOW_PRIORITY_INTERVAL_SECS = 10, /* number of pieces we'll allow in our fast set */ MAX_FAST_SET_SIZE = 3, /* how many blocks to keep prefetched per peer */ PREFETCH_SIZE = 18, /* when we're making requests from another peer, batch them together to send enough requests to meet our bandwidth goals for the next N seconds */ REQUEST_BUF_SECS = 10, /* defined in BEP #9 */ METADATA_MSG_TYPE_REQUEST = 0, METADATA_MSG_TYPE_DATA = 1, METADATA_MSG_TYPE_REJECT = 2 }; enum { AWAITING_BT_LENGTH, AWAITING_BT_ID, AWAITING_BT_MESSAGE, AWAITING_BT_PIECE }; typedef enum { ENCRYPTION_PREFERENCE_UNKNOWN, ENCRYPTION_PREFERENCE_YES, ENCRYPTION_PREFERENCE_NO } encryption_preference_t; /** *** **/ struct peer_request { uint32_t index; uint32_t offset; uint32_t length; }; static void blockToReq(tr_torrent const* tor, tr_block_index_t block, struct peer_request* setme) { tr_torrentGetBlockLocation(tor, block, &setme->index, &setme->offset, &setme->length); } /** *** **/ /* this is raw, unchanged data from the peer regarding * the current message that it's sending us. */ struct tr_incoming { uint8_t id; uint32_t length; /* includes the +1 for id length */ struct peer_request blockReq; /* metadata for incoming blocks */ struct evbuffer* block; /* piece data for incoming blocks */ }; /** * Low-level communication state information about a connected peer. * * This structure remembers the low-level protocol states that we're * in with this peer, such as active requests, pex messages, and so on. * Its fields are all private to peer-msgs.c. * * Data not directly involved with sending & receiving messages is * stored in tr_peer, where it can be accessed by both peermsgs and * the peer manager. * * @see struct peer_atom * @see tr_peer */ struct tr_peerMsgs { struct tr_peer peer; /* parent */ uint16_t magic_number; /* Whether or not we've choked this peer. */ bool peer_is_choked; /* whether or not the peer has indicated it will download from us. */ bool peer_is_interested; /* whether or not the peer is choking us. */ bool client_is_choked; /* whether or not we've indicated to the peer that we would download from them if unchoked. */ bool client_is_interested; bool peerSupportsPex; bool peerSupportsMetadataXfer; bool clientSentLtepHandshake; bool peerSentLtepHandshake; /*bool haveFastSet;*/ int desiredRequestCount; int prefetchCount; bool is_active[2]; /* how long the outMessages batch should be allowed to grow before * it's flushed -- some messages (like requests >:) should be sent * very quickly; others aren't as urgent. */ int8_t outMessagesBatchPeriod; uint8_t state; uint8_t ut_pex_id; uint8_t ut_metadata_id; uint16_t pexCount; uint16_t pexCount6; tr_port dht_port; encryption_preference_t encryption_preference; size_t metadata_size_hint; #if 0 size_t fastsetSize; tr_piece_index_t fastset[MAX_FAST_SET_SIZE]; #endif tr_torrent* torrent; tr_peer_callback callback; void* callbackData; struct evbuffer* outMessages; /* all the non-piece messages */ struct peer_request peerAskedFor[REQQ]; int peerAskedForMetadata[METADATA_REQQ]; int peerAskedForMetadataCount; tr_pex* pex; tr_pex* pex6; /*time_t clientSentPexAt;*/ time_t clientSentAnythingAt; time_t chokeChangedAt; /* when we started batching the outMessages */ time_t outMessagesBatchedAt; struct tr_incoming incoming; /* if the peer supports the Extension Protocol in BEP 10 and supplied a reqq argument, it's stored here. Otherwise, the value is zero and should be ignored. */ int64_t reqq; struct event* pexTimer; struct tr_peerIo* io; }; /** *** **/ static inline tr_session* getSession(struct tr_peerMsgs* msgs) { return msgs->torrent->session; } /** *** **/ static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5); static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) { tr_sys_file_t const fp = tr_logGetFile(); if (fp != TR_BAD_SYS_FILE) { va_list args; char timestr[64]; struct evbuffer* buf = evbuffer_new(); char* base = tr_sys_path_basename(file, NULL); char* message; evbuffer_add_printf(buf, "[%s] %s - %s [%s]: ", tr_logGetTimeStr(timestr, sizeof(timestr)), tr_torrentName( msgs->torrent), tr_peerIoGetAddrStr(msgs->io), tr_quark_get_string(msgs->peer.client, NULL)); va_start(args, fmt); evbuffer_add_vprintf(buf, fmt, args); va_end(args); evbuffer_add_printf(buf, " (%s:%d)", base, line); message = evbuffer_free_to_str(buf, NULL); tr_sys_file_write_line(fp, message, NULL); tr_free(base); tr_free(message); } } #define dbgmsg(msgs, ...) \ do \ { \ if (tr_logGetDeepEnabled()) \ { \ myDebug(__FILE__, __LINE__, msgs, __VA_ARGS__); \ } \ } \ while (0) /** *** **/ static void pokeBatchPeriod(tr_peerMsgs* msgs, int interval) { if (msgs->outMessagesBatchPeriod > interval) { msgs->outMessagesBatchPeriod = interval; dbgmsg(msgs, "lowering batch interval to %d seconds", interval); } } static void dbgOutMessageLen(tr_peerMsgs* msgs) { dbgmsg(msgs, "outMessage size is now %zu", evbuffer_get_length(msgs->outMessages)); } static void protocolSendReject(tr_peerMsgs* msgs, struct peer_request const* req) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t)); evbuffer_add_uint8(out, BT_FEXT_REJECT); evbuffer_add_uint32(out, req->index); evbuffer_add_uint32(out, req->offset); evbuffer_add_uint32(out, req->length); dbgmsg(msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length); dbgOutMessageLen(msgs); } static void protocolSendRequest(tr_peerMsgs* msgs, struct peer_request const* req) { struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t)); evbuffer_add_uint8(out, BT_REQUEST); evbuffer_add_uint32(out, req->index); evbuffer_add_uint32(out, req->offset); evbuffer_add_uint32(out, req->length); dbgmsg(msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } static void protocolSendCancel(tr_peerMsgs* msgs, struct peer_request const* req) { struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t)); evbuffer_add_uint8(out, BT_CANCEL); evbuffer_add_uint32(out, req->index); evbuffer_add_uint32(out, req->offset); evbuffer_add_uint32(out, req->length); dbgmsg(msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } static void protocolSendPort(tr_peerMsgs* msgs, uint16_t port) { struct evbuffer* out = msgs->outMessages; dbgmsg(msgs, "sending Port %u", port); evbuffer_add_uint32(out, 3); evbuffer_add_uint8(out, BT_PORT); evbuffer_add_uint16(out, port); } static void protocolSendHave(tr_peerMsgs* msgs, uint32_t index) { struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t) + sizeof(uint32_t)); evbuffer_add_uint8(out, BT_HAVE); evbuffer_add_uint32(out, index); dbgmsg(msgs, "sending Have %u", index); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, LOW_PRIORITY_INTERVAL_SECS); } #if 0 static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); tr_peerIo* io = msgs->io; struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(io, out, sizeof(uint8_t) + sizeof(uint32_t)); evbuffer_add_uint8(io, out, BT_FEXT_ALLOWED_FAST); evbuffer_add_uint32(io, out, pieceIndex); dbgmsg(msgs, "sending Allowed Fast %u...", pieceIndex); dbgOutMessageLen(msgs); } #endif static void protocolSendChoke(tr_peerMsgs* msgs, bool choke) { struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t)); evbuffer_add_uint8(out, choke ? BT_CHOKE : BT_UNCHOKE); dbgmsg(msgs, "sending %s...", choke ? "Choke" : "Unchoke"); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } static void protocolSendHaveAll(tr_peerMsgs* msgs) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t)); evbuffer_add_uint8(out, BT_FEXT_HAVE_ALL); dbgmsg(msgs, "sending HAVE_ALL..."); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } static void protocolSendHaveNone(tr_peerMsgs* msgs) { TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io)); struct evbuffer* out = msgs->outMessages; evbuffer_add_uint32(out, sizeof(uint8_t)); evbuffer_add_uint8(out, BT_FEXT_HAVE_NONE); dbgmsg(msgs, "sending HAVE_NONE..."); dbgOutMessageLen(msgs); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } /** *** EVENTS **/ static void publish(tr_peerMsgs* msgs, tr_peer_event* e) { if (msgs->callback != NULL) { (*msgs->callback)(&msgs->peer, e, msgs->callbackData); } } static void fireError(tr_peerMsgs* msgs, int err) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_ERROR; e.err = err; publish(msgs, &e); } static void fireGotBlock(tr_peerMsgs* msgs, struct peer_request const* req) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_BLOCK; e.pieceIndex = req->index; e.offset = req->offset; e.length = req->length; publish(msgs, &e); } static void fireGotRej(tr_peerMsgs* msgs, struct peer_request const* req) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_REJ; e.pieceIndex = req->index; e.offset = req->offset; e.length = req->length; publish(msgs, &e); } static void fireGotChoke(tr_peerMsgs* msgs) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_CHOKE; publish(msgs, &e); } static void fireClientGotHaveAll(tr_peerMsgs* msgs) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL; publish(msgs, &e); } static void fireClientGotHaveNone(tr_peerMsgs* msgs) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE; publish(msgs, &e); } static void fireClientGotPieceData(tr_peerMsgs* msgs, uint32_t length) { tr_peer_event e = TR_PEER_EVENT_INIT; e.length = length; e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; publish(msgs, &e); } static void firePeerGotPieceData(tr_peerMsgs* msgs, uint32_t length) { tr_peer_event e = TR_PEER_EVENT_INIT; e.length = length; e.eventType = TR_PEER_PEER_GOT_PIECE_DATA; publish(msgs, &e); } static void fireClientGotSuggest(tr_peerMsgs* msgs, uint32_t pieceIndex) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_SUGGEST; e.pieceIndex = pieceIndex; publish(msgs, &e); } static void fireClientGotPort(tr_peerMsgs* msgs, tr_port port) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_PORT; e.port = port; publish(msgs, &e); } static void fireClientGotAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST; e.pieceIndex = pieceIndex; publish(msgs, &e); } static void fireClientGotBitfield(tr_peerMsgs* msgs, tr_bitfield* bitfield) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_BITFIELD; e.bitfield = bitfield; publish(msgs, &e); } static void fireClientGotHave(tr_peerMsgs* msgs, tr_piece_index_t index) { tr_peer_event e = TR_PEER_EVENT_INIT; e.eventType = TR_PEER_CLIENT_GOT_HAVE; e.pieceIndex = index; publish(msgs, &e); } /** *** ALLOWED FAST SET *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html **/ #if 0 size_t tr_generateAllowedSet(tr_piece_index_t* setmePieces, size_t desiredSetSize, size_t pieceCount, uint8_t const* infohash, tr_address const* addr) { TR_ASSERT(setmePieces != NULL); TR_ASSERT(desiredSetSize <= pieceCount); TR_ASSERT(desiredSetSize != 0); TR_ASSERT(pieceCount != 0); TR_ASSERT(infohash != NULL); TR_ASSERT(addr != NULL); size_t setSize = 0; if (addr->type == TR_AF_INET) { uint8_t w[SHA_DIGEST_LENGTH + 4]; uint8_t* walk = w; uint8_t x[SHA_DIGEST_LENGTH]; uint32_t ui32 = ntohl(htonl(addr->addr.addr4.s_addr) & 0xffffff00); /* (1) */ memcpy(w, &ui32, sizeof(uint32_t)); walk += sizeof(uint32_t); memcpy(walk, infohash, SHA_DIGEST_LENGTH); /* (2) */ walk += SHA_DIGEST_LENGTH; tr_sha1(x, w, walk - w, NULL); /* (3) */ TR_ASSERT(sizeof(w) == walk - w); while (setSize < desiredSetSize) { for (int i = 0; i < 5 && setSize < desiredSetSize; ++i) /* (4) */ { uint32_t j = i * 4; /* (5) */ uint32_t y = ntohl(*(uint32_t*)(x + j)); /* (6) */ uint32_t index = y % pieceCount; /* (7) */ bool found = false; for (size_t k = 0; !found && k < setSize; ++k) /* (8) */ { found = setmePieces[k] == index; } if (!found) { setmePieces[setSize++] = index; /* (9) */ } } tr_sha1(x, x, sizeof(x), NULL); /* (3) */ } } return setSize; } static void updateFastSet(tr_peerMsgs* msgs) { TR_UNUSED(msgs); bool const fext = tr_peerIoSupportsFEXT(msgs->io); bool const peerIsNeedy = msgs->peer->progress < 0.10; if (fext && peerIsNeedy && !msgs->haveFastSet) { struct tr_address const* addr = tr_peerIoGetAddress(msgs->io, NULL); tr_info const* inf = &msgs->torrent->info; size_t const numwant = MIN(MAX_FAST_SET_SIZE, inf->pieceCount); /* build the fast set */ msgs->fastsetSize = tr_generateAllowedSet(msgs->fastset, numwant, inf->pieceCount, inf->hash, addr); msgs->haveFastSet = true; /* send it to the peer */ for (size_t i = 0; i < msgs->fastsetSize; ++i) { protocolSendAllowedFast(msgs, msgs->fastset[i]); } } } #endif /*** **** ACTIVE ***/ static bool tr_peerMsgsCalculateActive(tr_peerMsgs const* msgs, tr_direction direction) { TR_ASSERT(tr_isPeerMsgs(msgs)); TR_ASSERT(tr_isDirection(direction)); bool is_active; if (direction == TR_CLIENT_TO_PEER) { is_active = tr_peerMsgsIsPeerInterested(msgs) && !tr_peerMsgsIsPeerChoked(msgs); /* FIXME: https://trac.transmissionbt.com/ticket/5505 if (is_active) { TR_ASSERT(!tr_peerIsSeed(&msgs->peer)); } */ } else /* TR_PEER_TO_CLIENT */ { if (!tr_torrentHasMetadata(msgs->torrent)) { is_active = true; } else { is_active = tr_peerMsgsIsClientInterested(msgs) && !tr_peerMsgsIsClientChoked(msgs); if (is_active) { TR_ASSERT(!tr_torrentIsSeed(msgs->torrent)); } } } return is_active; } bool tr_peerMsgsIsActive(tr_peerMsgs const* msgs, tr_direction direction) { TR_ASSERT(tr_isPeerMsgs(msgs)); TR_ASSERT(tr_isDirection(direction)); bool is_active = msgs->is_active[direction]; TR_ASSERT(is_active == tr_peerMsgsCalculateActive(msgs, direction)); return is_active; } static void tr_peerMsgsSetActive(tr_peerMsgs* msgs, tr_direction direction, bool is_active) { dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active); if (msgs->is_active[direction] != is_active) { msgs->is_active[direction] = is_active; tr_swarmIncrementActivePeers(msgs->torrent->swarm, direction, is_active); } } void tr_peerMsgsUpdateActive(tr_peerMsgs* msgs, tr_direction direction) { bool const is_active = tr_peerMsgsCalculateActive(msgs, direction); tr_peerMsgsSetActive(msgs, direction, is_active); } /** *** INTEREST **/ static void sendInterest(tr_peerMsgs* msgs, bool b) { TR_ASSERT(msgs != NULL); struct evbuffer* out = msgs->outMessages; msgs->client_is_interested = b; dbgmsg(msgs, "Sending %s", b ? "Interested" : "Not Interested"); evbuffer_add_uint32(out, sizeof(uint8_t)); evbuffer_add_uint8(out, b ? BT_INTERESTED : BT_NOT_INTERESTED); pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS); dbgOutMessageLen(msgs); } static void updateInterest(tr_peerMsgs* msgs) { TR_UNUSED(msgs); /* FIXME -- might need to poke the mgr on startup */ } void tr_peerMsgsSetInterested(tr_peerMsgs* msgs, bool b) { if (msgs->client_is_interested != b) { sendInterest(msgs, b); tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT); } } static bool popNextMetadataRequest(tr_peerMsgs* msgs, int* piece) { if (msgs->peerAskedForMetadataCount == 0) { return false; } *piece = msgs->peerAskedForMetadata[0]; tr_removeElementFromArray(msgs->peerAskedForMetadata, 0, sizeof(int), msgs->peerAskedForMetadataCount); --msgs->peerAskedForMetadataCount; return true; } static bool popNextRequest(tr_peerMsgs* msgs, struct peer_request* setme) { if (msgs->peer.pendingReqsToClient == 0) { return false; } *setme = msgs->peerAskedFor[0]; tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->peer.pendingReqsToClient); --msgs->peer.pendingReqsToClient; return true; } static void cancelAllRequestsToClient(tr_peerMsgs* msgs) { struct peer_request req; bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io); while (popNextRequest(msgs, &req)) { if (mustSendCancel) { protocolSendReject(msgs, &req); } } } void tr_peerMsgsSetChoke(tr_peerMsgs* msgs, bool peer_is_choked) { TR_ASSERT(msgs != NULL); time_t const now = tr_time(); time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC; if (msgs->chokeChangedAt > fibrillationTime) { dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked); } else if (msgs->peer_is_choked != peer_is_choked) { msgs->peer_is_choked = peer_is_choked; if (peer_is_choked) { cancelAllRequestsToClient(msgs); } protocolSendChoke(msgs, peer_is_choked); msgs->chokeChangedAt = now; tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER); } } /** *** **/ void tr_peerMsgsHave(tr_peerMsgs* msgs, uint32_t index) { protocolSendHave(msgs, index); /* since we have more pieces now, we might not be interested in this peer */ updateInterest(msgs); } /** *** **/ static bool reqIsValid(tr_peerMsgs const* peer, uint32_t index, uint32_t offset, uint32_t length) { return tr_torrentReqIsValid(peer->torrent, index, offset, length); } static bool requestIsValid(tr_peerMsgs const* msgs, struct peer_request const* req) { return reqIsValid(msgs, req->index, req->offset, req->length); } void tr_peerMsgsCancel(tr_peerMsgs* msgs, tr_block_index_t block) { struct peer_request req; // fprintf(stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", // (size_t)block, msgs->peer); blockToReq(msgs->torrent, block, &req); protocolSendCancel(msgs, &req); } /** *** **/ static void sendLtepHandshake(tr_peerMsgs* msgs) { tr_variant val; bool allow_pex; bool allow_metadata_xfer; struct evbuffer* payload; struct evbuffer* out = msgs->outMessages; unsigned char const* ipv6 = tr_globalIPv6(); static tr_quark version_quark = 0; if (msgs->clientSentLtepHandshake) { return; } if (version_quark == 0) { version_quark = tr_quark_new(TR_NAME " " USERAGENT_PREFIX, TR_BAD_SIZE); } dbgmsg(msgs, "sending an ltep handshake"); msgs->clientSentLtepHandshake = true; /* decide if we want to advertise metadata xfer support (BEP 9) */ if (tr_torrentIsPrivate(msgs->torrent)) { allow_metadata_xfer = false; } else { allow_metadata_xfer = true; } /* decide if we want to advertise pex support */ if (!tr_torrentAllowsPex(msgs->torrent)) { allow_pex = false; } else if (msgs->peerSentLtepHandshake) { allow_pex = msgs->peerSupportsPex; } else { allow_pex = true; } tr_variantInitDict(&val, 8); tr_variantDictAddBool(&val, TR_KEY_e, getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED); if (ipv6 != NULL) { tr_variantDictAddRaw(&val, TR_KEY_ipv6, ipv6, 16); } if (allow_metadata_xfer && tr_torrentHasMetadata(msgs->torrent) && msgs->torrent->infoDictLength > 0) { tr_variantDictAddInt(&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength); } tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(getSession(msgs))); tr_variantDictAddInt(&val, TR_KEY_reqq, REQQ); tr_variantDictAddBool(&val, TR_KEY_upload_only, tr_torrentIsSeed(msgs->torrent)); tr_variantDictAddQuark(&val, TR_KEY_v, version_quark); if (allow_metadata_xfer || allow_pex) { tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2); if (allow_metadata_xfer) { tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID); } if (allow_pex) { tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID); } } payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC); evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload)); evbuffer_add_uint8(out, BT_LTEP); evbuffer_add_uint8(out, LTEP_HANDSHAKE); evbuffer_add_buffer(out, payload); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); dbgOutMessageLen(msgs); /* cleanup */ evbuffer_free(payload); tr_variantFree(&val); } static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* inbuf) { int64_t i; tr_variant val; tr_variant* sub; uint8_t* tmp = tr_new(uint8_t, len); uint8_t const* addr; size_t addr_len; tr_pex pex; int8_t seedProbability = -1; memset(&pex, 0, sizeof(tr_pex)); tr_peerIoReadBytes(msgs->io, inbuf, tmp, len); msgs->peerSentLtepHandshake = true; if (tr_variantFromBenc(&val, tmp, len) != 0 || !tr_variantIsDict(&val)) { dbgmsg(msgs, "GET extended-handshake, couldn't get dictionary"); tr_free(tmp); return; } /* arbitrary limit, should be more than enough */ if (len <= 4096) { dbgmsg(msgs, "here is the handshake: [%*.*s]", (int)len, (int)len, tmp); } else { dbgmsg(msgs, "handshake length is too big (%" PRIu32 "), printing skipped", len); } /* does the peer prefer encrypted connections? */ if (tr_variantDictFindInt(&val, TR_KEY_e, &i)) { msgs->encryption_preference = i != 0 ? ENCRYPTION_PREFERENCE_YES : ENCRYPTION_PREFERENCE_NO; if (i != 0) { pex.flags |= ADDED_F_ENCRYPTION_FLAG; } } /* check supported messages for utorrent pex */ msgs->peerSupportsPex = false; msgs->peerSupportsMetadataXfer = false; if (tr_variantDictFindDict(&val, TR_KEY_m, &sub)) { if (tr_variantDictFindInt(sub, TR_KEY_ut_pex, &i)) { msgs->peerSupportsPex = i != 0; msgs->ut_pex_id = (uint8_t)i; dbgmsg(msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id); } if (tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &i)) { msgs->peerSupportsMetadataXfer = i != 0; msgs->ut_metadata_id = (uint8_t)i; dbgmsg(msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id); } if (tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &i)) { /* Mysterious µTorrent extension that we don't grok. However, it implies support for µTP, so use it to indicate that. */ tr_peerMgrSetUtpFailed(msgs->torrent, tr_peerIoGetAddress(msgs->io, NULL), false); } } /* look for metainfo size (BEP 9) */ if (tr_variantDictFindInt(&val, TR_KEY_metadata_size, &i)) { if (tr_torrentSetMetadataSizeHint(msgs->torrent, i)) { msgs->metadata_size_hint = (size_t)i; } } /* look for upload_only (BEP 21) */ if (tr_variantDictFindInt(&val, TR_KEY_upload_only, &i)) { seedProbability = i == 0 ? 0 : 100; } /* get peer's listening port */ if (tr_variantDictFindInt(&val, TR_KEY_p, &i)) { pex.port = htons((uint16_t)i); fireClientGotPort(msgs, pex.port); dbgmsg(msgs, "peer's port is now %d", (int)i); } if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4) { pex.addr.type = TR_AF_INET; memcpy(&pex.addr.addr.addr4, addr, 4); tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability); } if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16) { pex.addr.type = TR_AF_INET6; memcpy(&pex.addr.addr.addr6, addr, 16); tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability); } /* get peer's maximum request queue size */ if (tr_variantDictFindInt(&val, TR_KEY_reqq, &i)) { msgs->reqq = i; } tr_variantFree(&val); tr_free(tmp); } static void parseUtMetadata(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf) { tr_variant dict; char* msg_end; char const* benc_end; int64_t msg_type = -1; int64_t piece = -1; int64_t total_size = 0; uint8_t* tmp = tr_new(uint8_t, msglen); tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen); msg_end = (char*)tmp + msglen; if (tr_variantFromBencFull(&dict, tmp, msglen, NULL, &benc_end) == 0) { (void)tr_variantDictFindInt(&dict, TR_KEY_msg_type, &msg_type); (void)tr_variantDictFindInt(&dict, TR_KEY_piece, &piece); (void)tr_variantDictFindInt(&dict, TR_KEY_total_size, &total_size); tr_variantFree(&dict); } dbgmsg(msgs, "got ut_metadata msg: type %d, piece %d, total_size %d", (int)msg_type, (int)piece, (int)total_size); if (msg_type == METADATA_MSG_TYPE_REJECT) { /* NOOP */ } if (msg_type == METADATA_MSG_TYPE_DATA && !tr_torrentHasMetadata(msgs->torrent) && msg_end - benc_end <= METADATA_PIECE_SIZE && piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size) { int const pieceLen = msg_end - benc_end; tr_torrentSetMetadataPiece(msgs->torrent, piece, benc_end, pieceLen); } if (msg_type == METADATA_MSG_TYPE_REQUEST) { if (piece >= 0 && tr_torrentHasMetadata(msgs->torrent) && !tr_torrentIsPrivate(msgs->torrent) && msgs->peerAskedForMetadataCount < METADATA_REQQ) { msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece; } else { tr_variant v; struct evbuffer* payload; struct evbuffer* out = msgs->outMessages; /* build the rejection message */ tr_variantInitDict(&v, 2); tr_variantDictAddInt(&v, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT); tr_variantDictAddInt(&v, TR_KEY_piece, piece); payload = tr_variantToBuf(&v, TR_VARIANT_FMT_BENC); /* write it out as a LTEP message to our outMessages buffer */ evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload)); evbuffer_add_uint8(out, BT_LTEP); evbuffer_add_uint8(out, msgs->ut_metadata_id); evbuffer_add_buffer(out, payload); pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS); dbgOutMessageLen(msgs); /* cleanup */ evbuffer_free(payload); tr_variantFree(&v); } } tr_free(tmp); } static void parseUtPex(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf) { tr_torrent* tor = msgs->torrent; if (!tr_torrentAllowsPex(tor)) { return; } uint8_t* tmp = tr_new(uint8_t, msglen); tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen); tr_variant val; bool const loaded = tr_variantFromBenc(&val, tmp, msglen) == 0; tr_free(tmp); if (!loaded) { return; } uint8_t const* added; size_t added_len; if (tr_variantDictFindRaw(&val, TR_KEY_added, &added, &added_len)) { tr_pex* pex; size_t n; size_t added_f_len; uint8_t const* added_f; if (!tr_variantDictFindRaw(&val, TR_KEY_added_f, &added_f, &added_f_len)) { added_f_len = 0; added_f = NULL; } pex = tr_peerMgrCompactToPex(added, added_len, added_f, added_f_len, &n); n = MIN(n, MAX_PEX_PEER_COUNT); for (size_t i = 0; i < n; ++i) { int seedProbability = -1; if (i < added_f_len) { seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0; } tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability); } tr_free(pex); } if (tr_variantDictFindRaw(&val, TR_KEY_added6, &added, &added_len)) { tr_pex* pex; size_t n; size_t added_f_len; uint8_t const* added_f; if (!tr_variantDictFindRaw(&val, TR_KEY_added6_f, &added_f, &added_f_len)) { added_f_len = 0; added_f = NULL; } pex = tr_peerMgrCompact6ToPex(added, added_len, added_f, added_f_len, &n); n = MIN(n, MAX_PEX_PEER_COUNT); for (size_t i = 0; i < n; ++i) { int seedProbability = -1; if (i < added_f_len) { seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0; } tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability); } tr_free(pex); } tr_variantFree(&val); } static void sendPex(tr_peerMsgs* msgs); static void parseLtep(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf) { TR_ASSERT(msglen > 0); uint8_t ltep_msgid; tr_peerIoReadUint8(msgs->io, inbuf, <ep_msgid); msglen--; if (ltep_msgid == LTEP_HANDSHAKE) { dbgmsg(msgs, "got ltep handshake"); parseLtepHandshake(msgs, msglen, inbuf); if (tr_peerIoSupportsLTEP(msgs->io)) { sendLtepHandshake(msgs); sendPex(msgs); } } else if (ltep_msgid == UT_PEX_ID) { dbgmsg(msgs, "got ut pex"); msgs->peerSupportsPex = true; parseUtPex(msgs, msglen, inbuf); } else if (ltep_msgid == UT_METADATA_ID) { dbgmsg(msgs, "got ut metadata"); msgs->peerSupportsMetadataXfer = true; parseUtMetadata(msgs, msglen, inbuf); } else { dbgmsg(msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid); evbuffer_drain(inbuf, msglen); } } static int readBtLength(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen) { uint32_t len; if (inlen < sizeof(len)) { return READ_LATER; } tr_peerIoReadUint32(msgs->io, inbuf, &len); if (len == 0) /* peer sent us a keepalive message */ { dbgmsg(msgs, "got KeepAlive"); } else { msgs->incoming.length = len; msgs->state = AWAITING_BT_ID; } return READ_NOW; } static int readBtMessage(tr_peerMsgs*, struct evbuffer*, size_t); static int readBtId(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen) { uint8_t id; if (inlen < sizeof(uint8_t)) { return READ_LATER; } tr_peerIoReadUint8(msgs->io, inbuf, &id); msgs->incoming.id = id; dbgmsg(msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length); if (id == BT_PIECE) { msgs->state = AWAITING_BT_PIECE; return READ_NOW; } else if (msgs->incoming.length != 1) { msgs->state = AWAITING_BT_MESSAGE; return READ_NOW; } else { return readBtMessage(msgs, inbuf, inlen - 1); } } static void updatePeerProgress(tr_peerMsgs* msgs) { tr_peerUpdateProgress(msgs->torrent, &msgs->peer); /* updateFastSet(msgs); */ updateInterest(msgs); } static void prefetchPieces(tr_peerMsgs* msgs) { if (!getSession(msgs)->isPrefetchEnabled) { return; } for (int i = msgs->prefetchCount; i < msgs->peer.pendingReqsToClient && i < PREFETCH_SIZE; ++i) { struct peer_request const* req = msgs->peerAskedFor + i; if (requestIsValid(msgs, req)) { tr_cachePrefetchBlock(getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length); ++msgs->prefetchCount; } } } static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req) { bool const fext = tr_peerIoSupportsFEXT(msgs->io); bool const reqIsValid = requestIsValid(msgs, req); bool const clientHasPiece = reqIsValid && tr_torrentPieceIsComplete(msgs->torrent, req->index); bool const peerIsChoked = msgs->peer_is_choked; bool allow = false; if (!reqIsValid) { dbgmsg(msgs, "rejecting an invalid request."); } else if (!clientHasPiece) { dbgmsg(msgs, "rejecting request for a piece we don't have."); } else if (peerIsChoked) { dbgmsg(msgs, "rejecting request from choked peer"); } else if (msgs->peer.pendingReqsToClient + 1 >= REQQ) { dbgmsg(msgs, "rejecting request ... reqq is full"); } else { allow = true; } if (allow) { msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req; prefetchPieces(msgs); } else if (fext) { protocolSendReject(msgs, req); } } static bool messageLengthIsCorrect(tr_peerMsgs const* msg, uint8_t id, uint32_t len) { switch (id) { case BT_CHOKE: case BT_UNCHOKE: case BT_INTERESTED: case BT_NOT_INTERESTED: case BT_FEXT_HAVE_ALL: case BT_FEXT_HAVE_NONE: return len == 1; case BT_HAVE: case BT_FEXT_SUGGEST: case BT_FEXT_ALLOWED_FAST: return len == 5; case BT_BITFIELD: if (tr_torrentHasMetadata(msg->torrent)) { return len == (msg->torrent->info.pieceCount >> 3) + ((msg->torrent->info.pieceCount & 7) != 0 ? 1 : 0) + 1U; } /* we don't know the piece count yet, so we can only guess whether to send true or false */ if (msg->metadata_size_hint > 0) { return len <= msg->metadata_size_hint; } return true; case BT_REQUEST: case BT_CANCEL: case BT_FEXT_REJECT: return len == 13; case BT_PIECE: return len > 9 && len <= 16393; case BT_PORT: return len == 3; case BT_LTEP: return len >= 2; default: return false; } } static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* block, struct peer_request const* req); static int readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read) { TR_ASSERT(evbuffer_get_length(inbuf) >= inlen); dbgmsg(msgs, "In readBtPiece"); struct peer_request* req = &msgs->incoming.blockReq; if (req->length == 0) { if (inlen < 8) { return READ_LATER; } tr_peerIoReadUint32(msgs->io, inbuf, &req->index); tr_peerIoReadUint32(msgs->io, inbuf, &req->offset); req->length = msgs->incoming.length - 9; dbgmsg(msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length); return READ_NOW; } else { int err; size_t n; size_t nLeft; struct evbuffer* block_buffer; if (msgs->incoming.block == NULL) { msgs->incoming.block = evbuffer_new(); } block_buffer = msgs->incoming.block; /* read in another chunk of data */ nLeft = req->length - evbuffer_get_length(block_buffer); n = MIN(nLeft, inlen); tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n); fireClientGotPieceData(msgs, n); *setme_piece_bytes_read += n; dbgmsg(msgs, "got %zu bytes for block %u:%u->%u ... %d remain", n, req->index, req->offset, req->length, (int)(req->length - evbuffer_get_length(block_buffer))); if (evbuffer_get_length(block_buffer) < req->length) { return READ_LATER; } /* pass the block along... */ err = clientGotBlock(msgs, block_buffer, req); evbuffer_drain(block_buffer, evbuffer_get_length(block_buffer)); /* cleanup */ req->length = 0; msgs->state = AWAITING_BT_LENGTH; return err != 0 ? READ_ERR : READ_NOW; } } static void updateDesiredRequestCount(tr_peerMsgs* msgs); static int readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen) { uint8_t const id = msgs->incoming.id; #ifdef TR_ENABLE_ASSERTS size_t const startBufLen = evbuffer_get_length(inbuf); #endif bool const fext = tr_peerIoSupportsFEXT(msgs->io); uint32_t ui32; uint32_t msglen = msgs->incoming.length; TR_ASSERT(msglen > 0); --msglen; /* id length */ dbgmsg(msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen); if (inlen < msglen) { return READ_LATER; } if (!messageLengthIsCorrect(msgs, id, msglen + 1)) { dbgmsg(msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen); fireError(msgs, EMSGSIZE); return READ_ERR; } switch (id) { case BT_CHOKE: dbgmsg(msgs, "got Choke"); msgs->client_is_choked = true; if (!fext) { fireGotChoke(msgs); } tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT); break; case BT_UNCHOKE: dbgmsg(msgs, "got Unchoke"); msgs->client_is_choked = false; tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT); updateDesiredRequestCount(msgs); break; case BT_INTERESTED: dbgmsg(msgs, "got Interested"); msgs->peer_is_interested = true; tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER); break; case BT_NOT_INTERESTED: dbgmsg(msgs, "got Not Interested"); msgs->peer_is_interested = false; tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER); break; case BT_HAVE: tr_peerIoReadUint32(msgs->io, inbuf, &ui32); dbgmsg(msgs, "got Have: %u", ui32); if (tr_torrentHasMetadata(msgs->torrent) && ui32 >= msgs->torrent->info.pieceCount) { fireError(msgs, ERANGE); return READ_ERR; } /* a peer can send the same HAVE message twice... */ if (!tr_bitfieldHas(&msgs->peer.have, ui32)) { tr_bitfieldAdd(&msgs->peer.have, ui32); fireClientGotHave(msgs, ui32); } updatePeerProgress(msgs); break; case BT_BITFIELD: { uint8_t* tmp = tr_new(uint8_t, msglen); dbgmsg(msgs, "got a bitfield"); tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen); tr_bitfieldSetRaw(&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata(msgs->torrent)); fireClientGotBitfield(msgs, &msgs->peer.have); updatePeerProgress(msgs); tr_free(tmp); break; } case BT_REQUEST: { struct peer_request r; tr_peerIoReadUint32(msgs->io, inbuf, &r.index); tr_peerIoReadUint32(msgs->io, inbuf, &r.offset); tr_peerIoReadUint32(msgs->io, inbuf, &r.length); dbgmsg(msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length); peerMadeRequest(msgs, &r); break; } case BT_CANCEL: { struct peer_request r; tr_peerIoReadUint32(msgs->io, inbuf, &r.index); tr_peerIoReadUint32(msgs->io, inbuf, &r.offset); tr_peerIoReadUint32(msgs->io, inbuf, &r.length); tr_historyAdd(&msgs->peer.cancelsSentToClient, tr_time(), 1); dbgmsg(msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length); for (int i = 0; i < msgs->peer.pendingReqsToClient; ++i) { struct peer_request const* req = msgs->peerAskedFor + i; if (req->index == r.index && req->offset == r.offset && req->length == r.length) { tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request), msgs->peer.pendingReqsToClient); --msgs->peer.pendingReqsToClient; break; } } break; } case BT_PIECE: TR_ASSERT(false); /* handled elsewhere! */ break; case BT_PORT: dbgmsg(msgs, "Got a BT_PORT"); tr_peerIoReadUint16(msgs->io, inbuf, &msgs->dht_port); if (msgs->dht_port > 0) { tr_dhtAddNode(getSession(msgs), tr_peerAddress(&msgs->peer), msgs->dht_port, false); } break; case BT_FEXT_SUGGEST: dbgmsg(msgs, "Got a BT_FEXT_SUGGEST"); tr_peerIoReadUint32(msgs->io, inbuf, &ui32); if (fext) { fireClientGotSuggest(msgs, ui32); } else { fireError(msgs, EMSGSIZE); return READ_ERR; } break; case BT_FEXT_ALLOWED_FAST: dbgmsg(msgs, "Got a BT_FEXT_ALLOWED_FAST"); tr_peerIoReadUint32(msgs->io, inbuf, &ui32); if (fext) { fireClientGotAllowedFast(msgs, ui32); } else { fireError(msgs, EMSGSIZE); return READ_ERR; } break; case BT_FEXT_HAVE_ALL: dbgmsg(msgs, "Got a BT_FEXT_HAVE_ALL"); if (fext) { tr_bitfieldSetHasAll(&msgs->peer.have); TR_ASSERT(tr_bitfieldHasAll(&msgs->peer.have)); fireClientGotHaveAll(msgs); updatePeerProgress(msgs); } else { fireError(msgs, EMSGSIZE); return READ_ERR; } break; case BT_FEXT_HAVE_NONE: dbgmsg(msgs, "Got a BT_FEXT_HAVE_NONE"); if (fext) { tr_bitfieldSetHasNone(&msgs->peer.have); fireClientGotHaveNone(msgs); updatePeerProgress(msgs); } else { fireError(msgs, EMSGSIZE); return READ_ERR; } break; case BT_FEXT_REJECT: { struct peer_request r; dbgmsg(msgs, "Got a BT_FEXT_REJECT"); tr_peerIoReadUint32(msgs->io, inbuf, &r.index); tr_peerIoReadUint32(msgs->io, inbuf, &r.offset); tr_peerIoReadUint32(msgs->io, inbuf, &r.length); if (fext) { fireGotRej(msgs, &r); } else { fireError(msgs, EMSGSIZE); return READ_ERR; } break; } case BT_LTEP: dbgmsg(msgs, "Got a BT_LTEP"); parseLtep(msgs, msglen, inbuf); break; default: dbgmsg(msgs, "peer sent us an UNKNOWN: %d", (int)id); tr_peerIoDrain(msgs->io, inbuf, msglen); break; } TR_ASSERT(msglen + 1 == msgs->incoming.length); TR_ASSERT(evbuffer_get_length(inbuf) == startBufLen - msglen); msgs->state = AWAITING_BT_LENGTH; return READ_NOW; } /* returns 0 on success, or an errno on failure */ static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_request const* req) { TR_ASSERT(msgs != NULL); TR_ASSERT(req != NULL); int err; tr_torrent* tor = msgs->torrent; tr_block_index_t const block = _tr_block(tor, req->index, req->offset); if (!requestIsValid(msgs, req)) { dbgmsg(msgs, "dropping invalid block %u:%u->%u", req->index, req->offset, req->length); return EBADMSG; } if (req->length != tr_torBlockCountBytes(msgs->torrent, block)) { dbgmsg(msgs, "wrong block size -- expected %u, got %d", tr_torBlockCountBytes(msgs->torrent, block), req->length); return EMSGSIZE; } dbgmsg(msgs, "got block %u:%u->%u", req->index, req->offset, req->length); if (!tr_peerMgrDidPeerRequest(msgs->torrent, &msgs->peer, block)) { dbgmsg(msgs, "we didn't ask for this message..."); return 0; } if (tr_torrentPieceIsComplete(msgs->torrent, req->index)) { dbgmsg(msgs, "we did ask for this message, but the piece is already complete..."); return 0; } /** *** Save the block **/ if ((err = tr_cacheWriteBlock(getSession(msgs)->cache, tor, req->index, req->offset, req->length, data)) != 0) { return err; } tr_bitfieldAdd(&msgs->peer.blame, req->index); fireGotBlock(msgs, req); return 0; } static void peerPulse(void* vmsgs); static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs) { tr_peerMsgs* msgs = vmsgs; if (wasPieceData) { firePeerGotPieceData(msgs, bytesWritten); } if (tr_isPeerIo(io) && io->userData != NULL) { peerPulse(msgs); } } static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) { ReadState ret; tr_peerMsgs* msgs = vmsgs; struct evbuffer* in = tr_peerIoGetReadBuffer(io); size_t const inlen = evbuffer_get_length(in); dbgmsg(msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state); if (inlen == 0) { ret = READ_LATER; } else if (msgs->state == AWAITING_BT_PIECE) { ret = readBtPiece(msgs, in, inlen, piece); } else { switch (msgs->state) { case AWAITING_BT_LENGTH: ret = readBtLength(msgs, in, inlen); break; case AWAITING_BT_ID: ret = readBtId(msgs, in, inlen); break; case AWAITING_BT_MESSAGE: ret = readBtMessage(msgs, in, inlen); break; default: ret = READ_ERR; TR_ASSERT_MSG(false, "unhandled peer messages state %d", (int)msgs->state); } } dbgmsg(msgs, "canRead: ret is %d", (int)ret); return ret; } bool tr_peerMsgsIsReadingBlock(tr_peerMsgs const* msgs, tr_block_index_t block) { if (msgs->state != AWAITING_BT_PIECE) { return false; } return block == _tr_block(msgs->torrent, msgs->incoming.blockReq.index, msgs->incoming.blockReq.offset); } /** *** **/ static void updateDesiredRequestCount(tr_peerMsgs* msgs) { tr_torrent* const torrent = msgs->torrent; /* there are lots of reasons we might not want to request any blocks... */ if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked || !msgs->client_is_interested) { msgs->desiredRequestCount = 0; } else { int estimatedBlocksInPeriod; unsigned int rate_Bps; unsigned int irate_Bps; int const floor = 4; int const seconds = REQUEST_BUF_SECS; uint64_t const now = tr_time_msec(); /* Get the rate limit we should use. * FIXME: this needs to consider all the other peers as well... */ rate_Bps = tr_peerGetPieceSpeed_Bps(&msgs->peer, now, TR_PEER_TO_CLIENT); if (tr_torrentUsesSpeedLimit(torrent, TR_PEER_TO_CLIENT)) { rate_Bps = MIN(rate_Bps, tr_torrentGetSpeedLimit_Bps(torrent, TR_PEER_TO_CLIENT)); } /* honor the session limits, if enabled */ if (tr_torrentUsesSessionLimits(torrent) && tr_sessionGetActiveSpeedLimit_Bps(torrent->session, TR_PEER_TO_CLIENT, &irate_Bps)) { rate_Bps = MIN(rate_Bps, irate_Bps); } /* use this desired rate to figure out how * many requests we should send to this peer */ estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize; msgs->desiredRequestCount = MAX(floor, estimatedBlocksInPeriod); /* honor the peer's maximum request count, if specified */ if (msgs->reqq > 0) { if (msgs->desiredRequestCount > msgs->reqq) { msgs->desiredRequestCount = msgs->reqq; } } } } static void updateMetadataRequests(tr_peerMsgs* msgs, time_t now) { int piece; if (msgs->peerSupportsMetadataXfer && tr_torrentGetNextMetadataRequest(msgs->torrent, now, &piece)) { tr_variant tmp; struct evbuffer* payload; struct evbuffer* out = msgs->outMessages; /* build the data message */ tr_variantInitDict(&tmp, 3); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST); tr_variantDictAddInt(&tmp, TR_KEY_piece, piece); payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC); dbgmsg(msgs, "requesting metadata piece #%d", piece); /* write it out as a LTEP message to our outMessages buffer */ evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload)); evbuffer_add_uint8(out, BT_LTEP); evbuffer_add_uint8(out, msgs->ut_metadata_id); evbuffer_add_buffer(out, payload); pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS); dbgOutMessageLen(msgs); /* cleanup */ evbuffer_free(payload); tr_variantFree(&tmp); } } static void updateBlockRequests(tr_peerMsgs* msgs) { if (tr_torrentIsPieceTransferAllowed(msgs->torrent, TR_PEER_TO_CLIENT) && msgs->desiredRequestCount > 0 && msgs->peer.pendingReqsToPeer <= msgs->desiredRequestCount * 0.66) { TR_ASSERT(tr_peerMsgsIsClientInterested(msgs)); TR_ASSERT(!tr_peerMsgsIsClientChoked(msgs)); int n; tr_block_index_t* blocks; int const numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer; blocks = tr_new(tr_block_index_t, numwant); tr_peerMgrGetNextRequests(msgs->torrent, &msgs->peer, numwant, blocks, &n, false); for (int i = 0; i < n; ++i) { struct peer_request req; blockToReq(msgs->torrent, blocks[i], &req); protocolSendRequest(msgs, &req); } tr_free(blocks); } } static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now) { int piece; size_t bytesWritten = 0; struct peer_request req; bool const haveMessages = evbuffer_get_length(msgs->outMessages) != 0; bool const fext = tr_peerIoSupportsFEXT(msgs->io); /** *** Protocol messages **/ if (haveMessages && msgs->outMessagesBatchedAt == 0) /* fresh batch */ { dbgmsg(msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length(msgs->outMessages)); msgs->outMessagesBatchedAt = now; } else if (haveMessages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod) { size_t const len = evbuffer_get_length(msgs->outMessages); /* flush the protocol messages */ dbgmsg(msgs, "flushing outMessages... to %p (length is %zu)", (void*)msgs->io, len); tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false); msgs->clientSentAnythingAt = now; msgs->outMessagesBatchedAt = 0; msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; bytesWritten += len; } /** *** Metadata Pieces **/ if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece)) { char* data; size_t dataLen; bool ok = false; data = tr_torrentGetMetadataPiece(msgs->torrent, piece, &dataLen); if (data != NULL) { tr_variant tmp; struct evbuffer* payload; struct evbuffer* out = msgs->outMessages; /* build the data message */ tr_variantInitDict(&tmp, 3); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA); tr_variantDictAddInt(&tmp, TR_KEY_piece, piece); tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength); payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC); /* write it out as a LTEP message to our outMessages buffer */ evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload) + dataLen); evbuffer_add_uint8(out, BT_LTEP); evbuffer_add_uint8(out, msgs->ut_metadata_id); evbuffer_add_buffer(out, payload); evbuffer_add(out, data, dataLen); pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS); dbgOutMessageLen(msgs); evbuffer_free(payload); tr_variantFree(&tmp); tr_free(data); ok = true; } if (!ok) /* send a rejection message */ { tr_variant tmp; struct evbuffer* payload; struct evbuffer* out = msgs->outMessages; /* build the rejection message */ tr_variantInitDict(&tmp, 2); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT); tr_variantDictAddInt(&tmp, TR_KEY_piece, piece); payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC); /* write it out as a LTEP message to our outMessages buffer */ evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload)); evbuffer_add_uint8(out, BT_LTEP); evbuffer_add_uint8(out, msgs->ut_metadata_id); evbuffer_add_buffer(out, payload); pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS); dbgOutMessageLen(msgs); evbuffer_free(payload); tr_variantFree(&tmp); } } /** *** Data Blocks **/ if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= msgs->torrent->blockSize && popNextRequest(msgs, &req)) { --msgs->prefetchCount; if (requestIsValid(msgs, &req) && tr_torrentPieceIsComplete(msgs->torrent, req.index)) { bool err; uint32_t const msglen = 4 + 1 + 4 + 4 + req.length; struct evbuffer* out; struct evbuffer_iovec iovec[1]; out = evbuffer_new(); evbuffer_expand(out, msglen); evbuffer_add_uint32(out, sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length); evbuffer_add_uint8(out, BT_PIECE); evbuffer_add_uint32(out, req.index); evbuffer_add_uint32(out, req.offset); evbuffer_reserve_space(out, req.length, iovec, 1); err = tr_cacheReadBlock(getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length, iovec[0].iov_base) != 0; iovec[0].iov_len = req.length; evbuffer_commit_space(out, iovec, 1); /* check the piece if it needs checking... */ if (!err && tr_torrentPieceNeedsCheck(msgs->torrent, req.index)) { err = !tr_torrentCheckPiece(msgs->torrent, req.index); if (err) { tr_torrentSetLocalError(msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."), (size_t)req.index); } } if (err) { if (fext) { protocolSendReject(msgs, &req); } } else { size_t const n = evbuffer_get_length(out); dbgmsg(msgs, "sending block %u:%u->%u", req.index, req.offset, req.length); TR_ASSERT(n == msglen); tr_peerIoWriteBuf(msgs->io, out, true); bytesWritten += n; msgs->clientSentAnythingAt = now; tr_historyAdd(&msgs->peer.blocksSentToPeer, tr_time(), 1); } evbuffer_free(out); if (err) { bytesWritten = 0; msgs = NULL; } } else if (fext) /* peer needs a reject message */ { protocolSendReject(msgs, &req); } if (msgs != NULL) { prefetchPieces(msgs); } } /** *** Keepalive **/ if (msgs != NULL && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KEEPALIVE_INTERVAL_SECS) { dbgmsg(msgs, "sending a keepalive message"); evbuffer_add_uint32(msgs->outMessages, 0); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); } return bytesWritten; } static void peerPulse(void* vmsgs) { tr_peerMsgs* msgs = vmsgs; time_t const now = tr_time(); if (tr_isPeerIo(msgs->io)) { updateDesiredRequestCount(msgs); updateBlockRequests(msgs); updateMetadataRequests(msgs, now); } for (;;) { if (fillOutputBuffer(msgs, now) < 1) { break; } } } void tr_peerMsgsPulse(tr_peerMsgs* msgs) { if (msgs != NULL) { peerPulse(msgs); } } static void gotError(tr_peerIo* io, short what, void* vmsgs) { TR_UNUSED(io); if ((what & BEV_EVENT_TIMEOUT) != 0) { dbgmsg(vmsgs, "libevent got a timeout, what=%hd", what); } if ((what & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) != 0) { dbgmsg(vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror(errno)); } fireError(vmsgs, ENOTCONN); } static void sendBitfield(tr_peerMsgs* msgs) { TR_ASSERT(tr_torrentHasMetadata(msgs->torrent)); void* bytes; size_t byte_count = 0; struct evbuffer* out = msgs->outMessages; bytes = tr_torrentCreatePieceBitfield(msgs->torrent, &byte_count); evbuffer_add_uint32(out, sizeof(uint8_t) + byte_count); evbuffer_add_uint8(out, BT_BITFIELD); evbuffer_add(out, bytes, byte_count); dbgmsg(msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length(out)); pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS); tr_free(bytes); } static void tellPeerWhatWeHave(tr_peerMsgs* msgs) { bool const fext = tr_peerIoSupportsFEXT(msgs->io); if (fext && tr_torrentHasAll(msgs->torrent)) { protocolSendHaveAll(msgs); } else if (fext && tr_torrentHasNone(msgs->torrent)) { protocolSendHaveNone(msgs); } else if (!tr_torrentHasNone(msgs->torrent)) { sendBitfield(msgs); } } /** *** **/ /* some peers give us error messages if we send more than this many peers in a single pex message http://wiki.theory.org/BitTorrentPeerExchangeConventions */ #define MAX_PEX_ADDED 50 #define MAX_PEX_DROPPED 50 typedef struct { tr_pex* added; tr_pex* dropped; tr_pex* elements; int addedCount; int droppedCount; int elementCount; } PexDiffs; static void pexAddedCb(void* vpex, void* userData) { PexDiffs* diffs = userData; tr_pex* pex = vpex; if (diffs->addedCount < MAX_PEX_ADDED) { diffs->added[diffs->addedCount++] = *pex; diffs->elements[diffs->elementCount++] = *pex; } } static inline void pexDroppedCb(void* vpex, void* userData) { PexDiffs* diffs = userData; tr_pex* pex = vpex; if (diffs->droppedCount < MAX_PEX_DROPPED) { diffs->dropped[diffs->droppedCount++] = *pex; } } static inline void pexElementCb(void* vpex, void* userData) { PexDiffs* diffs = userData; tr_pex* pex = vpex; diffs->elements[diffs->elementCount++] = *pex; } typedef void (* tr_set_func)(void* element, void* userData); /** * @brief find the differences and commonalities in two sorted sets * @param a the first set * @param aCount the number of elements in the set 'a' * @param b the second set * @param bCount the number of elements in the set 'b' * @param compare the sorting method for both sets * @param elementSize the sizeof the element in the two sorted sets * @param in_a called for items in set 'a' but not set 'b' * @param in_b called for items in set 'b' but not set 'a' * @param in_both called for items that are in both sets * @param userData user data passed along to in_a, in_b, and in_both */ static void tr_set_compare(void const* va, size_t aCount, void const* vb, size_t bCount, tr_voidptr_compare_func compare, size_t elementSize, tr_set_func in_a_cb, tr_set_func in_b_cb, tr_set_func in_both_cb, void* userData) { uint8_t const* a = va; uint8_t const* b = vb; uint8_t const* aend = a + elementSize * aCount; uint8_t const* bend = b + elementSize * bCount; while (a != aend || b != bend) { if (a == aend) { (*in_b_cb)((void*)b, userData); b += elementSize; } else if (b == bend) { (*in_a_cb)((void*)a, userData); a += elementSize; } else { int const val = (*compare)(a, b); if (val == 0) { (*in_both_cb)((void*)a, userData); a += elementSize; b += elementSize; } else if (val < 0) { (*in_a_cb)((void*)a, userData); a += elementSize; } else if (val > 0) { (*in_b_cb)((void*)b, userData); b += elementSize; } } } } static void sendPex(tr_peerMsgs* msgs) { if (msgs->peerSupportsPex && tr_torrentAllowsPex(msgs->torrent)) { PexDiffs diffs; PexDiffs diffs6; tr_pex* newPex = NULL; tr_pex* newPex6 = NULL; int const newCount = tr_peerMgrGetPeers(msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT); int const newCount6 = tr_peerMgrGetPeers(msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT); /* build the diffs */ diffs.added = tr_new(tr_pex, newCount); diffs.addedCount = 0; diffs.dropped = tr_new(tr_pex, msgs->pexCount); diffs.droppedCount = 0; diffs.elements = tr_new(tr_pex, newCount + msgs->pexCount); diffs.elementCount = 0; tr_set_compare(msgs->pex, msgs->pexCount, newPex, newCount, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb, pexElementCb, &diffs); diffs6.added = tr_new(tr_pex, newCount6); diffs6.addedCount = 0; diffs6.dropped = tr_new(tr_pex, msgs->pexCount6); diffs6.droppedCount = 0; diffs6.elements = tr_new(tr_pex, newCount6 + msgs->pexCount6); diffs6.elementCount = 0; tr_set_compare(msgs->pex6, msgs->pexCount6, newPex6, newCount6, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb, pexElementCb, &diffs6); dbgmsg(msgs, "pex: old peer count %d+%d, new peer count %d+%d, added %d+%d, removed %d+%d", msgs->pexCount, msgs->pexCount6, newCount, newCount6, diffs.addedCount, diffs6.addedCount, diffs.droppedCount, diffs6.droppedCount); if (diffs.addedCount == 0 && diffs.droppedCount == 0 && diffs6.addedCount == 0 && diffs6.droppedCount == 0) { tr_free(diffs.elements); tr_free(diffs6.elements); } else { tr_variant val; uint8_t* tmp; uint8_t* walk; struct evbuffer* payload; struct evbuffer* out = msgs->outMessages; /* update peer */ tr_free(msgs->pex); msgs->pex = diffs.elements; msgs->pexCount = diffs.elementCount; tr_free(msgs->pex6); msgs->pex6 = diffs6.elements; msgs->pexCount6 = diffs6.elementCount; /* build the pex payload */ tr_variantInitDict(&val, 3); /* ipv6 support: left as 3: speed vs. likelihood? */ if (diffs.addedCount > 0) { /* "added" */ tmp = walk = tr_new(uint8_t, diffs.addedCount * 6); for (int i = 0; i < diffs.addedCount; ++i) { memcpy(walk, &diffs.added[i].addr.addr, 4); walk += 4; memcpy(walk, &diffs.added[i].port, 2); walk += 2; } TR_ASSERT(walk - tmp == diffs.addedCount * 6); tr_variantDictAddRaw(&val, TR_KEY_added, tmp, walk - tmp); tr_free(tmp); /* "added.f" * unset each holepunch flag because we don't support it. */ tmp = walk = tr_new(uint8_t, diffs.addedCount); for (int i = 0; i < diffs.addedCount; ++i) { *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH; } TR_ASSERT(walk - tmp == diffs.addedCount); tr_variantDictAddRaw(&val, TR_KEY_added_f, tmp, walk - tmp); tr_free(tmp); } if (diffs.droppedCount > 0) { /* "dropped" */ tmp = walk = tr_new(uint8_t, diffs.droppedCount * 6); for (int i = 0; i < diffs.droppedCount; ++i) { memcpy(walk, &diffs.dropped[i].addr.addr, 4); walk += 4; memcpy(walk, &diffs.dropped[i].port, 2); walk += 2; } TR_ASSERT(walk - tmp == diffs.droppedCount * 6); tr_variantDictAddRaw(&val, TR_KEY_dropped, tmp, walk - tmp); tr_free(tmp); } if (diffs6.addedCount > 0) { /* "added6" */ tmp = walk = tr_new(uint8_t, diffs6.addedCount * 18); for (int i = 0; i < diffs6.addedCount; ++i) { memcpy(walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16); walk += 16; memcpy(walk, &diffs6.added[i].port, 2); walk += 2; } TR_ASSERT(walk - tmp == diffs6.addedCount * 18); tr_variantDictAddRaw(&val, TR_KEY_added6, tmp, walk - tmp); tr_free(tmp); /* "added6.f" * unset each holepunch flag because we don't support it. */ tmp = walk = tr_new(uint8_t, diffs6.addedCount); for (int i = 0; i < diffs6.addedCount; ++i) { *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH; } TR_ASSERT(walk - tmp == diffs6.addedCount); tr_variantDictAddRaw(&val, TR_KEY_added6_f, tmp, walk - tmp); tr_free(tmp); } if (diffs6.droppedCount > 0) { /* "dropped6" */ tmp = walk = tr_new(uint8_t, diffs6.droppedCount * 18); for (int i = 0; i < diffs6.droppedCount; ++i) { memcpy(walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16); walk += 16; memcpy(walk, &diffs6.dropped[i].port, 2); walk += 2; } TR_ASSERT(walk - tmp == diffs6.droppedCount * 18); tr_variantDictAddRaw(&val, TR_KEY_dropped6, tmp, walk - tmp); tr_free(tmp); } /* write the pex message */ payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC); evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload)); evbuffer_add_uint8(out, BT_LTEP); evbuffer_add_uint8(out, msgs->ut_pex_id); evbuffer_add_buffer(out, payload); pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS); dbgmsg(msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length(out)); dbgOutMessageLen(msgs); evbuffer_free(payload); tr_variantFree(&val); } /* cleanup */ tr_free(diffs.added); tr_free(diffs.dropped); tr_free(newPex); tr_free(diffs6.added); tr_free(diffs6.dropped); tr_free(newPex6); /* msgs->clientSentPexAt = tr_time(); */ } } static void pexPulse(evutil_socket_t fd, short what, void* vmsgs) { TR_UNUSED(fd); TR_UNUSED(what); struct tr_peerMsgs* msgs = vmsgs; sendPex(msgs); TR_ASSERT(msgs->pexTimer != NULL); tr_timerAdd(msgs->pexTimer, PEX_INTERVAL_SECS, 0); } /*** **** tr_peer virtual functions ***/ static bool peermsgs_is_transferring_pieces(struct tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* setme_Bps) { unsigned int Bps = 0; if (tr_isPeerMsgs(peer)) { tr_peerMsgs const* msgs = (tr_peerMsgs const*)peer; Bps = tr_peerIoGetPieceSpeed_Bps(msgs->io, now, direction); } if (setme_Bps != NULL) { *setme_Bps = Bps; } return Bps > 0; } static void peermsgs_destruct(tr_peer* peer) { tr_peerMsgs* msgs = PEER_MSGS(peer); TR_ASSERT(msgs != NULL); tr_peerMsgsSetActive(msgs, TR_UP, false); tr_peerMsgsSetActive(msgs, TR_DOWN, false); if (msgs->pexTimer != NULL) { event_free(msgs->pexTimer); } if (msgs->incoming.block != NULL) { evbuffer_free(msgs->incoming.block); } if (msgs->io != NULL) { tr_peerIoClear(msgs->io); tr_peerIoUnref(msgs->io); /* balanced by the ref in handshakeDoneCB() */ } evbuffer_free(msgs->outMessages); tr_free(msgs->pex6); tr_free(msgs->pex); tr_peerDestruct(&msgs->peer); memset(msgs, ~0, sizeof(tr_peerMsgs)); } static struct tr_peer_virtual_funcs const my_funcs = { .destruct = peermsgs_destruct, .is_transferring_pieces = peermsgs_is_transferring_pieces }; /*** **** ***/ time_t tr_peerMsgsGetConnectionAge(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return tr_peerIoGetAge(msgs->io); } bool tr_peerMsgsIsPeerChoked(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return msgs->peer_is_choked; } bool tr_peerMsgsIsPeerInterested(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return msgs->peer_is_interested; } bool tr_peerMsgsIsClientChoked(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return msgs->client_is_choked; } bool tr_peerMsgsIsClientInterested(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return msgs->client_is_interested; } bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP; } bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return tr_peerIoIsEncrypted(msgs->io); } bool tr_peerMsgsIsIncomingConnection(tr_peerMsgs const* msgs) { TR_ASSERT(tr_isPeerMsgs(msgs)); return tr_peerIoIsIncoming(msgs->io); } /*** **** ***/ bool tr_isPeerMsgs(void const* msgs) { /* FIXME: this is pretty crude */ return msgs != NULL && ((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER; } tr_peerMsgs* tr_peerMsgsCast(void* vm) { return tr_isPeerMsgs(vm) ? vm : NULL; } tr_peerMsgs* tr_peerMsgsNew(struct tr_torrent* torrent, struct tr_peerIo* io, tr_peer_callback callback, void* callbackData) { TR_ASSERT(io != NULL); tr_peerMsgs* m = tr_new0(tr_peerMsgs, 1); tr_peerConstruct(&m->peer, torrent); m->peer.funcs = &my_funcs; m->magic_number = MAGIC_NUMBER; m->client_is_choked = true; m->peer_is_choked = true; m->client_is_interested = false; m->peer_is_interested = false; m->is_active[TR_UP] = false; m->is_active[TR_DOWN] = false; m->callback = callback; m->callbackData = callbackData; m->io = io; m->torrent = torrent; m->state = AWAITING_BT_LENGTH; m->outMessages = evbuffer_new(); m->outMessagesBatchedAt = 0; m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS; if (tr_torrentAllowsPex(torrent)) { m->pexTimer = evtimer_new(torrent->session->event_base, pexPulse, m); tr_timerAdd(m->pexTimer, PEX_INTERVAL_SECS, 0); } if (tr_peerIoSupportsUTP(m->io)) { tr_address const* addr = tr_peerIoGetAddress(m->io, NULL); tr_peerMgrSetUtpSupported(torrent, addr); tr_peerMgrSetUtpFailed(torrent, addr, false); } if (tr_peerIoSupportsLTEP(m->io)) { sendLtepHandshake(m); } tellPeerWhatWeHave(m); if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(m->io)) { /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */ struct tr_address const* addr = tr_peerIoGetAddress(m->io, NULL); if (addr->type == TR_AF_INET || tr_globalIPv6() != NULL) { protocolSendPort(m, tr_dhtPort(torrent->session)); } } tr_peerIoSetIOFuncs(m->io, canRead, didWrite, gotError, m); updateDesiredRequestCount(m); return m; }