avoid stale filehandle issues, fixes #3265

This commit is contained in:
Thomas Waldmann 2018-06-13 02:43:34 +02:00
parent 86a91af67a
commit 5b5546d7e9
3 changed files with 34 additions and 7 deletions

View File

@ -52,6 +52,8 @@ LIST_SCAN_LIMIT = 100000
DEFAULT_SEGMENTS_PER_DIR = 1000
FD_MAX_AGE = 4 * 60 # 4 minutes
CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
HASH_WINDOW_SIZE = 0xfff # 4095B

View File

@ -39,6 +39,12 @@ class LRUCache:
self._lru.append(key)
return value
def upd(self, key, value):
# special use only: update the value for an existing key without having to dispose it first
# this method complements __setitem__ which should be used for the normal use case.
assert key in self._cache, "Unexpected attempt to update a non-existing item."
self._cache[key] = value
def clear(self):
for value in self._cache.values():
self._dispose(value)

View File

@ -3,6 +3,7 @@ import mmap
import os
import shutil
import struct
import time
from binascii import hexlify, unhexlify
from collections import defaultdict
from configparser import ConfigParser
@ -1135,8 +1136,7 @@ class LoggedIO:
def __init__(self, path, limit, segments_per_dir, capacity=90):
self.path = path
self.fds = LRUCache(capacity,
dispose=self.close_fd)
self.fds = LRUCache(capacity, dispose=self._close_fd)
self.segment = 0
self.limit = limit
self.segments_per_dir = segments_per_dir
@ -1148,7 +1148,8 @@ class LoggedIO:
self.fds.clear()
self.fds = None # Just to make sure we're disabled
def close_fd(self, fd):
def _close_fd(self, ts_fd):
ts, fd = ts_fd
safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED')
fd.close()
@ -1262,13 +1263,31 @@ class LoggedIO:
return self._write_fd
def get_fd(self, segment):
try:
return self.fds[segment]
except KeyError:
# note: get_fd() returns a fd with undefined file pointer position,
# so callers must always seek() to desired position afterwards.
now = time.monotonic()
def open_fd():
fd = open(self.segment_filename(segment), 'rb')
self.fds[segment] = fd
self.fds[segment] = (now, fd)
return fd
try:
ts, fd = self.fds[segment]
except KeyError:
fd = open_fd()
else:
if now - ts > FD_MAX_AGE:
# we do not want to touch long-unused file handles to
# avoid ESTALE issues (e.g. on network filesystems).
del self.fds[segment]
fd = open_fd()
else:
# fd is fresh enough, so we use it.
# also, we update the timestamp of the lru cache entry.
self.fds.upd(segment, (now, fd))
return fd
def close_segment(self):
# set self._write_fd to None early to guard against reentry from error handling code paths:
fd, self._write_fd = self._write_fd, None