transmission/libtransmission/peer-msgs.c

2394 lines
71 KiB
C

/*
* This file Copyright (C) 2007-2010 Mnemosyne LLC
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*
* $Id$
*/
#include <assert.h>
#include <errno.h>
#include <limits.h> /* INT_MAX */
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <event.h>
#include "transmission.h"
#include "bencode.h"
#include "completion.h"
#include "crypto.h"
#include "inout.h"
#ifdef WIN32
#include "net.h" /* for ECONN */
#endif
#include "peer-io.h"
#include "peer-mgr.h"
#include "peer-msgs.h"
#include "session.h"
#include "stats.h"
#include "torrent.h"
#include "torrent-magnet.h"
#include "tr-dht.h"
#include "utils.h"
#include "version.h"
/**
***
**/
enum
{
BT_CHOKE = 0,
BT_UNCHOKE = 1,
BT_INTERESTED = 2,
BT_NOT_INTERESTED = 3,
BT_HAVE = 4,
BT_BITFIELD = 5,
BT_REQUEST = 6,
BT_PIECE = 7,
BT_CANCEL = 8,
BT_PORT = 9,
BT_FEXT_SUGGEST = 13,
BT_FEXT_HAVE_ALL = 14,
BT_FEXT_HAVE_NONE = 15,
BT_FEXT_REJECT = 16,
BT_FEXT_ALLOWED_FAST = 17,
BT_LTEP = 20,
LTEP_HANDSHAKE = 0,
UT_PEX_ID = 1,
UT_METADATA_ID = 3,
MAX_PEX_PEER_COUNT = 50,
MIN_CHOKE_PERIOD_SEC = 10,
/* idle seconds before we send a keepalive */
KEEPALIVE_INTERVAL_SECS = 100,
PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
REQQ = 512,
METADATA_REQQ = 64,
MAX_BLOCK_SIZE = ( 1024 * 16 ),
/* used in lowering the outMessages queue period */
IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
HIGH_PRIORITY_INTERVAL_SECS = 2,
LOW_PRIORITY_INTERVAL_SECS = 10,
/* number of pieces to remove from the bitfield when
* lazy bitfields are turned on */
LAZY_PIECE_COUNT = 26,
/* number of pieces we'll allow in our fast set */
MAX_FAST_SET_SIZE = 3,
/* defined in BEP #9 */
METADATA_MSG_TYPE_REQUEST = 0,
METADATA_MSG_TYPE_DATA = 1,
METADATA_MSG_TYPE_REJECT = 2
};
enum
{
AWAITING_BT_LENGTH,
AWAITING_BT_ID,
AWAITING_BT_MESSAGE,
AWAITING_BT_PIECE
};
/**
***
**/
struct peer_request
{
uint32_t index;
uint32_t offset;
uint32_t length;
};
static uint32_t
getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
{
const uint64_t piecePos = tor->info.pieceSize * tr_torBlockPiece( tor, b );
const uint64_t blockPos = tor->blockSize * b;
assert( blockPos >= piecePos );
return (uint32_t)( blockPos - piecePos );
}
static void
blockToReq( const tr_torrent * tor,
tr_block_index_t block,
struct peer_request * setme )
{
assert( setme != NULL );
setme->index = tr_torBlockPiece( tor, block );
setme->offset = getBlockOffsetInPiece( tor, block );
setme->length = tr_torBlockCountBytes( tor, block );
}
/**
***
**/
/* this is raw, unchanged data from the peer regarding
* the current message that it's sending us. */
struct tr_incoming
{
uint8_t id;
uint32_t length; /* includes the +1 for id length */
struct peer_request blockReq; /* metadata for incoming blocks */
struct evbuffer * block; /* piece data for incoming blocks */
};
/**
* Low-level communication state information about a connected peer.
*
* This structure remembers the low-level protocol states that we're
* in with this peer, such as active requests, pex messages, and so on.
* Its fields are all private to peer-msgs.c.
*
* Data not directly involved with sending & receiving messages is
* stored in tr_peer, where it can be accessed by both peermsgs and
* the peer manager.
*
* @see struct peer_atom
* @see tr_peer
*/
struct tr_peermsgs
{
tr_bool peerSupportsPex;
tr_bool peerSupportsMetadataXfer;
tr_bool clientSentLtepHandshake;
tr_bool peerSentLtepHandshake;
/*tr_bool haveFastSet;*/
int desiredRequestCount;
int prefetchCount;
/* how long the outMessages batch should be allowed to grow before
* it's flushed -- some messages (like requests >:) should be sent
* very quickly; others aren't as urgent. */
int8_t outMessagesBatchPeriod;
uint8_t state;
uint8_t ut_pex_id;
uint8_t ut_metadata_id;
uint16_t pexCount;
uint16_t pexCount6;
#if 0
size_t fastsetSize;
tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
#endif
tr_peer * peer;
tr_torrent * torrent;
tr_publisher publisher;
struct evbuffer * outMessages; /* all the non-piece messages */
struct peer_request peerAskedFor[REQQ];
int peerAskedForMetadata[METADATA_REQQ];
int peerAskedForMetadataCount;
tr_pex * pex;
tr_pex * pex6;
/*time_t clientSentPexAt;*/
time_t clientSentAnythingAt;
/* when we started batching the outMessages */
time_t outMessagesBatchedAt;
struct tr_incoming incoming;
/* if the peer supports the Extension Protocol in BEP 10 and
supplied a reqq argument, it's stored here. otherwise the
value is zero and should be ignored. */
int64_t reqq;
struct event pexTimer;
};
/**
***
**/
#if 0
static tr_bitfield*
getHave( const struct tr_peermsgs * msgs )
{
if( msgs->peer->have == NULL )
msgs->peer->have = tr_bitfieldNew( msgs->torrent->info.pieceCount );
return msgs->peer->have;
}
#endif
static inline tr_session*
getSession( struct tr_peermsgs * msgs )
{
return msgs->torrent->session;
}
/**
***
**/
static void
myDebug( const char * file, int line,
const struct tr_peermsgs * msgs,
const char * fmt, ... )
{
FILE * fp = tr_getLog( );
if( fp )
{
va_list args;
char timestr[64];
struct evbuffer * buf = evbuffer_new( );
char * base = tr_basename( file );
evbuffer_add_printf( buf, "[%s] %s - %s [%s]: ",
tr_getLogTimeStr( timestr, sizeof( timestr ) ),
tr_torrentName( msgs->torrent ),
tr_peerIoGetAddrStr( msgs->peer->io ),
msgs->peer->client );
va_start( args, fmt );
evbuffer_add_vprintf( buf, fmt, args );
va_end( args );
evbuffer_add_printf( buf, " (%s:%d)\n", base, line );
/* FIXME(libevent2) tr_getLog() should return an fd, then use evbuffer_write() here */
fwrite( EVBUFFER_DATA( buf ), 1, EVBUFFER_LENGTH( buf ), fp );
tr_free( base );
evbuffer_free( buf );
}
}
#define dbgmsg( msgs, ... ) \
do { \
if( tr_deepLoggingIsActive( ) ) \
myDebug( __FILE__, __LINE__, msgs, __VA_ARGS__ ); \
} while( 0 )
/**
***
**/
static void
pokeBatchPeriod( tr_peermsgs * msgs,
int interval )
{
if( msgs->outMessagesBatchPeriod > interval )
{
msgs->outMessagesBatchPeriod = interval;
dbgmsg( msgs, "lowering batch interval to %d seconds", interval );
}
}
static inline void
dbgOutMessageLen( tr_peermsgs * msgs )
{
dbgmsg( msgs, "outMessage size is now %zu", EVBUFFER_LENGTH( msgs->outMessages ) );
}
static void
protocolSendReject( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_REJECT );
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
dbgmsg( msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length );
dbgOutMessageLen( msgs );
}
static void
protocolSendRequest( tr_peermsgs * msgs,
const struct peer_request * req )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
tr_peerIoWriteUint8 ( io, out, BT_REQUEST );
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
dbgmsg( msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendCancel( tr_peermsgs * msgs,
const struct peer_request * req )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 3 * sizeof( uint32_t ) );
tr_peerIoWriteUint8 ( io, out, BT_CANCEL );
tr_peerIoWriteUint32( io, out, req->index );
tr_peerIoWriteUint32( io, out, req->offset );
tr_peerIoWriteUint32( io, out, req->length );
dbgmsg( msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendPort(tr_peermsgs *msgs, uint16_t port)
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
dbgmsg( msgs, "sending Port %u", port);
tr_peerIoWriteUint32( io, out, 3 );
tr_peerIoWriteUint8 ( io, out, BT_PORT );
tr_peerIoWriteUint16( io, out, port);
}
static void
protocolSendHave( tr_peermsgs * msgs,
uint32_t index )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_HAVE );
tr_peerIoWriteUint32( io, out, index );
dbgmsg( msgs, "sending Have %u", index );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, LOW_PRIORITY_INTERVAL_SECS );
}
#if 0
static void
protocolSendAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
tr_peerIoWriteUint32( io, out, sizeof(uint8_t) + sizeof(uint32_t) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_ALLOWED_FAST );
tr_peerIoWriteUint32( io, out, pieceIndex );
dbgmsg( msgs, "sending Allowed Fast %u...", pieceIndex );
dbgOutMessageLen( msgs );
}
#endif
static void
protocolSendChoke( tr_peermsgs * msgs,
int choke )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( io, out, choke ? BT_CHOKE : BT_UNCHOKE );
dbgmsg( msgs, "sending %s...", choke ? "Choke" : "Unchoke" );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendHaveAll( tr_peermsgs * msgs )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_ALL );
dbgmsg( msgs, "sending HAVE_ALL..." );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
static void
protocolSendHaveNone( tr_peermsgs * msgs )
{
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
assert( tr_peerIoSupportsFEXT( msgs->peer->io ) );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( io, out, BT_FEXT_HAVE_NONE );
dbgmsg( msgs, "sending HAVE_NONE..." );
dbgOutMessageLen( msgs );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
/**
*** EVENTS
**/
static const tr_peer_event blankEvent = { 0, 0, 0, 0, 0.0f, 0, 0, 0 };
static void
publish( tr_peermsgs * msgs, tr_peer_event * e )
{
assert( msgs->peer );
assert( msgs->peer->msgs == msgs );
tr_publisherPublish( &msgs->publisher, msgs->peer, e );
}
static void
fireError( tr_peermsgs * msgs, int err )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_ERROR;
e.err = err;
publish( msgs, &e );
}
static void
firePeerProgress( tr_peermsgs * msgs )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_PEER_PROGRESS;
e.progress = msgs->peer->progress;
publish( msgs, &e );
}
static void
fireGotBlock( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
publish( msgs, &e );
}
static void
fireGotRej( tr_peermsgs * msgs, const struct peer_request * req )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_REJ;
e.pieceIndex = req->index;
e.offset = req->offset;
e.length = req->length;
publish( msgs, &e );
}
static void
fireGotChoke( tr_peermsgs * msgs )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
publish( msgs, &e );
}
static void
fireClientGotData( tr_peermsgs * msgs,
uint32_t length,
int wasPieceData )
{
tr_peer_event e = blankEvent;
e.length = length;
e.eventType = TR_PEER_CLIENT_GOT_DATA;
e.wasPieceData = wasPieceData;
publish( msgs, &e );
}
static void
fireClientGotSuggest( tr_peermsgs * msgs, uint32_t pieceIndex )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
e.pieceIndex = pieceIndex;
publish( msgs, &e );
}
static void
fireClientGotPort( tr_peermsgs * msgs, tr_port port )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_PORT;
e.port = port;
publish( msgs, &e );
}
static void
fireClientGotAllowedFast( tr_peermsgs * msgs, uint32_t pieceIndex )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
e.pieceIndex = pieceIndex;
publish( msgs, &e );
}
static void
firePeerGotData( tr_peermsgs * msgs,
uint32_t length,
int wasPieceData )
{
tr_peer_event e = blankEvent;
e.length = length;
e.eventType = TR_PEER_PEER_GOT_DATA;
e.wasPieceData = wasPieceData;
publish( msgs, &e );
}
/**
*** ALLOWED FAST SET
*** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
**/
size_t
tr_generateAllowedSet( tr_piece_index_t * setmePieces,
size_t desiredSetSize,
size_t pieceCount,
const uint8_t * infohash,
const tr_address * addr )
{
size_t setSize = 0;
assert( setmePieces );
assert( desiredSetSize <= pieceCount );
assert( desiredSetSize );
assert( pieceCount );
assert( infohash );
assert( addr );
if( addr->type == TR_AF_INET )
{
uint8_t w[SHA_DIGEST_LENGTH + 4], *walk=w;
uint8_t x[SHA_DIGEST_LENGTH];
uint32_t ui32 = ntohl( htonl( addr->addr.addr4.s_addr ) & 0xffffff00 ); /* (1) */
memcpy( w, &ui32, sizeof( uint32_t ) );
walk += sizeof( uint32_t );
memcpy( walk, infohash, SHA_DIGEST_LENGTH ); /* (2) */
walk += SHA_DIGEST_LENGTH;
tr_sha1( x, w, walk-w, NULL ); /* (3) */
assert( sizeof( w ) == walk-w );
while( setSize<desiredSetSize )
{
int i;
for( i=0; i<5 && setSize<desiredSetSize; ++i ) /* (4) */
{
size_t k;
uint32_t j = i * 4; /* (5) */
uint32_t y = ntohl( *( uint32_t* )( x + j ) ); /* (6) */
uint32_t index = y % pieceCount; /* (7) */
for( k=0; k<setSize; ++k ) /* (8) */
if( setmePieces[k] == index )
break;
if( k == setSize )
setmePieces[setSize++] = index; /* (9) */
}
tr_sha1( x, x, sizeof( x ), NULL ); /* (3) */
}
}
return setSize;
}
static void
updateFastSet( tr_peermsgs * msgs UNUSED )
{
#if 0
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
const int peerIsNeedy = msgs->peer->progress < 0.10;
if( fext && peerIsNeedy && !msgs->haveFastSet )
{
size_t i;
const struct tr_address * addr = tr_peerIoGetAddress( msgs->peer->io, NULL );
const tr_info * inf = &msgs->torrent->info;
const size_t numwant = MIN( MAX_FAST_SET_SIZE, inf->pieceCount );
/* build the fast set */
msgs->fastsetSize = tr_generateAllowedSet( msgs->fastset, numwant, inf->pieceCount, inf->hash, addr );
msgs->haveFastSet = 1;
/* send it to the peer */
for( i=0; i<msgs->fastsetSize; ++i )
protocolSendAllowedFast( msgs, msgs->fastset[i] );
}
#endif
}
/**
*** INTEREST
**/
static void
sendInterest( tr_peermsgs * msgs, tr_bool clientIsInterested )
{
struct evbuffer * out = msgs->outMessages;
assert( msgs );
assert( tr_isBool( clientIsInterested ) );
msgs->peer->clientIsInterested = clientIsInterested;
dbgmsg( msgs, "Sending %s", clientIsInterested ? "Interested" : "Not Interested" );
tr_peerIoWriteUint32( msgs->peer->io, out, sizeof( uint8_t ) );
tr_peerIoWriteUint8 ( msgs->peer->io, out, clientIsInterested ? BT_INTERESTED : BT_NOT_INTERESTED );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
dbgOutMessageLen( msgs );
}
static void
updateInterest( tr_peermsgs * msgs UNUSED )
{
/* FIXME -- might need to poke the mgr on startup */
}
void
tr_peerMsgsSetInterested( tr_peermsgs * msgs, int isInterested )
{
assert( tr_isBool( isInterested ) );
if( isInterested != msgs->peer->clientIsInterested )
sendInterest( msgs, isInterested );
}
static tr_bool
popNextMetadataRequest( tr_peermsgs * msgs, int * piece )
{
if( msgs->peerAskedForMetadataCount == 0 )
return FALSE;
*piece = msgs->peerAskedForMetadata[0];
tr_removeElementFromArray( msgs->peerAskedForMetadata, 0, sizeof( int ),
msgs->peerAskedForMetadataCount-- );
return TRUE;
}
static tr_bool
popNextRequest( tr_peermsgs * msgs, struct peer_request * setme )
{
if( msgs->peer->pendingReqsToClient == 0 )
return FALSE;
*setme = msgs->peerAskedFor[0];
tr_removeElementFromArray( msgs->peerAskedFor, 0, sizeof( struct peer_request ),
msgs->peer->pendingReqsToClient-- );
return TRUE;
}
static void
cancelAllRequestsToClient( tr_peermsgs * msgs )
{
struct peer_request req;
const int mustSendCancel = tr_peerIoSupportsFEXT( msgs->peer->io );
while( popNextRequest( msgs, &req ))
if( mustSendCancel )
protocolSendReject( msgs, &req );
}
void
tr_peerMsgsSetChoke( tr_peermsgs * msgs,
int choke )
{
const time_t now = tr_time( );
const time_t fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
assert( msgs );
assert( msgs->peer );
assert( choke == 0 || choke == 1 );
if( msgs->peer->chokeChangedAt > fibrillationTime )
{
dbgmsg( msgs, "Not changing choke to %d to avoid fibrillation", choke );
}
else if( msgs->peer->peerIsChoked != choke )
{
msgs->peer->peerIsChoked = choke;
if( choke )
cancelAllRequestsToClient( msgs );
protocolSendChoke( msgs, choke );
msgs->peer->chokeChangedAt = now;
}
}
/**
***
**/
void
tr_peerMsgsHave( tr_peermsgs * msgs,
uint32_t index )
{
protocolSendHave( msgs, index );
/* since we have more pieces now, we might not be interested in this peer */
updateInterest( msgs );
}
/**
***
**/
static tr_bool
reqIsValid( const tr_peermsgs * peer,
uint32_t index,
uint32_t offset,
uint32_t length )
{
return tr_torrentReqIsValid( peer->torrent, index, offset, length );
}
static tr_bool
requestIsValid( const tr_peermsgs * msgs, const struct peer_request * req )
{
return reqIsValid( msgs, req->index, req->offset, req->length );
}
void
tr_peerMsgsCancel( tr_peermsgs * msgs, tr_block_index_t block )
{
struct peer_request req;
/*fprintf( stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n", (size_t)block, msgs->peer );*/
blockToReq( msgs->torrent, block, &req );
protocolSendCancel( msgs, &req );
}
/**
***
**/
static void
sendLtepHandshake( tr_peermsgs * msgs )
{
tr_benc val, *m;
char * buf;
int len;
tr_bool allow_pex;
tr_bool allow_metadata_xfer;
struct evbuffer * out = msgs->outMessages;
const unsigned char * ipv6 = tr_globalIPv6();
if( msgs->clientSentLtepHandshake )
return;
dbgmsg( msgs, "sending an ltep handshake" );
msgs->clientSentLtepHandshake = 1;
/* decide if we want to advertise metadata xfer support (BEP 9) */
if( tr_torrentIsPrivate( msgs->torrent ) )
allow_metadata_xfer = 0;
else
allow_metadata_xfer = 1;
/* decide if we want to advertise pex support */
if( !tr_torrentAllowsPex( msgs->torrent ) )
allow_pex = 0;
else if( msgs->peerSentLtepHandshake )
allow_pex = msgs->peerSupportsPex ? 1 : 0;
else
allow_pex = 1;
tr_bencInitDict( &val, 8 );
tr_bencDictAddInt( &val, "e", getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED );
if( ipv6 != NULL )
tr_bencDictAddRaw( &val, "ipv6", ipv6, 16 );
if( allow_metadata_xfer && tr_torrentHasMetadata( msgs->torrent )
&& ( msgs->torrent->infoDictLength > 0 ) )
tr_bencDictAddInt( &val, "metadata_size", msgs->torrent->infoDictLength );
tr_bencDictAddInt( &val, "p", tr_sessionGetPeerPort( getSession(msgs) ) );
tr_bencDictAddInt( &val, "reqq", REQQ );
tr_bencDictAddInt( &val, "upload_only", tr_torrentIsSeed( msgs->torrent ) );
tr_bencDictAddStr( &val, "v", TR_NAME " " USERAGENT_PREFIX );
m = tr_bencDictAddDict( &val, "m", 2 );
if( allow_metadata_xfer )
tr_bencDictAddInt( m, "ut_metadata", UT_METADATA_ID );
if( allow_pex )
tr_bencDictAddInt( m, "ut_pex", UT_PEX_ID );
buf = tr_bencToStr( &val, TR_FMT_BENC, &len );
tr_peerIoWriteUint32( msgs->peer->io, out, 2 * sizeof( uint8_t ) + len );
tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_LTEP );
tr_peerIoWriteUint8 ( msgs->peer->io, out, LTEP_HANDSHAKE );
tr_peerIoWriteBytes ( msgs->peer->io, out, buf, len );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
dbgOutMessageLen( msgs );
/* cleanup */
tr_bencFree( &val );
tr_free( buf );
}
static void
parseLtepHandshake( tr_peermsgs * msgs,
int len,
struct evbuffer * inbuf )
{
int64_t i;
tr_benc val, * sub;
uint8_t * tmp = tr_new( uint8_t, len );
const uint8_t *addr;
size_t addr_len;
tr_pex pex;
int8_t seedProbability = -1;
memset( &pex, 0, sizeof( tr_pex ) );
tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, len );
msgs->peerSentLtepHandshake = 1;
if( tr_bencLoad( tmp, len, &val, NULL ) || !tr_bencIsDict( &val ) )
{
dbgmsg( msgs, "GET extended-handshake, couldn't get dictionary" );
tr_free( tmp );
return;
}
dbgmsg( msgs, "here is the handshake: [%*.*s]", len, len, tmp );
/* does the peer prefer encrypted connections? */
if( tr_bencDictFindInt( &val, "e", &i ) ) {
msgs->peer->encryption_preference = i ? ENCRYPTION_PREFERENCE_YES
: ENCRYPTION_PREFERENCE_NO;
if( i )
pex.flags |= ADDED_F_ENCRYPTION_FLAG;
}
/* check supported messages for utorrent pex */
msgs->peerSupportsPex = 0;
msgs->peerSupportsMetadataXfer = 0;
if( tr_bencDictFindDict( &val, "m", &sub ) ) {
if( tr_bencDictFindInt( sub, "ut_pex", &i ) ) {
msgs->peerSupportsPex = i != 0;
msgs->ut_pex_id = (uint8_t) i;
dbgmsg( msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id );
}
if( tr_bencDictFindInt( sub, "ut_metadata", &i ) ) {
msgs->peerSupportsMetadataXfer = i != 0;
msgs->ut_metadata_id = (uint8_t) i;
dbgmsg( msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id );
}
}
/* look for metainfo size (BEP 9) */
if( tr_bencDictFindInt( &val, "metadata_size", &i ) )
tr_torrentSetMetadataSizeHint( msgs->torrent, i );
/* look for upload_only (BEP 21) */
if( tr_bencDictFindInt( &val, "upload_only", &i ) )
seedProbability = i==0 ? 0 : 100;
/* get peer's listening port */
if( tr_bencDictFindInt( &val, "p", &i ) ) {
pex.port = htons( (uint16_t)i );
fireClientGotPort( msgs, pex.port );
dbgmsg( msgs, "peer's port is now %d", (int)i );
}
if( tr_peerIoIsIncoming( msgs->peer->io )
&& tr_bencDictFindRaw( &val, "ipv4", &addr, &addr_len )
&& ( addr_len == 4 ) )
{
pex.addr.type = TR_AF_INET;
memcpy( &pex.addr.addr.addr4, addr, 4 );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
}
if( tr_peerIoIsIncoming( msgs->peer->io )
&& tr_bencDictFindRaw( &val, "ipv6", &addr, &addr_len )
&& ( addr_len == 16 ) )
{
pex.addr.type = TR_AF_INET6;
memcpy( &pex.addr.addr.addr6, addr, 16 );
tr_peerMgrAddPex( msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability );
}
/* get peer's maximum request queue size */
if( tr_bencDictFindInt( &val, "reqq", &i ) )
msgs->reqq = i;
tr_bencFree( &val );
tr_free( tmp );
}
static void
parseUtMetadata( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
{
tr_benc dict;
char * msg_end;
char * benc_end;
int64_t msg_type = -1;
int64_t piece = -1;
int64_t total_size = 0;
uint8_t * tmp = tr_new( uint8_t, msglen );
tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
msg_end = (char*)tmp + msglen;
if( !tr_bencLoad( tmp, msglen, &dict, &benc_end ) )
{
tr_bencDictFindInt( &dict, "msg_type", &msg_type );
tr_bencDictFindInt( &dict, "piece", &piece );
tr_bencDictFindInt( &dict, "total_size", &total_size );
tr_bencFree( &dict );
}
dbgmsg( msgs, "got ut_metadata msg: type %d, piece %d, total_size %d",
(int)msg_type, (int)piece, (int)total_size );
if( msg_type == METADATA_MSG_TYPE_REJECT )
{
/* NOOP */
}
if( ( msg_type == METADATA_MSG_TYPE_DATA )
&& ( !tr_torrentHasMetadata( msgs->torrent ) )
&& ( msg_end - benc_end <= METADATA_PIECE_SIZE )
&& ( piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size ) )
{
const int pieceLen = msg_end - benc_end;
tr_torrentSetMetadataPiece( msgs->torrent, piece, benc_end, pieceLen );
}
if( msg_type == METADATA_MSG_TYPE_REQUEST )
{
if( ( piece >= 0 )
&& tr_torrentHasMetadata( msgs->torrent )
&& !tr_torrentIsPrivate( msgs->torrent )
&& ( msgs->peerAskedForMetadataCount < METADATA_REQQ ) )
{
msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
}
else
{
tr_benc tmp;
int payloadLen;
char * payload;
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
/* build the rejection message */
tr_bencInitDict( &tmp, 2 );
tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
tr_bencDictAddInt( &tmp, "piece", piece );
payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
tr_bencFree( &tmp );
/* write it out as a LTEP message to our outMessages buffer */
tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
tr_peerIoWriteUint8 ( io, out, BT_LTEP );
tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
tr_peerIoWriteBytes ( io, out, payload, payloadLen );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
dbgOutMessageLen( msgs );
tr_free( payload );
}
}
tr_free( tmp );
}
static void
parseUtPex( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
{
int loaded = 0;
uint8_t * tmp = tr_new( uint8_t, msglen );
tr_benc val;
tr_torrent * tor = msgs->torrent;
const uint8_t * added;
size_t added_len;
tr_peerIoReadBytes( msgs->peer->io, inbuf, tmp, msglen );
if( tr_torrentAllowsPex( tor )
&& ( ( loaded = !tr_bencLoad( tmp, msglen, &val, NULL ) ) ) )
{
if( tr_bencDictFindRaw( &val, "added", &added, &added_len ) )
{
tr_pex * pex;
size_t i, n;
size_t added_f_len = 0;
const uint8_t * added_f = NULL;
tr_bencDictFindRaw( &val, "added.f", &added_f, &added_f_len );
pex = tr_peerMgrCompactToPex( added, added_len, added_f, added_f_len, &n );
n = MIN( n, MAX_PEX_PEER_COUNT );
for( i=0; i<n; ++i )
{
int seedProbability = -1;
if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
}
tr_free( pex );
}
if( tr_bencDictFindRaw( &val, "added6", &added, &added_len ) )
{
tr_pex * pex;
size_t i, n;
size_t added_f_len = 0;
const uint8_t * added_f = NULL;
tr_bencDictFindRaw( &val, "added6.f", &added_f, &added_f_len );
pex = tr_peerMgrCompact6ToPex( added, added_len, added_f, added_f_len, &n );
n = MIN( n, MAX_PEX_PEER_COUNT );
for( i=0; i<n; ++i )
{
int seedProbability = -1;
if( i < added_f_len ) seedProbability = ( added_f[i] & ADDED_F_SEED_FLAG ) ? 100 : 0;
tr_peerMgrAddPex( tor, TR_PEER_FROM_PEX, pex+i, seedProbability );
}
tr_free( pex );
}
}
if( loaded )
tr_bencFree( &val );
tr_free( tmp );
}
static void sendPex( tr_peermsgs * msgs );
static void
parseLtep( tr_peermsgs * msgs, int msglen, struct evbuffer * inbuf )
{
uint8_t ltep_msgid;
tr_peerIoReadUint8( msgs->peer->io, inbuf, &ltep_msgid );
msglen--;
if( ltep_msgid == LTEP_HANDSHAKE )
{
dbgmsg( msgs, "got ltep handshake" );
parseLtepHandshake( msgs, msglen, inbuf );
if( tr_peerIoSupportsLTEP( msgs->peer->io ) )
{
sendLtepHandshake( msgs );
sendPex( msgs );
}
}
else if( ltep_msgid == UT_PEX_ID )
{
dbgmsg( msgs, "got ut pex" );
msgs->peerSupportsPex = 1;
parseUtPex( msgs, msglen, inbuf );
}
else if( ltep_msgid == UT_METADATA_ID )
{
dbgmsg( msgs, "got ut metadata" );
msgs->peerSupportsMetadataXfer = 1;
parseUtMetadata( msgs, msglen, inbuf );
}
else
{
dbgmsg( msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid );
evbuffer_drain( inbuf, msglen );
}
}
static int
readBtLength( tr_peermsgs * msgs,
struct evbuffer * inbuf,
size_t inlen )
{
uint32_t len;
if( inlen < sizeof( len ) )
return READ_LATER;
tr_peerIoReadUint32( msgs->peer->io, inbuf, &len );
if( len == 0 ) /* peer sent us a keepalive message */
dbgmsg( msgs, "got KeepAlive" );
else
{
msgs->incoming.length = len;
msgs->state = AWAITING_BT_ID;
}
return READ_NOW;
}
static int readBtMessage( tr_peermsgs * msgs,
struct evbuffer * inbuf,
size_t inlen );
static int
readBtId( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
{
uint8_t id;
if( inlen < sizeof( uint8_t ) )
return READ_LATER;
tr_peerIoReadUint8( msgs->peer->io, inbuf, &id );
msgs->incoming.id = id;
dbgmsg( msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length );
if( id == BT_PIECE )
{
msgs->state = AWAITING_BT_PIECE;
return READ_NOW;
}
else if( msgs->incoming.length != 1 )
{
msgs->state = AWAITING_BT_MESSAGE;
return READ_NOW;
}
else return readBtMessage( msgs, inbuf, inlen - 1 );
}
static void
updatePeerProgress( tr_peermsgs * msgs )
{
msgs->peer->progress = tr_bitsetPercent( &msgs->peer->have );
dbgmsg( msgs, "peer progress is %f", msgs->peer->progress );
updateFastSet( msgs );
updateInterest( msgs );
firePeerProgress( msgs );
}
static void
prefetchPieces( tr_peermsgs *msgs )
{
int i;
/* Maintain 12 prefetched blocks per unchoked peer */
for( i=msgs->prefetchCount; i<msgs->peer->pendingReqsToClient && i<12; ++i )
{
const struct peer_request * req = msgs->peerAskedFor + i;
tr_ioPrefetch( msgs->torrent, req->index, req->offset, req->length );
++msgs->prefetchCount;
}
}
static void
peerMadeRequest( tr_peermsgs * msgs,
const struct peer_request * req )
{
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
const int reqIsValid = requestIsValid( msgs, req );
const int clientHasPiece = reqIsValid && tr_cpPieceIsComplete( &msgs->torrent->completion, req->index );
const int peerIsChoked = msgs->peer->peerIsChoked;
int allow = FALSE;
if( !reqIsValid )
dbgmsg( msgs, "rejecting an invalid request." );
else if( !clientHasPiece )
dbgmsg( msgs, "rejecting request for a piece we don't have." );
else if( peerIsChoked )
dbgmsg( msgs, "rejecting request from choked peer" );
else if( msgs->peer->pendingReqsToClient + 1 >= REQQ )
dbgmsg( msgs, "rejecting request ... reqq is full" );
else
allow = TRUE;
if( allow ) {
msgs->peerAskedFor[msgs->peer->pendingReqsToClient++] = *req;
prefetchPieces( msgs );
} else if( fext ) {
protocolSendReject( msgs, req );
}
}
static tr_bool
messageLengthIsCorrect( const tr_peermsgs * msg, uint8_t id, uint32_t len )
{
switch( id )
{
case BT_CHOKE:
case BT_UNCHOKE:
case BT_INTERESTED:
case BT_NOT_INTERESTED:
case BT_FEXT_HAVE_ALL:
case BT_FEXT_HAVE_NONE:
return len == 1;
case BT_HAVE:
case BT_FEXT_SUGGEST:
case BT_FEXT_ALLOWED_FAST:
return len == 5;
case BT_BITFIELD:
return len == ( msg->torrent->info.pieceCount + 7u ) / 8u + 1u;
case BT_REQUEST:
case BT_CANCEL:
case BT_FEXT_REJECT:
return len == 13;
case BT_PIECE:
return len > 9 && len <= 16393;
case BT_PORT:
return len == 3;
case BT_LTEP:
return len >= 2;
default:
return FALSE;
}
}
static int clientGotBlock( tr_peermsgs * msgs,
const uint8_t * block,
const struct peer_request * req );
static int
readBtPiece( tr_peermsgs * msgs,
struct evbuffer * inbuf,
size_t inlen,
size_t * setme_piece_bytes_read )
{
struct peer_request * req = &msgs->incoming.blockReq;
assert( EVBUFFER_LENGTH( inbuf ) >= inlen );
dbgmsg( msgs, "In readBtPiece" );
if( !req->length )
{
if( inlen < 8 )
return READ_LATER;
tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->index );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &req->offset );
req->length = msgs->incoming.length - 9;
dbgmsg( msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length );
return READ_NOW;
}
else
{
int err;
/* read in another chunk of data */
const size_t nLeft = req->length - EVBUFFER_LENGTH( msgs->incoming.block );
size_t n = MIN( nLeft, inlen );
size_t i = n;
void * buf = tr_sessionGetBuffer( getSession( msgs ) );
const size_t buflen = SESSION_BUFFER_SIZE;
while( i > 0 )
{
const size_t thisPass = MIN( i, buflen );
tr_peerIoReadBytes( msgs->peer->io, inbuf, buf, thisPass );
evbuffer_add( msgs->incoming.block, buf, thisPass );
i -= thisPass;
}
tr_sessionReleaseBuffer( getSession( msgs ) );
buf = NULL;
fireClientGotData( msgs, n, TRUE );
*setme_piece_bytes_read += n;
dbgmsg( msgs, "got %zu bytes for block %u:%u->%u ... %d remain",
n, req->index, req->offset, req->length,
(int)( req->length - EVBUFFER_LENGTH( msgs->incoming.block ) ) );
if( EVBUFFER_LENGTH( msgs->incoming.block ) < req->length )
return READ_LATER;
/* we've got the whole block ... process it */
err = clientGotBlock( msgs, EVBUFFER_DATA( msgs->incoming.block ), req );
/* cleanup */
evbuffer_free( msgs->incoming.block );
msgs->incoming.block = evbuffer_new( );
req->length = 0;
msgs->state = AWAITING_BT_LENGTH;
return err ? READ_ERR : READ_NOW;
}
}
static void updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now );
static int
readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
{
uint32_t ui32;
uint32_t msglen = msgs->incoming.length;
const uint8_t id = msgs->incoming.id;
#ifndef NDEBUG
const size_t startBufLen = EVBUFFER_LENGTH( inbuf );
#endif
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
--msglen; /* id length */
dbgmsg( msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen );
if( inlen < msglen )
return READ_LATER;
if( !messageLengthIsCorrect( msgs, id, msglen + 1 ) )
{
dbgmsg( msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen );
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
switch( id )
{
case BT_CHOKE:
dbgmsg( msgs, "got Choke" );
msgs->peer->clientIsChoked = 1;
if( !fext )
fireGotChoke( msgs );
break;
case BT_UNCHOKE:
dbgmsg( msgs, "got Unchoke" );
msgs->peer->clientIsChoked = 0;
updateDesiredRequestCount( msgs, tr_date( ) );
break;
case BT_INTERESTED:
dbgmsg( msgs, "got Interested" );
msgs->peer->peerIsInterested = 1;
break;
case BT_NOT_INTERESTED:
dbgmsg( msgs, "got Not Interested" );
msgs->peer->peerIsInterested = 0;
break;
case BT_HAVE:
tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
dbgmsg( msgs, "got Have: %u", ui32 );
if( tr_torrentHasMetadata( msgs->torrent )
&& ( ui32 >= msgs->torrent->info.pieceCount ) )
{
fireError( msgs, ERANGE );
return READ_ERR;
}
if( tr_bitsetAdd( &msgs->peer->have, ui32 ) )
{
fireError( msgs, ERANGE );
return READ_ERR;
}
updatePeerProgress( msgs );
break;
case BT_BITFIELD: {
const size_t bitCount = tr_torrentHasMetadata( msgs->torrent )
? msgs->torrent->info.pieceCount
: msglen * 8;
dbgmsg( msgs, "got a bitfield" );
tr_bitsetReserve( &msgs->peer->have, bitCount );
tr_peerIoReadBytes( msgs->peer->io, inbuf,
msgs->peer->have.bitfield.bits, msglen );
updatePeerProgress( msgs );
break;
}
case BT_REQUEST:
{
struct peer_request r;
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
dbgmsg( msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length );
peerMadeRequest( msgs, &r );
break;
}
case BT_CANCEL:
{
int i;
struct peer_request r;
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
tr_historyAdd( msgs->peer->cancelsSentToClient, tr_date( ), 1 );
dbgmsg( msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length );
for( i=0; i<msgs->peer->pendingReqsToClient; ++i ) {
const struct peer_request * req = msgs->peerAskedFor + i;
if( ( req->index == r.index ) && ( req->offset == r.offset ) && ( req->length == r.length ) )
break;
}
if( i < msgs->peer->pendingReqsToClient )
tr_removeElementFromArray( msgs->peerAskedFor, i, sizeof( struct peer_request ),
msgs->peer->pendingReqsToClient-- );
break;
}
case BT_PIECE:
assert( 0 ); /* handled elsewhere! */
break;
case BT_PORT:
dbgmsg( msgs, "Got a BT_PORT" );
tr_peerIoReadUint16( msgs->peer->io, inbuf, &msgs->peer->dht_port );
if( msgs->peer->dht_port > 0 )
tr_dhtAddNode( getSession(msgs),
tr_peerAddress( msgs->peer ),
msgs->peer->dht_port, 0 );
break;
case BT_FEXT_SUGGEST:
dbgmsg( msgs, "Got a BT_FEXT_SUGGEST" );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
if( fext )
fireClientGotSuggest( msgs, ui32 );
else {
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
break;
case BT_FEXT_ALLOWED_FAST:
dbgmsg( msgs, "Got a BT_FEXT_ALLOWED_FAST" );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &ui32 );
if( fext )
fireClientGotAllowedFast( msgs, ui32 );
else {
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
break;
case BT_FEXT_HAVE_ALL:
dbgmsg( msgs, "Got a BT_FEXT_HAVE_ALL" );
if( fext ) {
tr_bitsetSetHaveAll( &msgs->peer->have );
updatePeerProgress( msgs );
} else {
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
break;
case BT_FEXT_HAVE_NONE:
dbgmsg( msgs, "Got a BT_FEXT_HAVE_NONE" );
if( fext ) {
tr_bitsetSetHaveNone( &msgs->peer->have );
updatePeerProgress( msgs );
} else {
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
break;
case BT_FEXT_REJECT:
{
struct peer_request r;
dbgmsg( msgs, "Got a BT_FEXT_REJECT" );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.index );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.offset );
tr_peerIoReadUint32( msgs->peer->io, inbuf, &r.length );
if( fext )
fireGotRej( msgs, &r );
else {
fireError( msgs, EMSGSIZE );
return READ_ERR;
}
break;
}
case BT_LTEP:
dbgmsg( msgs, "Got a BT_LTEP" );
parseLtep( msgs, msglen, inbuf );
break;
default:
dbgmsg( msgs, "peer sent us an UNKNOWN: %d", (int)id );
tr_peerIoDrain( msgs->peer->io, inbuf, msglen );
break;
}
assert( msglen + 1 == msgs->incoming.length );
assert( EVBUFFER_LENGTH( inbuf ) == startBufLen - msglen );
msgs->state = AWAITING_BT_LENGTH;
return READ_NOW;
}
static void
addPeerToBlamefield( tr_peermsgs * msgs, uint32_t index )
{
if( !msgs->peer->blame )
msgs->peer->blame = tr_bitfieldNew( msgs->torrent->info.pieceCount );
tr_bitfieldAdd( msgs->peer->blame, index );
}
/* returns 0 on success, or an errno on failure */
static int
clientGotBlock( tr_peermsgs * msgs,
const uint8_t * data,
const struct peer_request * req )
{
int err;
tr_torrent * tor = msgs->torrent;
const tr_block_index_t block = _tr_block( tor, req->index, req->offset );
assert( msgs );
assert( req );
if( req->length != tr_torBlockCountBytes( msgs->torrent, block ) ) {
dbgmsg( msgs, "wrong block size -- expected %u, got %d",
tr_torBlockCountBytes( msgs->torrent, block ), req->length );
return EMSGSIZE;
}
dbgmsg( msgs, "got block %u:%u->%u", req->index, req->offset, req->length );
if( !tr_peerMgrDidPeerRequest( msgs->torrent, msgs->peer, block ) ) {
dbgmsg( msgs, "we didn't ask for this message..." );
return 0;
}
if( tr_cpPieceIsComplete( &msgs->torrent->completion, req->index ) ) {
dbgmsg( msgs, "we did ask for this message, but the piece is already complete..." );
return 0;
}
/**
*** Save the block
**/
if(( err = tr_ioWrite( tor, req->index, req->offset, req->length, data )))
return err;
addPeerToBlamefield( msgs, req->index );
fireGotBlock( msgs, req );
return 0;
}
static int peerPulse( void * vmsgs );
static void
didWrite( tr_peerIo * io UNUSED, size_t bytesWritten, int wasPieceData, void * vmsgs )
{
tr_peermsgs * msgs = vmsgs;
firePeerGotData( msgs, bytesWritten, wasPieceData );
if ( tr_isPeerIo( io ) && io->userData )
peerPulse( msgs );
}
static ReadState
canRead( tr_peerIo * io, void * vmsgs, size_t * piece )
{
ReadState ret;
tr_peermsgs * msgs = vmsgs;
struct evbuffer * in = tr_peerIoGetReadBuffer( io );
const size_t inlen = EVBUFFER_LENGTH( in );
dbgmsg( msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state );
if( !inlen )
{
ret = READ_LATER;
}
else if( msgs->state == AWAITING_BT_PIECE )
{
ret = inlen ? readBtPiece( msgs, in, inlen, piece ) : READ_LATER;
}
else switch( msgs->state )
{
case AWAITING_BT_LENGTH:
ret = readBtLength ( msgs, in, inlen ); break;
case AWAITING_BT_ID:
ret = readBtId ( msgs, in, inlen ); break;
case AWAITING_BT_MESSAGE:
ret = readBtMessage( msgs, in, inlen ); break;
default:
ret = READ_ERR;
assert( 0 );
}
dbgmsg( msgs, "canRead: ret is %d", (int)ret );
/* log the raw data that was read */
if( ( ret != READ_ERR ) && ( EVBUFFER_LENGTH( in ) != inlen ) )
fireClientGotData( msgs, inlen - EVBUFFER_LENGTH( in ), FALSE );
return ret;
}
int
tr_peerMsgsIsReadingBlock( const tr_peermsgs * msgs, tr_block_index_t block )
{
if( msgs->state != AWAITING_BT_PIECE )
return FALSE;
return block == _tr_block( msgs->torrent,
msgs->incoming.blockReq.index,
msgs->incoming.blockReq.offset );
}
/**
***
**/
static void
updateDesiredRequestCount( tr_peermsgs * msgs, uint64_t now )
{
const tr_torrent * const torrent = msgs->torrent;
if( tr_torrentIsSeed( msgs->torrent ) )
{
msgs->desiredRequestCount = 0;
}
else if( msgs->peer->clientIsChoked )
{
msgs->desiredRequestCount = 0;
}
else if( !msgs->peer->clientIsInterested )
{
msgs->desiredRequestCount = 0;
}
else
{
int irate;
int estimatedBlocksInPeriod;
double rate;
const int floor = 4;
const int seconds = REQUEST_BUF_SECS;
/* Get the rate limit we should use.
* FIXME: this needs to consider all the other peers as well... */
rate = tr_peerGetPieceSpeed( msgs->peer, now, TR_PEER_TO_CLIENT );
if( tr_torrentUsesSpeedLimit( torrent, TR_PEER_TO_CLIENT ) )
rate = MIN( rate, tr_torrentGetSpeedLimit( torrent, TR_PEER_TO_CLIENT ) );
/* honor the session limits, if enabled */
if( tr_torrentUsesSessionLimits( torrent ) )
if( tr_sessionGetActiveSpeedLimit( torrent->session, TR_PEER_TO_CLIENT, &irate ) )
rate = MIN( rate, irate );
/* use this desired rate to figure out how
* many requests we should send to this peer */
estimatedBlocksInPeriod = ( rate * seconds * 1024 ) / torrent->blockSize;
msgs->desiredRequestCount = MAX( floor, estimatedBlocksInPeriod );
/* honor the peer's maximum request count, if specified */
if( msgs->reqq > 0 )
if( msgs->desiredRequestCount > msgs->reqq )
msgs->desiredRequestCount = msgs->reqq;
}
}
static void
updateMetadataRequests( tr_peermsgs * msgs, time_t now )
{
int piece;
if( msgs->peerSupportsMetadataXfer
&& tr_torrentGetNextMetadataRequest( msgs->torrent, now, &piece ) )
{
tr_benc tmp;
int payloadLen;
char * payload;
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
/* build the data message */
tr_bencInitDict( &tmp, 3 );
tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REQUEST );
tr_bencDictAddInt( &tmp, "piece", piece );
payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
tr_bencFree( &tmp );
dbgmsg( msgs, "requesting metadata piece #%d", piece );
/* write it out as a LTEP message to our outMessages buffer */
tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
tr_peerIoWriteUint8 ( io, out, BT_LTEP );
tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
tr_peerIoWriteBytes ( io, out, payload, payloadLen );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
dbgOutMessageLen( msgs );
tr_free( payload );
}
}
static void
updateBlockRequests( tr_peermsgs * msgs )
{
if( tr_torrentIsPieceTransferAllowed( msgs->torrent, TR_PEER_TO_CLIENT )
&& ( msgs->desiredRequestCount > 0 )
&& ( msgs->peer->pendingReqsToPeer <= ( msgs->desiredRequestCount * 0.66 ) ) )
{
int i;
int n;
const int numwant = msgs->desiredRequestCount - msgs->peer->pendingReqsToPeer;
tr_block_index_t * blocks = tr_new( tr_block_index_t, numwant );
tr_peerMgrGetNextRequests( msgs->torrent, msgs->peer, numwant, blocks, &n );
for( i=0; i<n; ++i )
{
struct peer_request req;
blockToReq( msgs->torrent, blocks[i], &req );
protocolSendRequest( msgs, &req );
}
tr_free( blocks );
}
}
static size_t
fillOutputBuffer( tr_peermsgs * msgs, time_t now )
{
int piece;
size_t bytesWritten = 0;
struct peer_request req;
const tr_bool haveMessages = EVBUFFER_LENGTH( msgs->outMessages ) != 0;
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
/**
*** Protocol messages
**/
if( haveMessages && !msgs->outMessagesBatchedAt ) /* fresh batch */
{
dbgmsg( msgs, "started an outMessages batch (length is %zu)", EVBUFFER_LENGTH( msgs->outMessages ) );
msgs->outMessagesBatchedAt = now;
}
else if( haveMessages && ( ( now - msgs->outMessagesBatchedAt ) >= msgs->outMessagesBatchPeriod ) )
{
const size_t len = EVBUFFER_LENGTH( msgs->outMessages );
/* flush the protocol messages */
dbgmsg( msgs, "flushing outMessages... to %p (length is %zu)", msgs->peer->io, len );
tr_peerIoWriteBuf( msgs->peer->io, msgs->outMessages, FALSE );
msgs->clientSentAnythingAt = now;
msgs->outMessagesBatchedAt = 0;
msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
bytesWritten += len;
}
/**
*** Metadata Pieces
**/
if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= METADATA_PIECE_SIZE )
&& popNextMetadataRequest( msgs, &piece ) )
{
char * data;
int dataLen;
tr_bool ok = FALSE;
data = tr_torrentGetMetadataPiece( msgs->torrent, piece, &dataLen );
if( ( dataLen > 0 ) && ( data != NULL ) )
{
tr_benc tmp;
int payloadLen;
char * payload;
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
/* build the data message */
tr_bencInitDict( &tmp, 3 );
tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_DATA );
tr_bencDictAddInt( &tmp, "piece", piece );
tr_bencDictAddInt( &tmp, "total_size", msgs->torrent->infoDictLength );
payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
tr_bencFree( &tmp );
/* write it out as a LTEP message to our outMessages buffer */
tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen + dataLen );
tr_peerIoWriteUint8 ( io, out, BT_LTEP );
tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
tr_peerIoWriteBytes ( io, out, payload, payloadLen );
tr_peerIoWriteBytes ( io, out, data, dataLen );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
dbgOutMessageLen( msgs );
tr_free( payload );
tr_free( data );
ok = TRUE;
}
if( !ok ) /* send a rejection message */
{
tr_benc tmp;
int payloadLen;
char * payload;
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
/* build the rejection message */
tr_bencInitDict( &tmp, 2 );
tr_bencDictAddInt( &tmp, "msg_type", METADATA_MSG_TYPE_REJECT );
tr_bencDictAddInt( &tmp, "piece", piece );
payload = tr_bencToStr( &tmp, TR_FMT_BENC, &payloadLen );
tr_bencFree( &tmp );
/* write it out as a LTEP message to our outMessages buffer */
tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + payloadLen );
tr_peerIoWriteUint8 ( io, out, BT_LTEP );
tr_peerIoWriteUint8 ( io, out, msgs->ut_metadata_id );
tr_peerIoWriteBytes ( io, out, payload, payloadLen );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
dbgOutMessageLen( msgs );
tr_free( payload );
}
}
/**
*** Data Blocks
**/
if( ( tr_peerIoGetWriteBufferSpace( msgs->peer->io, now ) >= msgs->torrent->blockSize )
&& popNextRequest( msgs, &req ) )
{
--msgs->prefetchCount;
if( requestIsValid( msgs, &req )
&& tr_cpPieceIsComplete( &msgs->torrent->completion, req.index ) )
{
/* FIXME(libevent2) use evbuffer_reserve_space() + evbuffer_commit_space() */
int err;
const uint32_t msglen = 4 + 1 + 4 + 4 + req.length;
struct evbuffer * out;
tr_peerIo * io = msgs->peer->io;
out = evbuffer_new( );
evbuffer_expand( out, msglen );
tr_peerIoWriteUint32( io, out, sizeof( uint8_t ) + 2 * sizeof( uint32_t ) + req.length );
tr_peerIoWriteUint8 ( io, out, BT_PIECE );
tr_peerIoWriteUint32( io, out, req.index );
tr_peerIoWriteUint32( io, out, req.offset );
err = tr_ioRead( msgs->torrent, req.index, req.offset, req.length, EVBUFFER_DATA(out)+EVBUFFER_LENGTH(out) );
if( err )
{
if( fext )
protocolSendReject( msgs, &req );
}
else
{
dbgmsg( msgs, "sending block %u:%u->%u", req.index, req.offset, req.length );
EVBUFFER_LENGTH(out) += req.length;
assert( EVBUFFER_LENGTH( out ) == msglen );
tr_peerIoWriteBuf( io, out, TRUE );
bytesWritten += EVBUFFER_LENGTH( out );
msgs->clientSentAnythingAt = now;
tr_historyAdd( msgs->peer->blocksSentToPeer, tr_date( ), 1 );
}
evbuffer_free( out );
if( err )
{
bytesWritten = 0;
msgs = NULL;
}
}
else if( fext ) /* peer needs a reject message */
{
protocolSendReject( msgs, &req );
}
if( msgs != NULL )
prefetchPieces( msgs );
}
/**
*** Keepalive
**/
if( ( msgs != NULL )
&& ( msgs->clientSentAnythingAt != 0 )
&& ( ( now - msgs->clientSentAnythingAt ) > KEEPALIVE_INTERVAL_SECS ) )
{
dbgmsg( msgs, "sending a keepalive message" );
tr_peerIoWriteUint32( msgs->peer->io, msgs->outMessages, 0 );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
}
return bytesWritten;
}
static int
peerPulse( void * vmsgs )
{
tr_peermsgs * msgs = vmsgs;
const time_t now = tr_time( );
if ( tr_isPeerIo( msgs->peer->io ) ) {
updateDesiredRequestCount( msgs, tr_date( ) );
updateBlockRequests( msgs );
updateMetadataRequests( msgs, now );
}
for( ;; )
if( fillOutputBuffer( msgs, now ) < 1 )
break;
return TRUE; /* loop forever */
}
void
tr_peerMsgsPulse( tr_peermsgs * msgs )
{
if( msgs != NULL )
peerPulse( msgs );
}
static void
gotError( tr_peerIo * io UNUSED,
short what,
void * vmsgs )
{
if( what & EVBUFFER_TIMEOUT )
dbgmsg( vmsgs, "libevent got a timeout, what=%hd", what );
if( what & ( EVBUFFER_EOF | EVBUFFER_ERROR ) )
dbgmsg( vmsgs, "libevent got an error! what=%hd, errno=%d (%s)",
what, errno, tr_strerror( errno ) );
fireError( vmsgs, ENOTCONN );
}
static void
sendBitfield( tr_peermsgs * msgs )
{
struct evbuffer * out = msgs->outMessages;
tr_bitfield * field;
tr_piece_index_t lazyPieces[LAZY_PIECE_COUNT];
size_t i;
size_t lazyCount = 0;
field = tr_bitfieldDup( tr_cpPieceBitfield( &msgs->torrent->completion ) );
if( tr_sessionIsLazyBitfieldEnabled( getSession( msgs ) ) )
{
/** Lazy bitfields aren't a high priority or secure, so I'm opting for
speed over a truly random sample -- let's limit the pool size to
the first 1000 pieces so large torrents don't bog things down */
size_t poolSize;
const size_t maxPoolSize = MIN( msgs->torrent->info.pieceCount, 1000 );
tr_piece_index_t * pool = tr_new( tr_piece_index_t, maxPoolSize );
/* build the pool */
for( i=poolSize=0; i<maxPoolSize; ++i )
if( tr_bitfieldHas( field, i ) )
pool[poolSize++] = i;
/* pull random piece indices from the pool */
while( ( poolSize > 0 ) && ( lazyCount < LAZY_PIECE_COUNT ) )
{
const int pos = tr_cryptoWeakRandInt( poolSize );
const tr_piece_index_t piece = pool[pos];
tr_bitfieldRem( field, piece );
lazyPieces[lazyCount++] = piece;
pool[pos] = pool[--poolSize];
}
/* cleanup */
tr_free( pool );
}
tr_peerIoWriteUint32( msgs->peer->io, out,
sizeof( uint8_t ) + field->byteCount );
tr_peerIoWriteUint8 ( msgs->peer->io, out, BT_BITFIELD );
/* FIXME(libevent2): use evbuffer_add_reference() */
tr_peerIoWriteBytes ( msgs->peer->io, out, field->bits, field->byteCount );
dbgmsg( msgs, "sending bitfield... outMessage size is now %zu",
EVBUFFER_LENGTH( out ) );
pokeBatchPeriod( msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS );
for( i = 0; i < lazyCount; ++i )
protocolSendHave( msgs, lazyPieces[i] );
tr_bitfieldFree( field );
}
static void
tellPeerWhatWeHave( tr_peermsgs * msgs )
{
const tr_bool fext = tr_peerIoSupportsFEXT( msgs->peer->io );
if( fext && ( tr_cpGetStatus( &msgs->torrent->completion ) == TR_SEED ) )
{
protocolSendHaveAll( msgs );
}
else if( fext && ( tr_cpHaveValid( &msgs->torrent->completion ) == 0 ) )
{
protocolSendHaveNone( msgs );
}
else
{
sendBitfield( msgs );
}
}
/**
***
**/
/* some peers give us error messages if we send
more than this many peers in a single pex message
http://wiki.theory.org/BitTorrentPeerExchangeConventions */
#define MAX_PEX_ADDED 50
#define MAX_PEX_DROPPED 50
typedef struct
{
tr_pex * added;
tr_pex * dropped;
tr_pex * elements;
int addedCount;
int droppedCount;
int elementCount;
}
PexDiffs;
static void
pexAddedCb( void * vpex,
void * userData )
{
PexDiffs * diffs = userData;
tr_pex * pex = vpex;
if( diffs->addedCount < MAX_PEX_ADDED )
{
diffs->added[diffs->addedCount++] = *pex;
diffs->elements[diffs->elementCount++] = *pex;
}
}
static inline void
pexDroppedCb( void * vpex,
void * userData )
{
PexDiffs * diffs = userData;
tr_pex * pex = vpex;
if( diffs->droppedCount < MAX_PEX_DROPPED )
{
diffs->dropped[diffs->droppedCount++] = *pex;
}
}
static inline void
pexElementCb( void * vpex,
void * userData )
{
PexDiffs * diffs = userData;
tr_pex * pex = vpex;
diffs->elements[diffs->elementCount++] = *pex;
}
static void
sendPex( tr_peermsgs * msgs )
{
if( msgs->peerSupportsPex && tr_torrentAllowsPex( msgs->torrent ) )
{
PexDiffs diffs;
PexDiffs diffs6;
tr_pex * newPex = NULL;
tr_pex * newPex6 = NULL;
const int newCount = tr_peerMgrGetPeers( msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
const int newCount6 = tr_peerMgrGetPeers( msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT );
/* build the diffs */
diffs.added = tr_new( tr_pex, newCount );
diffs.addedCount = 0;
diffs.dropped = tr_new( tr_pex, msgs->pexCount );
diffs.droppedCount = 0;
diffs.elements = tr_new( tr_pex, newCount + msgs->pexCount );
diffs.elementCount = 0;
tr_set_compare( msgs->pex, msgs->pexCount,
newPex, newCount,
tr_pexCompare, sizeof( tr_pex ),
pexDroppedCb, pexAddedCb, pexElementCb, &diffs );
diffs6.added = tr_new( tr_pex, newCount6 );
diffs6.addedCount = 0;
diffs6.dropped = tr_new( tr_pex, msgs->pexCount6 );
diffs6.droppedCount = 0;
diffs6.elements = tr_new( tr_pex, newCount6 + msgs->pexCount6 );
diffs6.elementCount = 0;
tr_set_compare( msgs->pex6, msgs->pexCount6,
newPex6, newCount6,
tr_pexCompare, sizeof( tr_pex ),
pexDroppedCb, pexAddedCb, pexElementCb, &diffs6 );
dbgmsg(
msgs,
"pex: old peer count %d+%d, new peer count %d+%d, "
"added %d+%d, removed %d+%d",
msgs->pexCount, msgs->pexCount6, newCount, newCount6,
diffs.addedCount, diffs6.addedCount,
diffs.droppedCount, diffs6.droppedCount );
if( !diffs.addedCount && !diffs.droppedCount && !diffs6.addedCount &&
!diffs6.droppedCount )
{
tr_free( diffs.elements );
tr_free( diffs6.elements );
}
else
{
int i;
tr_benc val;
char * benc;
int bencLen;
uint8_t * tmp, *walk;
tr_peerIo * io = msgs->peer->io;
struct evbuffer * out = msgs->outMessages;
/* update peer */
tr_free( msgs->pex );
msgs->pex = diffs.elements;
msgs->pexCount = diffs.elementCount;
tr_free( msgs->pex6 );
msgs->pex6 = diffs6.elements;
msgs->pexCount6 = diffs6.elementCount;
/* build the pex payload */
tr_bencInitDict( &val, 3 ); /* ipv6 support: left as 3:
* speed vs. likelihood? */
if( diffs.addedCount > 0)
{
/* "added" */
tmp = walk = tr_new( uint8_t, diffs.addedCount * 6 );
for( i = 0; i < diffs.addedCount; ++i ) {
memcpy( walk, &diffs.added[i].addr.addr, 4 ); walk += 4;
memcpy( walk, &diffs.added[i].port, 2 ); walk += 2;
}
assert( ( walk - tmp ) == diffs.addedCount * 6 );
tr_bencDictAddRaw( &val, "added", tmp, walk - tmp );
tr_free( tmp );
/* "added.f" */
tmp = walk = tr_new( uint8_t, diffs.addedCount );
for( i = 0; i < diffs.addedCount; ++i )
*walk++ = diffs.added[i].flags;
assert( ( walk - tmp ) == diffs.addedCount );
tr_bencDictAddRaw( &val, "added.f", tmp, walk - tmp );
tr_free( tmp );
}
if( diffs.droppedCount > 0 )
{
/* "dropped" */
tmp = walk = tr_new( uint8_t, diffs.droppedCount * 6 );
for( i = 0; i < diffs.droppedCount; ++i ) {
memcpy( walk, &diffs.dropped[i].addr.addr, 4 ); walk += 4;
memcpy( walk, &diffs.dropped[i].port, 2 ); walk += 2;
}
assert( ( walk - tmp ) == diffs.droppedCount * 6 );
tr_bencDictAddRaw( &val, "dropped", tmp, walk - tmp );
tr_free( tmp );
}
if( diffs6.addedCount > 0 )
{
/* "added6" */
tmp = walk = tr_new( uint8_t, diffs6.addedCount * 18 );
for( i = 0; i < diffs6.addedCount; ++i ) {
memcpy( walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16 );
walk += 16;
memcpy( walk, &diffs6.added[i].port, 2 );
walk += 2;
}
assert( ( walk - tmp ) == diffs6.addedCount * 18 );
tr_bencDictAddRaw( &val, "added6", tmp, walk - tmp );
tr_free( tmp );
/* "added6.f" */
tmp = walk = tr_new( uint8_t, diffs6.addedCount );
for( i = 0; i < diffs6.addedCount; ++i )
*walk++ = diffs6.added[i].flags;
assert( ( walk - tmp ) == diffs6.addedCount );
tr_bencDictAddRaw( &val, "added6.f", tmp, walk - tmp );
tr_free( tmp );
}
if( diffs6.droppedCount > 0 )
{
/* "dropped6" */
tmp = walk = tr_new( uint8_t, diffs6.droppedCount * 18 );
for( i = 0; i < diffs6.droppedCount; ++i ) {
memcpy( walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16 );
walk += 16;
memcpy( walk, &diffs6.dropped[i].port, 2 );
walk += 2;
}
assert( ( walk - tmp ) == diffs6.droppedCount * 18);
tr_bencDictAddRaw( &val, "dropped6", tmp, walk - tmp );
tr_free( tmp );
}
/* write the pex message */
benc = tr_bencToStr( &val, TR_FMT_BENC, &bencLen );
tr_peerIoWriteUint32( io, out, 2 * sizeof( uint8_t ) + bencLen );
tr_peerIoWriteUint8 ( io, out, BT_LTEP );
tr_peerIoWriteUint8 ( io, out, msgs->ut_pex_id );
tr_peerIoWriteBytes ( io, out, benc, bencLen );
pokeBatchPeriod( msgs, HIGH_PRIORITY_INTERVAL_SECS );
dbgmsg( msgs, "sending a pex message; outMessage size is now %zu", EVBUFFER_LENGTH( out ) );
dbgOutMessageLen( msgs );
tr_free( benc );
tr_bencFree( &val );
}
/* cleanup */
tr_free( diffs.added );
tr_free( diffs.dropped );
tr_free( newPex );
tr_free( diffs6.added );
tr_free( diffs6.dropped );
tr_free( newPex6 );
/*msgs->clientSentPexAt = tr_time( );*/
}
}
static void
pexPulse( int foo UNUSED, short bar UNUSED, void * vmsgs )
{
struct tr_peermsgs * msgs = vmsgs;
sendPex( msgs );
tr_timerAdd( &msgs->pexTimer, PEX_INTERVAL_SECS, 0 );
}
/**
***
**/
tr_peermsgs*
tr_peerMsgsNew( struct tr_torrent * torrent,
struct tr_peer * peer,
tr_delivery_func func,
void * userData,
tr_publisher_tag * setme )
{
tr_peermsgs * m;
assert( peer );
assert( peer->io );
m = tr_new0( tr_peermsgs, 1 );
m->publisher = TR_PUBLISHER_INIT;
m->peer = peer;
m->torrent = torrent;
m->peer->clientIsChoked = 1;
m->peer->peerIsChoked = 1;
m->peer->clientIsInterested = 0;
m->peer->peerIsInterested = 0;
m->state = AWAITING_BT_LENGTH;
m->outMessages = evbuffer_new( );
m->outMessagesBatchedAt = 0;
m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
m->incoming.block = evbuffer_new( );
evtimer_set( &m->pexTimer, pexPulse, m );
tr_timerAdd( &m->pexTimer, PEX_INTERVAL_SECS, 0 );
peer->msgs = m;
*setme = tr_publisherSubscribe( &m->publisher, func, userData );
if( tr_peerIoSupportsLTEP( peer->io ) )
sendLtepHandshake( m );
if( tr_dhtEnabled( torrent->session ) && tr_peerIoSupportsDHT( peer->io ))
{
/* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
protocolSendPort( m, tr_dhtPort( torrent->session ) );
}
}
tellPeerWhatWeHave( m );
tr_peerIoSetIOFuncs( m->peer->io, canRead, didWrite, gotError, m );
updateDesiredRequestCount( m, tr_date( ) );
return m;
}
void
tr_peerMsgsFree( tr_peermsgs* msgs )
{
if( msgs )
{
evtimer_del( &msgs->pexTimer );
tr_publisherDestruct( &msgs->publisher );
evbuffer_free( msgs->incoming.block );
evbuffer_free( msgs->outMessages );
tr_free( msgs->pex6 );
tr_free( msgs->pex );
memset( msgs, ~0, sizeof( tr_peermsgs ) );
tr_free( msgs );
}
}
void
tr_peerMsgsUnsubscribe( tr_peermsgs * peer,
tr_publisher_tag tag )
{
tr_publisherUnsubscribe( &peer->publisher, tag );
}