Merge pull request #3901 from ThomasWaldmann/avoid-stale-repo-files

avoid stale filehandle issues, fixes #3265
This commit is contained in:
TW 2018-06-29 23:46:28 +02:00 committed by GitHub
commit 15d1699d48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 7 deletions

View File

@ -52,6 +52,8 @@ LIST_SCAN_LIMIT = 100000
DEFAULT_SEGMENTS_PER_DIR = 1000 DEFAULT_SEGMENTS_PER_DIR = 1000
FD_MAX_AGE = 4 * 60 # 4 minutes
CHUNK_MIN_EXP = 19 # 2**19 == 512kiB CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
CHUNK_MAX_EXP = 23 # 2**23 == 8MiB CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
HASH_WINDOW_SIZE = 0xfff # 4095B HASH_WINDOW_SIZE = 0xfff # 4095B

View File

@ -39,6 +39,12 @@ class LRUCache:
self._lru.append(key) self._lru.append(key)
return value return value
def upd(self, key, value):
# special use only: update the value for an existing key without having to dispose it first
# this method complements __setitem__ which should be used for the normal use case.
assert key in self._cache, "Unexpected attempt to update a non-existing item."
self._cache[key] = value
def clear(self): def clear(self):
for value in self._cache.values(): for value in self._cache.values():
self._dispose(value) self._dispose(value)

View File

@ -3,6 +3,7 @@ import mmap
import os import os
import shutil import shutil
import struct import struct
import time
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from collections import defaultdict from collections import defaultdict
from configparser import ConfigParser from configparser import ConfigParser
@ -1135,8 +1136,7 @@ class LoggedIO:
def __init__(self, path, limit, segments_per_dir, capacity=90): def __init__(self, path, limit, segments_per_dir, capacity=90):
self.path = path self.path = path
self.fds = LRUCache(capacity, self.fds = LRUCache(capacity, dispose=self._close_fd)
dispose=self.close_fd)
self.segment = 0 self.segment = 0
self.limit = limit self.limit = limit
self.segments_per_dir = segments_per_dir self.segments_per_dir = segments_per_dir
@ -1148,7 +1148,8 @@ class LoggedIO:
self.fds.clear() self.fds.clear()
self.fds = None # Just to make sure we're disabled self.fds = None # Just to make sure we're disabled
def close_fd(self, fd): def _close_fd(self, ts_fd):
ts, fd = ts_fd
safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED') safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED')
fd.close() fd.close()
@ -1262,13 +1263,31 @@ class LoggedIO:
return self._write_fd return self._write_fd
def get_fd(self, segment): def get_fd(self, segment):
try: # note: get_fd() returns a fd with undefined file pointer position,
return self.fds[segment] # so callers must always seek() to desired position afterwards.
except KeyError: now = time.monotonic()
def open_fd():
fd = open(self.segment_filename(segment), 'rb') fd = open(self.segment_filename(segment), 'rb')
self.fds[segment] = fd self.fds[segment] = (now, fd)
return fd return fd
try:
ts, fd = self.fds[segment]
except KeyError:
fd = open_fd()
else:
if now - ts > FD_MAX_AGE:
# we do not want to touch long-unused file handles to
# avoid ESTALE issues (e.g. on network filesystems).
del self.fds[segment]
fd = open_fd()
else:
# fd is fresh enough, so we use it.
# also, we update the timestamp of the lru cache entry.
self.fds.upd(segment, (now, fd))
return fd
def close_segment(self): def close_segment(self):
# set self._write_fd to None early to guard against reentry from error handling code paths: # set self._write_fd to None early to guard against reentry from error handling code paths:
fd, self._write_fd = self._write_fd, None fd, self._write_fd = self._write_fd, None