1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2024-12-26 01:27:28 +00:00

(trunk libT) #2987 "push libcurl into its own worker thread" -- committed to trunk

This commit is contained in:
Charles Kerr 2010-03-06 15:05:05 +00:00
parent 095cc3e05f
commit e8cb5367e1
3 changed files with 198 additions and 552 deletions

View file

@ -613,7 +613,7 @@ tr_sessionInitImpl( void * vdata )
tr_statsInit( session ); tr_statsInit( session );
session->web = tr_webInit( session ); tr_webInit( session );
tr_sessionSet( session, &settings ); tr_sessionSet( session, &settings );
@ -713,7 +713,7 @@ sessionSetImpl( void * vdata )
b.addr = tr_inaddr_any; b.addr = tr_inaddr_any;
b.socket = -1; b.socket = -1;
session->public_ipv4 = tr_memdup( &b, sizeof( struct tr_bindinfo ) ); session->public_ipv4 = tr_memdup( &b, sizeof( struct tr_bindinfo ) );
tr_webSetInterface( session->web, &session->public_ipv4->addr ); tr_webSetInterface( session, &session->public_ipv4->addr );
str = TR_PREFS_KEY_BIND_ADDRESS_IPV6; str = TR_PREFS_KEY_BIND_ADDRESS_IPV6;
tr_bencDictFindStr( settings, TR_PREFS_KEY_BIND_ADDRESS_IPV6, &str ); tr_bencDictFindStr( settings, TR_PREFS_KEY_BIND_ADDRESS_IPV6, &str );
@ -1456,7 +1456,7 @@ tr_sessionGetPieceSpeed( const tr_session * session, tr_direction dir )
double double
tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir ) tr_sessionGetRawSpeed( const tr_session * session, tr_direction dir )
{ {
return tr_isSession( session ) ? tr_bandwidthGetPieceSpeed( session->bandwidth, 0, dir ) : 0.0; return tr_isSession( session ) ? tr_bandwidthGetRawSpeed( session->bandwidth, 0, dir ) : 0.0;
} }
int int
@ -1524,9 +1524,9 @@ sessionCloseImpl( void * vsession )
tr_announcerClose( session ); tr_announcerClose( session );
tr_statsClose( session ); tr_statsClose( session );
tr_peerMgrFree( session->peerMgr ); tr_peerMgrFree( session->peerMgr );
tr_webClose( session, TR_WEB_CLOSE_WHEN_IDLE );
closeBlocklists( session ); closeBlocklists( session );
tr_webClose( &session->web );
tr_fdClose( session ); tr_fdClose( session );
@ -1562,14 +1562,16 @@ tr_sessionClose( tr_session * session )
* so we need to keep the transmission thread alive * so we need to keep the transmission thread alive
* for a bit while they tell the router & tracker * for a bit while they tell the router & tracker
* that we're closing now */ * that we're closing now */
while( ( session->shared while( ( session->shared || session->web || session->announcer )
|| session->announcer ) && !deadlineReached( deadline ) ) && !deadlineReached( deadline ) )
{ {
dbgmsg( "waiting on port unmap (%p) or announcer (%p)", dbgmsg( "waiting on port unmap (%p) or announcer (%p)",
session->shared, session->announcer ); session->shared, session->announcer );
tr_wait_msec( 100 ); tr_wait_msec( 100 );
} }
tr_webClose( session, TR_WEB_CLOSE_NOW );
/* close the libtransmission thread */ /* close the libtransmission thread */
tr_eventClose( session ); tr_eventClose( session );
while( session->events != NULL ) while( session->events != NULL )

View file

