(trunk libT) jch's patches 0001 through 0004 for ticket #2576, IPv6 support for DHT (BEP #32)

This commit is contained in:
Charles Kerr 2009-11-24 01:59:51 +00:00
parent 4322837189
commit 1cbbcf9fa1
11 changed files with 1510 additions and 633 deletions

View File

@ -1650,15 +1650,29 @@ announceMore( tr_announcer * announcer )
tor = NULL;
while(( tor = tr_torrentNext( announcer->session, tor ))) {
if( tor->dhtAnnounceAt <= now ) {
int rc = 1;
if( tor->isRunning && tr_torrentAllowsDHT(tor) )
rc = tr_dhtAnnounce(tor, 1);
if(rc == 0)
/* The DHT is not ready yet. Try again soon. */
tor->dhtAnnounceAt = now + 5 + tr_cryptoWeakRandInt( 5 );
else
/* We should announce at least once every 30 minutes. */
tor->dhtAnnounceAt = now + 25 * 60 + tr_cryptoWeakRandInt( 3 * 60 );
if( tor->isRunning && tr_torrentAllowsDHT(tor) ) {
int rc;
rc = tr_dhtAnnounce(tor, AF_INET, 1);
if(rc == 0)
/* The DHT is not ready yet. Try again soon. */
tor->dhtAnnounceAt = now + 5 + tr_cryptoWeakRandInt( 5 );
else
/* We should announce at least once every 30 minutes. */
tor->dhtAnnounceAt =
now + 25 * 60 + tr_cryptoWeakRandInt( 3 * 60 );
}
}
if( tor->dhtAnnounce6At <= now ) {
if( tor->isRunning && tr_torrentAllowsDHT(tor) ) {
int rc;
rc = tr_dhtAnnounce(tor, AF_INET6, 1);
if(rc == 0)
tor->dhtAnnounce6At = now + 5 + tr_cryptoWeakRandInt( 5 );
else
tor->dhtAnnounce6At =
now + 25 * 60 + tr_cryptoWeakRandInt( 3 * 60 );
}
}
}
}

View File

