transmission/libtransmission/cache.cc

476 lines
11 KiB
C++

/*
* This file Copyright (C) 2010-2014 Mnemosyne LLC
*
* It may be used under the GNU GPL versions 2 or 3
* or any future license endorsed by Mnemosyne LLC.
*
*/
#include <cstdlib> /* qsort() */
#include <event2/buffer.h>
#include "transmission.h"
#include "cache.h"
#include "inout.h"
#include "log.h"
#include "peer-common.h" /* MAX_BLOCK_SIZE */
#include "ptrarray.h"
#include "torrent.h"
#include "tr-assert.h"
#include "trevent.h"
#include "utils.h"
#define MY_NAME "Cache"
#define dbgmsg(...) tr_logAddDeepNamed(MY_NAME, __VA_ARGS__)
/****
*****
****/
struct cache_block
{
tr_torrent* tor;
tr_piece_index_t piece;
uint32_t offset;
uint32_t length;
time_t time;
tr_block_index_t block;
struct evbuffer* evbuf;
};
struct tr_cache
{
tr_ptrArray blocks;
int max_blocks;
size_t max_bytes;
size_t disk_writes;
size_t disk_write_bytes;
size_t cache_writes;
size_t cache_write_bytes;
};
/****
*****
****/
struct run_info
{
int pos;
int rank;
time_t last_block_time;
bool is_multi_piece;
bool is_piece_done;
unsigned int len;
};
/* return a count of how many contiguous blocks there are starting at this pos */
static int getBlockRun(tr_cache const* cache, int pos, struct run_info* info)
{
int const n = tr_ptrArraySize(&cache->blocks);
struct cache_block const* const* blocks = (struct cache_block const* const*)tr_ptrArrayBase(&cache->blocks);
struct cache_block const* ref = blocks[pos];
tr_block_index_t block = ref->block;
int len = 0;
for (int i = pos; i < n; ++i)
{
struct cache_block const* b = blocks[i];
if (b->block != block)
{
break;
}
if (b->tor != ref->tor)
{
break;
}
++block;
++len;
}
if (info != nullptr)
{
struct cache_block const* b = blocks[pos + len - 1];
info->last_block_time = b->time;
info->is_piece_done = b->tor->hasPiece(b->piece);
info->is_multi_piece = b->piece != blocks[pos]->piece;
info->len = len;
info->pos = pos;
}
return len;
}
/* higher rank comes before lower rank */
static int compareRuns(void const* va, void const* vb)
{
auto* a = static_cast<struct run_info const*>(va);
auto* b = static_cast<struct run_info const*>(vb);
return b->rank - a->rank;
}
enum
{
MULTIFLAG = 0x1000,
DONEFLAG = 0x2000,
SESSIONFLAG = 0x4000
};
/* Calculte runs
* - Stale runs, runs sitting in cache for a long time or runs not growing, get priority.
* Returns number of runs.
*/
// TODO: return std::vector
static int calcRuns(tr_cache const* cache, struct run_info* runs)
{
int const n = tr_ptrArraySize(&cache->blocks);
int i = 0;
time_t const now = tr_time();
for (int pos = 0; pos < n; pos += runs[i++].len)
{
int rank = getBlockRun(cache, pos, &runs[i]);
/* This adds ~2 to the relative length of a run for every minute it has
* languished in the cache. */
rank += (now - runs[i].last_block_time) / 32;
/* Flushing stale blocks should be a top priority as the probability of them
* growing is very small, for blocks on piece boundaries, and nonexistant for
* blocks inside pieces. */
rank |= runs[i].is_piece_done ? DONEFLAG : 0;
/* Move the multi piece runs higher */
rank |= runs[i].is_multi_piece ? MULTIFLAG : 0;
runs[i].rank = rank;
}
qsort(runs, i, sizeof(struct run_info), compareRuns);
return i;
}
static int flushContiguous(tr_cache* cache, int pos, int n)
{
int err = 0;
uint8_t* buf = tr_new(uint8_t, n * MAX_BLOCK_SIZE);
uint8_t* walk = buf;
struct cache_block** blocks = (struct cache_block**)tr_ptrArrayBase(&cache->blocks);
struct cache_block* b = blocks[pos];
tr_torrent* tor = b->tor;
tr_piece_index_t const piece = b->piece;
uint32_t const offset = b->offset;
for (int i = 0; i < n; ++i)
{
b = blocks[pos + i];
evbuffer_copyout(b->evbuf, walk, b->length);
walk += b->length;
evbuffer_free(b->evbuf);
tr_free(b);
}
tr_ptrArrayErase(&cache->blocks, pos, pos + n);
err = tr_ioWrite(tor, piece, offset, walk - buf, buf);
tr_free(buf);
++cache->disk_writes;
cache->disk_write_bytes += walk - buf;
return err;
}
static int flushRuns(tr_cache* cache, struct run_info* runs, int n)
{
int err = 0;
for (int i = 0; err == 0 && i < n; i++)
{
err = flushContiguous(cache, runs[i].pos, runs[i].len);
for (int j = i + 1; j < n; j++)
{
if (runs[j].pos > runs[i].pos)
{
runs[j].pos -= runs[i].len;
}
}
}
return err;
}
static int cacheTrim(tr_cache* cache)
{
int err = 0;
if (tr_ptrArraySize(&cache->blocks) > cache->max_blocks)
{
/* Amount of cache that should be removed by the flush. This influences how large
* runs can grow as well as how often flushes will happen. */
int const cacheCutoff = 1 + cache->max_blocks / 4;
struct run_info* runs = tr_new(struct run_info, tr_ptrArraySize(&cache->blocks));
int i = 0;
int j = 0;
calcRuns(cache, runs);
while (j < cacheCutoff)
{
j += runs[i++].len;
}
err = flushRuns(cache, runs, i);
tr_free(runs);
}
return err;
}
/***
****
***/
static int getMaxBlocks(int64_t max_bytes)
{
return max_bytes / (double)MAX_BLOCK_SIZE;
}
int tr_cacheSetLimit(tr_cache* cache, int64_t max_bytes)
{
char buf[128];
cache->max_bytes = max_bytes;
cache->max_blocks = getMaxBlocks(max_bytes);
tr_formatter_mem_B(buf, cache->max_bytes, sizeof(buf));
tr_logAddNamedDbg(MY_NAME, "Maximum cache size set to %s (%d blocks)", buf, cache->max_blocks);
return cacheTrim(cache);
}
int64_t tr_cacheGetLimit(tr_cache const* cache)
{
return cache->max_bytes;
}
tr_cache* tr_cacheNew(int64_t max_bytes)
{
tr_cache* cache = tr_new0(tr_cache, 1);
cache->blocks = {};
cache->max_bytes = max_bytes;
cache->max_blocks = getMaxBlocks(max_bytes);
return cache;
}
void tr_cacheFree(tr_cache* cache)
{
// FIXME(ckerr): this assertion isn't _always_ going to be true,
// e.g. if writing to disk failed due to disk full / permission error etc
// then there is still going to be data sitting in the cache on shutdown.
// Make this assertion smarter or remove it.
TR_ASSERT(tr_ptrArrayEmpty(&cache->blocks));
tr_ptrArrayDestruct(&cache->blocks, nullptr);
tr_free(cache);
}
/***
****
***/
static int cache_block_compare(void const* va, void const* vb)
{
auto const* a = static_cast<struct cache_block const*>(va);
auto const* b = static_cast<struct cache_block const*>(vb);
/* primary key: torrent id */
if (a->tor->uniqueId != b->tor->uniqueId)
{
return a->tor->uniqueId < b->tor->uniqueId ? -1 : 1;
}
/* secondary key: block # */
if (a->block != b->block)
{
return a->block < b->block ? -1 : 1;
}
/* they're equal */
return 0;
}
static struct cache_block* findBlock(tr_cache* cache, tr_torrent* torrent, tr_piece_index_t piece, uint32_t offset)
{
struct cache_block key;
key.tor = torrent;
key.block = torrent->blockOf(piece, offset);
return static_cast<struct cache_block*>(tr_ptrArrayFindSorted(&cache->blocks, &key, cache_block_compare));
}
int tr_cacheWriteBlock(
tr_cache* cache,
tr_torrent* torrent,
tr_piece_index_t piece,
uint32_t offset,
uint32_t length,
struct evbuffer* writeme)
{
TR_ASSERT(tr_amInEventThread(torrent->session));
struct cache_block* cb = findBlock(cache, torrent, piece, offset);
if (cb == nullptr)
{
cb = tr_new(struct cache_block, 1);
cb->tor = torrent;
cb->piece = piece;
cb->offset = offset;
cb->length = length;
cb->block = torrent->blockOf(piece, offset);
cb->evbuf = evbuffer_new();
tr_ptrArrayInsertSorted(&cache->blocks, cb, cache_block_compare);
}
TR_ASSERT(cb->length == length);
cb->time = tr_time();
evbuffer_drain(cb->evbuf, evbuffer_get_length(cb->evbuf));
evbuffer_remove_buffer(writeme, cb->evbuf, cb->length);
cache->cache_writes++;
cache->cache_write_bytes += cb->length;
return cacheTrim(cache);
}
int tr_cacheReadBlock(
tr_cache* cache,
tr_torrent* torrent,
tr_piece_index_t piece,
uint32_t offset,
uint32_t len,
uint8_t* setme)
{
int err = 0;
struct cache_block* cb = findBlock(cache, torrent, piece, offset);
if (cb != nullptr)
{
evbuffer_copyout(cb->evbuf, setme, len);
}
else
{
err = tr_ioRead(torrent, piece, offset, len, setme);
}
return err;
}
int tr_cachePrefetchBlock(tr_cache* cache, tr_torrent* torrent, tr_piece_index_t piece, uint32_t offset, uint32_t len)
{
int err = 0;
struct cache_block const* const cb = findBlock(cache, torrent, piece, offset);
if (cb == nullptr)
{
err = tr_ioPrefetch(torrent, piece, offset, len);
}
return err;
}
/***
****
***/
static int findBlockPos(tr_cache const* cache, tr_torrent* torrent, tr_piece_index_t block)
{
struct cache_block key;
key.tor = torrent;
key.block = block;
return tr_ptrArrayLowerBound(&cache->blocks, &key, cache_block_compare, nullptr);
}
int tr_cacheFlushDone(tr_cache* cache)
{
int err = 0;
if (tr_ptrArraySize(&cache->blocks) > 0)
{
auto* const runs = tr_new(struct run_info, tr_ptrArraySize(&cache->blocks));
int i = 0;
int const n = calcRuns(cache, runs);
while (i < n && (runs[i].is_piece_done || runs[i].is_multi_piece))
{
runs[i++].rank |= SESSIONFLAG;
}
err = flushRuns(cache, runs, i);
tr_free(runs);
}
return err;
}
int tr_cacheFlushFile(tr_cache* cache, tr_torrent* torrent, tr_file_index_t i)
{
auto const [begin, end] = tr_torGetFileBlockSpan(torrent, i);
int pos = findBlockPos(cache, torrent, begin);
dbgmsg("flushing file %d from cache to disk: blocks [%zu...%zu)", (int)i, (size_t)begin, (size_t)end);
/* flush out all the blocks in that file */
int err = 0;
while (err == 0 && pos < tr_ptrArraySize(&cache->blocks))
{
auto const* b = static_cast<struct cache_block const*>(tr_ptrArrayNth(&cache->blocks, pos));
if (b->tor != torrent)
{
break;
}
if (b->block < begin || b->block >= end)
{
break;
}
err = flushContiguous(cache, pos, getBlockRun(cache, pos, nullptr));
}
return err;
}
int tr_cacheFlushTorrent(tr_cache* cache, tr_torrent* torrent)
{
int err = 0;
int const pos = findBlockPos(cache, torrent, 0);
/* flush out all the blocks in that torrent */
while (err == 0 && pos < tr_ptrArraySize(&cache->blocks))
{
auto const* b = static_cast<struct cache_block const*>(tr_ptrArrayNth(&cache->blocks, pos));
if (b->tor != torrent)
{
break;
}
err = flushContiguous(cache, pos, getBlockRun(cache, pos, nullptr));
}
return err;
}