(trunk libT) better shutdown management of libutp and UDP trackers in tr_sessionClose().

This is a little overlapping since the utp code can be closed more-or-less immediately, but the udp manager needs to stay open in order to process the udp tracker connection requests before sending out event=stopped. Moreover DNS resolver can be shut down after the UDP tracker is shutdown.
This commit is contained in:
Jordan Lee 2011-03-17 18:51:31 +00:00
parent 78a4865aa1
commit f4b4ddd231
9 changed files with 139 additions and 33 deletions

View File

@ -238,6 +238,6 @@ void tr_tracker_udp_announce( tr_session * session,
tr_announce_response_func response_func,
void * user_data );
void tr_tracker_udp_upkeep( tr_session * session );
void tr_tracker_udp_start_shutdown( tr_session * session );
#endif /* _TR_ANNOUNCER_COMMON_H_ */

View File

@ -19,6 +19,7 @@
#include <event2/util.h>
#include "transmission.h"
#include "announcer.h"
#include "announcer-common.h"
#include "crypto.h"
#include "peer-io.h"
@ -435,13 +436,14 @@ struct tau_tracker
tau_connection_t connection_id;
tau_transaction_t connection_transaction_id;
time_t close_at;
tr_ptrArray announces;
tr_ptrArray scrapes;
};
static void tau_tracker_upkeep( struct tau_tracker * );
#if 0
static void
tau_tracker_free( struct tau_tracker * t )
{
@ -453,7 +455,6 @@ tau_tracker_free( struct tau_tracker * t )
tr_free( t->key );
tr_free( t );
}
#endif
static void
tau_tracker_fail_all( struct tau_tracker * tracker,
@ -523,6 +524,13 @@ tau_tracker_send_request( struct tau_tracker * tracker,
evbuffer_free( buf );
}
static tr_bool
tau_tracker_is_empty( const struct tau_tracker * tracker )
{
return tr_ptrArrayEmpty( &tracker->announces )
&& tr_ptrArrayEmpty( &tracker->scrapes );
}
static void
tau_tracker_upkeep( struct tau_tracker * tracker )
{
@ -540,8 +548,7 @@ tau_tracker_upkeep( struct tau_tracker * tracker )
}
/* are there any requests pending? */
if( tr_ptrArrayEmpty( &tracker->announces ) &&
tr_ptrArrayEmpty( &tracker->scrapes ) )
if( tau_tracker_is_empty( tracker ) )
return;
/* if we don't have an address yet, try & get one now. */
@ -583,14 +590,21 @@ tau_tracker_upkeep( struct tau_tracker * tracker )
reqs = &tracker->announces;
for( i=0, n=tr_ptrArraySize(reqs); i<n; ++i )
{
tr_bool remove_request = FALSE;
struct tau_announce_request * req = tr_ptrArrayNth( reqs, i );
if( is_connected && !req->sent_at ) {
dbgmsg( tracker->key, "Sending an announce request" );
req->sent_at = now;
tau_tracker_send_request( tracker, req->payload, req->payload_len );
remove_request = req->callback == NULL;
}
else if( req->created_at + TAU_REQUEST_TTL < now ) {
tau_announce_request_fail( tracker->session, req, FALSE, TRUE, NULL );
remove_request = TRUE;
}
if( tracker->close_at && ( tracker->close_at <= time(NULL) ) )
remove_request = TRUE;
if( remove_request ) {
tau_announce_request_free( req );
tr_ptrArrayRemove( reqs, i );
--i;
@ -602,14 +616,21 @@ tau_tracker_upkeep( struct tau_tracker * tracker )
reqs = &tracker->scrapes;
for( i=0, n=tr_ptrArraySize(reqs); i<n; ++i )
{
tr_bool remove_request = FALSE;
struct tau_scrape_request * req = tr_ptrArrayNth( reqs, i );
if( is_connected && !req->sent_at ) {
dbgmsg( tracker->key, "Sending a scrape request" );
req->sent_at = now;
tau_tracker_send_request( tracker, req->payload, req->payload_len );
remove_request = req->callback == NULL;
}
else if( req->created_at + TAU_REQUEST_TTL < now ) {
tau_scrape_request_fail( tracker->session, req, FALSE, TRUE, NULL );
remove_request = TRUE;
}
if( tracker->close_at && ( tracker->close_at <= time(NULL) ) )
remove_request = TRUE;
if( remove_request ) {
tau_scrape_request_free( req );
tr_ptrArrayRemove( reqs, i );
--i;
@ -741,6 +762,57 @@ tr_tracker_udp_upkeep( tr_session * session )
(PtrArrayForeachFunc)tau_tracker_upkeep );
}
tr_bool
tr_tracker_udp_is_empty( const tr_session * session )
{
int i;
int n;
struct tr_announcer_udp * tau = session->announcer_udp;
if( tau != NULL )
for( i=0, n=tr_ptrArraySize(&tau->trackers); i<n; ++i )
if( !tau_tracker_is_empty( tr_ptrArrayNth( &tau->trackers, i ) ) )
return FALSE;
return TRUE;
}
/* drop dead now. */
void
tr_tracker_udp_close( tr_session * session )
{
struct tr_announcer_udp * tau = session->announcer_udp;
if( tau != NULL )
{
session->announcer_udp = NULL;
tr_ptrArrayDestruct( &tau->trackers, (PtrArrayForeachFunc)tau_tracker_free );
tr_free( tau );
}
}
/* start shutting down.
This doesn't destroy everything if there are requests,
but sets a deadline on how much longer to wait for the remaining ones */
void
tr_tracker_udp_start_shutdown( tr_session * session )
{
const time_t now = time( NULL );
struct tr_announcer_udp * tau = session->announcer_udp;
if( tau != NULL )
{
int i, n;
for( i=0, n=tr_ptrArraySize(&tau->trackers); i<n; ++i )
{
struct tau_tracker * tracker = tr_ptrArrayNth( &tau->trackers, i );
tracker->close_at = now + 3;
tau_tracker_upkeep( tracker );
}
}
}
/* @brief process an incoming udp message if it's a tracker response.
* @return true if msg was a tracker response; false otherwise */
tr_bool

View File

@ -124,7 +124,6 @@ typedef struct tr_announcer
struct event * upkeepTimer;
int slotsAvailable;
int key;
time_t lpdUpkeepAt;
time_t tauUpkeepAt;
}
tr_announcer;
@ -135,14 +134,6 @@ tr_announcerHasBacklog( const struct tr_announcer * announcer )
return announcer->slotsAvailable < 1;
}
static inline time_t
jitterize( const int val )
{
const double jitter = 0.1;
assert( val > 0 );
return val + tr_cryptoWeakRandInt((int)(1 + val * jitter));
}
static void
onUpkeepTimer( int foo UNUSED, short bar UNUSED, void * vannouncer );
@ -158,7 +149,6 @@ tr_announcerInit( tr_session * session )
a->key = tr_cryptoRandInt( INT_MAX );
a->session = session;
a->slotsAvailable = MAX_CONCURRENT_TASKS;
a->lpdUpkeepAt = tr_time() + jitterize(5);
a->upkeepTimer = evtimer_new( session->event_base, onUpkeepTimer, a );
tr_timerAdd( a->upkeepTimer, UPKEEP_INTERVAL_SECS, 0 );
@ -174,6 +164,8 @@ tr_announcerClose( tr_session * session )
flushCloseMessages( announcer );
tr_tracker_udp_start_shutdown( session );
event_free( announcer->upkeepTimer );
announcer->upkeepTimer = NULL;
@ -1491,26 +1483,30 @@ announceMore( tr_announcer * announcer )
static void
onUpkeepTimer( int foo UNUSED, short bar UNUSED, void * vannouncer )
{
const time_t now = tr_time( );
tr_announcer * announcer = vannouncer;
tr_sessionLock( announcer->session );
tr_session * session = announcer->session;
const tr_bool is_closing = session->isClosed;
const time_t now = tr_time( );
tr_sessionLock( session );
/* maybe send out some "stopped" messages for closed torrents */
flushCloseMessages( announcer );
/* maybe send out some announcements to trackers */
announceMore( announcer );
if( !is_closing )
announceMore( announcer );
/* TAU upkeep */
if( announcer->tauUpkeepAt <= now ) {
announcer->tauUpkeepAt = now + TAU_UPKEEP_INTERVAL_SECS;
tr_tracker_udp_upkeep( announcer->session );
tr_tracker_udp_upkeep( session );
}
/* set up the next timer */
tr_timerAdd( announcer->upkeepTimer, UPKEEP_INTERVAL_SECS, 0 );
tr_sessionUnlock( announcer->session );
tr_sessionUnlock( session );
}
/***

View File

@ -104,5 +104,16 @@ tr_tracker_stat * tr_announcerStats( const tr_torrent * torrent,
void tr_announcerStatsFree( tr_tracker_stat * trackers,
int trackerCount );
/***
****
***/
void tr_tracker_udp_upkeep( tr_session * session );
void tr_tracker_udp_close( tr_session * session );
tr_bool tr_tracker_udp_is_empty( const tr_session * session );
#endif /* _TR_ANNOUNCER_H_ */

View File

@ -21,6 +21,7 @@
#include <unistd.h> /* stat */
#include <dirent.h> /* opendir */
#include <event2/dns.h> /* evdns_base_free() */
#include <event2/event.h>
//#define TR_SHOW_DEPRECATED
@ -1733,7 +1734,7 @@ sessionCloseImpl( void * vsession )
tr_lpdUninit( session );
tr_utpClose( session );
tr_udpUninit( session );
tr_dhtUninit( session );
event_free( session->saveTimer );
session->saveTimer = NULL;
@ -1758,12 +1759,33 @@ sessionCloseImpl( void * vsession )
tr_torrentFree( torrents[i] );
tr_free( torrents );
/* Close the announcer *after* closing the torrents
so that all the &event=stopped messages will be
queued to be sent by tr_announcerClose() */
tr_announcerClose( session );
/* and this goes *after* announcer close so that
it won't be idle until the announce events are sent... */
tr_webClose( session, TR_WEB_CLOSE_WHEN_IDLE );
tr_cacheFree( session->cache );
session->cache = NULL;
tr_announcerClose( session );
/* gotta keep udp running long enough to send out all
the &event=stopped UDP tracker messages */
while( !tr_tracker_udp_is_empty( session ) ) {
tr_tracker_udp_upkeep( session );
tr_wait_msec( 100 );
}
/* we had to wait until UDP trackers were closed before closing these: */
evdns_base_free( session->evdns_base, 0 );
session->evdns_base = NULL;
tr_tracker_udp_close( session );
tr_udpUninit( session );
tr_statsClose( session );
tr_peerMgrFree( session->peerMgr );
tr_webClose( session, TR_WEB_CLOSE_WHEN_IDLE );
closeBlocklists( session );
@ -1801,7 +1823,7 @@ tr_sessionClose( tr_session * session )
* so we need to keep the transmission thread alive
* for a bit while they tell the router & tracker
* that we're closing now */
while( ( session->shared || session->web || session->announcer )
while( ( session->shared || session->web || session->announcer || session->announcer_udp )
&& !deadlineReached( deadline ) )
{
dbgmsg( "waiting on port unmap (%p) or announcer (%p)... now %zu deadline %zu",

View File

@ -289,8 +289,6 @@ tr_udpInit(tr_session *ss)
void
tr_udpUninit(tr_session *ss)
{
tr_dhtUninit(ss);
if(ss->udp_socket >= 0) {
tr_netCloseSocket( ss->udp_socket );
ss->udp_socket = -1;

View File

@ -169,7 +169,7 @@ tr_utpPacket(const unsigned char *buf, size_t buflen,
const struct sockaddr *from, socklen_t fromlen,
tr_session *ss)
{
if(utp_timer == NULL)
if( !ss->isClosed && !utp_timer )
{
utp_timer = evtimer_new( ss->event_base, timer_callback, ss );
if(utp_timer == NULL)

View File

@ -224,7 +224,6 @@ static void
libeventThreadFunc( void * veh )
{
struct event_base * base;
struct evdns_base * evdns_base;
tr_event_handle * eh = veh;
#ifndef WIN32
@ -234,12 +233,11 @@ libeventThreadFunc( void * veh )
/* create the libevent bases */
base = event_base_new( );
evdns_base = evdns_base_new( base, TRUE );
/* set the struct's fields */
eh->base = base;
eh->session->event_base = base;
eh->session->evdns_base = evdns_base;
eh->session->evdns_base = evdns_base_new( base, TRUE );
eh->session->events = eh;
/* listen to the pipe's read fd */
@ -253,7 +251,6 @@ libeventThreadFunc( void * veh )
/* shut down the thread */
tr_lockFree( eh->lock );
evdns_base_free( evdns_base, FALSE );
event_base_free( base );
eh->session->events = NULL;
tr_free( eh );

View File

@ -330,7 +330,7 @@ tr_webThreadFunc( void * vsession )
if( web->close_mode == TR_WEB_CLOSE_NOW )
break;
if( ( web->close_mode == TR_WEB_CLOSE_WHEN_IDLE ) && !taskCount )
if( ( web->close_mode == TR_WEB_CLOSE_WHEN_IDLE ) && ( web->tasks == NULL ) )
break;
/* add tasks from the queue */
@ -349,6 +349,8 @@ tr_webThreadFunc( void * vsession )
curl_multi_timeout( multi, &msec );
if( msec < 0 )
msec = THREADFUNC_MAX_SLEEP_MSEC;
if( session->isClosed )
msec = 100; /* on shutdown, call perform() more frequently */
if( msec > 0 )
{
int usec;
@ -368,7 +370,6 @@ tr_webThreadFunc( void * vsession )
usec = msec * 1000;
t.tv_sec = usec / 1000000;
t.tv_usec = usec % 1000000;
tr_select( max_fd+1, &r_fd_set, &w_fd_set, &c_fd_set, &t );
}
@ -399,6 +400,15 @@ tr_webThreadFunc( void * vsession )
--taskCount;
}
}
#if 0
{
tr_list * l;
for( l=web->tasks; l!=NULL; l=l->next )
fprintf( stderr, "still pending: %s\n", ((struct tr_web_task*)l->data)->url );
}
fprintf( stderr, "loop is ending... web is closing\n" );
#endif
}
/* Discard any remaining tasks.