@ -2128,10 +2128,9 @@ tr_peerMsgsNew( struct tr_torrent * torrent,
sendLtepHandshake( m );
if(tr_peerIoSupportsDHT(peer->io)) {
/* We don't have an IPv6 DHT yet.
* According to BEP-32, we can't send PORT over IPv6. */
/* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
const struct tr_address *addr = tr_peerIoGetAddress( peer->io, NULL );
if( addr->type == TR_AF_INET ) {
if( addr->type == TR_AF_INET || tr_globalIPv6() ) {
protocolSendPort( m, tr_dhtPort( torrent->session ) );
}
}

View File

@ -1319,6 +1319,7 @@ checkAndStartImpl( void * vtor )
tr_torrentResetTransferStats( tor );
tr_announcerTorrentStarted( tor );
tor->dhtAnnounceAt = now + tr_cryptoWeakRandInt( 20 );
tor->dhtAnnounce6At = now + tr_cryptoWeakRandInt( 20 );
tr_peerMgrStartTorrent( tor );
}

View File

@ -181,7 +181,9 @@ struct tr_torrent
struct tr_publisher_tag * tiersSubscription;
time_t dhtAnnounceAt;
time_t dhtAnnounce6At;
tr_bool dhtAnnounceInProgress;
tr_bool dhtAnnounce6InProgress;
uint64_t downloadedCur;
uint64_t downloadedPrev;

View File

@ -30,6 +30,7 @@ THE SOFTWARE.
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h> /* socket(), bind() */
#include <netdb.h>
#include <unistd.h> /* close() */
/* third party */
@ -50,8 +51,8 @@ THE SOFTWARE.
#include "utils.h"
#include "version.h"
static int dht_socket;
static struct event dht_event;
static int dht_socket = -1, dht6_socket = -1;
static struct event dht_event, dht6_event;
static tr_port dht_port;
static unsigned char myid[20];
static tr_session *session = NULL;
@ -61,56 +62,238 @@ static void event_callback(int s, short type, void *ignore);
struct bootstrap_closure {
tr_session *session;
uint8_t *nodes;
size_t len;
uint8_t *nodes6;
size_t len, len6;
};
static int
bootstrap_done( tr_session *session, int af )
{
int status;
if(af == 0)
return
bootstrap_done(session, AF_INET) &&
bootstrap_done(session, AF_INET6);
status = tr_dhtStatus(session, af, NULL);
return status == TR_DHT_STOPPED || status >= TR_DHT_FIREWALLED;
}
static void
nap( int roughly )
{
struct timeval tv;
tr_timevalSet( &tv, roughly / 2 + tr_cryptoWeakRandInt( roughly ),
tr_cryptoWeakRandInt( 1000000 ) );
select( 0, NULL, NULL, NULL, &tv );
}
static int
bootstrap_af(tr_session *session)
{
if( bootstrap_done(session, AF_INET6) )
return AF_INET;
else if ( bootstrap_done(session, AF_INET) )
return AF_INET6;
else
return 0;
}
static void
bootstrap_from_name( const char *name, short int port, int af )
{
struct addrinfo hints, *info, *infop;
char pp[10];
int rc;
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_DGRAM;
hints.ai_family = af;
/* No, just passing p + 1 to gai won't work. */
snprintf(pp, 10, "%d", port);
rc = getaddrinfo(name, pp, &hints, &info);
if(rc != 0) {
tr_nerr("DHT", "%s:%s: %s", name, pp, gai_strerror(rc));
return;
}
infop = info;
while(infop) {
dht_ping_node(infop->ai_addr, infop->ai_addrlen);
nap(15);
if(bootstrap_done(session, af))
break;
infop = infop->ai_next;
}
freeaddrinfo(info);
}
static void
dht_bootstrap(void *closure)
{
struct bootstrap_closure *cl = closure;
size_t i;
int i;
int num = cl->len / 6, num6 = cl->len6 / 18;
if(session != cl->session)
return;
for(i = 0; i < cl->len; i += 6)
{
struct timeval tv;
tr_port port;
struct tr_address addr;
int status;
if(cl->len > 0)
tr_ninf( "DHT", "Bootstrapping from %d nodes", num );
memset(&addr, 0, sizeof(addr));
addr.type = TR_AF_INET;
memcpy(&addr.addr.addr4, &cl->nodes[i], 4);
memcpy(&port, &cl->nodes[i + 4], 2);
port = ntohs(port);
/* There's no race here -- if we uninit between the test and the
AddNode, the AddNode will be ignored. */
status = tr_dhtStatus(cl->session, NULL);
if(status == TR_DHT_STOPPED || status >= TR_DHT_FIREWALLED)
if(cl->len6 > 0)
tr_ninf( "DHT", "Bootstrapping from %d IPv6 nodes", num6 );
for(i = 0; i < MAX(num, num6); i++) {
if( i < num && !bootstrap_done(cl->session, AF_INET) ) {
tr_port port;
struct tr_address addr;
memset(&addr, 0, sizeof(addr));
addr.type = TR_AF_INET;
memcpy(&addr.addr.addr4, &cl->nodes[i * 6], 4);
memcpy(&port, &cl->nodes[i * 6 + 4], 2);
port = ntohs(port);
tr_dhtAddNode(cl->session, &addr, port, 1);
}
if( i < num6 && !bootstrap_done(cl->session, AF_INET6) ) {
tr_port port;
struct tr_address addr;
memset(&addr, 0, sizeof(addr));
addr.type = TR_AF_INET6;
memcpy(&addr.addr.addr6, &cl->nodes6[i * 18], 16);
memcpy(&port, &cl->nodes6[i * 18 + 16], 2);
port = ntohs(port);
tr_dhtAddNode(cl->session, &addr, port, 1);
}
/* Our DHT code is able to take up to 9 nodes in a row without
dropping any. After that, it takes some time to split buckets.
So ping the first 8 nodes quickly, then slow down. */
if(i < 8)
nap(2);
else
nap(15);
if(bootstrap_done( session, 0 ))
break;
tr_dhtAddNode(cl->session, &addr, port, 1);
tr_timevalSet( &tv, 2 + tr_cryptoWeakRandInt( 5 ), tr_cryptoWeakRandInt( 1000000 ) );
select( 0, NULL, NULL, NULL, &tv );
}
tr_free( cl->nodes );
if(!bootstrap_done(cl->session, 0)) {
char *bootstrap_file;
FILE *f = NULL;
bootstrap_file =
tr_buildPath(cl->session->configDir, "dht.bootstrap", NULL);
if(bootstrap_file)
f = fopen(bootstrap_file, "r");
if(f) {
tr_ninf("DHT", "Attempting manual bootstrap");
while(1) {
char buf[201];
char *p;
int port = 0;
p = fgets(buf, 200, f);
if( p == NULL )
break;
p = memchr(buf, ' ', strlen(buf));
if(p != NULL)
port = atoi(p + 1);
if(p == NULL || port <= 0 || port >= 0x10000) {
tr_nerr("DHT", "Couldn't parse %s", buf);
continue;
}
*p = '\0';
bootstrap_from_name( buf, port, bootstrap_af(session) );
if(bootstrap_done(cl->session, 0))
break;
}
}
}
/* We really don't want to abuse our bootstrap nodes.
Be glacially slow. */
if(!bootstrap_done(cl->session, 0))
nap(30);
if(!bootstrap_done(cl->session, 0)) {
tr_ninf("DHT", "Attempting bootstrap from dht.transmissionbt.com");
bootstrap_from_name( "dht.transmissionbt.com", 6881,
bootstrap_af(session) );
}
if( cl->nodes )
tr_free( cl->nodes );
if( cl->nodes6 )
tr_free( cl->nodes6 );
tr_free( closure );
tr_ndbg( "DHT", "Finished bootstrapping" );
}
/* BEP-32 has a rather nice explanation of why we need to bind to one
IPv6 address, if I may say so myself. */
static int
rebind_ipv6(int force)
{
struct sockaddr_in6 sin6;
const unsigned char *ipv6 = tr_globalIPv6();
static unsigned char *last_bound = NULL;
int rc;
if(dht6_socket < 0)
return 0;
if(!force &&
((ipv6 == NULL && last_bound == NULL) ||
(ipv6 != NULL && last_bound != NULL &&
memcmp(ipv6, last_bound, 16) == 0)))
return 0;
memset(&sin6, 0, sizeof(sin6));
sin6.sin6_family = AF_INET6;
if(ipv6)
memcpy(&sin6.sin6_addr, ipv6, 16);
sin6.sin6_port = htons(dht_port);
rc = bind(dht6_socket, (struct sockaddr*)&sin6, sizeof(sin6));
if(last_bound)
free(last_bound);
last_bound = NULL;
if(rc >= 0 && ipv6) {
last_bound = malloc(16);
if(last_bound)
memcpy(last_bound, ipv6, 16);
}
return rc;
}
int
tr_dhtInit(tr_session *ss, tr_address * tr_addr)
tr_dhtInit(tr_session *ss, const tr_address * tr_addr)
{
struct sockaddr_in sin;
tr_benc benc;
int rc;
tr_bool have_id = FALSE;
char * dat_file;
uint8_t * nodes = NULL;
uint8_t * nodes = NULL, * nodes6 = NULL;
const uint8_t * raw;
size_t len;
size_t len, len6;
char v[5];
struct bootstrap_closure * cl;
if( session ) /* already initialized */
return -1;
@ -127,12 +310,21 @@ tr_dhtInit(tr_session *ss, tr_address * tr_addr)
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
memcpy(&(sin.sin_addr), &(tr_addr->addr.addr4), sizeof (struct in_addr));
memcpy(&sin.sin_addr, &tr_addr->addr.addr4, sizeof (struct in_addr));
sin.sin_port = htons(dht_port);
rc = bind(dht_socket, (struct sockaddr*)&sin, sizeof(sin));
if(rc < 0)
goto fail;
if(tr_globalIPv6()) {
dht6_socket = socket(PF_INET6, SOCK_DGRAM, 0);
if(dht6_socket < 0)
goto fail;
rebind_ipv6(1);
}
if( getenv( "TR_DHT_VERBOSE" ) != NULL )
dht_debug = stderr;
@ -140,15 +332,25 @@ tr_dhtInit(tr_session *ss, tr_address * tr_addr)
rc = tr_bencLoadFile( &benc, TR_FMT_BENC, dat_file );
tr_free( dat_file );
if(rc == 0) {
if(( have_id = tr_bencDictFindRaw( &benc, "id", &raw, &len ) && len==20 ))
have_id = tr_bencDictFindRaw(&benc, "id", &raw, &len);
if( have_id && len==20 )
memcpy( myid, raw, len );
if( tr_bencDictFindRaw( &benc, "nodes", &raw, &len ) && !(len%6) ) {
nodes = tr_memdup( raw, len );
tr_ninf( "DHT", "Bootstrapping from %d old nodes", (int)(len/6) );
if( dht_socket >= 0 &&
tr_bencDictFindRaw( &benc, "nodes", &raw, &len ) && !(len%6) ) {
nodes = tr_memdup( raw, len );
}
if( dht6_socket > 0 &&
tr_bencDictFindRaw( &benc, "nodes6", &raw, &len6 ) && !(len6%18) ) {
nodes6 = tr_memdup( raw, len6 );
}
tr_bencFree( &benc );
}
if(nodes == NULL)
len = 0;
if(nodes6 == NULL)
len6 = 0;
if( have_id )
tr_ninf( "DHT", "Reusing old id" );
else {
@ -162,22 +364,24 @@ tr_dhtInit(tr_session *ss, tr_address * tr_addr)
v[1] = 'R';
v[2] = (SVN_REVISION_NUM >> 8) & 0xFF;
v[3] = SVN_REVISION_NUM & 0xFF;
rc = dht_init( dht_socket, myid, (const unsigned char*)v );
rc = dht_init( dht_socket, dht6_socket, myid, (const unsigned char*)v );
if(rc < 0)
goto fail;
session = ss;
if(nodes) {
struct bootstrap_closure * cl = tr_new( struct bootstrap_closure, 1 );
cl->session = session;
cl->nodes = nodes;
cl->len = len;
tr_threadNew( dht_bootstrap, cl );
}
cl = tr_new( struct bootstrap_closure, 1 );
cl->session = session;
cl->nodes = nodes;
cl->nodes6 = nodes6;
cl->len = len;
cl->len6 = len6;
tr_threadNew( dht_bootstrap, cl );
event_set( &dht_event, dht_socket, EV_READ, event_callback, NULL );
event_set( &dht6_event, dht6_socket, EV_READ, event_callback, NULL );
tr_timerAdd( &dht_event, 0, tr_cryptoWeakRandInt( 1000000 ) );
tr_timerAdd( &dht6_event, 0, tr_cryptoWeakRandInt( 1000000 ) );
tr_ndbg( "DHT", "DHT initialized" );
@ -187,7 +391,9 @@ tr_dhtInit(tr_session *ss, tr_address * tr_addr)
{
const int save = errno;
close(dht_socket);
dht_socket = -1;
if( dht6_socket >= 0 )
close(dht6_socket);
dht_socket = dht6_socket = -1;
session = NULL;
tr_ndbg( "DHT", "DHT initialization failed (errno = %d)", save );
errno = save;
@ -205,37 +411,51 @@ tr_dhtUninit(tr_session *ss)
tr_ndbg( "DHT", "Uninitializing DHT" );
event_del(&dht_event);
event_del(&dht6_event);
/* Since we only save known good nodes, avoid erasing older data if we
don't know enough nodes. */
if(tr_dhtStatus(ss, NULL) < TR_DHT_FIREWALLED)
if(tr_dhtStatus(ss, AF_INET, NULL) < TR_DHT_FIREWALLED)
tr_ninf( "DHT", "Not saving nodes, DHT not ready" );
else {
tr_benc benc;
struct sockaddr_in sins[300];
char compact[300 * 6];
struct sockaddr_in6 sins6[300];
char compact[300 * 6], compact6[300 * 18];
char *dat_file;
int i;
int n = dht_get_nodes(sins, 300);
int j = 0;
int i, j, num = 300, num6 = 300;
int n = dht_get_nodes(sins, &num, sins6, &num6);
tr_ninf( "DHT", "Saving %d nodes", n );
for( i=0; i<n; ++i ) {
tr_ninf( "DHT", "Saving %d (%d + %d) nodes", n, num, num6 );
j = 0;
for( i=0; i<num; ++i ) {
memcpy( compact + j, &sins[i].sin_addr, 4 );
memcpy( compact + j + 4, &sins[i].sin_port, 2 );
j += 6;
}
tr_bencInitDict( &benc, 2 );
j = 0;
for( i=0; i<num6; ++i ) {
memcpy( compact6 + j, &sins6[i].sin6_addr, 16 );
memcpy( compact6 + j + 16, &sins6[i].sin6_port, 2 );
j += 18;
}
tr_bencInitDict( &benc, 3 );
tr_bencDictAddRaw( &benc, "id", myid, 20 );
tr_bencDictAddRaw( &benc, "nodes", compact, j );
if(num > 0)
tr_bencDictAddRaw( &benc, "nodes", compact, num * 6 );
if(num6 > 0)
tr_bencDictAddRaw( &benc, "nodes6", compact6, num6 * 18 );
dat_file = tr_buildPath( ss->configDir, "dht.dat", NULL );
tr_bencToFile( &benc, TR_FMT_BENC, dat_file );
tr_bencFree( &benc );
tr_free( dat_file );
}
dht_uninit( dht_socket, 0 );
dht_uninit( 1 );
tr_netCloseSocket( dht_socket );
if(dht6_socket > 0)
tr_netCloseSocket( dht6_socket );
tr_ndbg("DHT", "Done uninitializing DHT");
@ -243,57 +463,67 @@ tr_dhtUninit(tr_session *ss)
}
tr_bool
tr_dhtEnabled( const tr_session * ss )
tr_dhtEnabled( tr_session * ss )
{
return ss && ( ss == session );
}
struct getstatus_closure
{
int af;
sig_atomic_t status;
sig_atomic_t count;
};
static void
getstatus( void * closure )
getstatus( void * cl )
{
struct getstatus_closure * ret = closure;
struct getstatus_closure * closure = cl;
int good, dubious, incoming;
dht_nodes( &good, &dubious, NULL, &incoming );
dht_nodes( closure->af, &good, &dubious, NULL, &incoming );
ret->count = good + dubious;
closure->count = good + dubious;
if( good < 4 || good + dubious <= 8 )
ret->status = TR_DHT_BROKEN;
closure->status = TR_DHT_BROKEN;
else if( good < 40 )
ret->status = TR_DHT_POOR;
closure->status = TR_DHT_POOR;
else if( incoming < 8 )
ret->status = TR_DHT_FIREWALLED;
closure->status = TR_DHT_FIREWALLED;
else
ret->status = TR_DHT_GOOD;
closure->status = TR_DHT_GOOD;
}
int
tr_dhtStatus( tr_session * ss, int * nodes_return )
tr_dhtStatus( tr_session * ss, int af, int * nodes_return )
{
struct getstatus_closure ret = { -1, - 1 };
struct getstatus_closure closure = { af, -1, -1 };
if( !tr_dhtEnabled( ss ) )
return TR_DHT_STOPPED;
tr_runInEventThread( ss, getstatus, &ret );
while( ret.status < 0 )
if(dht_socket < 0 && dht6_socket < 0)
return TR_DHT_STOPPED;
if(af == AF_INET && dht_socket < 0)
return TR_DHT_STOPPED;
if(af == AF_INET6 && dht6_socket < 0)
return TR_DHT_STOPPED;
tr_runInEventThread( ss, getstatus, &closure );
while( closure.status < 0 )
tr_wait( 10 /*msec*/ );
if( nodes_return )
*nodes_return = ret.count;
*nodes_return = closure.count;
return ret.status;
return closure.status;
}
tr_port
tr_dhtPort( const tr_session *ss )
tr_dhtPort( tr_session *ss )
{
return tr_dhtEnabled( ss ) ? dht_port : 0;
}
@ -304,28 +534,38 @@ tr_dhtAddNode( tr_session * ss,
tr_port port,
tr_bool bootstrap )
{
struct sockaddr_in sin;
int af = address->type == TR_AF_INET ? AF_INET : AF_INET6;
if( !tr_dhtEnabled( ss ) )
return 0;
if( address->type != TR_AF_INET )
return 0;
/* Since we don't want to abuse our bootstrap nodes,
* we don't ping them if the DHT is in a good state. */
if(bootstrap) {
if(tr_dhtStatus(ss, NULL) >= TR_DHT_FIREWALLED)
if(tr_dhtStatus(ss, af, NULL) >= TR_DHT_FIREWALLED)
return 0;
}
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
memcpy(&sin.sin_addr, &address->addr.addr4, 4);
sin.sin_port = htons(port);
dht_ping_node(dht_socket, &sin);
if( address->type == TR_AF_INET ) {
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
memcpy(&sin.sin_addr, &address->addr.addr4, 4);
sin.sin_port = htons(port);
dht_ping_node((struct sockaddr*)&sin, sizeof(sin));
return 1;
} else if( address->type == TR_AF_INET6 ) {
struct sockaddr_in6 sin6;
memset(&sin6, 0, sizeof(sin6));
sin6.sin6_family = AF_INET6;
memcpy(&sin6.sin6_addr, &address->addr.addr6, 16);
sin6.sin6_port = htons(port);
dht_ping_node((struct sockaddr*)&sin6, sizeof(sin6));
return 1;
}
return 1;
return 0;
}
const char *
@ -345,69 +585,85 @@ static void
callback( void *ignore UNUSED, int event,
unsigned char *info_hash, void *data, size_t data_len )
{
if( event == DHT_EVENT_VALUES )
{
if( event == DHT_EVENT_VALUES || event == DHT_EVENT_VALUES6 ) {
tr_torrent *tor;
tr_globalLock( session );
tor = tr_torrentFindFromHash( session, info_hash );
if( tor && tr_torrentAllowsDHT( tor ))
{
size_t i, n;
tr_pex * pex = tr_peerMgrCompactToPex(data, data_len, NULL, 0, &n);
tr_pex * pex;
if( event == DHT_EVENT_VALUES )
pex = tr_peerMgrCompactToPex(data, data_len, NULL, 0, &n);
else
pex = tr_peerMgrCompact6ToPex(data, data_len, NULL, 0, &n);
for( i=0; i<n; ++i )
tr_peerMgrAddPex( tor, TR_PEER_FROM_DHT, pex+i );
tr_free(pex);
tr_torinf(tor, "Learned %d peers from DHT", (int)n);
tr_torinf(tor, "Learned %d%s peers from DHT",
(int)n,
event == DHT_EVENT_VALUES6 ? " IPv6" : "");
}
tr_globalUnlock( session );
}
else if( event == DHT_EVENT_SEARCH_DONE )
{
} else if( event == DHT_EVENT_SEARCH_DONE ||
event == DHT_EVENT_SEARCH_DONE6) {
tr_torrent * tor = tr_torrentFindFromHash( session, info_hash );
if( tor ) {
tr_torinf(tor, "DHT announce done");
tor->dhtAnnounceInProgress = 0;
if( event == DHT_EVENT_SEARCH_DONE ) {
tr_torinf(tor, "DHT announce done");
tor->dhtAnnounceInProgress = 0;
} else {
tr_torinf(tor, "IPv6 DHT announce done");
tor->dhtAnnounce6InProgress = 0;
}
}
}
}
int
tr_dhtAnnounce(tr_torrent *tor, tr_bool announce)
tr_dhtAnnounce(tr_torrent *tor, int af, tr_bool announce)
{
int rc, status, numnodes;
int rc, status, numnodes, ret = 0;
if( !tr_torrentAllowsDHT( tor ) )
return -1;
status = tr_dhtStatus( tor->session, &numnodes );
if(status < TR_DHT_POOR ) {
tr_tordbg(tor, "DHT not ready (%s, %d nodes)",
tr_dhtPrintableStatus(status), numnodes);
return 0;
}
rc = dht_search( dht_socket, tor->info.hash,
announce ? tr_sessionGetPeerPort(session) : 0,
callback, NULL);
if( rc >= 1 ) {
tr_torinf(tor, "Starting DHT announce (%s, %d nodes)",
tr_dhtPrintableStatus(status), numnodes);
tor->dhtAnnounceInProgress = TRUE;
status = tr_dhtStatus( tor->session, af, &numnodes );
if(status >= TR_DHT_POOR ) {
rc = dht_search( tor->info.hash,
announce ? tr_sessionGetPeerPort(session) : 0,
af, callback, NULL);
if( rc >= 1 ) {
tr_torinf(tor, "Starting%s DHT announce (%s, %d nodes)",
af == AF_INET6 ? " IPv6" : "",
tr_dhtPrintableStatus(status), numnodes);
if(af == AF_INET)
tor->dhtAnnounceInProgress = TRUE;
else
tor->dhtAnnounce6InProgress = TRUE;
ret = 1;
} else {
tr_torerr(tor, "%sDHT announce failed, errno = %d (%s, %d nodes)",
af == AF_INET6 ? "IPv6 " : "",
errno, tr_dhtPrintableStatus(status), numnodes);
}
} else {
tr_torerr(tor, "DHT announce failed, errno = %d (%s, %d nodes)",
errno, tr_dhtPrintableStatus(status), numnodes);
tr_tordbg(tor, "%sDHT not ready (%s, %d nodes)",
af == AF_INET6 ? "IPv6 " : "",
tr_dhtPrintableStatus(status), numnodes);
}
return 1;
return ret;
}
static void
event_callback(int s, short type, void *ignore UNUSED )
{
struct event *event = (s == dht_socket) ? &dht_event : &dht6_event;
time_t tosleep;
static int count = 0;
if( dht_periodic(s, type == EV_READ, &tosleep, callback, NULL) < 0 ) {
if( dht_periodic( type == EV_READ, &tosleep, callback, NULL) < 0 ) {
if(errno == EINTR) {
tosleep = 0;
} else {
@ -418,9 +674,17 @@ event_callback(int s, short type, void *ignore UNUSED )
}
}
/* Only do this once in a while. Counting rather than measuring time
avoids a system call. */
count++;
if(count >= 128) {
rebind_ipv6(0);
count = 0;
}
/* Being slightly late is fine,
and has the added benefit of adding some jitter. */
tr_timerAdd( &dht_event, tosleep, tr_cryptoWeakRandInt( 1000000 ) );
tr_timerAdd( event, tosleep, tr_cryptoWeakRandInt( 1000000 ) );
}
void

View File

@ -26,11 +26,11 @@ THE SOFTWARE.
#define TR_DHT_FIREWALLED 3
#define TR_DHT_GOOD 4
int tr_dhtInit( tr_session *, tr_address * );
int tr_dhtInit( tr_session *, const tr_address * );
void tr_dhtUninit( tr_session * );
tr_bool tr_dhtEnabled( const tr_session * );
tr_port tr_dhtPort ( const tr_session * );
int tr_dhtStatus( tr_session *, int * setme_nodeCount );
tr_bool tr_dhtEnabled( tr_session * );
tr_port tr_dhtPort ( tr_session * );
int tr_dhtStatus( tr_session *, int af, int * setme_nodeCount );
const char *tr_dhtPrintableStatus(int status);
int tr_dhtAddNode( tr_session *, const tr_address *, tr_port, tr_bool bootstrap );
int tr_dhtAnnounce( tr_torrent *, tr_bool announce );
int tr_dhtAnnounce( tr_torrent *, int af, tr_bool announce );

View File

@ -1,3 +1,16 @@
22 November 2009: dht-0.11
* Implemented IPv6 support (BEP-32).
* Fixed a bug which could cause us to never mark a search as finished.
* Fixed a bug that could cause us to send incomplete lists in response to
find_nodes.
* Limit the number of hashes that we're willing to track.
* Made bucket maintenance slightly more aggressive.
* Produce on-the-wire error messages to give a hint to the other side.
* Added a bunch of options to dht-example to make it useful as
a bootstrap node.
* Send version "JC\0\0" when using dht-example.
18 October 2009: dht-0.10
* Send nodes even when sending values. This is a violation of the

View File

@ -19,14 +19,18 @@ Initialisation
* dht_init
This must be called before using the library. You pass it a bound IPv4
datagram socket, and your node id, a 20-octet array that should be globally
unique.
datagram socket, a bound IPv6 datagram socket, and your node id, a 20-octet
array that should be globally unique.
If you're on a multi-homed host, you should bind the sockets to one of your
addresses.
Node ids must be well distributed, so you cannot just use your Bittorrent
id; you should either generate a truly random value (using plenty of
entropy), or at least take the SHA-1 of something. However, it is a good
idea to keep the id stable, so you may want to store it in stable storage
at client shutdown.
* dht_uninit
@ -147,14 +151,14 @@ Functions provided by you
* The callback function
The callback function is called with 5 arguments. Closure is simply the
value that you passed to dht_periodic. Event is one of DHT_EVENT_VALUES,
which indicates that we have new values, or DHT_EVENT_SEARCH_DONE, which
indicates that a search has completed. In either case, info_hash is set to
the info-hash of the search.
value that you passed to dht_periodic. Event is one of DHT_EVENT_VALUES or
DHT_EVENT_VALUES6, which indicates that we have new values, or
DHT_EVENT_SEARCH_DONE or DHT_EVENT_SEARCH_DONE6, which indicates that
a search has completed. In either case, info_hash is set to the info-hash
of the search.
In the case of DHT_EVENT_VALUES, data is a list of nodes in ``compact''
format -- 6 bytes per node, 4 for the IP address and 2 for the port. It's
length in bytes is in data_len.
format -- 6 or 18 bytes per node. Its length in bytes is in data_len.
* dht_hash
@ -186,9 +190,6 @@ make most full cone NATs happy.
Some of the code has had very little testing. If it breaks, you get to
keep both pieces.
IPv6 support is deliberately not included: designing a double-stack
distributed hash table raises some tricky issues, and doing it naively may
break connectivity for everyone.
Juliusz Chroboczek
<jch@pps.jussieu.fr>

View File

@ -20,7 +20,7 @@
#include "dht.h"
#define MAX_BOOTSTRAP_NODES 20
static struct sockaddr_in bootstrap_nodes[MAX_BOOTSTRAP_NODES];
static struct sockaddr_storage bootstrap_nodes[MAX_BOOTSTRAP_NODES];
static int num_bootstrap_nodes = 0;
static volatile sig_atomic_t dumping = 0, searching = 0, exiting = 0;
@ -92,15 +92,61 @@ int
main(int argc, char **argv)
{
int i, rc, fd;
int s, port;
int s = -1, s6 = -1, port;
int have_id = 0;
unsigned char myid[20];
time_t tosleep = 0;
char *id_file = "dht-example.id";
int opt;
int quiet = 0, ipv4 = 1, ipv6 = 1;
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
memset(&sin6, 0, sizeof(sin6));
sin6.sin6_family = AF_INET6;
while(1) {
opt = getopt(argc, argv, "q46b:i:");
if(opt < 0)
break;
switch(opt) {
case 'q': quiet = 1; break;
case '4': ipv6 = 0; break;
case '6': ipv4 = 0; break;
case 'b': {
char buf[16];
int rc;
rc = inet_pton(AF_INET, optarg, buf);
if(rc == 1) {
memcpy(&sin.sin_addr, buf, 4);
break;
}
rc = inet_pton(AF_INET6, optarg, buf);
if(rc == 1) {
memcpy(&sin6.sin6_addr, buf, 16);
break;
}
goto usage;
}
break;
case 'i':
id_file = optarg;
break;
default:
goto usage;
}
}
/* Ids need to be distributed evenly, so you cannot just use your
bittorrent id. Either generate it randomly, or take the SHA-1 of
something. */
fd = open("dht-example.id", O_RDONLY);
fd = open(id_file, O_RDONLY);
if(fd >= 0) {
rc = read(fd, myid, 20);
if(rc == 20)
@ -108,12 +154,15 @@ main(int argc, char **argv)
close(fd);
}
fd = open("/dev/urandom", O_RDONLY);
if(fd < 0) {
perror("open(random)");
exit(1);
}
if(!have_id) {
fd = open("/dev/urandom", O_RDONLY);
if(fd < 0) {
perror("open(random)");
exit(1);
}
int ofd;
rc = read(fd, myid, 20);
if(rc < 0) {
perror("read(random)");
@ -122,19 +171,27 @@ main(int argc, char **argv)
have_id = 1;
close(fd);
fd = open("dht-example.id", O_WRONLY | O_CREAT | O_TRUNC, 0666);
if(fd >= 0) {
rc = write(fd, myid, 20);
ofd = open(id_file, O_WRONLY | O_CREAT | O_TRUNC, 0666);
if(ofd >= 0) {
rc = write(ofd, myid, 20);
if(rc < 20)
unlink("dht-example.id");
close(fd);
unlink(id_file);
close(ofd);
}
}
{
unsigned seed;
read(fd, &seed, sizeof(seed));
srandom(seed);
}
close(fd);
if(argc < 2)
goto usage;
i = 1;
i = optind;
if(argc < i + 1)
goto usage;
@ -146,9 +203,14 @@ main(int argc, char **argv)
while(i < argc) {
struct addrinfo hints, *info, *infop;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
rc = getaddrinfo(argv[i], NULL, &hints, &info);
if(!ipv6)
hints.ai_family = AF_INET;
else if(!ipv4)
hints.ai_family = AF_INET6;
else
hints.ai_family = 0;
rc = getaddrinfo(argv[i], argv[i + 1], &hints, &info);
if(rc != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rc));
exit(1);
@ -160,14 +222,10 @@ main(int argc, char **argv)
infop = info;
while(infop) {
if(infop->ai_addr->sa_family == AF_INET) {
struct sockaddr_in sin;
memcpy(&sin, infop->ai_addr, infop->ai_addrlen);
sin.sin_port = htons(atoi(argv[i]));
bootstrap_nodes[num_bootstrap_nodes] = sin;
num_bootstrap_nodes++;
}
memcpy(&bootstrap_nodes[num_bootstrap_nodes],
infop->ai_addr, infop->ai_addrlen);
infop = infop->ai_next;
num_bootstrap_nodes++;
}
freeaddrinfo(info);
@ -176,30 +234,66 @@ main(int argc, char **argv)
/* If you set dht_debug to a stream, every action taken by the DHT will
be logged. */
dht_debug = stdout;
if(!quiet)
dht_debug = stdout;
/* We need an IPv4 socket, bound to a stable port. Rumour has it that
uTorrent works better when it is the same as your Bittorrent port. */
s = socket(PF_INET, SOCK_DGRAM, 0);
if(s < 0) {
perror("socket");
/* We need an IPv4 and an IPv6 socket, bound to a stable port. Rumour
has it that uTorrent works better when it is the same as your
Bittorrent port. */
if(ipv4) {
s = socket(PF_INET, SOCK_DGRAM, 0);
if(s < 0) {
perror("socket(IPv4)");
}
}
if(ipv6) {
s6 = socket(PF_INET6, SOCK_DGRAM, 0);
if(s6 < 0) {
perror("socket(IPv6)");
}
}
if(s < 0 && s6 < 0) {
fprintf(stderr, "Eek!");
exit(1);
}
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
if(s >= 0) {
sin.sin_port = htons(port);
rc = bind(s, (struct sockaddr*)&sin, sizeof(sin));
if(rc < 0) {
perror("bind");
perror("bind(IPv4)");
exit(1);
}
}
if(s6 >= 0) {
int rc;
int val = 1;
rc = setsockopt(s6, IPPROTO_IPV6, IPV6_V6ONLY,
(char *)&val, sizeof(val));
if(rc < 0) {
perror("setsockopt(IPV6_V6ONLY)");
exit(1);
}
/* BEP-32 mandates that we should bind this socket to one of our
global IPv6 addresses. In this simple example, this only
happens if the user used the -b flag. */
sin6.sin6_port = htons(port);
rc = bind(s6, (struct sockaddr*)&sin6, sizeof(sin6));
if(rc < 0) {
perror("bind(IPv6)");
exit(1);
}
}
/* Init the dht. This sets the socket into non-blocking mode. */
rc = dht_init(s, myid, NULL);
rc = dht_init(s, s6, myid, (unsigned char*)"JC\0\0");
if(rc < 0) {
perror("dht_init");
exit(1);
@ -217,7 +311,8 @@ main(int argc, char **argv)
a dump) and you already know their ids, it's better to use
dht_insert_node. If the ids are incorrect, the DHT will recover. */
for(i = 0; i < num_bootstrap_nodes; i++) {
dht_ping_node(s, &bootstrap_nodes[i]);
dht_ping_node((struct sockaddr*)&bootstrap_nodes[i],
sizeof(bootstrap_nodes[i]));
usleep(random() % 100000);
}
@ -228,8 +323,11 @@ main(int argc, char **argv)
tv.tv_usec = random() % 1000000;
FD_ZERO(&readfds);
FD_SET(s, &readfds);
rc = select(s + 1, &readfds, NULL, NULL, &tv);
if(s >= 0)
FD_SET(s, &readfds);
if(s6 >= 0)
FD_SET(s6, &readfds);
rc = select(s > s6 ? s + 1 : s6 + 1, &readfds, NULL, NULL, &tv);
if(rc < 0) {
if(errno != EINTR) {
perror("select");
@ -240,7 +338,7 @@ main(int argc, char **argv)
if(exiting)
break;
rc = dht_periodic(s, rc > 0, &tosleep, callback, NULL);
rc = dht_periodic(rc > 0, &tosleep, callback, NULL);
if(rc < 0) {
if(errno == EINTR) {
continue;
@ -253,11 +351,14 @@ main(int argc, char **argv)
}
/* This is how you trigger a search for a torrent hash. If port
(the third argument) is non-zero, it also performs an announce.
(the second argument) is non-zero, it also performs an announce.
Since peers expire announced data after 30 minutes, it's a good
idea to reannounce every 28 minutes or so. */
if(searching) {
dht_search(s, hash, 0, callback, NULL);
if(s >= 0)
dht_search(hash, 0, AF_INET, callback, NULL);
if(s6 >= 0)
dht_search(hash, 0, AF_INET6, callback, NULL);
searching = 0;
}
@ -269,17 +370,20 @@ main(int argc, char **argv)
}
{
struct sockaddr_in sins[500];
struct sockaddr_in sin[500];
struct sockaddr_in6 sin6[500];
int num = 500, num6 = 500;
int i;
i = dht_get_nodes(sins, 500);
printf("Found %d good nodes.\n", i);
i = dht_get_nodes(sin, &num, sin6, &num6);
printf("Found %d (%d + %d) good nodes.\n", i, num, num6);
}
dht_uninit(s, 1);
dht_uninit(1);
return 0;
usage:
fprintf(stderr, "Foo!\n");
printf("Usage: dht-example [-q] [-4] [-6] [-i filename] [-b address]...\n"
" port [address port]...\n");
exit(1);
}

1359
third-party/dht/dht.c vendored

File diff suppressed because it is too large Load Diff

22
third-party/dht/dht.h vendored
View File

@ -27,22 +27,26 @@ dht_callback(void *closure, int event,
#define DHT_EVENT_NONE 0
#define DHT_EVENT_VALUES 1
#define DHT_EVENT_SEARCH_DONE 2
#define DHT_EVENT_VALUES6 2
#define DHT_EVENT_SEARCH_DONE 3
#define DHT_EVENT_SEARCH_DONE6 4
extern FILE *dht_debug;
int dht_init(int s, const unsigned char *id, const unsigned char *v);
int dht_insert_node(int s, const unsigned char *id, struct sockaddr_in *sin);
int dht_ping_node(int s, struct sockaddr_in *sin);
int dht_periodic(int s, int available, time_t *tosleep,
int dht_init(int s, int s6, const unsigned char *id, const unsigned char *v);
int dht_insert_node(const unsigned char *id, struct sockaddr *sa, int salen);
int dht_ping_node(struct sockaddr *sa, int salen);
int dht_periodic(int available, time_t *tosleep,
dht_callback *callback, void *closure);
int dht_search(int s, const unsigned char *id, int port,
int dht_search(const unsigned char *id, int port, int af,
dht_callback *callback, void *closure);
int dht_nodes(int *good_return, int *dubious_return, int *cached_return,
int dht_nodes(int af,
int *good_return, int *dubious_return, int *cached_return,
int *incoming_return);
void dht_dump_tables(FILE *f);
int dht_get_nodes(struct sockaddr_in *sins, int num);
int dht_uninit(int s, int dofree);
int dht_get_nodes(struct sockaddr_in *sin, int *num,
struct sockaddr_in6 *sin6, int *num6);
int dht_uninit(int dofree);
/* This must be provided by the user. */
void dht_hash(void *hash_return, int hash_size,