diff --git a/libtransmission/peer-io.c b/libtransmission/peer-io.c index b5bda5f01..71de57ec1 100644 --- a/libtransmission/peer-io.c +++ b/libtransmission/peer-io.c @@ -171,20 +171,26 @@ tr_peerIoNewOutgoing( struct tr_handle * handle, tr_netOpenTCP( in_addr, port, 0 ) ); } -void -tr_peerIoFree( tr_peerIo * c ) +static void +io_dtor( void * vio ) { - if( c != NULL ) + tr_peerIo * io = vio; + + bufferevent_free( io->bufev ); + tr_netClose( io->socket ); + tr_cryptoFree( io->crypto ); + tr_free( io ); +} + +void +tr_peerIoFree( tr_peerIo * io ) +{ + if( io != NULL ) { - c->canRead = NULL; - c->didWrite = NULL; - c->gotError = NULL; - bufferevent_free( c->bufev ); - tr_netClose( c->socket ); - - tr_cryptoFree( c->crypto ); - - tr_free( c ); + io->canRead = NULL; + io->didWrite = NULL; + io->gotError = NULL; + tr_runInEventThread( io->handle, io_dtor, io ); } } diff --git a/libtransmission/peer-mgr.c b/libtransmission/peer-mgr.c index 5f956b8bd..7365d0090 100644 --- a/libtransmission/peer-mgr.c +++ b/libtransmission/peer-mgr.c @@ -96,6 +96,7 @@ struct peer_atom typedef struct { uint8_t hash[SHA_DIGEST_LENGTH]; + tr_ptrArray * outgoingHandshakes; /* tr_handshake */ tr_ptrArray * pool; /* struct peer_atom */ tr_ptrArray * peers; /* tr_peer */ tr_timer * reconnectTimer; @@ -112,10 +113,10 @@ Torrent; struct tr_peerMgr { + int connectionCount; tr_handle * handle; tr_ptrArray * torrents; /* Torrent */ - int connectionCount; - tr_ptrArray * handshakes; /* in-process */ + tr_ptrArray * incomingHandshakes; /* tr_handshake */ }; /** @@ -195,9 +196,9 @@ handshakeCompare( const void * a, const void * b ) } static tr_handshake* -getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr ) +getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr ) { - return tr_ptrArrayFindSorted( mgr->handshakes, + return tr_ptrArrayFindSorted( handshakes, in_addr, handshakeCompareToAddr ); } @@ -284,12 +285,15 @@ peerIsKnown( const Torrent * t, const struct in_addr * addr ) } static int -peerIsInUse( const Torrent * t, const struct in_addr * addr ) +peerIsInUse( const Torrent * ct, const struct in_addr * addr ) { + Torrent * t = (Torrent*) ct; + assert( torrentIsLocked ( t ) ); - return ( getExistingPeer( (Torrent*)t, addr ) != NULL ) - || ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL ); + return ( getExistingPeer( t, addr ) != NULL ) + || ( getExistingHandshake( t->outgoingHandshakes, addr ) != NULL ) + || ( getExistingHandshake( t->manager->incomingHandshakes, addr ) != NULL ); } static tr_peer* @@ -374,13 +378,16 @@ removeAllPeers( Torrent * t ) /* torrent must have already been removed from manager->torrents */ static void -freeTorrent( Torrent * t ) +torrentDestructor( Torrent * t ) { uint8_t hash[SHA_DIGEST_LENGTH]; assert( t != NULL ); + assert( !t->isRunning ); assert( t->peers != NULL ); assert( torrentIsLocked( t ) ); + assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) ); + assert( tr_ptrArrayEmpty( t->peers ) ); memcpy( hash, t->hash, SHA_DIGEST_LENGTH ); @@ -390,10 +397,50 @@ freeTorrent( Torrent * t ) tr_bitfieldFree( t->requested ); tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free ); - tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer ); + tr_ptrArrayFree( t->outgoingHandshakes, NULL ); + tr_free( t ); } +static int reconnectPulse( void * vtorrent ); + +static void +restartReconnectTimer( Torrent * t ) +{ + tr_timerFree( &t->reconnectTimer ); + if( t->isRunning ) + t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC ); +} + +static int rechokePulse( void * vtorrent ); + +static void +restartChokeTimer( Torrent * t ) +{ + tr_timerFree( &t->rechokeTimer ); + if( t->isRunning ) + t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC ); +} + +static Torrent* +torrentConstructor( tr_peerMgr * manager, tr_torrent * tor ) +{ + Torrent * t; + + t = tr_new0( Torrent, 1 ); + t->manager = manager; + t->tor = tor; + t->pool = tr_ptrArrayNew( ); + t->peers = tr_ptrArrayNew( ); + t->outgoingHandshakes = tr_ptrArrayNew( ); + t->requested = tr_bitfieldNew( tor->blockCount ); + restartChokeTimer( t ); + restartReconnectTimer( t ); + memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH ); + + return t; +} + /** *** **/ @@ -464,7 +511,7 @@ tr_peerMgrNew( tr_handle * handle ) tr_peerMgr * m = tr_new0( tr_peerMgr, 1 ); m->handle = handle; m->torrents = tr_ptrArrayNew( ); - m->handshakes = tr_ptrArrayNew( ); + m->incomingHandshakes = tr_ptrArrayNew( ); return m; } @@ -474,13 +521,13 @@ tr_peerMgrFree( tr_peerMgr * manager ) managerLock( manager ); /* free the torrents. */ - tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent ); + tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor ); /* free the handshakes. Abort invokes handshakeDoneCB(), which removes * the item from manager->handshakes, so this is a little roundabout... */ - while( !tr_ptrArrayEmpty( manager->handshakes ) ) - tr_handshakeAbort( tr_ptrArrayNth( manager->handshakes, 0 ) ); - tr_ptrArrayFree( manager->handshakes, NULL ); + while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) ) + tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) ); + tr_ptrArrayFree( manager->incomingHandshakes, NULL ); managerUnlock( manager ); tr_free( manager ); @@ -761,34 +808,6 @@ broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length tr_free( peers ); } -/** -*** -**/ - -static int reconnectPulse( void * vtorrent ); - -static void -restartReconnectTimer( Torrent * t ) -{ - tr_timerFree( &t->reconnectTimer ); - if( t->isRunning ) - t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC ); -} - -/** -*** -**/ - -static int rechokePulse( void * vtorrent ); - -static void -restartChokeTimer( Torrent * t ) -{ - tr_timerFree( &t->rechokeTimer ); - if( t->isRunning ) - t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC ); -} - static void msgsCallbackFunc( void * vpeer, void * vevent, void * vt ) { @@ -853,6 +872,7 @@ ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8 } } +/* FIXME: this is kind of a mess. */ static void myHandshakeDoneCB( tr_handshake * handshake, tr_peerIo * io, @@ -864,34 +884,33 @@ myHandshakeDoneCB( tr_handshake * handshake, uint16_t port; const struct in_addr * in_addr; tr_peerMgr * manager = (tr_peerMgr*) vmanager; - const uint8_t * hash = NULL; Torrent * t; tr_handshake * ours; assert( io != NULL ); assert( isConnected==0 || isConnected==1 ); - ours = tr_ptrArrayRemoveSorted( manager->handshakes, - handshake, - handshakeCompare ); + t = tr_peerIoHasTorrentHash( io ) + ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) ) + : NULL; + + if( tr_peerIoIsIncoming ( io ) ) + ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes, + handshake, handshakeCompare ); + else if( t != NULL ) + ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes, + handshake, handshakeCompare ); + else + ours = handshake; + assert( ours != NULL ); assert( ours == handshake ); - in_addr = tr_peerIoGetAddress( io, &port ); - - if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */ - { - tr_peerIoFree( io ); - --manager->connectionCount; - return; - } - - hash = tr_peerIoGetTorrentHash( io ); - t = getExistingTorrent( manager, hash ); - if( t != NULL ) torrentLock( t ); + in_addr = tr_peerIoGetAddress( io, &port ); + if( !t || !t->isRunning ) { tr_peerIoFree( io ); @@ -930,22 +949,6 @@ myHandshakeDoneCB( tr_handshake * handshake, torrentUnlock( t ); } -static void -initiateHandshake( tr_peerMgr * manager, tr_peerIo * io ) -{ - tr_handshake * handshake; - - assert( io != NULL ); - - handshake = tr_handshakeNew( io, - manager->handle->encryptionMode, - myHandshakeDoneCB, - manager ); - ++manager->connectionCount; - - tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare ); -} - void tr_peerMgrAddIncoming( tr_peerMgr * manager, struct in_addr * addr, @@ -954,10 +957,17 @@ tr_peerMgrAddIncoming( tr_peerMgr * manager, { managerLock( manager ); - if( getExistingHandshake( manager, addr ) == NULL ) + if( getExistingHandshake( manager->incomingHandshakes, addr ) == NULL ) { tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket ); - initiateHandshake( manager, io ); + + tr_handshake * handshake = tr_handshakeNew( io, + manager->handle->encryptionMode, + myHandshakeDoneCB, + manager ); + ++manager->connectionCount; + + tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare ); } managerUnlock( manager ); @@ -1104,20 +1114,20 @@ tr_peerMgrStartTorrent( tr_peerMgr * manager, static void stopTorrent( Torrent * t ) { - int i, size; - tr_peer ** peers; - assert( torrentIsLocked( t ) ); t->isRunning = 0; tr_timerFree( &t->rechokeTimer ); tr_timerFree( &t->reconnectTimer ); - peers = getConnectedPeers( t, &size ); - for( i=0; ipeers, (PtrArrayForeachFunc)freePeer ); + tr_ptrArrayClear( t->peers ); - tr_free( peers ); + /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(), + * which removes the handshake from t->outgoingHandshakes... */ + while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) ) + tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) ); } void tr_peerMgrStopTorrent( tr_peerMgr * manager, @@ -1141,16 +1151,7 @@ tr_peerMgrAddTorrent( tr_peerMgr * manager, assert( tor != NULL ); assert( getExistingTorrent( manager, tor->info.hash ) == NULL ); - t = tr_new0( Torrent, 1 ); - t->manager = manager; - t->tor = tor; - t->pool = tr_ptrArrayNew( ); - t->peers = tr_ptrArrayNew( ); - t->requested = tr_bitfieldNew( tor->blockCount ); - restartChokeTimer( t ); - restartReconnectTimer( t ); - - memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH ); + t = torrentConstructor( manager, tor ); tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare ); managerUnlock( manager ); @@ -1168,7 +1169,7 @@ tr_peerMgrRemoveTorrent( tr_peerMgr * manager, assert( t != NULL ); stopTorrent( t ); tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare ); - freeTorrent( t ); + torrentDestructor( t ); managerUnlock( manager ); } @@ -1612,11 +1613,24 @@ reconnectPulse( void * vtorrent ) /* add some new ones */ nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount; - for( i=0; imanager; + struct peer_atom * atom = candidates[i]; - tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash ); - tordbg( t, "RECONNECT adding an outgoing connection...\n" ); - initiateHandshake( t->manager, io ); + + tr_peerIo * io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash ); + + tr_handshake * handshake = tr_handshakeNew( io, + mgr->handle->encryptionMode, + myHandshakeDoneCB, + mgr ); + ++mgr->connectionCount; + + assert( tr_peerIoGetTorrentHash( io ) != NULL ); + + tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare ); + atom->time = time( NULL ); } diff --git a/libtransmission/transmission.c b/libtransmission/transmission.c index 4689dd2a7..56839d061 100644 --- a/libtransmission/transmission.c +++ b/libtransmission/transmission.c @@ -312,7 +312,7 @@ tr_closeImpl( void * vh ) void tr_close( tr_handle * h ) { - assert( tr_torrentCount( h ) == 0 ); + //assert( tr_torrentCount( h ) == 0 ); tr_runInEventThread( h, tr_closeImpl, h ); while( !h->isClosed )