@ -10,28 +10,24 @@
* $Id$ * $Id$
*/ */
#include <assert.h> #include <sys/select.h>
#include <curl/curl.h> #include <curl/curl.h>
#include <event.h> #include <event.h>
#include <evdns.h>
#include "transmission.h" #include "transmission.h"
#include "net.h" #include "list.h"
#include "ptrarray.h" #include "net.h" /* tr_address */
#include "platform.h" /* mutex */
#include "session.h" #include "session.h"
#include "trevent.h" #include "trevent.h" /* tr_runInEventThread() */
#include "utils.h" #include "utils.h"
#include "version.h" #include "version.h" /* User-Agent */
#include "web.h" #include "web.h"
enum enum
{ {
TR_MEMORY_TRASH = 0xCC, THREADFUNC_MAX_SLEEP_MSEC = 1000,
DEFAULT_TIMER_MSEC = 250, /* arbitrary */
DNS_CACHE_FAIL_TTL = 120 /* seconds */
}; };
#if 0 #if 0
@ -54,30 +50,13 @@ enum
struct tr_web struct tr_web
{ {
tr_bool closing; int close_mode;
tr_bool haveAddr; tr_bool haveAddr;
int taskCount; tr_list * tasks;
long timer_msec; tr_lock * taskLock;
CURLM * multi;
tr_session * session;
tr_address addr; tr_address addr;
tr_ptrArray dns_cache;
struct event timer_event;
}; };
struct dns_cache_item;
static void dns_cache_item_free( struct dns_cache_item * );
static void
web_free( tr_web * g )
{
evdns_shutdown( TRUE );
curl_multi_cleanup( g->multi );
evtimer_del( &g->timer_event );
tr_ptrArrayDestruct( &g->dns_cache, (PtrArrayForeachFunc)dns_cache_item_free );
memset( g, TR_MEMORY_TRASH, sizeof( struct tr_web ) );
tr_free( g );
}
/*** /***
**** ****
@ -85,37 +64,21 @@ web_free( tr_web * g )
struct tr_web_task struct tr_web_task
{ {
int port; long code;
unsigned long tag;
struct curl_slist * slist;
struct evbuffer * response; struct evbuffer * response;
char * url; char * url;
char * resolved_url;
char * host;
const char * resolved_host;
char * range; char * range;
tr_session * session; tr_session * session;
tr_web_done_func * done_func; tr_web_done_func * done_func;
void * done_func_user_data; void * done_func_user_data;
struct event timer_event;
CURL * easy;
CURLM * multi;
tr_bool timer_event_isSet;
}; };
static void static void
task_free( struct tr_web_task * task ) task_free( struct tr_web_task * task )
{ {
if( task->slist != NULL )
curl_slist_free_all( task->slist );
if( task->timer_event_isSet )
evtimer_del( &task->timer_event );
evbuffer_free( task->response ); evbuffer_free( task->response );
tr_free( task->host );
tr_free( task->range ); tr_free( task->range );
tr_free( task->resolved_url );
tr_free( task->url ); tr_free( task->url );
memset( task, TR_MEMORY_TRASH, sizeof( struct tr_web_task ) );
tr_free( task ); tr_free( task );
} }
@ -123,135 +86,6 @@ task_free( struct tr_web_task * task )
**** ****
***/ ***/
struct dns_cache_item
{
char * host;
char * resolved_host;
time_t expiration;
tr_bool success;
};
static void
dns_cache_item_free( struct dns_cache_item * item )
{
if( item != NULL )
{
tr_free( item->host );
tr_free( item->resolved_host );
memset( item, TR_MEMORY_TRASH, sizeof( struct dns_cache_item ) );
tr_free( item );
}
}
static int
dns_cache_compare( const void * va, const void * vb )
{
const struct dns_cache_item * a = va;
const struct dns_cache_item * b = vb;
return strcmp( a->host, b->host );
}
typedef enum
{
TR_DNS_OK,
TR_DNS_FAIL,
TR_DNS_UNTESTED
}
tr_dns_result;
static void
dns_cache_clear_entry( struct tr_ptrArray * cache, const char * host )
{
struct dns_cache_item key;
key.host = (char*) host;
dns_cache_item_free( tr_ptrArrayRemoveSorted( cache, &key, dns_cache_compare ) );
}
static tr_dns_result
dns_cache_lookup( struct tr_web_task * task, const char * host, const char ** resolved )
{
tr_dns_result result = TR_DNS_UNTESTED;
if( task->session->web != NULL )
{
struct dns_cache_item key;
struct dns_cache_item * item;
tr_ptrArray * cache = &task->session->web->dns_cache;
key.host = (char*) host;
item = tr_ptrArrayFindSorted( cache, &key, dns_cache_compare );
/* has the ttl expired? */
if( ( item != NULL ) && ( item->expiration <= tr_time( ) ) )
{
dns_cache_clear_entry( cache, host );
item = NULL;
}
if( item != NULL )
{
result = item->success ? TR_DNS_OK : TR_DNS_FAIL;
if( result == TR_DNS_OK )
{
*resolved = item->resolved_host;
dbgmsg( "found cached dns entry for \"%s\": %s", host, *resolved );
}
}
}
return result;
}
static void
dns_cache_set_fail( struct tr_web_task * task, const char * host )
{
if( ( task->session->web != NULL ) && ( host != NULL ) )
{
struct dns_cache_item * item;
tr_ptrArray * cache = &task->session->web->dns_cache;
dns_cache_clear_entry( cache, host );
item = tr_new( struct dns_cache_item, 1 );
item->host = tr_strdup( host );
item->resolved_host = NULL;
item->expiration = tr_time( ) + DNS_CACHE_FAIL_TTL;
item->success = FALSE;
tr_ptrArrayInsertSorted( cache, item, dns_cache_compare );
}
}
static const char*
dns_cache_set_name( struct tr_web_task * task, const char * host,
const char * resolved, int ttl )
{
char * ret = NULL;
if( task->session->web != NULL )
{
struct dns_cache_item * item;
tr_ptrArray * cache = &task->session->web->dns_cache;
dns_cache_clear_entry( cache, host );
item = tr_new( struct dns_cache_item, 1 );
item->host = tr_strdup( host );
item->resolved_host = tr_strdup( resolved );
item->expiration = tr_time( ) + ttl;
item->success = TRUE;
tr_ptrArrayInsertSorted( cache, item, dns_cache_compare );
ret = item->resolved_host;
dbgmsg( "adding dns cache entry for \"%s\": %s", host, resolved );
}
return ret;
}
/***
****
***/
static size_t static size_t
writeFunc( void * ptr, size_t size, size_t nmemb, void * vtask ) writeFunc( void * ptr, size_t size, size_t nmemb, void * vtask )
{ {
@ -290,202 +124,57 @@ getCurlProxyType( tr_proxy_type t )
return CURLPROXY_HTTP; return CURLPROXY_HTTP;
} }
static int static long
getTimeoutFromURL( const char * url ) getTimeoutFromURL( const char * url )
{ {
if( strstr( url, "scrape" ) != NULL ) return 30; if( strstr( url, "scrape" ) != NULL ) return 30L;
if( strstr( url, "announce" ) != NULL ) return 90; if( strstr( url, "announce" ) != NULL ) return 90L;
return 240; return 240L;
} }
static void task_timeout_cb( int fd UNUSED, short what UNUSED, void * task ); static CURL *
static void task_finish( struct tr_web_task * task, long response_code ); createEasy( tr_session * s, struct tr_web * w, struct tr_web_task * task )
static void
addTask( void * vtask )
{ {
struct tr_web_task * task = vtask; CURL * e = curl_easy_init( );
const tr_session * session = task->session; const long verbose = getenv( "TR_CURL_VERBOSE" ) != NULL;
if( ( session == NULL ) || ( session->web == NULL ) ) if( !task->range && s->isProxyEnabled ) {
return; const long proxyType = getCurlProxyType( s->proxyType );
curl_easy_setopt( e, CURLOPT_PROXY, s->proxy );
if( !task->resolved_host ) curl_easy_setopt( e, CURLOPT_PROXYAUTH, CURLAUTH_ANY );
{ curl_easy_setopt( e, CURLOPT_PROXYPORT, s->proxyPort );
dbgmsg( "couldn't resolve host for \"%s\"... task failed", task->url ); curl_easy_setopt( e, CURLOPT_PROXYTYPE, proxyType );
task_finish( task, 0 );
}
else
{
CURL * e = curl_easy_init( );
struct tr_web * web = session->web;
const int timeout = getTimeoutFromURL( task->url );
const long verbose = getenv( "TR_CURL_VERBOSE" ) != NULL;
const char * user_agent = TR_NAME "/" SHORT_VERSION_STRING;
/* insert the resolved host into the URL s.t. curl's DNS won't block
* even if -- like on most OSes -- it wasn't built with C-Ares :(
* "http://www.craptrackular.org/announce?key=val&key2=..." becomes
* "http://127.0.0.1/announce?key=val&key2=..." */
{
char * host;
struct evbuffer * buf = evbuffer_new( );
char * pch = strstr( task->url, task->host );
char * tail = pch + strlen( task->host );
evbuffer_add( buf, task->url, pch - task->url );
evbuffer_add_printf( buf, "%s", task->resolved_host );
evbuffer_add_printf( buf, "%s", tail );
task->resolved_url = tr_strndup( EVBUFFER_DATA( buf ), EVBUFFER_LENGTH( buf ) );
dbgmsg( "old url: \"%s\" -- new url: \"%s\"", task->url, task->resolved_url );
evbuffer_free( buf );
/* Manually add a Host: argument that refers to the true URL */
if( ( ( task->port <= 0 ) ) ||
( ( task->port == 80 ) && !strncmp( task->url, "http://", 7 ) ) ||
( ( task->port == 443 ) && !strncmp( task->url, "https://", 8 ) ) )
host = tr_strdup_printf( "Host: %s", task->host );
else
host = tr_strdup_printf( "Host: %s:%d", task->host, task->port );
task->slist = curl_slist_append( NULL, host );
task->slist = curl_slist_append( task->slist, "Accept:" );
curl_easy_setopt( e, CURLOPT_HTTPHEADER, task->slist );
tr_free( host );
}
dbgmsg( "adding task #%lu [%s]", task->tag, task->resolved_url ? task->resolved_url : task->url );
if( !task->range && session->isProxyEnabled ) {
curl_easy_setopt( e, CURLOPT_PROXY, session->proxy );
curl_easy_setopt( e, CURLOPT_PROXYAUTH, CURLAUTH_ANY );
curl_easy_setopt( e, CURLOPT_PROXYPORT, session->proxyPort );
curl_easy_setopt( e, CURLOPT_PROXYTYPE,
getCurlProxyType( session->proxyType ) );
}
if( !task->range && session->isProxyAuthEnabled ) {
char * str = tr_strdup_printf( "%s:%s", session->proxyUsername,
session->proxyPassword );
curl_easy_setopt( e, CURLOPT_PROXYUSERPWD, str );
tr_free( str );
}
task->easy = e;
task->multi = web->multi;
/* use our own timeout instead of CURLOPT_TIMEOUT because the latter
* doesn't play nicely with curl_multi. See curl bug #2501457 */
task->timer_event_isSet = TRUE;
evtimer_set( &task->timer_event, task_timeout_cb, task );
tr_timerAdd( &task->timer_event, timeout, 0 );
curl_easy_setopt( e, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4 );
curl_easy_setopt( e, CURLOPT_SOCKOPTFUNCTION, sockoptfunction );
curl_easy_setopt( e, CURLOPT_SOCKOPTDATA, task );
curl_easy_setopt( e, CURLOPT_WRITEDATA, task );
curl_easy_setopt( e, CURLOPT_WRITEFUNCTION, writeFunc );
curl_easy_setopt( e, CURLOPT_FOLLOWLOCATION, 1L );
curl_easy_setopt( e, CURLOPT_AUTOREFERER, 1L );
curl_easy_setopt( e, CURLOPT_FORBID_REUSE, 1L );
curl_easy_setopt( e, CURLOPT_MAXREDIRS, -1L );
curl_easy_setopt( e, CURLOPT_PRIVATE, task );
curl_easy_setopt( e, CURLOPT_SSL_VERIFYHOST, 0L );
curl_easy_setopt( e, CURLOPT_SSL_VERIFYPEER, 0L );
curl_easy_setopt( e, CURLOPT_URL, task->resolved_url ? task->resolved_url : task->url );
curl_easy_setopt( e, CURLOPT_USERAGENT, user_agent );
curl_easy_setopt( e, CURLOPT_VERBOSE, verbose );
if( web->haveAddr )
curl_easy_setopt( e, CURLOPT_INTERFACE, tr_ntop_non_ts( &web->addr ) );
if( task->range )
curl_easy_setopt( e, CURLOPT_RANGE, task->range );
if( curl_multi_add_handle( web->multi, e ) == CURLM_OK )
++web->taskCount;
}
}
static void
dns_ipv6_done_cb( int err, char type, int count, int ttl, void * addresses, void * vtask )
{
struct tr_web_task * task = vtask;
if( !err && task->host && ( count>0 ) && ( ttl>=0 ) && ( type==DNS_IPv6_AAAA ) )
{
int i;
char buf[INET6_ADDRSTRLEN+1];
struct in6_addr *in6_addrs = addresses;
for( i=0; i<count; ++i ) {
const char * b = inet_ntop(AF_INET6, &in6_addrs[i], buf,sizeof(buf));
if( b != NULL ) {
/* FIXME: is there a better way to tell which one to use if count > 1? */
task->resolved_host = dns_cache_set_name( task, task->host, b, ttl );
break;
}
}
} }
if( task->resolved_host == NULL ) if( !task->range && s->isProxyAuthEnabled ) {
dns_cache_set_fail( task, task->host ); char * str = tr_strdup_printf( "%s:%s", s->proxyUsername,
s->proxyPassword );
addTask( task ); curl_easy_setopt( e, CURLOPT_PROXYUSERPWD, str );
} tr_free( str );
static void
dns_ipv4_done_cb( int err, char type, int count, int ttl, void * addresses, void * vtask )
{
struct tr_web_task * task = vtask;
if( !err && task->host && ( count>0 ) && ( ttl>=0 ) && ( type==DNS_IPv4_A ) )
{
struct in_addr * in_addrs = addresses;
const char * resolved = inet_ntoa( in_addrs[0] );
task->resolved_host = dns_cache_set_name( task, task->host, resolved, ttl );
/* FIXME: if count > 1, is there a way to decide which is best to use? */
} }
if( ( task->resolved_host != NULL ) curl_easy_setopt( e, CURLOPT_AUTOREFERER, 1L );
|| ( task->host == NULL ) curl_easy_setopt( e, CURLOPT_ENCODING, "gzip;q=1.0, deflate, identity" );
|| evdns_resolve_ipv6( task->host, 0, dns_ipv6_done_cb, task ) ) curl_easy_setopt( e, CURLOPT_FOLLOWLOCATION, 1L );
dns_ipv6_done_cb( DNS_ERR_UNKNOWN, DNS_IPv6_AAAA, 0, 0, NULL, task ); curl_easy_setopt( e, CURLOPT_MAXREDIRS, -1L );
} curl_easy_setopt( e, CURLOPT_NOSIGNAL, 1L );
curl_easy_setopt( e, CURLOPT_PRIVATE, task );
curl_easy_setopt( e, CURLOPT_SOCKOPTFUNCTION, sockoptfunction );
curl_easy_setopt( e, CURLOPT_SOCKOPTDATA, task );
curl_easy_setopt( e, CURLOPT_SSL_VERIFYHOST, 0L );
curl_easy_setopt( e, CURLOPT_SSL_VERIFYPEER, 0L );
curl_easy_setopt( e, CURLOPT_TIMEOUT, getTimeoutFromURL( task->url ) );
curl_easy_setopt( e, CURLOPT_URL, task->url );
curl_easy_setopt( e, CURLOPT_USERAGENT, TR_NAME "/" SHORT_VERSION_STRING );
curl_easy_setopt( e, CURLOPT_VERBOSE, verbose );
curl_easy_setopt( e, CURLOPT_WRITEDATA, task );
curl_easy_setopt( e, CURLOPT_WRITEFUNCTION, writeFunc );
if( w->haveAddr )
curl_easy_setopt( e, CURLOPT_INTERFACE, tr_ntop_non_ts( &w->addr ) );
if( task->range )
curl_easy_setopt( e, CURLOPT_RANGE, task->range );
static void return e;
doDNS( void * vtask )
{
tr_address addr;
int port = -1;
char * host = NULL;
struct tr_web_task * task = vtask;
tr_dns_result lookup_result = TR_DNS_UNTESTED;
assert( task->resolved_host == NULL );
if( !tr_urlParse( task->url, -1, NULL, &host, &port, NULL ) )
{
task->port = port;
task->host = host;
/* If 'host' is an IPv4 or IPv6 address in text form, use it as-is.
* Otherwise, see if its resolved name is in our DNS cache */
if( tr_pton( task->host, &addr ) != NULL )
{
task->resolved_host = task->host;
lookup_result = TR_DNS_OK;
}
else
{
lookup_result = dns_cache_lookup( task, host, &task->resolved_host );
}
}
if( lookup_result != TR_DNS_UNTESTED )
{
addTask( task );
}
else if( !host || evdns_resolve_ipv4( host, 0, dns_ipv4_done_cb, task ) )
{
dns_ipv4_done_cb( DNS_ERR_UNKNOWN, DNS_IPv4_A, 0, 0, NULL, task );
}
} }
/*** /***
@ -493,159 +182,21 @@ doDNS( void * vtask )
***/ ***/
static void static void
task_finish( struct tr_web_task * task, long response_code ) task_finish_func( void * vtask )
{ {
dbgmsg( "finished web task %lu; got %ld", task->tag, response_code ); struct tr_web_task * task = vtask;
dbgmsg( "finished web task %p; got %ld", task, task->code );
if( task->done_func != NULL ) if( task->done_func != NULL )
task->done_func( task->session, task->done_func( task->session,
response_code, task->code,
EVBUFFER_DATA( task->response ), EVBUFFER_DATA( task->response ),
EVBUFFER_LENGTH( task->response ), EVBUFFER_LENGTH( task->response ),
task->done_func_user_data ); task->done_func_user_data );
task_free( task ); task_free( task );
} }
static void
remove_task( struct tr_web_task * task )
{
long code;
curl_easy_getinfo( task->easy, CURLINFO_RESPONSE_CODE, &code );
curl_multi_remove_handle( task->multi, task->easy );
curl_easy_cleanup( task->easy );
task_finish( task, code );
}
static void
task_timeout_cb( int fd UNUSED, short what UNUSED, void * task )
{
remove_task( task );
}
static void
remove_finished_tasks( tr_web * g )
{
CURLMsg * msg;
int msgs_left;
while(( msg = curl_multi_info_read( g->multi, &msgs_left ))) {
if(( msg->msg == CURLMSG_DONE ) && ( msg->easy_handle != NULL )) {
struct tr_web_task * task;
CURL * e = msg->easy_handle;
curl_easy_getinfo( e, CURLINFO_PRIVATE, (void*)&task );
assert( e == task->easy );
remove_task( task );
}
}
}
static void
restart_timer( tr_web * g )
{
dbgmsg( "adding a timeout for %.1f seconds from now", g->timer_msec/1000.0 );
evtimer_del( &g->timer_event );
tr_timerAddMsec( &g->timer_event, g->timer_msec );
}
static void
tr_multi_perform( tr_web * g, int fd, int curl_what )
{
CURLMcode m;
dbgmsg( "check_run_count: %d taskCount", g->taskCount );
/* invoke libcurl's processing */
do
m = curl_multi_socket_action( g->multi, fd, curl_what, &g->taskCount );
while( m == CURLM_CALL_MULTI_SOCKET );
remove_finished_tasks( g );
if( g->closing && !g->taskCount )
web_free( g );
else
restart_timer( g );
}
/* libevent says that sock is ready to be processed, so wake up libcurl */
static void
event_cb( int fd, short ev_what, void * g )
{
int curl_what = 0;
if( ev_what & EV_READ ) curl_what |= CURL_POLL_IN;
if( ev_what & EV_WRITE ) curl_what |= CURL_POLL_OUT;
tr_multi_perform( g, fd, curl_what );
}
/* CURLMOPT_SOCKETFUNCTION */
static int
sock_cb( CURL * e UNUSED, curl_socket_t fd, int curl_what,
void * vweb, void * vevent )
{
/*static int num_events = 0;*/
struct tr_web * web = vweb;
struct event * io_event = vevent;
dbgmsg( "sock_cb: curl_what %d, fd %d, io_event %p",
curl_what, (int)fd, io_event );
if( io_event != NULL )
event_del( io_event );
if( curl_what & ( CURL_POLL_IN | CURL_POLL_OUT ) )
{
const short ev_what = EV_PERSIST
| (( curl_what & CURL_POLL_IN ) ? EV_READ : 0 )
| (( curl_what & CURL_POLL_OUT ) ? EV_WRITE : 0 );
if( io_event == NULL ) {
io_event = tr_new0( struct event, 1 );
curl_multi_assign( web->multi, fd, io_event );
/*fprintf( stderr, "+1 io_events to %d\n", ++num_events );*/
}
dbgmsg( "enabling (libevent %hd, libcurl %d) on io_event %p, fd %d",
ev_what, curl_what, io_event, fd );
event_set( io_event, fd, ev_what, event_cb, web );
assert( io_event->ev_base != NULL );
event_add( io_event, NULL );
}
if( ( io_event != NULL ) && ( curl_what & CURL_POLL_REMOVE ) )
{
memset( io_event, TR_MEMORY_TRASH, sizeof( struct event ) );
tr_free( io_event );
/*fprintf( stderr, "-1 io_events to %d\n", --num_events );*/
}
return 0; /* libcurl documentation: "The callback MUST return 0." */
}
/* libevent says that timer_msec have passed, so wake up libcurl */
static void
libevent_timer_cb( int fd UNUSED, short what UNUSED, void * g )
{
dbgmsg( "libevent timer is done" );
tr_multi_perform( g, CURL_SOCKET_TIMEOUT, 0 );
}
/* libcurl documentation: "If 0, it means you should proceed immediately
* without waiting for anything. If it returns -1, there's no timeout at all
* set ... (but) you must not wait too long (more than a few seconds perhaps)
* before you call curl_multi_perform() again." */
static void
multi_timer_cb( CURLM * multi UNUSED, long timer_msec, void * vg )
{
tr_web * g = vg;
g->timer_msec = timer_msec > 0 ? timer_msec : DEFAULT_TIMER_MSEC;
if( timer_msec < 1 )
tr_multi_perform( g, CURL_SOCKET_TIMEOUT, 0 );
else
restart_timer( g );
}
/**** /****
***** *****
****/ ****/
@ -657,32 +208,43 @@ tr_webRun( tr_session * session,
tr_web_done_func done_func, tr_web_done_func done_func,
void * done_func_user_data ) void * done_func_user_data )
{ {
if( session->web != NULL ) struct tr_web * web = session->web;
if( web != NULL )
{ {
static unsigned long tag = 0;
struct tr_web_task * task = tr_new0( struct tr_web_task, 1 ); struct tr_web_task * task = tr_new0( struct tr_web_task, 1 );
task->session = session; task->session = session;
task->url = tr_strdup( url ); task->url = tr_strdup( url );
task->range = tr_strdup( range ); task->range = tr_strdup( range );
task->done_func = done_func; task->done_func = done_func;
task->done_func_user_data = done_func_user_data; task->done_func_user_data = done_func_user_data;
task->tag = ++tag;
task->response = evbuffer_new( ); task->response = evbuffer_new( );
tr_runInEventThread( session, doDNS, task );
tr_lockLock( web->taskLock );
tr_list_append( &web->tasks, task );
tr_lockUnlock( web->taskLock );
} }
} }
void void
tr_webSetInterface( tr_web * web, const tr_address * addr ) tr_webSetInterface( tr_session * session, const tr_address * addr )
{ {
if(( web->haveAddr = ( addr != NULL ))) struct tr_web * web = session->web;
web->addr = *addr;
if( web != NULL )
if(( web->haveAddr = ( addr != NULL )))
web->addr = *addr;
} }
tr_web* static void
tr_webInit( tr_session * session ) tr_webThreadFunc( void * vsession )
{ {
tr_web * web; int unused;
CURLM * multi;
struct tr_web * web;
int taskCount = 0;
tr_session * session = vsession;
/* try to enable ssl for https support; but if that fails, /* try to enable ssl for https support; but if that fails,
* try a plain vanilla init */ * try a plain vanilla init */
@ -690,32 +252,106 @@ tr_webInit( tr_session * session )
curl_global_init( 0 ); curl_global_init( 0 );
web = tr_new0( struct tr_web, 1 ); web = tr_new0( struct tr_web, 1 );
web->dns_cache = TR_PTR_ARRAY_INIT; web->close_mode = ~0;
web->session = session; web->taskLock = tr_lockNew( );
web->timer_msec = DEFAULT_TIMER_MSEC; /* overwritten by multi_timer_cb() */ web->tasks = NULL;
evtimer_set( &web->timer_event, libevent_timer_cb, web ); multi = curl_multi_init( );
session->web = web;
web->multi = curl_multi_init( ); for( ;; )
{
long msec;
CURLMsg * msg;
CURLMcode mcode;
struct tr_web_task * task;
evdns_init( ); if( web->close_mode == TR_WEB_CLOSE_NOW )
break;
if( ( web->close_mode == TR_WEB_CLOSE_WHEN_IDLE ) && !taskCount )
break;
curl_multi_setopt( web->multi, CURLMOPT_SOCKETDATA, web ); /* add tasks from the queue */
curl_multi_setopt( web->multi, CURLMOPT_SOCKETFUNCTION, sock_cb ); tr_lockLock( web->taskLock );
curl_multi_setopt( web->multi, CURLMOPT_TIMERDATA, web ); while(( task = tr_list_pop_front( &web->tasks )))
curl_multi_setopt( web->multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb ); {
curl_multi_add_handle( multi, createEasy( session, web, task ));
fprintf( stderr, "adding a task.. taskCount is now %d\n", taskCount );
++taskCount;
}
tr_lockUnlock( web->taskLock );
return web; /* maybe wait a little while before calling curl_multi_perform() */
msec = 0;
curl_multi_timeout( multi, &msec );
if( msec < 0 )
msec = THREADFUNC_MAX_SLEEP_MSEC;
if( msec > 0 )
{
int max_fd;
struct timeval t;
fd_set r_fd_set, w_fd_set, c_fd_set;
max_fd = 0;
FD_ZERO( &r_fd_set );
FD_ZERO( &w_fd_set );
FD_ZERO( &c_fd_set );
curl_multi_fdset( multi, &r_fd_set, &w_fd_set, &c_fd_set, &max_fd );
if( msec > THREADFUNC_MAX_SLEEP_MSEC )
msec = THREADFUNC_MAX_SLEEP_MSEC;
t.tv_sec = 0;
t.tv_usec = msec * 1000;
select( max_fd+1, &r_fd_set, &w_fd_set, &c_fd_set, &t );
}
/* call curl_multi_perform() */
do {
mcode = curl_multi_perform( multi, &unused );
} while( mcode == CURLM_CALL_MULTI_PERFORM );
/* pump completed tasks from the multi */
while(( msg = curl_multi_info_read( multi, &unused )))
{
if(( msg->msg == CURLMSG_DONE ) && ( msg->easy_handle != NULL ))
{
struct tr_web_task * task;
CURL * e = msg->easy_handle;
curl_easy_getinfo( e, CURLINFO_PRIVATE, (void*)&task );
curl_easy_getinfo( e, CURLINFO_RESPONSE_CODE, &task->code );
curl_multi_remove_handle( multi, e );
curl_easy_cleanup( e );
fprintf( stderr, "removing a completed task.. taskCount is now %d (response code: %d, response len: %d)\n", taskCount, (int)task->code, EVBUFFER_LENGTH(task->response) );
tr_runInEventThread( task->session, task_finish_func, task );
--taskCount;
}
}
}
/* cleanup */
curl_multi_cleanup( multi );
tr_lockFree( web->taskLock );
tr_free( web );
session->web = NULL;
} }
void void
tr_webClose( tr_web ** web_in ) tr_webInit( tr_session * session )
{ {
tr_web * web = *web_in; tr_threadNew( tr_webThreadFunc, session );
*web_in = NULL; }
if( web->taskCount < 1 )
web_free( web ); void
else tr_webClose( tr_session * session, tr_web_close_mode close_mode )
web->closing = 1; {
if( session->web != NULL )
{
session->web->close_mode = close_mode;
if( close_mode == TR_WEB_CLOSE_NOW )
while( session->web != NULL )
tr_wait_msec( 100 );
}
} }
/***** /*****
@ -777,20 +413,22 @@ void
tr_http_escape( struct evbuffer * out, tr_http_escape( struct evbuffer * out,
const char * str, int len, tr_bool escape_slashes ) const char * str, int len, tr_bool escape_slashes )
{ {
int i; const char * end;
if( ( len < 0 ) && ( str != NULL ) ) if( ( len < 0 ) && ( str != NULL ) )
len = strlen( str ); len = strlen( str );
for( i = 0; i < len; i++ ) { for( end=str+len; str!=end; ++str ) {
if( str[i] == ',' || str[i] == '-' || str[i] == '.' if( ( *str == ',' )
|| ( '0' <= str[i] && str[i] <= '9' ) || ( *str == '-' )
|| ( 'A' <= str[i] && str[i] <= 'Z' ) || ( *str == '.' )
|| ( 'a' <= str[i] && str[i] <= 'z' ) || ( ( '0' <= *str ) && ( *str <= '9' ) )
|| ( str[i] == '/' && !escape_slashes ) ) || ( ( 'A' <= *str ) && ( *str <= 'Z' ) )
evbuffer_add( out, &str[i], 1 ); || ( ( 'a' <= *str ) && ( *str <= 'z' ) )
|| ( ( *str == '/' ) && ( !escape_slashes ) ) )
evbuffer_add( out, str, 1 );
else else
evbuffer_add_printf( out, "%%%02X", (unsigned)(str[i]&0xFF) ); evbuffer_add_printf( out, "%%%02X", (unsigned)(*str&0xFF) );
} }
} }

View file

@ -14,13 +14,19 @@
#define TR_HTTP_H #define TR_HTTP_H
struct tr_address; struct tr_address;
typedef struct tr_web tr_web;
tr_web* tr_webInit( tr_session * session ); void tr_webInit( tr_session * session );
void tr_webClose( tr_web ** ); typedef enum
{
TR_WEB_CLOSE_WHEN_IDLE,
TR_WEB_CLOSE_NOW
}
tr_web_close_mode;
void tr_webSetInterface( tr_web * web, const struct tr_address * addr ); void tr_webClose( tr_session * session, tr_web_close_mode close_mode );
void tr_webSetInterface( tr_session * session, const struct tr_address * addr );
typedef void ( tr_web_done_func )( tr_session * session, typedef void ( tr_web_done_func )( tr_session * session,
long response_code, long response_code,