(rpc) better handling of multiple concurrent RPC connections

This commit is contained in:
Charles Kerr 2008-07-16 17:47:20 +00:00
parent 5df61e8781
commit 1a1f195216
1 changed files with 117 additions and 53 deletions

View File

@ -24,6 +24,7 @@
#include "transmission.h"
#include "bencode.h"
#include "list.h"
#include "platform.h"
#include "rpc.h"
#include "rpc-server.h"
@ -32,9 +33,8 @@
#define MY_NAME "RPC Server"
#define MY_REALM "Transmission RPC Server"
#define BUSY_INTERVAL_MSEC 30
#define IDLE_INTERVAL_MSEC 66
#define UNUSED_INTERVAL_MSEC 100
#define ACTIVE_INTERVAL_MSEC 40
#define INACTIVE_INTERVAL_MSEC 500
struct tr_rpc_server
{
@ -42,13 +42,12 @@ struct tr_rpc_server
time_t lastRequestTime;
struct shttpd_ctx * ctx;
tr_handle * session;
struct evbuffer * in;
struct evbuffer * out;
struct event timer;
int isPasswordEnabled;
char * username;
char * password;
char * acl;
tr_list * connections;
};
#define dbgmsg(fmt...) tr_deepLog(__FILE__, __LINE__, MY_NAME, ##fmt )
@ -57,27 +56,91 @@ static const char*
tr_memmem( const char * s1, size_t l1,
const char * s2, size_t l2 )
{
if (!l2) return s1;
while (l1 >= l2) {
l1--;
if (!memcmp(s1,s2,l2))
return s1;
s1++;
}
return NULL;
if (!l2) return s1;
while (l1 >= l2) {
l1--;
if (!memcmp(s1,s2,l2))
return s1;
s1++;
}
return NULL;
}
/**
***
**/
struct ConnBuf
{
char * key;
time_t lastActivity;
struct evbuffer * in;
struct evbuffer * out;
};
static char*
buildKey( struct shttpd_arg * arg )
{
return tr_strdup_printf( "%s %s",
shttpd_get_env( arg, "REMOTE_ADDR" ),
shttpd_get_env( arg, "REQUEST_URI" ) );
}
static struct ConnBuf*
getBuffer( tr_rpc_server * server, struct shttpd_arg * arg )
{
tr_list * l;
char * key = buildKey( arg );
struct ConnBuf * found = NULL;
for( l=server->connections; l && !found; l=l->next )
{
struct ConnBuf * buf = l->data;
if( !strcmp( key, buf->key ) )
found = buf;
}
if( found == NULL )
{
found = tr_new0( struct ConnBuf, 1 );
found->lastActivity = time( NULL );
found->key = tr_strdup( key );
found->in = evbuffer_new( );
found->out = evbuffer_new( );
tr_list_append( &server->connections, found );
}
tr_free( key );
return found;
}
static void
pruneBuf( tr_rpc_server * server, struct ConnBuf * buf )
{
tr_list_remove_data( &server->connections, buf );
evbuffer_free( buf->in );
evbuffer_free( buf->out );
tr_free( buf->key );
tr_free( buf );
}
/**
***
**/
static void
handle_upload( struct shttpd_arg * arg )
{
struct tr_rpc_server * s = arg->user_data;
s->lastRequestTime = time( NULL );
struct ConnBuf * cbuf = getBuffer( s, arg );
/* if we haven't parsed the POST, do that now */
if( !EVBUFFER_LENGTH( s->out ) )
if( !EVBUFFER_LENGTH( cbuf->out ) )
{
/* if we haven't finished reading the POST, read more now */
evbuffer_add( s->in, arg->in.buf, arg->in.len );
evbuffer_add( cbuf->in, arg->in.buf, arg->in.len );
arg->in.num_bytes = arg->in.len;
if( arg->flags & SHTTPD_MORE_POST_DATA )
return;
@ -85,8 +148,8 @@ handle_upload( struct shttpd_arg * arg )
const char * query_string = shttpd_get_env( arg, "QUERY_STRING" );
const char * content_type = shttpd_get_header( arg, "Content-Type" );
const char * delim;
const char * in = (const char *) EVBUFFER_DATA( s->in );
size_t inlen = EVBUFFER_LENGTH( s->in );
const char * in = (const char *) EVBUFFER_DATA( cbuf->in );
size_t inlen = EVBUFFER_LENGTH( cbuf->in );
char * boundary = tr_strdup_printf( "--%s", strstr( content_type, "boundary=" ) + strlen( "boundary=" ) );
const size_t boundary_len = strlen( boundary );
char buf[64];
@ -142,7 +205,7 @@ handle_upload( struct shttpd_arg * arg )
}
while( delim );
evbuffer_drain( s->in, EVBUFFER_LENGTH( s->in ) );
evbuffer_drain( cbuf->in, EVBUFFER_LENGTH( cbuf->in ) );
tr_free( boundary );
{
@ -150,24 +213,27 @@ handle_upload( struct shttpd_arg * arg )
* see http://www.malsup.com/jquery/form/#sample7 for details */
const char * response = "<result>success</result>";
const int len = strlen( response );
evbuffer_add_printf( s->out, "HTTP/1.1 200 OK\r\n"
"Content-Type: text/xml\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%s\r\n", len, response );
evbuffer_add_printf( cbuf->out, "HTTP/1.1 200 OK\r\n"
"Content-Type: text/xml\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%s\r\n", len, response );
}
}
if( EVBUFFER_LENGTH( s->out ) )
if( EVBUFFER_LENGTH( cbuf->out ) )
{
const int n = MIN( ( int )EVBUFFER_LENGTH( s->out ), arg->out.len );
memcpy( arg->out.buf, EVBUFFER_DATA( s->out ), n );
evbuffer_drain( s->out, n );
const int n = MIN( ( int )EVBUFFER_LENGTH( cbuf->out ), arg->out.len );
memcpy( arg->out.buf, EVBUFFER_DATA( cbuf->out ), n );
evbuffer_drain( cbuf->out, n );
arg->out.num_bytes = n;
}
if( !EVBUFFER_LENGTH( s->out ) )
if( !EVBUFFER_LENGTH( cbuf->out ) )
{
arg->flags |= SHTTPD_END_OF_OUTPUT;
pruneBuf( s, cbuf );
}
}
static void
@ -175,8 +241,9 @@ handle_rpc( struct shttpd_arg * arg )
{
struct tr_rpc_server * s = arg->user_data;
s->lastRequestTime = time( NULL );
struct ConnBuf * cbuf = getBuffer( s, arg );
if( !EVBUFFER_LENGTH( s->out ) )
if( !EVBUFFER_LENGTH( cbuf->out ) )
{
int len = 0;
char * response = NULL;
@ -190,35 +257,38 @@ handle_rpc( struct shttpd_arg * arg )
&len );
else if( !strcmp( request_method, "POST" ) )
{
evbuffer_add( s->in, arg->in.buf, arg->in.len );
evbuffer_add( cbuf->in, arg->in.buf, arg->in.len );
arg->in.num_bytes = arg->in.len;
if( arg->flags & SHTTPD_MORE_POST_DATA )
return;
response = tr_rpc_request_exec_json( s->session,
EVBUFFER_DATA( s->in ),
EVBUFFER_LENGTH( s->in ),
EVBUFFER_DATA( cbuf->in ),
EVBUFFER_LENGTH( cbuf->in ),
&len );
evbuffer_drain( s->in, EVBUFFER_LENGTH( s->in ) );
evbuffer_drain( cbuf->in, EVBUFFER_LENGTH( cbuf->in ) );
}
evbuffer_add_printf( s->out, "HTTP/1.1 200 OK\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%*.*s", len, len, len, response );
evbuffer_add_printf( cbuf->out, "HTTP/1.1 200 OK\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%*.*s", len, len, len, response );
tr_free( response );
}
if( EVBUFFER_LENGTH( s->out ) )
if( EVBUFFER_LENGTH( cbuf->out ) )
{
const int n = MIN( ( int )EVBUFFER_LENGTH( s->out ), arg->out.len );
memcpy( arg->out.buf, EVBUFFER_DATA( s->out ), n );
evbuffer_drain( s->out, n );
const int n = MIN( ( int )EVBUFFER_LENGTH( cbuf->out ), arg->out.len );
memcpy( arg->out.buf, EVBUFFER_DATA( cbuf->out ), n );
evbuffer_drain( cbuf->out, n );
arg->out.num_bytes = n;
}
if( !EVBUFFER_LENGTH( s->out ) )
if( !EVBUFFER_LENGTH( cbuf->out ) )
{
arg->flags |= SHTTPD_END_OF_OUTPUT;
pruneBuf( s, cbuf );
}
}
static void
@ -235,12 +305,10 @@ rpcPulse( int socket UNUSED, short action UNUSED, void * vserver )
shttpd_poll( server->ctx, 1 );
/* set a timer for the next pulse */
if( EVBUFFER_LENGTH( server->in ) || EVBUFFER_LENGTH( server->out ) )
interval = BUSY_INTERVAL_MSEC;
else if( now - server->lastRequestTime < 300 )
interval = IDLE_INTERVAL_MSEC;
if( now - server->lastRequestTime < 300 )
interval = ACTIVE_INTERVAL_MSEC;
else
interval = UNUSED_INTERVAL_MSEC;
interval = INACTIVE_INTERVAL_MSEC;
tv = tr_timevalMsec( interval );
evtimer_add( &server->timer, &tv );
}
@ -263,7 +331,7 @@ startServer( tr_rpc_server * server )
char ports[128];
char passwd[MAX_PATH_LENGTH];
const char * clutchDir = tr_getClutchDir( server->session );
struct timeval tv = tr_timevalMsec( UNUSED_INTERVAL_MSEC );
struct timeval tv = tr_timevalMsec( INACTIVE_INTERVAL_MSEC );
getPasswordFile( server, passwd, sizeof( passwd ) );
if( !server->isPasswordEnabled )
@ -584,8 +652,6 @@ tr_rpcClose( tr_rpc_server ** ps )
*ps = NULL;
stopServer( s );
evbuffer_free( s->in );
evbuffer_free( s->out );
tr_free( s->acl );
tr_free( s );
}
@ -613,8 +679,6 @@ tr_rpcInit( tr_handle * session,
s = tr_new0( tr_rpc_server, 1 );
s->session = session;
s->port = port;
s->in = evbuffer_new( );
s->out = evbuffer_new( );
s->acl = tr_strdup( acl );
s->username = tr_strdup( username );
s->password = tr_strdup( password );