(trunk libT) #117 "UDP tracker support (BEP #15)" -- added request timeout

UDP announce and scrapes now have a 120 second TTL.
This commit is contained in:
Jordan Lee 2011-03-13 06:38:54 +00:00
parent 7a24257d00
commit 2cca699f06
4 changed files with 116 additions and 76 deletions

View File

@ -238,4 +238,6 @@ void tr_tracker_udp_announce( tr_session * session,
tr_announce_response_func response_func,
void * user_data );
void tr_tracker_udp_upkeep( tr_session * session );
#endif /* _TR_ANNOUNCER_COMMON_H_ */

View File

@ -120,6 +120,11 @@ typedef enum
}
tau_action_t;
enum
{
TAU_REQUEST_TTL = 120
};
/****
*****
***** SCRAPE
@ -157,7 +162,7 @@ tau_scrape_request_new( const tr_scrape_request * in,
memcpy( req->response.rows[i].info_hash,
in->info_hash[i], SHA_DIGEST_LENGTH );
/* build the scrape payload */
/* build the payload */
buf = evbuffer_new( );
evbuffer_add_hton_32( buf, TAU_ACTION_SCRAPE );
evbuffer_add_hton_32( buf, req->transaction_id );
@ -223,7 +228,6 @@ on_scrape_response( tr_session * session,
row->seeders = evbuffer_read_ntoh_32( buf );
row->downloads = evbuffer_read_ntoh_32( buf );
row->leechers = evbuffer_read_ntoh_32( buf );
}
tau_scrape_request_finished( session, request );
@ -259,10 +263,10 @@ struct tau_announce_request
typedef enum
{
/* used in the "event" field of an announce request */
UDP_TRACKER_EVENT_NONE = 0,
UDP_TRACKER_EVENT_COMPLETED = 1,
UDP_TRACKER_EVENT_STARTED = 2,
UDP_TRACKER_EVENT_STOPPED = 3
TAU_ANNOUNCE_EVENT_NONE = 0,
TAU_ANNOUNCE_EVENT_COMPLETED = 1,
TAU_ANNOUNCE_EVENT_STARTED = 2,
TAU_ANNOUNCE_EVENT_STOPPED = 3
}
tau_announce_event;
@ -271,10 +275,10 @@ get_tau_announce_event( tr_announce_event e )
{
switch( e )
{
case TR_ANNOUNCE_EVENT_COMPLETED: return UDP_TRACKER_EVENT_COMPLETED;
case TR_ANNOUNCE_EVENT_STARTED: return UDP_TRACKER_EVENT_STARTED;
case TR_ANNOUNCE_EVENT_STOPPED: return UDP_TRACKER_EVENT_STOPPED;
default: return UDP_TRACKER_EVENT_NONE;
case TR_ANNOUNCE_EVENT_COMPLETED: return TAU_ANNOUNCE_EVENT_COMPLETED;
case TR_ANNOUNCE_EVENT_STARTED: return TAU_ANNOUNCE_EVENT_STARTED;
case TR_ANNOUNCE_EVENT_STOPPED: return TAU_ANNOUNCE_EVENT_STOPPED;
default: return TAU_ANNOUNCE_EVENT_NONE;
}
}
@ -285,16 +289,16 @@ tau_announce_request_new( const tr_announce_request * in,
{
struct evbuffer * buf;
struct tau_announce_request * req = tr_new0( struct tau_announce_request, 1 );
req->transaction_id = tau_transaction_new( );
req->callback = callback;
req->user_data = user_data;
memcpy( req->response.info_hash, in->info_hash, SHA_DIGEST_LENGTH );
struct tau_announce_request * r = tr_new0( struct tau_announce_request, 1 );
r->transaction_id = tau_transaction_new( );
r->callback = callback;
r->user_data = user_data;
memcpy( r->response.info_hash, in->info_hash, SHA_DIGEST_LENGTH );
/* build the announce payload */
buf = evbuffer_new( );
evbuffer_add_hton_32( buf, TAU_ACTION_ANNOUNCE );
evbuffer_add_hton_32( buf, req->transaction_id );
evbuffer_add_hton_32( buf, r->transaction_id );
evbuffer_add ( buf, in->info_hash, SHA_DIGEST_LENGTH );
evbuffer_add ( buf, in->peer_id, PEER_ID_LEN );
evbuffer_add_hton_64( buf, in->down );
@ -305,11 +309,11 @@ tau_announce_request_new( const tr_announce_request * in,
evbuffer_add_hton_32( buf, in->key );
evbuffer_add_hton_32( buf, in->numwant );
evbuffer_add_hton_16( buf, in->port );
req->payload_len = evbuffer_get_length( buf );
req->payload = tr_memdup( evbuffer_pullup( buf, -1 ), req->payload_len );
r->payload_len = evbuffer_get_length( buf );
r->payload = tr_memdup( evbuffer_pullup( buf, -1 ), r->payload_len );
evbuffer_free( buf );
return req;
return r;
}
static void
@ -494,8 +498,7 @@ tau_tracker_upkeep( struct tau_tracker * tracker )
int n;
tr_ptrArray * reqs;
const time_t now = tr_time( );
/* FIXME: look for timed-out requests */
const tr_bool is_connected = tracker->connection_expiration_time > now;
/* if the address info is too old, expire it */
if( tracker->addr && ( tracker->addr_expiration_time <= now ) ) {
@ -504,66 +507,78 @@ tau_tracker_upkeep( struct tau_tracker * tracker )
tracker->addr = NULL;
}
/* if no requests, there's nothing to do */
if( tr_ptrArrayEmpty( &tracker->announces ) && tr_ptrArrayEmpty( &tracker->scrapes ) )
/* are there any requests pending? */
if( tr_ptrArrayEmpty( &tracker->announces ) &&
tr_ptrArrayEmpty( &tracker->scrapes ) )
return;
/* can't do anything without an address */
if( !tracker->addr ) {
if( !tracker->is_asking_dns ) {
struct evutil_addrinfo hints;
memset( &hints, 0, sizeof( hints ) );
hints.ai_family = AF_UNSPEC;
hints.ai_flags = EVUTIL_AI_CANONNAME;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
tracker->is_asking_dns = TRUE;
dbgmsg( tracker->host, "Trying a new DNS lookup" );
evdns_getaddrinfo( tracker->session->evdns_base,
tracker->host, NULL, &hints, tau_tracker_on_dns, tracker );
}
/* if we don't have an address yet, try & get one now. */
if( !tracker->addr && !tracker->is_asking_dns )
{
struct evutil_addrinfo hints;
memset( &hints, 0, sizeof( hints ) );
hints.ai_family = AF_UNSPEC;
hints.ai_flags = EVUTIL_AI_CANONNAME;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
tracker->is_asking_dns = TRUE;
dbgmsg( tracker->host, "Trying a new DNS lookup" );
evdns_getaddrinfo( tracker->session->evdns_base,
tracker->host, NULL, &hints,
tau_tracker_on_dns, tracker );
return;
}
/* also need a valid connection ID... */
if( tracker->connection_expiration_time < now ) {
if( !tracker->is_connecting ) {
struct evbuffer * buf = evbuffer_new( );
tracker->is_connecting = TRUE;
tracker->connection_transaction_id = tau_transaction_new( );
dbgmsg( tracker->key, "Trying to connect. Transaction ID is %u",
tracker->connection_transaction_id );
evbuffer_add_hton_64( buf, 0x41727101980LL );
evbuffer_add_hton_32( buf, TAU_ACTION_CONNECT );
evbuffer_add_hton_32( buf, tracker->connection_transaction_id );
tau_sendto( tracker->session, tracker->addr, tracker->port,
evbuffer_pullup( buf, -1 ),
evbuffer_get_length( buf ) );
evbuffer_free( buf );
}
if( !is_connected && !tracker->is_connecting )
{
struct evbuffer * buf = evbuffer_new( );
tracker->is_connecting = TRUE;
tracker->connection_transaction_id = tau_transaction_new( );
dbgmsg( tracker->key, "Trying to connect. Transaction ID is %u",
tracker->connection_transaction_id );
evbuffer_add_hton_64( buf, 0x41727101980LL );
evbuffer_add_hton_32( buf, TAU_ACTION_CONNECT );
evbuffer_add_hton_32( buf, tracker->connection_transaction_id );
tau_sendto( tracker->session, tracker->addr, tracker->port,
evbuffer_pullup( buf, -1 ),
evbuffer_get_length( buf ) );
evbuffer_free( buf );
return;
}
/* send the announce requests */
reqs = &tracker->announces;
for( i=0, n=tr_ptrArraySize(reqs); i<n; ++i ) {
for( i=0, n=tr_ptrArraySize(reqs); i<n; ++i )
{
struct tau_announce_request * req = tr_ptrArrayNth( reqs, i );
if( req->sent_at == 0 ) {
if( is_connected && !req->sent_at ) {
dbgmsg( tracker->key, "Sending an announce request" );
req->sent_at = now;
tau_tracker_send_request( tracker, req->payload, req->payload_len );
}
else if( req->sent_at && ( req->sent_at + TAU_REQUEST_TTL < now ) ) {
tau_announce_request_fail( tracker->session, req, FALSE, TRUE, NULL );
tau_announce_request_free( req );
tr_ptrArrayRemove( reqs, i-- );
}
}
/* send the scrape requests */
reqs = &tracker->scrapes;
for( i=0, n=tr_ptrArraySize(reqs); i<n; ++i ) {
for( i=0, n=tr_ptrArraySize(reqs); i<n; ++i )
{
struct tau_scrape_request * req = tr_ptrArrayNth( reqs, i );
if( req->sent_at == 0 ) {
if( is_connected && !req->sent_at ) {
dbgmsg( tracker->key, "Sending a scrape request" );
req->sent_at = now;
tau_tracker_send_request( tracker, req->payload, req->payload_len );
}
else if( req->sent_at && ( req->sent_at + TAU_REQUEST_TTL < now ) ) {
tau_scrape_request_fail( tracker->session, req, FALSE, TRUE, NULL );
tau_scrape_request_free( req );
tr_ptrArrayRemove( reqs, i-- );
}
}
}
@ -673,6 +688,16 @@ tau_session_get_tracker( struct tr_announcer_udp * tau, const char * url )
*****
****/
void
tr_tracker_udp_upkeep( tr_session * session )
{
struct tr_announcer_udp * tau = session->announcer_udp;
if( tau != NULL )
tr_ptrArrayForeach( &tau->trackers,
(PtrArrayForeachFunc)tau_tracker_upkeep );
}
tr_bool
tau_handle_message( tr_session * session,
const uint8_t * msg,
@ -711,7 +736,8 @@ tau_handle_message( tr_session * session,
struct tau_tracker * tracker = tr_ptrArrayNth( &tau->trackers, i );
/* is it a connection response? */
if( tracker->is_connecting && ( transaction_id == tracker->connection_transaction_id ) )
if( tracker->is_connecting
&& ( transaction_id == tracker->connection_transaction_id ) )
{
dbgmsg( tracker->key, "%"PRIu32" matches my connection request!", transaction_id );
on_tracker_connection_response( tracker, action_id, buf );
@ -761,7 +787,9 @@ tr_tracker_udp_announce( tr_session * session,
{
struct tr_announcer_udp * tau = announcer_udp_get( session );
struct tau_tracker * tracker = tau_session_get_tracker( tau, request->url );
struct tau_announce_request * r = tau_announce_request_new( request, response_func, user_data );
struct tau_announce_request * r = tau_announce_request_new( request,
response_func,
user_data );
tr_ptrArrayAppend( &tracker->announces, r );
tau_tracker_upkeep( tracker );
}
@ -774,7 +802,9 @@ tr_tracker_udp_scrape( tr_session * session,
{
struct tr_announcer_udp * tau = announcer_udp_get( session );
struct tau_tracker * tracker = tau_session_get_tracker( tau, request->url );
struct tau_scrape_request * r = tau_scrape_request_new( request, response_func, user_data );
struct tau_scrape_request * r = tau_scrape_request_new( request,
response_func,
user_data );
tr_ptrArrayAppend( &tracker->scrapes, r );
tau_tracker_upkeep( tracker );
}

View File

@ -64,7 +64,10 @@ enum
UPKEEP_INTERVAL_SECS = 1,
/* this is an upper limit for the frequency of LDS announces */
LPD_HOUSEKEEPING_INTERVAL_SECS = 5
LPD_HOUSEKEEPING_INTERVAL_SECS = 5,
/* this is how often to call the UDP tracker upkeep */
TAU_UPKEEP_INTERVAL_SECS = 5
};
/***
@ -128,7 +131,8 @@ typedef struct tr_announcer
struct event * upkeepTimer;
int slotsAvailable;
int key;
time_t lpdHouseKeepingAt;
time_t lpdUpkeepAt;
time_t tauUpkeepAt;
}
tr_announcer;
@ -139,7 +143,7 @@ tr_announcerHasBacklog( const struct tr_announcer * announcer )
}
static inline time_t
calcRescheduleWithJitter( const int minPeriod )
valPlusJitter( const int minPeriod )
{
const double jitter = 0.1;
@ -158,8 +162,7 @@ tr_announcerInit( tr_session * session )
{
tr_announcer * a;
const time_t lpdAt =
calcRescheduleWithJitter( LPD_HOUSEKEEPING_INTERVAL_SECS / 3 );
const time_t lpdAt = valPlusJitter( LPD_HOUSEKEEPING_INTERVAL_SECS / 3 );
assert( tr_isSession( session ) );
@ -168,7 +171,7 @@ tr_announcerInit( tr_session * session )
a->key = tr_cryptoRandInt( INT_MAX );
a->session = session;
a->slotsAvailable = MAX_CONCURRENT_TASKS;
a->lpdHouseKeepingAt = lpdAt;
a->lpdUpkeepAt = lpdAt;
a->upkeepTimer = evtimer_new( session->event_base, onUpkeepTimer, a );
tr_timerAdd( a->upkeepTimer, UPKEEP_INTERVAL_SECS, 0 );
@ -1414,21 +1417,12 @@ fprintf( stderr, "[%s] announce.c has %d requests ready to send (announce: %d, s
}
}
}
/* Local Peer Discovery */
if( announcer->lpdHouseKeepingAt <= now )
{
tr_lpdAnnounceMore( now, LPD_HOUSEKEEPING_INTERVAL_SECS );
/* reschedule more LDS announces for ( the future + jitter ) */
announcer->lpdHouseKeepingAt =
calcRescheduleWithJitter( LPD_HOUSEKEEPING_INTERVAL_SECS );
}
}
static void
onUpkeepTimer( int foo UNUSED, short bar UNUSED, void * vannouncer )
{
const time_t now = tr_time( );
tr_announcer * announcer = vannouncer;
tr_sessionLock( announcer->session );
@ -1438,6 +1432,19 @@ onUpkeepTimer( int foo UNUSED, short bar UNUSED, void * vannouncer )
/* maybe send out some announcements to trackers */
announceMore( announcer );
/* LPD upkeep */
if( announcer->lpdUpkeepAt <= now ) {
const int seconds = LPD_HOUSEKEEPING_INTERVAL_SECS;
announcer->lpdUpkeepAt = valPlusJitter( seconds );
tr_lpdAnnounceMore( now, seconds );
}
/* TAU upkeep */
if( announcer->tauUpkeepAt <= now ) {
announcer->tauUpkeepAt = now + TAU_UPKEEP_INTERVAL_SECS;
tr_tracker_udp_upkeep( announcer->session );
}
/* set up the next timer */
tr_timerAdd( announcer->upkeepTimer, UPKEEP_INTERVAL_SECS, 0 );

View File

@ -32,6 +32,7 @@ void tr_udpInit( tr_session * );
void tr_udpUninit( tr_session * );
void tr_udpSetSocketBuffers(tr_session *);
tr_bool tau_handle_message( tr_session * session, const uint8_t * msg, size_t msglen );
tr_bool tau_handle_message( tr_session * session,
const uint8_t * msg, size_t msglen );
#endif /* #ifndef TR_UDP_H */