Repository.compact_segments: always save_space

This commit is contained in:
Marian Beermann 2016-07-14 01:54:47 +02:00
parent 3bdfe2a564
commit 389503db60
2 changed files with 20 additions and 14 deletions

View File

@ -248,9 +248,10 @@ class Repository:
def commit(self, save_space=False): def commit(self, save_space=False):
"""Commit transaction """Commit transaction
""" """
# save_space is not used anymore, but stays for RPC/API compatibility.
self.io.write_commit() self.io.write_commit()
if not self.append_only: if not self.append_only:
self.compact_segments(save_space=save_space) self.compact_segments()
self.write_index() self.write_index()
self.rollback() self.rollback()
@ -348,7 +349,7 @@ class Repository:
os.unlink(os.path.join(self.path, name)) os.unlink(os.path.join(self.path, name))
self.index = None self.index = None
def compact_segments(self, save_space=False): def compact_segments(self):
"""Compact sparse segments by copying data into new segments """Compact sparse segments by copying data into new segments
""" """
if not self.compact: if not self.compact:
@ -357,12 +358,11 @@ class Repository:
segments = self.segments segments = self.segments
unused = [] # list of segments, that are not used anymore unused = [] # list of segments, that are not used anymore
def complete_xfer(): def complete_xfer(intermediate=True):
# complete the transfer (usually exactly when some target segment # complete the current transfer (when some target segment is full)
# is full, or at the very end when everything is processed)
nonlocal unused nonlocal unused
# commit the new, compact, used segments # commit the new, compact, used segments
self.io.write_commit() self.io.write_commit(intermediate=intermediate)
# get rid of the old, sparse, unused segments. free space. # get rid of the old, sparse, unused segments. free space.
for segment in unused: for segment in unused:
assert self.segments.pop(segment) == 0 assert self.segments.pop(segment) == 0
@ -383,7 +383,7 @@ class Repository:
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset): if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
try: try:
new_segment, offset = self.io.write_put(key, data, raise_full=save_space) new_segment, offset = self.io.write_put(key, data, raise_full=True)
except LoggedIO.SegmentFull: except LoggedIO.SegmentFull:
complete_xfer() complete_xfer()
new_segment, offset = self.io.write_put(key, data) new_segment, offset = self.io.write_put(key, data)
@ -394,13 +394,13 @@ class Repository:
elif tag == TAG_DELETE: elif tag == TAG_DELETE:
if index_transaction_id is None or segment > index_transaction_id: if index_transaction_id is None or segment > index_transaction_id:
try: try:
self.io.write_delete(key, raise_full=save_space) self.io.write_delete(key, raise_full=True)
except LoggedIO.SegmentFull: except LoggedIO.SegmentFull:
complete_xfer() complete_xfer()
self.io.write_delete(key) self.io.write_delete(key)
assert segments[segment] == 0 assert segments[segment] == 0
unused.append(segment) unused.append(segment)
complete_xfer() complete_xfer(intermediate=False)
def replay_segments(self, index_transaction_id, segments_transaction_id): def replay_segments(self, index_transaction_id, segments_transaction_id):
self.prepare_txn(index_transaction_id, do_cleanup=False) self.prepare_txn(index_transaction_id, do_cleanup=False)
@ -536,7 +536,7 @@ class Repository:
if current_index.get(key, (-1, -1)) != value: if current_index.get(key, (-1, -1)) != value:
report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1)))) report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
if repair: if repair:
self.compact_segments(save_space=save_space) self.compact_segments()
self.write_index() self.write_index()
self.rollback() self.rollback()
if error_found: if error_found:
@ -898,9 +898,15 @@ class LoggedIO:
self.offset += self.put_header_fmt.size self.offset += self.put_header_fmt.size
return self.segment, self.put_header_fmt.size return self.segment, self.put_header_fmt.size
def write_commit(self): def write_commit(self, intermediate=False):
self.close_segment() if intermediate:
fd = self.get_write_fd() # Intermediate commits go directly into the current segment - this makes checking their validity more
# expensive, but is faster and reduces clobber.
fd = self.get_write_fd()
fd.sync()
else:
self.close_segment()
fd = self.get_write_fd()
header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT) header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff) crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
fd.write(b''.join((crc, header))) fd.write(b''.join((crc, header)))

View File

@ -449,7 +449,7 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
# Simulate a crash before compact # Simulate a crash before compact
with patch.object(Repository, 'compact_segments') as compact: with patch.object(Repository, 'compact_segments') as compact:
self.repository.commit() self.repository.commit()
compact.assert_called_once_with(save_space=False) compact.assert_called_once_with()
self.reopen() self.reopen()
with self.repository: with self.repository:
self.check(repair=True) self.check(repair=True)