Partition operations by priority

This commit is contained in:
M66B 2020-01-25 12:58:21 +01:00
parent 544d4dfd61
commit 08365a7f1d
1 changed files with 72 additions and 58 deletions

View File

@ -64,6 +64,7 @@ import java.util.Hashtable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.mail.AuthenticationFailedException; import javax.mail.AuthenticationFailedException;
@ -1111,86 +1112,99 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
@Override @Override
public void onChanged(final List<TupleOperationEx> _operations) { public void onChanged(final List<TupleOperationEx> _operations) {
List<Long> ops = new ArrayList<>(); List<Long> ops = new ArrayList<>();
List<TupleOperationEx> operations = new ArrayList<>(); List<TupleOperationEx> submit = new ArrayList<>();
for (TupleOperationEx op : _operations) { for (TupleOperationEx op : _operations) {
if (!handling.contains(op.id)) if (!handling.contains(op.id))
operations.add(op); submit.add(op);
ops.add(op.id); ops.add(op.id);
} }
handling = ops; handling = ops;
if (operations.size() > 0 ) { if (submit.size() > 0) {
Log.i(folder.name + " queuing operations=" + operations.size() + Log.i(folder.name + " queuing operations=" + submit.size() +
" init=" + folder.initialize + " poll=" + folder.poll); " init=" + folder.initialize + " poll=" + folder.poll);
executor.submit(new Helper.PriorityRunnable(operations.get(0).priority) { // Partition operations by priority
@Override Map<Integer, List<TupleOperationEx>> partitions = new TreeMap<>();
public void run() { for (TupleOperationEx op : submit) {
super.run(); if (!partitions.containsKey(op.priority))
try { partitions.put(op.priority, new ArrayList<>());
wlFolder.acquire(); partitions.get(op.priority).add(op);
Log.i(folder.name + " process"); }
// Get folder for (int priority : partitions.keySet()) {
Folder ifolder = mapFolders.get(folder); // null when polling List<TupleOperationEx> partition = partitions.get(priority);
boolean canOpen = (account.protocol == EntityAccount.TYPE_IMAP || EntityFolder.INBOX.equals(folder.type)); Log.i(folder.name + " queuing operations=" + partition.size() + " priority=" + priority);
final boolean shouldClose = (ifolder == null && canOpen);
executor.submit(new Helper.PriorityRunnable(priority) {
@Override
public void run() {
super.run();
try { try {
Log.i(folder.name + " run " + (shouldClose ? "offline" : "online")); wlFolder.acquire();
Log.i(folder.name + " process");
if (shouldClose) { // Get folder
// Prevent unnecessary folder connections Folder ifolder = mapFolders.get(folder); // null when polling
if (db.operation().getOperationCount(folder.id, null) == 0) boolean canOpen = (account.protocol == EntityAccount.TYPE_IMAP || EntityFolder.INBOX.equals(folder.type));
return; final boolean shouldClose = (ifolder == null && canOpen);
db.folder().setFolderState(folder.id, "connecting"); try {
Log.i(folder.name + " run " + (shouldClose ? "offline" : "online"));
ifolder = iservice.getStore().getFolder(folder.name); if (shouldClose) {
ifolder.open(Folder.READ_WRITE); // Prevent unnecessary folder connections
if (db.operation().getOperationCount(folder.id, null) == 0)
return;
db.folder().setFolderState(folder.id, "connected"); db.folder().setFolderState(folder.id, "connecting");
db.folder().setFolderError(folder.id, null); ifolder = iservice.getStore().getFolder(folder.name);
} ifolder.open(Folder.READ_WRITE);
Core.processOperations(ServiceSynchronize.this, db.folder().setFolderState(folder.id, "connected");
account, folder,
operations,
iservice.getStore(), ifolder,
state);
} catch (FolderNotFoundException ex) { db.folder().setFolderError(folder.id, null);
Log.w(folder.name, ex); }
db.folder().deleteFolder(folder.id);
} catch (Throwable ex) { Core.processOperations(ServiceSynchronize.this,
Log.e(folder.name, ex); account, folder,
EntityLog.log( partition,
ServiceSynchronize.this, iservice.getStore(), ifolder,
folder.name + " " + Log.formatThrowable(ex, false)); state);
db.folder().setFolderError(folder.id, Log.formatThrowable(ex));
state.error(ex); } catch (FolderNotFoundException ex) {
} finally { Log.w(folder.name, ex);
if (shouldClose) { db.folder().deleteFolder(folder.id);
if (ifolder != null && ifolder.isOpen()) { } catch (Throwable ex) {
db.folder().setFolderState(folder.id, "closing"); Log.e(folder.name, ex);
try { EntityLog.log(
ifolder.close(false); ServiceSynchronize.this,
} catch (MessagingException ex) { folder.name + " " + Log.formatThrowable(ex, false));
Log.w(folder.name, ex); db.folder().setFolderError(folder.id, Log.formatThrowable(ex));
} state.error(ex);
} finally {
if (shouldClose) {
if (ifolder != null && ifolder.isOpen()) {
db.folder().setFolderState(folder.id, "closing");
try {
ifolder.close(false);
} catch (MessagingException ex) {
Log.w(folder.name, ex);
}
}
if (folder.synchronize && (folder.poll || !capIdle))
db.folder().setFolderState(folder.id, "waiting");
else
db.folder().setFolderState(folder.id, null);
} }
if (folder.synchronize && (folder.poll || !capIdle))
db.folder().setFolderState(folder.id, "waiting");
else
db.folder().setFolderState(folder.id, null);
} }
} finally {
wlFolder.release();
} }
} finally {
wlFolder.release();
} }
} });
}); }
} }
} }
}); });