/* * This file Copyright (C) Mnemosyne LLC * * This file is licensed by the GPL version 2. Works owned by the * Transmission project are granted a special exemption to clause 2(b) * so that the bulk of its code can remain under the MIT license. * This exemption does not extend to derived works not owned by * the Transmission project. * * $Id$ */ #include #include #include #include #include #include #include #include "transmission.h" #include "session.h" #include "bandwidth.h" #include "crypto.h" #include "net.h" #include "peer-common.h" /* MAX_BLOCK_SIZE */ #include "peer-io.h" #include "trevent.h" /* tr_runInEventThread() */ #include "tr-utp.h" #include "utils.h" #ifdef WIN32 #define EAGAIN WSAEWOULDBLOCK #define EINTR WSAEINTR #define EINPROGRESS WSAEINPROGRESS #define EPIPE WSAECONNRESET #endif /* The amount of read bufferring that we allow for uTP sockets. */ #define UTP_READ_BUFFER_SIZE (256 * 1024) static size_t guessPacketOverhead( size_t d ) { /** * http://sd.wareonearth.com/~phil/net/overhead/ * * TCP over Ethernet: * Assuming no header compression (e.g. not PPP) * Add 20 IPv4 header or 40 IPv6 header (no options) * Add 20 TCP header * Add 12 bytes optional TCP timestamps * Max TCP Payload data rates over ethernet are thus: * (1500-40)/(38+1500) = 94.9285 % IPv4, minimal headers * (1500-52)/(38+1500) = 94.1482 % IPv4, TCP timestamps * (1500-52)/(42+1500) = 93.9040 % 802.1q, IPv4, TCP timestamps * (1500-60)/(38+1500) = 93.6281 % IPv6, minimal headers * (1500-72)/(38+1500) = 92.8479 % IPv6, TCP timestamps * (1500-72)/(42+1500) = 92.6070 % 802.1q, IPv6, ICP timestamps */ const double assumed_payload_data_rate = 94.0; return (unsigned int)( d * ( 100.0 / assumed_payload_data_rate ) - d ); } /** *** **/ #define dbgmsg( io, ... ) \ do { \ if( tr_deepLoggingIsActive( ) ) \ tr_deepLog( __FILE__, __LINE__, tr_peerIoGetAddrStr( io ), __VA_ARGS__ ); \ } while( 0 ) /** *** **/ struct tr_datatype { struct tr_datatype * next; size_t length; bool isPieceData; }; static struct tr_datatype * datatype_pool = NULL; static const struct tr_datatype TR_DATATYPE_INIT = { NULL, 0, false }; static struct tr_datatype * datatype_new( void ) { struct tr_datatype * ret; if( datatype_pool == NULL ) ret = tr_new( struct tr_datatype, 1 ); else { ret = datatype_pool; datatype_pool = datatype_pool->next; } *ret = TR_DATATYPE_INIT; return ret; } static void datatype_free( struct tr_datatype * datatype ) { datatype->next = datatype_pool; datatype_pool = datatype; } static void peer_io_pull_datatype( tr_peerIo * io ) { struct tr_datatype * tmp; if(( tmp = io->outbuf_datatypes )) { io->outbuf_datatypes = tmp->next; datatype_free( tmp ); } } static void peer_io_push_datatype( tr_peerIo * io, struct tr_datatype * datatype ) { struct tr_datatype * tmp; if(( tmp = io->outbuf_datatypes )) { while( tmp->next != NULL ) tmp = tmp->next; tmp->next = datatype; } else { io->outbuf_datatypes = datatype; } } /*** **** ***/ static void didWriteWrapper( tr_peerIo * io, unsigned int bytes_transferred ) { while( bytes_transferred && tr_isPeerIo( io ) ) { struct tr_datatype * next = io->outbuf_datatypes; const unsigned int payload = MIN( next->length, bytes_transferred ); /* For uTP sockets, the overhead is computed in utp_on_overhead. */ const unsigned int overhead = io->socket ? guessPacketOverhead( payload ) : 0; const uint64_t now = tr_time_msec( ); tr_bandwidthUsed( &io->bandwidth, TR_UP, payload, next->isPieceData, now ); if( overhead > 0 ) tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now ); if( io->didWrite ) io->didWrite( io, payload, next->isPieceData, io->userData ); if( tr_isPeerIo( io ) ) { bytes_transferred -= payload; next->length -= payload; if( !next->length ) peer_io_pull_datatype( io ); } } } static void canReadWrapper( tr_peerIo * io ) { bool err = 0; bool done = 0; tr_session * session; dbgmsg( io, "canRead" ); tr_peerIoRef( io ); session = io->session; /* try to consume the input buffer */ if( io->canRead ) { const uint64_t now = tr_time_msec( ); tr_sessionLock( session ); while( !done && !err ) { size_t piece = 0; const size_t oldLen = evbuffer_get_length( io->inbuf ); const int ret = io->canRead( io, io->userData, &piece ); const size_t used = oldLen - evbuffer_get_length( io->inbuf ); const unsigned int overhead = guessPacketOverhead( used ); if( piece || (piece!=used) ) { if( piece ) tr_bandwidthUsed( &io->bandwidth, TR_DOWN, piece, true, now ); if( used != piece ) tr_bandwidthUsed( &io->bandwidth, TR_DOWN, used - piece, false, now ); } if( overhead > 0 ) tr_bandwidthUsed( &io->bandwidth, TR_UP, overhead, false, now ); switch( ret ) { case READ_NOW: if( evbuffer_get_length( io->inbuf ) ) continue; done = 1; break; case READ_LATER: done = 1; break; case READ_ERR: err = 1; break; } assert( tr_isPeerIo( io ) ); } tr_sessionUnlock( session ); } tr_peerIoUnref( io ); } static void event_read_cb( int fd, short event UNUSED, void * vio ) { int res; int e; tr_peerIo * io = vio; /* Limit the input buffer to 256K, so it doesn't grow too large */ unsigned int howmuch; unsigned int curlen; const tr_direction dir = TR_DOWN; const unsigned int max = 256 * 1024; assert( tr_isPeerIo( io ) ); assert( io->socket >= 0 ); io->pendingEvents &= ~EV_READ; curlen = evbuffer_get_length( io->inbuf ); howmuch = curlen >= max ? 0 : max - curlen; howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch ); dbgmsg( io, "libevent says this peer is ready to read" ); /* if we don't have any bandwidth left, stop reading */ if( howmuch < 1 ) { tr_peerIoSetEnabled( io, dir, false ); return; } EVUTIL_SET_SOCKET_ERROR( 0 ); res = evbuffer_read( io->inbuf, fd, (int)howmuch ); e = EVUTIL_SOCKET_ERROR( ); if( res > 0 ) { tr_peerIoSetEnabled( io, dir, true ); /* Invoke the user callback - must always be called last */ canReadWrapper( io ); } else { char errstr[512]; short what = BEV_EVENT_READING; if( res == 0 ) /* EOF */ what |= BEV_EVENT_EOF; else if( res == -1 ) { if( e == EAGAIN || e == EINTR ) { tr_peerIoSetEnabled( io, dir, true ); return; } what |= BEV_EVENT_ERROR; } dbgmsg( io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); if( io->gotError != NULL ) io->gotError( io, what, io->userData ); } } static int tr_evbuffer_write( tr_peerIo * io, int fd, size_t howmuch ) { int e; int n; char errstr[256]; EVUTIL_SET_SOCKET_ERROR( 0 ); n = evbuffer_write_atmost( io->outbuf, fd, howmuch ); e = EVUTIL_SOCKET_ERROR( ); dbgmsg( io, "wrote %d to peer (%s)", n, (n==-1?tr_net_strerror(errstr,sizeof(errstr),e):"") ); return n; } static void event_write_cb( int fd, short event UNUSED, void * vio ) { int res = 0; int e; short what = BEV_EVENT_WRITING; tr_peerIo * io = vio; size_t howmuch; const tr_direction dir = TR_UP; char errstr[1024]; assert( tr_isPeerIo( io ) ); assert( io->socket >= 0 ); io->pendingEvents &= ~EV_WRITE; dbgmsg( io, "libevent says this peer is ready to write" ); /* Write as much as possible, since the socket is non-blocking, write() will * return if it can't write any more data without blocking */ howmuch = tr_bandwidthClamp( &io->bandwidth, dir, evbuffer_get_length( io->outbuf ) ); /* if we don't have any bandwidth left, stop writing */ if( howmuch < 1 ) { tr_peerIoSetEnabled( io, dir, false ); return; } EVUTIL_SET_SOCKET_ERROR( 0 ); res = tr_evbuffer_write( io, fd, howmuch ); e = EVUTIL_SOCKET_ERROR( ); if (res == -1) { if (!e || e == EAGAIN || e == EINTR || e == EINPROGRESS) goto reschedule; /* error case */ what |= BEV_EVENT_ERROR; } else if (res == 0) { /* eof case */ what |= BEV_EVENT_EOF; } if (res <= 0) goto error; if( evbuffer_get_length( io->outbuf ) ) tr_peerIoSetEnabled( io, dir, true ); didWriteWrapper( io, res ); return; reschedule: if( evbuffer_get_length( io->outbuf ) ) tr_peerIoSetEnabled( io, dir, true ); return; error: tr_net_strerror( errstr, sizeof( errstr ), e ); dbgmsg( io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr ); if( io->gotError != NULL ) io->gotError( io, what, io->userData ); } /** *** **/ static void maybeSetCongestionAlgorithm( int socket, const char * algorithm ) { if( algorithm && *algorithm ) { const int rc = tr_netSetCongestionControl( socket, algorithm ); if( rc < 0 ) tr_ninf( "Net", "Can't set congestion control algorithm '%s': %s", algorithm, tr_strerror( errno )); } } #ifdef WITH_UTP /* UTP callbacks */ static void utp_on_read(void *closure, const unsigned char *buf, size_t buflen) { int rc; tr_peerIo *io = closure; assert( tr_isPeerIo( io ) ); rc = evbuffer_add( io->inbuf, buf, buflen ); dbgmsg( io, "utp_on_read got %zu bytes", buflen ); if( rc < 0 ) { tr_nerr( "UTP", "On read evbuffer_add" ); return; } tr_peerIoSetEnabled( io, TR_DOWN, true ); canReadWrapper( io ); } static void utp_on_write(void *closure, unsigned char *buf, size_t buflen) { int rc; tr_peerIo *io = closure; assert( tr_isPeerIo( io ) ); rc = evbuffer_remove( io->outbuf, buf, buflen ); dbgmsg( io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc ); assert( rc == (int)buflen ); /* if this fails, we've corrupted our bookkeeping somewhere */ if( rc < (long)buflen ) { tr_nerr( "UTP", "Short write: %d < %ld", rc, (long)buflen); } didWriteWrapper( io, buflen ); } static size_t utp_get_rb_size(void *closure) { size_t bytes; tr_peerIo *io = closure; assert( tr_isPeerIo( io ) ); bytes = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE ); dbgmsg( io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes ); return UTP_READ_BUFFER_SIZE - bytes; } static void utp_on_state_change(void *closure, int state) { tr_peerIo *io = closure; assert( tr_isPeerIo( io ) ); if( state == UTP_STATE_CONNECT ) { dbgmsg( io, "utp_on_state_change -- changed to connected" ); io->utpSupported = true; } else if( state == UTP_STATE_WRITABLE ) { dbgmsg( io, "utp_on_state_change -- changed to writable" ); } else if( state == UTP_STATE_EOF ) { if( io->gotError ) io->gotError( io, BEV_EVENT_EOF, io->userData ); } else if( state == UTP_STATE_DESTROYING ) { tr_nerr( "UTP", "Impossible state UTP_STATE_DESTROYING" ); return; } else { tr_nerr( "UTP", "Unknown state %d", state ); } } static void utp_on_error(void *closure, int errcode) { tr_peerIo *io = closure; assert( tr_isPeerIo( io ) ); dbgmsg( io, "utp_on_error -- errcode is %d", errcode ); if( io->gotError ) { errno = errcode; io->gotError( io, BEV_EVENT_ERROR, io->userData ); } } static void utp_on_overhead(void *closure, bool send, size_t count, int type UNUSED) { tr_peerIo *io = closure; assert( tr_isPeerIo( io ) ); dbgmsg( io, "utp_on_overhead -- count is %zu", count ); tr_bandwidthUsed( &io->bandwidth, send ? TR_UP : TR_DOWN, count, false, tr_time_msec() ); } static struct UTPFunctionTable utp_function_table = { .on_read = utp_on_read, .on_write = utp_on_write, .get_rb_size = utp_get_rb_size, .on_state = utp_on_state_change, .on_error = utp_on_error, .on_overhead = utp_on_overhead }; /* Dummy UTP callbacks. */ /* We switch a UTP socket to use these after the associated peerIo has been destroyed -- see io_dtor. */ static void dummy_read( void * closure UNUSED, const unsigned char *buf UNUSED, size_t buflen UNUSED ) { /* This cannot happen, as far as I'm aware. */ tr_nerr( "UTP", "On_read called on closed socket" ); } static void dummy_write(void * closure UNUSED, unsigned char *buf, size_t buflen) { /* This can very well happen if we've shut down a peer connection that had unflushed buffers. Complain and send zeroes. */ tr_ndbg( "UTP", "On_write called on closed socket" ); memset( buf, 0, buflen ); } static size_t dummy_get_rb_size( void * closure UNUSED ) { return 0; } static void dummy_on_state_change(void * closure UNUSED, int state UNUSED ) { return; } static void dummy_on_error( void * closure UNUSED, int errcode UNUSED ) { return; } static void dummy_on_overhead( void *closure UNUSED, bool send UNUSED, size_t count UNUSED, int type UNUSED ) { return; } static struct UTPFunctionTable dummy_utp_function_table = { .on_read = dummy_read, .on_write = dummy_write, .get_rb_size = dummy_get_rb_size, .on_state = dummy_on_state_change, .on_error = dummy_on_error, .on_overhead = dummy_on_overhead }; #endif /* #ifdef WITH_UTP */ static tr_peerIo* tr_peerIoNew( tr_session * session, tr_bandwidth * parent, const tr_address * addr, tr_port port, const uint8_t * torrentHash, bool isIncoming, bool isSeed, int socket, struct UTPSocket * utp_socket) { tr_peerIo * io; assert( session != NULL ); assert( session->events != NULL ); assert( tr_isBool( isIncoming ) ); assert( tr_isBool( isSeed ) ); assert( tr_amInEventThread( session ) ); assert( (socket < 0) == (utp_socket != NULL) ); #ifndef WITH_UTP assert( socket >= 0 ); #endif if( socket >= 0 ) { tr_netSetTOS( socket, session->peerSocketTOS ); maybeSetCongestionAlgorithm( socket, session->peer_congestion_algorithm ); } io = tr_new0( tr_peerIo, 1 ); io->magicNumber = PEER_IO_MAGIC_NUMBER; io->refCount = 1; tr_cryptoConstruct( &io->crypto, torrentHash, isIncoming ); io->session = session; io->addr = *addr; io->isSeed = isSeed; io->port = port; io->socket = socket; io->utp_socket = utp_socket; io->isIncoming = isIncoming != 0; io->timeCreated = tr_time( ); io->inbuf = evbuffer_new( ); io->outbuf = evbuffer_new( ); tr_bandwidthConstruct( &io->bandwidth, session, parent ); tr_bandwidthSetPeer( &io->bandwidth, io ); dbgmsg( io, "bandwidth is %p; its parent is %p", &io->bandwidth, parent ); dbgmsg( io, "socket is %d, utp_socket is %p", socket, utp_socket ); if( io->socket >= 0 ) { io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io ); io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io ); } #ifdef WITH_UTP else { UTP_SetSockopt( utp_socket, SO_RCVBUF, UTP_READ_BUFFER_SIZE ); dbgmsg( io, "%s", "calling UTP_SetCallbacks &utp_function_table" ); UTP_SetCallbacks( utp_socket, &utp_function_table, io ); if( !isIncoming ) { dbgmsg( io, "%s", "calling UTP_Connect" ); UTP_Connect( utp_socket ); } } #endif return io; } tr_peerIo* tr_peerIoNewIncoming( tr_session * session, tr_bandwidth * parent, const tr_address * addr, tr_port port, int fd, struct UTPSocket * utp_socket ) { assert( session ); assert( tr_address_is_valid( addr ) ); return tr_peerIoNew( session, parent, addr, port, NULL, true, false, fd, utp_socket ); } tr_peerIo* tr_peerIoNewOutgoing( tr_session * session, tr_bandwidth * parent, const tr_address * addr, tr_port port, const uint8_t * torrentHash, bool isSeed, bool utp ) { int fd = -1; struct UTPSocket * utp_socket = NULL; assert( session ); assert( tr_address_is_valid( addr ) ); assert( torrentHash ); if( utp ) utp_socket = tr_netOpenPeerUTPSocket( session, addr, port, isSeed ); if( !utp_socket ) { fd = tr_netOpenPeerSocket( session, addr, port, isSeed ); dbgmsg( NULL, "tr_netOpenPeerSocket returned fd %d", fd ); } if( fd < 0 && utp_socket == NULL ) return NULL; return tr_peerIoNew( session, parent, addr, port, torrentHash, false, isSeed, fd, utp_socket ); } /*** **** ***/ static void event_enable( tr_peerIo * io, short event ) { assert( tr_amInEventThread( io->session ) ); assert( io->session != NULL ); assert( io->session->events != NULL ); if( io->socket < 0 ) return; assert( io->session->events != NULL ); assert( event_initialized( io->event_read ) ); assert( event_initialized( io->event_write ) ); if( ( event & EV_READ ) && ! ( io->pendingEvents & EV_READ ) ) { dbgmsg( io, "enabling libevent ready-to-read polling" ); event_add( io->event_read, NULL ); io->pendingEvents |= EV_READ; } if( ( event & EV_WRITE ) && ! ( io->pendingEvents & EV_WRITE ) ) { dbgmsg( io, "enabling libevent ready-to-write polling" ); event_add( io->event_write, NULL ); io->pendingEvents |= EV_WRITE; } } static void event_disable( struct tr_peerIo * io, short event ) { assert( tr_amInEventThread( io->session ) ); assert( io->session != NULL ); if( io->socket < 0 ) return; assert( io->session->events != NULL ); assert( event_initialized( io->event_read ) ); assert( event_initialized( io->event_write ) ); if( ( event & EV_READ ) && ( io->pendingEvents & EV_READ ) ) { dbgmsg( io, "disabling libevent ready-to-read polling" ); event_del( io->event_read ); io->pendingEvents &= ~EV_READ; } if( ( event & EV_WRITE ) && ( io->pendingEvents & EV_WRITE ) ) { dbgmsg( io, "disabling libevent ready-to-write polling" ); event_del( io->event_write ); io->pendingEvents &= ~EV_WRITE; } } void tr_peerIoSetEnabled( tr_peerIo * io, tr_direction dir, bool isEnabled ) { const short event = dir == TR_UP ? EV_WRITE : EV_READ; assert( tr_isPeerIo( io ) ); assert( tr_isDirection( dir ) ); assert( tr_amInEventThread( io->session ) ); assert( io->session->events != NULL ); if( isEnabled ) event_enable( io, event ); else event_disable( io, event ); } /*** **** ***/ static void io_close_socket( tr_peerIo * io ) { if( io->socket >= 0 ) { tr_netClose( io->session, io->socket ); io->socket = -1; } if( io->event_read != NULL) { event_free( io->event_read ); io->event_read = NULL; } if( io->event_write != NULL) { event_free( io->event_write ); io->event_write = NULL; } #ifdef WITH_UTP if( io->utp_socket ) { UTP_SetCallbacks( io->utp_socket, &dummy_utp_function_table, NULL ); UTP_Close( io->utp_socket ); io->utp_socket = NULL; } #endif } static void io_dtor( void * vio ) { tr_peerIo * io = vio; assert( tr_isPeerIo( io ) ); assert( tr_amInEventThread( io->session ) ); assert( io->session->events != NULL ); dbgmsg( io, "in tr_peerIo destructor" ); event_disable( io, EV_READ | EV_WRITE ); tr_bandwidthDestruct( &io->bandwidth ); evbuffer_free( io->outbuf ); evbuffer_free( io->inbuf ); io_close_socket( io ); tr_cryptoDestruct( &io->crypto ); while( io->outbuf_datatypes != NULL ) peer_io_pull_datatype( io ); memset( io, ~0, sizeof( tr_peerIo ) ); tr_free( io ); } static void tr_peerIoFree( tr_peerIo * io ) { if( io ) { dbgmsg( io, "in tr_peerIoFree" ); io->canRead = NULL; io->didWrite = NULL; io->gotError = NULL; tr_runInEventThread( io->session, io_dtor, io ); } } void tr_peerIoRefImpl( const char * file, int line, tr_peerIo * io ) { assert( tr_isPeerIo( io ) ); dbgmsg( io, "%s:%d is incrementing the IO's refcount from %d to %d", file, line, io->refCount, io->refCount+1 ); ++io->refCount; } void tr_peerIoUnrefImpl( const char * file, int line, tr_peerIo * io ) { assert( tr_isPeerIo( io ) ); dbgmsg( io, "%s:%d is decrementing the IO's refcount from %d to %d", file, line, io->refCount, io->refCount-1 ); if( !--io->refCount ) tr_peerIoFree( io ); } const tr_address* tr_peerIoGetAddress( const tr_peerIo * io, tr_port * port ) { assert( tr_isPeerIo( io ) ); if( port ) *port = io->port; return &io->addr; } const char* tr_peerIoAddrStr( const tr_address * addr, tr_port port ) { static char buf[512]; tr_snprintf( buf, sizeof( buf ), "[%s]:%u", tr_address_to_string( addr ), ntohs( port ) ); return buf; } const char* tr_peerIoGetAddrStr( const tr_peerIo * io ) { return tr_isPeerIo( io ) ? tr_peerIoAddrStr( &io->addr, io->port ) : "error"; } void tr_peerIoSetIOFuncs( tr_peerIo * io, tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void * userData ) { io->canRead = readcb; io->didWrite = writecb; io->gotError = errcb; io->userData = userData; } void tr_peerIoClear( tr_peerIo * io ) { tr_peerIoSetIOFuncs( io, NULL, NULL, NULL, NULL ); tr_peerIoSetEnabled( io, TR_UP, false ); tr_peerIoSetEnabled( io, TR_DOWN, false ); } int tr_peerIoReconnect( tr_peerIo * io ) { short int pendingEvents; tr_session * session; assert( tr_isPeerIo( io ) ); assert( !tr_peerIoIsIncoming( io ) ); session = tr_peerIoGetSession( io ); pendingEvents = io->pendingEvents; event_disable( io, EV_READ | EV_WRITE ); io_close_socket( io ); io->socket = tr_netOpenPeerSocket( session, &io->addr, io->port, io->isSeed ); io->event_read = event_new( session->event_base, io->socket, EV_READ, event_read_cb, io ); io->event_write = event_new( session->event_base, io->socket, EV_WRITE, event_write_cb, io ); if( io->socket >= 0 ) { event_enable( io, pendingEvents ); tr_netSetTOS( io->socket, session->peerSocketTOS ); maybeSetCongestionAlgorithm( io->socket, session->peer_congestion_algorithm ); return 0; } return -1; } /** *** **/ void tr_peerIoSetTorrentHash( tr_peerIo * io, const uint8_t * hash ) { assert( tr_isPeerIo( io ) ); tr_cryptoSetTorrentHash( &io->crypto, hash ); } const uint8_t* tr_peerIoGetTorrentHash( tr_peerIo * io ) { assert( tr_isPeerIo( io ) ); return tr_cryptoGetTorrentHash( &io->crypto ); } int tr_peerIoHasTorrentHash( const tr_peerIo * io ) { assert( tr_isPeerIo( io ) ); return tr_cryptoHasTorrentHash( &io->crypto ); } /** *** **/ void tr_peerIoSetPeersId( tr_peerIo * io, const uint8_t * peer_id ) { assert( tr_isPeerIo( io ) ); if( ( io->peerIdIsSet = peer_id != NULL ) ) memcpy( io->peerId, peer_id, 20 ); else memset( io->peerId, 0, 20 ); } /** *** **/ static unsigned int getDesiredOutputBufferSize( const tr_peerIo * io, uint64_t now ) { /* this is all kind of arbitrary, but what seems to work well is * being large enough to hold the next 20 seconds' worth of input, * or a few blocks, whichever is bigger. * It's okay to tweak this as needed */ const unsigned int currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps( &io->bandwidth, now, TR_UP ); const unsigned int period = 15u; /* arbitrary */ /* the 3 is arbitrary; the .5 is to leave room for messages */ static const unsigned int ceiling = (unsigned int)( MAX_BLOCK_SIZE * 3.5 ); return MAX( ceiling, currentSpeed_Bps*period ); } size_t tr_peerIoGetWriteBufferSpace( const tr_peerIo * io, uint64_t now ) { const size_t desiredLen = getDesiredOutputBufferSize( io, now ); const size_t currentLen = evbuffer_get_length( io->outbuf ); size_t freeSpace = 0; if( desiredLen > currentLen ) freeSpace = desiredLen - currentLen; return freeSpace; } /** *** **/ void tr_peerIoSetEncryption( tr_peerIo * io, tr_encryption_type encryption_type ) { assert( tr_isPeerIo( io ) ); assert( encryption_type == PEER_ENCRYPTION_NONE || encryption_type == PEER_ENCRYPTION_RC4 ); io->encryption_type = encryption_type; } /** *** **/ static void addDatatype( tr_peerIo * io, size_t byteCount, bool isPieceData ) { struct tr_datatype * d; d = datatype_new( ); d->isPieceData = isPieceData != 0; d->length = byteCount; peer_io_push_datatype( io, d ); } static void maybeEncryptBuffer( tr_peerIo * io, struct evbuffer * buf ) { if( io->encryption_type == PEER_ENCRYPTION_RC4 ) { struct evbuffer_ptr pos; struct evbuffer_iovec iovec; evbuffer_ptr_set( buf, &pos, 0, EVBUFFER_PTR_SET ); do { evbuffer_peek( buf, -1, &pos, &iovec, 1 ); tr_cryptoEncrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base ); } while( !evbuffer_ptr_set( buf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) ); } } void tr_peerIoWriteBuf( tr_peerIo * io, struct evbuffer * buf, bool isPieceData ) { const size_t byteCount = evbuffer_get_length( buf ); maybeEncryptBuffer( io, buf ); evbuffer_add_buffer( io->outbuf, buf ); addDatatype( io, byteCount, isPieceData ); } void tr_peerIoWriteBytes( tr_peerIo * io, const void * bytes, size_t byteCount, bool isPieceData ) { struct evbuffer_iovec iovec; evbuffer_reserve_space( io->outbuf, byteCount, &iovec, 1 ); iovec.iov_len = byteCount; if( io->encryption_type == PEER_ENCRYPTION_RC4 ) tr_cryptoEncrypt( &io->crypto, iovec.iov_len, bytes, iovec.iov_base ); else memcpy( iovec.iov_base, bytes, iovec.iov_len ); evbuffer_commit_space( io->outbuf, &iovec, 1 ); addDatatype( io, byteCount, isPieceData ); } /*** **** ***/ void evbuffer_add_uint8( struct evbuffer * outbuf, uint8_t byte ) { evbuffer_add( outbuf, &byte, 1 ); } void evbuffer_add_uint16( struct evbuffer * outbuf, uint16_t addme_hs ) { const uint16_t ns = htons( addme_hs ); evbuffer_add( outbuf, &ns, sizeof( ns ) ); } void evbuffer_add_uint32( struct evbuffer * outbuf, uint32_t addme_hl ) { const uint32_t nl = htonl( addme_hl ); evbuffer_add( outbuf, &nl, sizeof( nl ) ); } void evbuffer_add_uint64( struct evbuffer * outbuf, uint64_t addme_hll ) { const uint64_t nll = tr_htonll( addme_hll ); evbuffer_add( outbuf, &nll, sizeof( nll ) ); } /*** **** ***/ void tr_peerIoReadBytesToBuf( tr_peerIo * io, struct evbuffer * inbuf, struct evbuffer * outbuf, size_t byteCount ) { struct evbuffer * tmp; const size_t old_length = evbuffer_get_length( outbuf ); assert( tr_isPeerIo( io ) ); assert( evbuffer_get_length( inbuf ) >= byteCount ); /* append it to outbuf */ tmp = evbuffer_new( ); evbuffer_remove_buffer( inbuf, tmp, byteCount ); evbuffer_add_buffer( outbuf, tmp ); evbuffer_free( tmp ); /* decrypt if needed */ if( io->encryption_type == PEER_ENCRYPTION_RC4 ) { struct evbuffer_ptr pos; struct evbuffer_iovec iovec; evbuffer_ptr_set( outbuf, &pos, old_length, EVBUFFER_PTR_SET ); do { evbuffer_peek( outbuf, byteCount, &pos, &iovec, 1 ); tr_cryptoDecrypt( &io->crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base ); byteCount -= iovec.iov_len; } while( !evbuffer_ptr_set( outbuf, &pos, iovec.iov_len, EVBUFFER_PTR_ADD ) ); } } void tr_peerIoReadBytes( tr_peerIo * io, struct evbuffer * inbuf, void * bytes, size_t byteCount ) { assert( tr_isPeerIo( io ) ); assert( evbuffer_get_length( inbuf ) >= byteCount ); switch( io->encryption_type ) { case PEER_ENCRYPTION_NONE: evbuffer_remove( inbuf, bytes, byteCount ); break; case PEER_ENCRYPTION_RC4: evbuffer_remove( inbuf, bytes, byteCount ); tr_cryptoDecrypt( &io->crypto, byteCount, bytes, bytes ); break; default: assert( 0 ); } } void tr_peerIoReadUint16( tr_peerIo * io, struct evbuffer * inbuf, uint16_t * setme ) { uint16_t tmp; tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint16_t ) ); *setme = ntohs( tmp ); } void tr_peerIoReadUint32( tr_peerIo * io, struct evbuffer * inbuf, uint32_t * setme ) { uint32_t tmp; tr_peerIoReadBytes( io, inbuf, &tmp, sizeof( uint32_t ) ); *setme = ntohl( tmp ); } void tr_peerIoDrain( tr_peerIo * io, struct evbuffer * inbuf, size_t byteCount ) { char buf[4096]; const size_t buflen = sizeof( buf ); while( byteCount > 0 ) { const size_t thisPass = MIN( byteCount, buflen ); tr_peerIoReadBytes( io, inbuf, buf, thisPass ); byteCount -= thisPass; } } /*** **** ***/ static int tr_peerIoTryRead( tr_peerIo * io, size_t howmuch ) { int res = 0; if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_DOWN, howmuch ))) { if( io->utp_socket != NULL ) /* utp peer connection */ { /* UTP_RBDrained notifies libutp that your read buffer is emtpy. * It opens up the congestion window by sending an ACK (soonish) * if one was not going to be sent. */ if( evbuffer_get_length( io->inbuf ) == 0 ) UTP_RBDrained( io->utp_socket ); } else /* tcp peer connection */ { int e; EVUTIL_SET_SOCKET_ERROR( 0 ); res = evbuffer_read( io->inbuf, io->socket, (int)howmuch ); e = EVUTIL_SOCKET_ERROR( ); dbgmsg( io, "read %d from peer (%s)", res, (res==-1?tr_strerror(e):"") ); if( evbuffer_get_length( io->inbuf ) ) canReadWrapper( io ); if( ( res <= 0 ) && ( io->gotError ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) ) { char errstr[512]; short what = BEV_EVENT_READING | BEV_EVENT_ERROR; if( res == 0 ) what |= BEV_EVENT_EOF; dbgmsg( io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); io->gotError( io, what, io->userData ); } } } return res; } static int tr_peerIoTryWrite( tr_peerIo * io, size_t howmuch ) { int n = 0; const size_t old_len = evbuffer_get_length( io->outbuf ); dbgmsg( io, "in tr_peerIoTryWrite %zu", howmuch ); if( howmuch > old_len ) howmuch = old_len; if(( howmuch = tr_bandwidthClamp( &io->bandwidth, TR_UP, howmuch ))) { if( io->utp_socket != NULL ) /* utp peer connection */ { const size_t old_len = evbuffer_get_length( io->outbuf ); UTP_Write( io->utp_socket, howmuch ); n = old_len - evbuffer_get_length( io->outbuf ); } else { int e; EVUTIL_SET_SOCKET_ERROR( 0 ); n = tr_evbuffer_write( io, io->socket, howmuch ); e = EVUTIL_SOCKET_ERROR( ); if( n > 0 ) didWriteWrapper( io, n ); if( ( n < 0 ) && ( io->gotError ) && e && ( e != EPIPE ) && ( e != EAGAIN ) && ( e != EINTR ) && ( e != EINPROGRESS ) ) { char errstr[512]; const short what = BEV_EVENT_WRITING | BEV_EVENT_ERROR; dbgmsg( io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e, tr_net_strerror( errstr, sizeof( errstr ), e ) ); if( io->gotError != NULL ) io->gotError( io, what, io->userData ); } } } return n; } int tr_peerIoFlush( tr_peerIo * io, tr_direction dir, size_t limit ) { int bytesUsed = 0; assert( tr_isPeerIo( io ) ); assert( tr_isDirection( dir ) ); if( dir == TR_DOWN ) bytesUsed = tr_peerIoTryRead( io, limit ); else bytesUsed = tr_peerIoTryWrite( io, limit ); dbgmsg( io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed ); return bytesUsed; } int tr_peerIoFlushOutgoingProtocolMsgs( tr_peerIo * io ) { size_t byteCount = 0; const struct tr_datatype * it; /* count up how many bytes are used by non-piece-data messages at the front of our outbound queue */ for( it=io->outbuf_datatypes; it!=NULL; it=it->next ) if( it->isPieceData ) break; else byteCount += it->length; return tr_peerIoFlush( io, TR_UP, byteCount ); }