1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-01 12:45:34 +00:00

remote logging/progress: use callback to send queued records, fixes #7662

This commit is contained in:
Thomas Waldmann 2023-06-22 13:03:52 +02:00
parent 96076a71d2
commit 3aec98ada9
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 17 additions and 0 deletions

View file

@ -358,6 +358,7 @@ def open(
storage_quota=self.storage_quota, storage_quota=self.storage_quota,
exclusive=exclusive, exclusive=exclusive,
make_parent_dirs=make_parent_dirs, make_parent_dirs=make_parent_dirs,
send_log_cb=self.send_queued_log,
) )
self.repository.__enter__() # clean exit handled by serve() method self.repository.__enter__() # clean exit handled by serve() method
return self.repository.id return self.repository.id

View file

@ -178,10 +178,16 @@ def __init__(
append_only=False, append_only=False,
storage_quota=None, storage_quota=None,
make_parent_dirs=False, make_parent_dirs=False,
send_log_cb=None,
): ):
self.path = os.path.abspath(path) self.path = os.path.abspath(path)
self._location = Location("file://%s" % self.path) self._location = Location("file://%s" % self.path)
self.version = None self.version = None
# long-running repository methods which emit log or progress output are responsible for calling
# the ._send_log method periodically to get log and progress output transferred to the borg client
# in a timely manner, in case we have a RemoteRepository.
# for local repositories ._send_log can be called also (it will just do nothing in that case).
self._send_log = send_log_cb or (lambda: None)
self.io = None # type: LoggedIO self.io = None # type: LoggedIO
self.lock = None self.lock = None
self.index = None self.index = None
@ -785,6 +791,7 @@ def complete_xfer(intermediate=True):
logger.warning("Segment %d not found, but listed in compaction data", segment) logger.warning("Segment %d not found, but listed in compaction data", segment)
del self.compact[segment] del self.compact[segment]
pi.show() pi.show()
self._send_log()
continue continue
segment_size = self.io.segment_size(segment) segment_size = self.io.segment_size(segment)
freeable_ratio = 1.0 * freeable_space / segment_size freeable_ratio = 1.0 * freeable_space / segment_size
@ -798,6 +805,7 @@ def complete_xfer(intermediate=True):
freeable_space, freeable_space,
) )
pi.show() pi.show()
self._send_log()
continue continue
segments.setdefault(segment, 0) segments.setdefault(segment, 0)
logger.debug( logger.debug(
@ -905,7 +913,9 @@ def complete_xfer(intermediate=True):
assert segments[segment] == 0, "Corrupted segment reference count - corrupted index or hints" assert segments[segment] == 0, "Corrupted segment reference count - corrupted index or hints"
unused.append(segment) unused.append(segment)
pi.show() pi.show()
self._send_log()
pi.finish() pi.finish()
self._send_log()
complete_xfer(intermediate=False) complete_xfer(intermediate=False)
self.io.clear_empty_dirs() self.io.clear_empty_dirs()
quota_use_after = self.storage_quota_use quota_use_after = self.storage_quota_use
@ -924,6 +934,7 @@ def replay_segments(self, index_transaction_id, segments_transaction_id):
) )
for i, (segment, filename) in enumerate(self.io.segment_iterator()): for i, (segment, filename) in enumerate(self.io.segment_iterator()):
pi.show(i) pi.show(i)
self._send_log()
if index_transaction_id is not None and segment <= index_transaction_id: if index_transaction_id is not None and segment <= index_transaction_id:
continue continue
if segment > segments_transaction_id: if segment > segments_transaction_id:
@ -931,6 +942,7 @@ def replay_segments(self, index_transaction_id, segments_transaction_id):
objects = self.io.iter_objects(segment) objects = self.io.iter_objects(segment)
self._update_index(segment, objects) self._update_index(segment, objects)
pi.finish() pi.finish()
self._send_log()
self.write_index() self.write_index()
finally: finally:
self.exclusive = remember_exclusive self.exclusive = remember_exclusive
@ -1061,6 +1073,7 @@ def report_error(msg):
segment = -1 # avoid uninitialized variable if there are no segment files at all segment = -1 # avoid uninitialized variable if there are no segment files at all
for i, (segment, filename) in enumerate(self.io.segment_iterator()): for i, (segment, filename) in enumerate(self.io.segment_iterator()):
pi.show(i) pi.show(i)
self._send_log()
if segment <= last_segment_checked: if segment <= last_segment_checked:
continue continue
if segment > transaction_id: if segment > transaction_id:
@ -1087,6 +1100,7 @@ def report_error(msg):
self.save_config(self.path, self.config) self.save_config(self.path, self.config)
pi.finish() pi.finish()
self._send_log()
# self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>. # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>.
# We might need to add a commit tag if no committed segment is found. # We might need to add a commit tag if no committed segment is found.
if repair and segments_transaction_id is None: if repair and segments_transaction_id is None:
@ -1110,12 +1124,14 @@ def report_error(msg):
current_value = current_index.get(key, not_found) current_value = current_index.get(key, not_found)
if current_value != value: if current_value != value:
logger.warning(line_format, bin_to_hex(key), value, current_value) logger.warning(line_format, bin_to_hex(key), value, current_value)
self._send_log()
for key, current_value in current_index.iteritems(): for key, current_value in current_index.iteritems():
if key in self.index: if key in self.index:
continue continue
value = self.index.get(key, not_found) value = self.index.get(key, not_found)
if current_value != value: if current_value != value:
logger.warning(line_format, bin_to_hex(key), value, current_value) logger.warning(line_format, bin_to_hex(key), value, current_value)
self._send_log()
if repair: if repair:
self.write_index() self.write_index()
self.rollback() self.rollback()