From 1a1f195216e6e2e668477a6ab0f90066e689d4c5 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Wed, 16 Jul 2008 17:47:20 +0000 Subject: [PATCH] (rpc) better handling of multiple concurrent RPC connections --- libtransmission/rpc-server.c | 170 ++++++++++++++++++++++++----------- 1 file changed, 117 insertions(+), 53 deletions(-) diff --git a/libtransmission/rpc-server.c b/libtransmission/rpc-server.c index a1c6c4d24..1ec4d2c8b 100644 --- a/libtransmission/rpc-server.c +++ b/libtransmission/rpc-server.c @@ -24,6 +24,7 @@ #include "transmission.h" #include "bencode.h" +#include "list.h" #include "platform.h" #include "rpc.h" #include "rpc-server.h" @@ -32,9 +33,8 @@ #define MY_NAME "RPC Server" #define MY_REALM "Transmission RPC Server" -#define BUSY_INTERVAL_MSEC 30 -#define IDLE_INTERVAL_MSEC 66 -#define UNUSED_INTERVAL_MSEC 100 +#define ACTIVE_INTERVAL_MSEC 40 +#define INACTIVE_INTERVAL_MSEC 500 struct tr_rpc_server { @@ -42,13 +42,12 @@ struct tr_rpc_server time_t lastRequestTime; struct shttpd_ctx * ctx; tr_handle * session; - struct evbuffer * in; - struct evbuffer * out; struct event timer; int isPasswordEnabled; char * username; char * password; char * acl; + tr_list * connections; }; #define dbgmsg(fmt...) tr_deepLog(__FILE__, __LINE__, MY_NAME, ##fmt ) @@ -57,27 +56,91 @@ static const char* tr_memmem( const char * s1, size_t l1, const char * s2, size_t l2 ) { - if (!l2) return s1; - while (l1 >= l2) { - l1--; - if (!memcmp(s1,s2,l2)) - return s1; - s1++; - } - return NULL; + if (!l2) return s1; + while (l1 >= l2) { + l1--; + if (!memcmp(s1,s2,l2)) + return s1; + s1++; + } + return NULL; } +/** +*** +**/ + +struct ConnBuf +{ + char * key; + time_t lastActivity; + struct evbuffer * in; + struct evbuffer * out; +}; + +static char* +buildKey( struct shttpd_arg * arg ) +{ + return tr_strdup_printf( "%s %s", + shttpd_get_env( arg, "REMOTE_ADDR" ), + shttpd_get_env( arg, "REQUEST_URI" ) ); +} + +static struct ConnBuf* +getBuffer( tr_rpc_server * server, struct shttpd_arg * arg ) +{ + tr_list * l; + char * key = buildKey( arg ); + struct ConnBuf * found = NULL; + + for( l=server->connections; l && !found; l=l->next ) + { + struct ConnBuf * buf = l->data; + if( !strcmp( key, buf->key ) ) + found = buf; + } + + if( found == NULL ) + { + found = tr_new0( struct ConnBuf, 1 ); + found->lastActivity = time( NULL ); + found->key = tr_strdup( key ); + found->in = evbuffer_new( ); + found->out = evbuffer_new( ); + tr_list_append( &server->connections, found ); + } + + tr_free( key ); + return found; +} + +static void +pruneBuf( tr_rpc_server * server, struct ConnBuf * buf ) +{ + tr_list_remove_data( &server->connections, buf ); + + evbuffer_free( buf->in ); + evbuffer_free( buf->out ); + tr_free( buf->key ); + tr_free( buf ); +} + +/** +*** +**/ + static void handle_upload( struct shttpd_arg * arg ) { struct tr_rpc_server * s = arg->user_data; s->lastRequestTime = time( NULL ); + struct ConnBuf * cbuf = getBuffer( s, arg ); /* if we haven't parsed the POST, do that now */ - if( !EVBUFFER_LENGTH( s->out ) ) + if( !EVBUFFER_LENGTH( cbuf->out ) ) { /* if we haven't finished reading the POST, read more now */ - evbuffer_add( s->in, arg->in.buf, arg->in.len ); + evbuffer_add( cbuf->in, arg->in.buf, arg->in.len ); arg->in.num_bytes = arg->in.len; if( arg->flags & SHTTPD_MORE_POST_DATA ) return; @@ -85,8 +148,8 @@ handle_upload( struct shttpd_arg * arg ) const char * query_string = shttpd_get_env( arg, "QUERY_STRING" ); const char * content_type = shttpd_get_header( arg, "Content-Type" ); const char * delim; - const char * in = (const char *) EVBUFFER_DATA( s->in ); - size_t inlen = EVBUFFER_LENGTH( s->in ); + const char * in = (const char *) EVBUFFER_DATA( cbuf->in ); + size_t inlen = EVBUFFER_LENGTH( cbuf->in ); char * boundary = tr_strdup_printf( "--%s", strstr( content_type, "boundary=" ) + strlen( "boundary=" ) ); const size_t boundary_len = strlen( boundary ); char buf[64]; @@ -142,7 +205,7 @@ handle_upload( struct shttpd_arg * arg ) } while( delim ); - evbuffer_drain( s->in, EVBUFFER_LENGTH( s->in ) ); + evbuffer_drain( cbuf->in, EVBUFFER_LENGTH( cbuf->in ) ); tr_free( boundary ); { @@ -150,24 +213,27 @@ handle_upload( struct shttpd_arg * arg ) * see http://www.malsup.com/jquery/form/#sample7 for details */ const char * response = "success"; const int len = strlen( response ); - evbuffer_add_printf( s->out, "HTTP/1.1 200 OK\r\n" - "Content-Type: text/xml\r\n" - "Content-Length: %d\r\n" - "\r\n" - "%s\r\n", len, response ); + evbuffer_add_printf( cbuf->out, "HTTP/1.1 200 OK\r\n" + "Content-Type: text/xml\r\n" + "Content-Length: %d\r\n" + "\r\n" + "%s\r\n", len, response ); } } - if( EVBUFFER_LENGTH( s->out ) ) + if( EVBUFFER_LENGTH( cbuf->out ) ) { - const int n = MIN( ( int )EVBUFFER_LENGTH( s->out ), arg->out.len ); - memcpy( arg->out.buf, EVBUFFER_DATA( s->out ), n ); - evbuffer_drain( s->out, n ); + const int n = MIN( ( int )EVBUFFER_LENGTH( cbuf->out ), arg->out.len ); + memcpy( arg->out.buf, EVBUFFER_DATA( cbuf->out ), n ); + evbuffer_drain( cbuf->out, n ); arg->out.num_bytes = n; } - if( !EVBUFFER_LENGTH( s->out ) ) + if( !EVBUFFER_LENGTH( cbuf->out ) ) + { arg->flags |= SHTTPD_END_OF_OUTPUT; + pruneBuf( s, cbuf ); + } } static void @@ -175,8 +241,9 @@ handle_rpc( struct shttpd_arg * arg ) { struct tr_rpc_server * s = arg->user_data; s->lastRequestTime = time( NULL ); + struct ConnBuf * cbuf = getBuffer( s, arg ); - if( !EVBUFFER_LENGTH( s->out ) ) + if( !EVBUFFER_LENGTH( cbuf->out ) ) { int len = 0; char * response = NULL; @@ -190,35 +257,38 @@ handle_rpc( struct shttpd_arg * arg ) &len ); else if( !strcmp( request_method, "POST" ) ) { - evbuffer_add( s->in, arg->in.buf, arg->in.len ); + evbuffer_add( cbuf->in, arg->in.buf, arg->in.len ); arg->in.num_bytes = arg->in.len; if( arg->flags & SHTTPD_MORE_POST_DATA ) return; response = tr_rpc_request_exec_json( s->session, - EVBUFFER_DATA( s->in ), - EVBUFFER_LENGTH( s->in ), + EVBUFFER_DATA( cbuf->in ), + EVBUFFER_LENGTH( cbuf->in ), &len ); - evbuffer_drain( s->in, EVBUFFER_LENGTH( s->in ) ); + evbuffer_drain( cbuf->in, EVBUFFER_LENGTH( cbuf->in ) ); } - evbuffer_add_printf( s->out, "HTTP/1.1 200 OK\r\n" - "Content-Type: application/json\r\n" - "Content-Length: %d\r\n" - "\r\n" - "%*.*s", len, len, len, response ); + evbuffer_add_printf( cbuf->out, "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %d\r\n" + "\r\n" + "%*.*s", len, len, len, response ); tr_free( response ); } - if( EVBUFFER_LENGTH( s->out ) ) + if( EVBUFFER_LENGTH( cbuf->out ) ) { - const int n = MIN( ( int )EVBUFFER_LENGTH( s->out ), arg->out.len ); - memcpy( arg->out.buf, EVBUFFER_DATA( s->out ), n ); - evbuffer_drain( s->out, n ); + const int n = MIN( ( int )EVBUFFER_LENGTH( cbuf->out ), arg->out.len ); + memcpy( arg->out.buf, EVBUFFER_DATA( cbuf->out ), n ); + evbuffer_drain( cbuf->out, n ); arg->out.num_bytes = n; } - if( !EVBUFFER_LENGTH( s->out ) ) + if( !EVBUFFER_LENGTH( cbuf->out ) ) + { arg->flags |= SHTTPD_END_OF_OUTPUT; + pruneBuf( s, cbuf ); + } } static void @@ -235,12 +305,10 @@ rpcPulse( int socket UNUSED, short action UNUSED, void * vserver ) shttpd_poll( server->ctx, 1 ); /* set a timer for the next pulse */ - if( EVBUFFER_LENGTH( server->in ) || EVBUFFER_LENGTH( server->out ) ) - interval = BUSY_INTERVAL_MSEC; - else if( now - server->lastRequestTime < 300 ) - interval = IDLE_INTERVAL_MSEC; + if( now - server->lastRequestTime < 300 ) + interval = ACTIVE_INTERVAL_MSEC; else - interval = UNUSED_INTERVAL_MSEC; + interval = INACTIVE_INTERVAL_MSEC; tv = tr_timevalMsec( interval ); evtimer_add( &server->timer, &tv ); } @@ -263,7 +331,7 @@ startServer( tr_rpc_server * server ) char ports[128]; char passwd[MAX_PATH_LENGTH]; const char * clutchDir = tr_getClutchDir( server->session ); - struct timeval tv = tr_timevalMsec( UNUSED_INTERVAL_MSEC ); + struct timeval tv = tr_timevalMsec( INACTIVE_INTERVAL_MSEC ); getPasswordFile( server, passwd, sizeof( passwd ) ); if( !server->isPasswordEnabled ) @@ -584,8 +652,6 @@ tr_rpcClose( tr_rpc_server ** ps ) *ps = NULL; stopServer( s ); - evbuffer_free( s->in ); - evbuffer_free( s->out ); tr_free( s->acl ); tr_free( s ); } @@ -613,8 +679,6 @@ tr_rpcInit( tr_handle * session, s = tr_new0( tr_rpc_server, 1 ); s->session = session; s->port = port; - s->in = evbuffer_new( ); - s->out = evbuffer_new( ); s->acl = tr_strdup( acl ); s->username = tr_strdup( username ); s->password = tr_strdup( password );