diff --git a/attic/_chunker.c b/attic/_chunker.c index 94d4e47ae..167f6f2c3 100644 --- a/attic/_chunker.c +++ b/attic/_chunker.c @@ -1,4 +1,5 @@ #include +#include /* Cyclic polynomial / buzhash: https://en.wikipedia.org/wiki/Rolling_hash */ @@ -78,8 +79,9 @@ typedef struct { int window_size, chunk_mask, min_size; size_t buf_size; uint32_t *table; - uint8_t *data; + uint8_t *data, *read_buf; PyObject *fd; + int fh; int done, eof; size_t remaining, bytes_read, bytes_yielded, position, last; } Chunker; @@ -94,15 +96,17 @@ chunker_init(int window_size, int chunk_mask, int min_size, uint32_t seed) c->table = buzhash_init_table(seed); c->buf_size = 10 * 1024 * 1024; c->data = malloc(c->buf_size); + c->read_buf = malloc(c->buf_size); return c; } static void -chunker_set_fd(Chunker *c, PyObject *fd) +chunker_set_fd(Chunker *c, PyObject *fd, int fh) { Py_XDECREF(c->fd); c->fd = fd; Py_INCREF(fd); + c->fh = fh; c->done = 0; c->remaining = 0; c->bytes_read = 0; @@ -118,6 +122,7 @@ chunker_free(Chunker *c) Py_XDECREF(c->fd); free(c->table); free(c->data); + free(c->read_buf); free(c); } @@ -133,20 +138,48 @@ chunker_fill(Chunker *c) if(c->eof || n == 0) { return 1; } - data = PyObject_CallMethod(c->fd, "read", "i", n); - if(!data) { - return 0; - } - n = PyBytes_Size(data); - if(n) { - memcpy(c->data + c->position + c->remaining, PyBytes_AsString(data), n); - c->remaining += n; - c->bytes_read += n; + if(c->fh >= 0) { + // if we have a os-level file descriptor, use os-level API + n = read(c->fh, c->read_buf, n); + if(n > 0) { + memcpy(c->data + c->position + c->remaining, c->read_buf, n); + c->remaining += n; + c->bytes_read += n; + } + else + if(n == 0) { + c->eof = 1; + } + else { + // some error happened + return 0; + } + #if ( _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L ) + // We tell the OS that we do not need the data of this file any more + // that it maybe has in the cache. This avoids that we spoil the + // complete cache with data that we only read once and (due to cache + // size limit) kick out data from the cache that might be still useful + // for the OS or other processes. + posix_fadvise(c->fh, (off_t) 0, (off_t) 0, POSIX_FADV_DONTNEED); + #endif } else { - c->eof = 1; + // no os-level file descriptor, use Python file object API + data = PyObject_CallMethod(c->fd, "read", "i", n); + if(!data) { + return 0; + } + n = PyBytes_Size(data); + if(n) { + memcpy(c->data + c->position + c->remaining, PyBytes_AsString(data), n); + c->remaining += n; + c->bytes_read += n; + } + else { + c->eof = 1; + } + Py_DECREF(data); } - Py_DECREF(data); return 1; } diff --git a/attic/archive.py b/attic/archive.py index 6b6f791ce..f4b3ed0fa 100644 --- a/attic/archive.py +++ b/attic/archive.py @@ -464,9 +464,10 @@ def process_file(self, path, st, cache): status = 'A' # regular file, added # Only chunkify the file if needed if chunks is None: - with Archive._open_rb(path, st) as fd: + fh = Archive._open_rb(path, st) + with os.fdopen(fh, 'rb') as fd: chunks = [] - for chunk in self.chunker.chunkify(fd): + for chunk in self.chunker.chunkify(fd, fh): chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)) cache.memorize_file(path_hash, st, [c[0] for c in chunks]) status = status or 'M' # regular file, modified (if not 'A' already) @@ -488,12 +489,10 @@ def _open_rb(path, st): euid = None def open_simple(p, s): - fd = os.open(p, flags_normal) - return os.fdopen(fd, 'rb') + return os.open(p, flags_normal) def open_noatime(p, s): - fd = os.open(p, flags_noatime) - return os.fdopen(fd, 'rb') + return os.open(p, flags_noatime) def open_noatime_if_owner(p, s): if euid == 0 or s.st_uid == euid: @@ -515,7 +514,7 @@ def open_noatime_with_fallback(p, s): # So in future, let's check whether the file is owned by us # before attempting to use O_NOATIME. Archive._open_rb = open_noatime_if_owner - return os.fdopen(fd, 'rb') + return fd if flags_noatime != flags_normal: # Always use O_NOATIME version. diff --git a/attic/chunker.pyx b/attic/chunker.pyx index 44ec31fc7..3ca4b1a7e 100644 --- a/attic/chunker.pyx +++ b/attic/chunker.pyx @@ -9,7 +9,7 @@ cdef extern from "_chunker.c": ctypedef struct _Chunker "Chunker": pass _Chunker *chunker_init(int window_size, int chunk_mask, int min_size, uint32_t seed) - void chunker_set_fd(_Chunker *chunker, object fd) + void chunker_set_fd(_Chunker *chunker, object f, int fd) void chunker_free(_Chunker *chunker) object chunker_process(_Chunker *chunker) uint32_t *buzhash_init_table(uint32_t seed) @@ -23,8 +23,15 @@ cdef class Chunker: def __cinit__(self, window_size, chunk_mask, min_size, seed): self.chunker = chunker_init(window_size, chunk_mask, min_size, seed & 0xffffffff) - def chunkify(self, fd): - chunker_set_fd(self.chunker, fd) + def chunkify(self, fd, fh=-1): + """ + Cut a file into chunks. + + :param fd: Python file object + :param fh: OS-level file handle (if available), + defaults to -1 which means not to use OS-level fd. + """ + chunker_set_fd(self.chunker, fd, fh) return self def __dealloc__(self): @@ -52,4 +59,4 @@ def buzhash_update(uint32_t sum, unsigned char remove, unsigned char add, size_t table = buzhash_init_table(seed & 0xffffffff) sum = c_buzhash_update(sum, remove, add, len, table) free(table) - return sum \ No newline at end of file + return sum diff --git a/attic/repository.py b/attic/repository.py index 255382f61..991a2fdb3 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -555,6 +555,10 @@ def write_put(self, id, data): header = self.header_no_crc_fmt.pack(size, TAG_PUT) crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff) fd.write(b''.join((crc, header, id, data))) + if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX + # tell the OS that it does not need to cache what we just wrote, + # avoids spoiling the cache for the OS and other processes. + os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED) self.offset += size return self.segment, offset