clean up the shutdown code a little bit.

This commit is contained in:
Charles Kerr 2007-10-02 16:12:44 +00:00
parent c271cec6a3
commit cdabb1c0b1
3 changed files with 131 additions and 111 deletions

View File

@ -171,20 +171,26 @@ tr_peerIoNewOutgoing( struct tr_handle * handle,
tr_netOpenTCP( in_addr, port, 0 ) ); tr_netOpenTCP( in_addr, port, 0 ) );
} }
static void
io_dtor( void * vio )
{
tr_peerIo * io = vio;
bufferevent_free( io->bufev );
tr_netClose( io->socket );
tr_cryptoFree( io->crypto );
tr_free( io );
}
void void
tr_peerIoFree( tr_peerIo * c ) tr_peerIoFree( tr_peerIo * io )
{ {
if( c != NULL ) if( io != NULL )
{ {
c->canRead = NULL; io->canRead = NULL;
c->didWrite = NULL; io->didWrite = NULL;
c->gotError = NULL; io->gotError = NULL;
bufferevent_free( c->bufev ); tr_runInEventThread( io->handle, io_dtor, io );
tr_netClose( c->socket );
tr_cryptoFree( c->crypto );
tr_free( c );
} }
} }

View File

@ -96,6 +96,7 @@ struct peer_atom
typedef struct typedef struct
{ {
uint8_t hash[SHA_DIGEST_LENGTH]; uint8_t hash[SHA_DIGEST_LENGTH];
tr_ptrArray * outgoingHandshakes; /* tr_handshake */
tr_ptrArray * pool; /* struct peer_atom */ tr_ptrArray * pool; /* struct peer_atom */
tr_ptrArray * peers; /* tr_peer */ tr_ptrArray * peers; /* tr_peer */
tr_timer * reconnectTimer; tr_timer * reconnectTimer;
@ -112,10 +113,10 @@ Torrent;
struct tr_peerMgr struct tr_peerMgr
{ {
int connectionCount;
tr_handle * handle; tr_handle * handle;
tr_ptrArray * torrents; /* Torrent */ tr_ptrArray * torrents; /* Torrent */
int connectionCount; tr_ptrArray * incomingHandshakes; /* tr_handshake */
tr_ptrArray * handshakes; /* in-process */
}; };
/** /**
@ -195,9 +196,9 @@ handshakeCompare( const void * a, const void * b )
} }
static tr_handshake* static tr_handshake*
getExistingHandshake( tr_peerMgr * mgr, const struct in_addr * in_addr ) getExistingHandshake( tr_ptrArray * handshakes, const struct in_addr * in_addr )
{ {
return tr_ptrArrayFindSorted( mgr->handshakes, return tr_ptrArrayFindSorted( handshakes,
in_addr, in_addr,
handshakeCompareToAddr ); handshakeCompareToAddr );
} }
@ -284,12 +285,15 @@ peerIsKnown( const Torrent * t, const struct in_addr * addr )
} }
static int static int
peerIsInUse( const Torrent * t, const struct in_addr * addr ) peerIsInUse( const Torrent * ct, const struct in_addr * addr )
{ {
Torrent * t = (Torrent*) ct;
assert( torrentIsLocked ( t ) ); assert( torrentIsLocked ( t ) );
return ( getExistingPeer( (Torrent*)t, addr ) != NULL ) return ( getExistingPeer( t, addr ) != NULL )
|| ( getExistingHandshake( ((Torrent*)t)->manager, addr ) != NULL ); || ( getExistingHandshake( t->outgoingHandshakes, addr ) != NULL )
|| ( getExistingHandshake( t->manager->incomingHandshakes, addr ) != NULL );
} }
static tr_peer* static tr_peer*
@ -374,13 +378,16 @@ removeAllPeers( Torrent * t )
/* torrent must have already been removed from manager->torrents */ /* torrent must have already been removed from manager->torrents */
static void static void
freeTorrent( Torrent * t ) torrentDestructor( Torrent * t )
{ {
uint8_t hash[SHA_DIGEST_LENGTH]; uint8_t hash[SHA_DIGEST_LENGTH];
assert( t != NULL ); assert( t != NULL );
assert( !t->isRunning );
assert( t->peers != NULL ); assert( t->peers != NULL );
assert( torrentIsLocked( t ) ); assert( torrentIsLocked( t ) );
assert( tr_ptrArrayEmpty( t->outgoingHandshakes ) );
assert( tr_ptrArrayEmpty( t->peers ) );
memcpy( hash, t->hash, SHA_DIGEST_LENGTH ); memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
@ -390,10 +397,50 @@ freeTorrent( Torrent * t )
tr_bitfieldFree( t->requested ); tr_bitfieldFree( t->requested );
tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free ); tr_ptrArrayFree( t->pool, (PtrArrayForeachFunc)tr_free );
tr_ptrArrayFree( t->peers, (PtrArrayForeachFunc)freePeer ); tr_ptrArrayFree( t->outgoingHandshakes, NULL );
tr_free( t ); tr_free( t );
} }
static int reconnectPulse( void * vtorrent );
static void
restartReconnectTimer( Torrent * t )
{
tr_timerFree( &t->reconnectTimer );
if( t->isRunning )
t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
}
static int rechokePulse( void * vtorrent );
static void
restartChokeTimer( Torrent * t )
{
tr_timerFree( &t->rechokeTimer );
if( t->isRunning )
t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
}
static Torrent*
torrentConstructor( tr_peerMgr * manager, tr_torrent * tor )
{
Torrent * t;
t = tr_new0( Torrent, 1 );
t->manager = manager;
t->tor = tor;
t->pool = tr_ptrArrayNew( );
t->peers = tr_ptrArrayNew( );
t->outgoingHandshakes = tr_ptrArrayNew( );
t->requested = tr_bitfieldNew( tor->blockCount );
restartChokeTimer( t );
restartReconnectTimer( t );
memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
return t;
}
/** /**
*** ***
**/ **/
@ -464,7 +511,7 @@ tr_peerMgrNew( tr_handle * handle )
tr_peerMgr * m = tr_new0( tr_peerMgr, 1 ); tr_peerMgr * m = tr_new0( tr_peerMgr, 1 );
m->handle = handle; m->handle = handle;
m->torrents = tr_ptrArrayNew( ); m->torrents = tr_ptrArrayNew( );
m->handshakes = tr_ptrArrayNew( ); m->incomingHandshakes = tr_ptrArrayNew( );
return m; return m;
} }
@ -474,13 +521,13 @@ tr_peerMgrFree( tr_peerMgr * manager )
managerLock( manager ); managerLock( manager );
/* free the torrents. */ /* free the torrents. */
tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)freeTorrent ); tr_ptrArrayFree( manager->torrents, (PtrArrayForeachFunc)torrentDestructor );
/* free the handshakes. Abort invokes handshakeDoneCB(), which removes /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
* the item from manager->handshakes, so this is a little roundabout... */ * the item from manager->handshakes, so this is a little roundabout... */
while( !tr_ptrArrayEmpty( manager->handshakes ) ) while( !tr_ptrArrayEmpty( manager->incomingHandshakes ) )
tr_handshakeAbort( tr_ptrArrayNth( manager->handshakes, 0 ) ); tr_handshakeAbort( tr_ptrArrayNth( manager->incomingHandshakes, 0 ) );
tr_ptrArrayFree( manager->handshakes, NULL ); tr_ptrArrayFree( manager->incomingHandshakes, NULL );
managerUnlock( manager ); managerUnlock( manager );
tr_free( manager ); tr_free( manager );
@ -761,34 +808,6 @@ broadcastGotBlock( Torrent * t, uint32_t index, uint32_t offset, uint32_t length
tr_free( peers ); tr_free( peers );
} }
/**
***
**/
static int reconnectPulse( void * vtorrent );
static void
restartReconnectTimer( Torrent * t )
{
tr_timerFree( &t->reconnectTimer );
if( t->isRunning )
t->reconnectTimer = tr_timerNew( t->manager->handle, reconnectPulse, t, RECONNECT_PERIOD_MSEC );
}
/**
***
**/
static int rechokePulse( void * vtorrent );
static void
restartChokeTimer( Torrent * t )
{
tr_timerFree( &t->rechokeTimer );
if( t->isRunning )
t->rechokeTimer = tr_timerNew( t->manager->handle, rechokePulse, t, RECHOKE_PERIOD_MSEC );
}
static void static void
msgsCallbackFunc( void * vpeer, void * vevent, void * vt ) msgsCallbackFunc( void * vpeer, void * vevent, void * vt )
{ {
@ -853,6 +872,7 @@ ensureAtomExists( Torrent * t, const struct in_addr * addr, uint16_t port, uint8
} }
} }
/* FIXME: this is kind of a mess. */
static void static void
myHandshakeDoneCB( tr_handshake * handshake, myHandshakeDoneCB( tr_handshake * handshake,
tr_peerIo * io, tr_peerIo * io,
@ -864,34 +884,33 @@ myHandshakeDoneCB( tr_handshake * handshake,
uint16_t port; uint16_t port;
const struct in_addr * in_addr; const struct in_addr * in_addr;
tr_peerMgr * manager = (tr_peerMgr*) vmanager; tr_peerMgr * manager = (tr_peerMgr*) vmanager;
const uint8_t * hash = NULL;
Torrent * t; Torrent * t;
tr_handshake * ours; tr_handshake * ours;
assert( io != NULL ); assert( io != NULL );
assert( isConnected==0 || isConnected==1 ); assert( isConnected==0 || isConnected==1 );
ours = tr_ptrArrayRemoveSorted( manager->handshakes, t = tr_peerIoHasTorrentHash( io )
handshake, ? getExistingTorrent( manager, tr_peerIoGetTorrentHash( io ) )
handshakeCompare ); : NULL;
if( tr_peerIoIsIncoming ( io ) )
ours = tr_ptrArrayRemoveSorted( manager->incomingHandshakes,
handshake, handshakeCompare );
else if( t != NULL )
ours = tr_ptrArrayRemoveSorted( t->outgoingHandshakes,
handshake, handshakeCompare );
else
ours = handshake;
assert( ours != NULL ); assert( ours != NULL );
assert( ours == handshake ); assert( ours == handshake );
in_addr = tr_peerIoGetAddress( io, &port );
if( !tr_peerIoHasTorrentHash( io ) ) /* incoming connection gone wrong? */
{
tr_peerIoFree( io );
--manager->connectionCount;
return;
}
hash = tr_peerIoGetTorrentHash( io );
t = getExistingTorrent( manager, hash );
if( t != NULL ) if( t != NULL )
torrentLock( t ); torrentLock( t );
in_addr = tr_peerIoGetAddress( io, &port );
if( !t || !t->isRunning ) if( !t || !t->isRunning )
{ {
tr_peerIoFree( io ); tr_peerIoFree( io );
@ -930,22 +949,6 @@ myHandshakeDoneCB( tr_handshake * handshake,
torrentUnlock( t ); torrentUnlock( t );
} }
static void
initiateHandshake( tr_peerMgr * manager, tr_peerIo * io )
{
tr_handshake * handshake;
assert( io != NULL );
handshake = tr_handshakeNew( io,
manager->handle->encryptionMode,
myHandshakeDoneCB,
manager );
++manager->connectionCount;
tr_ptrArrayInsertSorted( manager->handshakes, handshake, handshakeCompare );
}
void void
tr_peerMgrAddIncoming( tr_peerMgr * manager, tr_peerMgrAddIncoming( tr_peerMgr * manager,
struct in_addr * addr, struct in_addr * addr,
@ -954,10 +957,17 @@ tr_peerMgrAddIncoming( tr_peerMgr * manager,
{ {
managerLock( manager ); managerLock( manager );
if( getExistingHandshake( manager, addr ) == NULL ) if( getExistingHandshake( manager->incomingHandshakes, addr ) == NULL )
{ {
tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket ); tr_peerIo * io = tr_peerIoNewIncoming( manager->handle, addr, port, socket );
initiateHandshake( manager, io );
tr_handshake * handshake = tr_handshakeNew( io,
manager->handle->encryptionMode,
myHandshakeDoneCB,
manager );
++manager->connectionCount;
tr_ptrArrayInsertSorted( manager->incomingHandshakes, handshake, handshakeCompare );
} }
managerUnlock( manager ); managerUnlock( manager );
@ -1104,20 +1114,20 @@ tr_peerMgrStartTorrent( tr_peerMgr * manager,
static void static void
stopTorrent( Torrent * t ) stopTorrent( Torrent * t )
{ {
int i, size;
tr_peer ** peers;
assert( torrentIsLocked( t ) ); assert( torrentIsLocked( t ) );
t->isRunning = 0; t->isRunning = 0;
tr_timerFree( &t->rechokeTimer ); tr_timerFree( &t->rechokeTimer );
tr_timerFree( &t->reconnectTimer ); tr_timerFree( &t->reconnectTimer );
peers = getConnectedPeers( t, &size ); /* disconnect the peers. */
for( i=0; i<size; ++i ) tr_ptrArrayForeach( t->peers, (PtrArrayForeachFunc)freePeer );
disconnectPeer( peers[i] ); tr_ptrArrayClear( t->peers );
tr_free( peers ); /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
* which removes the handshake from t->outgoingHandshakes... */
while( !tr_ptrArrayEmpty( t->outgoingHandshakes ) )
tr_handshakeAbort( tr_ptrArrayNth( t->outgoingHandshakes, 0 ) );
} }
void void
tr_peerMgrStopTorrent( tr_peerMgr * manager, tr_peerMgrStopTorrent( tr_peerMgr * manager,
@ -1141,16 +1151,7 @@ tr_peerMgrAddTorrent( tr_peerMgr * manager,
assert( tor != NULL ); assert( tor != NULL );
assert( getExistingTorrent( manager, tor->info.hash ) == NULL ); assert( getExistingTorrent( manager, tor->info.hash ) == NULL );
t = tr_new0( Torrent, 1 ); t = torrentConstructor( manager, tor );
t->manager = manager;
t->tor = tor;
t->pool = tr_ptrArrayNew( );
t->peers = tr_ptrArrayNew( );
t->requested = tr_bitfieldNew( tor->blockCount );
restartChokeTimer( t );
restartReconnectTimer( t );
memcpy( t->hash, tor->info.hash, SHA_DIGEST_LENGTH );
tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare ); tr_ptrArrayInsertSorted( manager->torrents, t, torrentCompare );
managerUnlock( manager ); managerUnlock( manager );
@ -1168,7 +1169,7 @@ tr_peerMgrRemoveTorrent( tr_peerMgr * manager,
assert( t != NULL ); assert( t != NULL );
stopTorrent( t ); stopTorrent( t );
tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare ); tr_ptrArrayRemoveSorted( manager->torrents, t, torrentCompare );
freeTorrent( t ); torrentDestructor( t );
managerUnlock( manager ); managerUnlock( manager );
} }
@ -1612,11 +1613,24 @@ reconnectPulse( void * vtorrent )
/* add some new ones */ /* add some new ones */
nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount; nAdd = MAX_CONNECTED_PEERS_PER_TORRENT - peerCount;
for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i ) { for( i=0; i<nAdd && i<nCandidates && i<MAX_RECONNECTIONS_PER_PULSE; ++i )
{
tr_peerMgr * mgr = t->manager;
struct peer_atom * atom = candidates[i]; struct peer_atom * atom = candidates[i];
tr_peerIo * io = tr_peerIoNewOutgoing( t->manager->handle, &atom->addr, atom->port, t->hash );
tordbg( t, "RECONNECT adding an outgoing connection...\n" ); tr_peerIo * io = tr_peerIoNewOutgoing( mgr->handle, &atom->addr, atom->port, t->hash );
initiateHandshake( t->manager, io );
tr_handshake * handshake = tr_handshakeNew( io,
mgr->handle->encryptionMode,
myHandshakeDoneCB,
mgr );
++mgr->connectionCount;
assert( tr_peerIoGetTorrentHash( io ) != NULL );
tr_ptrArrayInsertSorted( t->outgoingHandshakes, handshake, handshakeCompare );
atom->time = time( NULL ); atom->time = time( NULL );
} }

View File

@ -312,7 +312,7 @@ tr_closeImpl( void * vh )
void void
tr_close( tr_handle * h ) tr_close( tr_handle * h )
{ {
assert( tr_torrentCount( h ) == 0 ); //assert( tr_torrentCount( h ) == 0 );
tr_runInEventThread( h, tr_closeImpl, h ); tr_runInEventThread( h, tr_closeImpl, h );
while( !h->isClosed ) while( !h->isClosed )