Prioritize operation execution

This commit is contained in:
M66B 2020-01-25 10:49:59 +01:00
parent 60a4f49477
commit 5669acad9e
7 changed files with 121 additions and 14 deletions

View File

@ -95,7 +95,7 @@ public class AdapterOperation extends RecyclerView.Adapter<AdapterOperation.View
view.setAlpha(operation.synchronize ? 1.0f : Helper.LOW_LIGHT);
StringBuilder sb = new StringBuilder();
sb.append(operation.name);
sb.append(operation.name).append(':').append(operation.priority);
try {
JSONArray jarray = new JSONArray(operation.args);
if (jarray.length() > 0)

View File

@ -133,7 +133,7 @@ class Core {
Log.i(folder.name + " start process");
DB db = DB.getInstance(context);
List<EntityOperation> ops = db.operation().getOperations(folder.id);
List<TupleOperationEx> ops = db.operation().getOperations(folder.id);
List<Long> processed = new ArrayList<>();
Log.i(folder.name + " pending operations=" + ops.size());

View File

@ -37,7 +37,9 @@ public interface DaoOperation {
" ELSE 0" +
" END";
@Query("SELECT operation.*, account.name AS accountName, folder.name AS folderName" +
@Query("SELECT operation.*" +
", " + priority + " AS priority" +
", account.name AS accountName, folder.name AS folderName" +
" ,((account.synchronize IS NULL OR account.synchronize)" +
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)) AS synchronize" +
" FROM operation" +
@ -48,7 +50,12 @@ public interface DaoOperation {
" ORDER BY " + priority + ", id")
LiveData<List<TupleOperationEx>> liveOperations();
String GET_OPS_FOLDER = "SELECT operation.* FROM operation" +
String GET_OPS_FOLDER = "SELECT operation.*" +
", " + priority + " AS priority" +
", account.name AS accountName, folder.name AS folderName" +
" ,((account.synchronize IS NULL OR account.synchronize)" +
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)) AS synchronize" +
" FROM operation" +
" JOIN folder ON folder.id = operation.folder" +
" LEFT JOIN message ON message.id = operation.message" +
" LEFT JOIN account ON account.id = operation.account" +
@ -59,10 +66,10 @@ public interface DaoOperation {
" ORDER BY " + priority + ", id";
@Query(GET_OPS_FOLDER)
List<EntityOperation> getOperations(Long folder);
List<TupleOperationEx> getOperations(Long folder);
@Query(GET_OPS_FOLDER)
LiveData<List<EntityOperation>> liveOperations(Long folder);
LiveData<List<TupleOperationEx>> liveOperations(Long folder);
@Query("SELECT COUNT(operation.id) AS pending" +
", SUM(CASE WHEN operation.error IS NULL THEN 0 ELSE 1 END) AS errors" +

View File

@ -90,6 +90,8 @@ import androidx.recyclerview.widget.RecyclerView;
import com.google.android.material.bottomnavigation.BottomNavigationView;
import org.jetbrains.annotations.NotNull;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
@ -105,17 +107,22 @@ import java.text.DateFormat;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static android.os.Process.THREAD_PRIORITY_BACKGROUND;
@ -141,7 +148,7 @@ public class Helper {
static final String CROWDIN_URI = "https://crowdin.com/project/open-source-email";
static final String GRAVATAR_PRIVACY_URI = "https://meta.stackexchange.com/questions/44717/is-gravatar-a-privacy-risk";
static ExecutorService getBackgroundExecutor(int threads, String name) {
static ExecutorService getBackgroundExecutor(int threads, final String name) {
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger threadId = new AtomicInteger();
@ -160,6 +167,21 @@ public class Helper {
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
factory);
else if (threads == 1)
return new ThreadPoolExecutorEx(
threads, threads,
0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(10, new PriorityComparator()),
factory) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
RunnableFuture<T> task = super.newTaskFor(runnable, value);
if (runnable instanceof PriorityRunnable)
return new PriorityFuture<T>(task, ((PriorityRunnable) runnable).getPriority());
else
return task;
}
};
else
return new ThreadPoolExecutorEx(
threads, threads,
@ -179,6 +201,81 @@ public class Helper {
}
}
private static class PriorityFuture<T> implements RunnableFuture<T> {
private int priority;
private RunnableFuture<T> wrapped;
PriorityFuture(RunnableFuture<T> wrapped, int priority) {
this.priority = priority;
this.wrapped = wrapped;
}
public int getPriority() {
return priority;
}
@Override
public void run() {
wrapped.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return wrapped.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return wrapped.isCancelled();
}
@Override
public boolean isDone() {
return wrapped.isDone();
}
@Override
public T get() throws ExecutionException, InterruptedException {
return wrapped.get();
}
@Override
public T get(long timeout, @NotNull TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
return wrapped.get(timeout, unit);
}
}
private static class PriorityComparator implements Comparator<Runnable> {
@Override
public int compare(Runnable r1, Runnable r2) {
if (r1 instanceof PriorityFuture<?> && r2 instanceof PriorityFuture<?>) {
Integer p1 = ((PriorityFuture<?>) r1).getPriority();
Integer p2 = ((PriorityFuture<?>) r2).getPriority();
Log.i("Priority " + p1 + "/" + p2 + "=" + p1.compareTo(p2));
return p1.compareTo(p2);
} else
return 0;
}
}
static class PriorityRunnable implements Runnable {
private int priority;
int getPriority() {
return priority;
}
PriorityRunnable(int priority) {
this.priority = priority;
}
@Override
public void run() {
Log.i("Run priority=" + priority);
}
}
private static final ExecutorService executor = getBackgroundExecutor(1, "helper");
// Features

View File

@ -92,11 +92,11 @@ public class ServiceSend extends ServiceBase {
});
// Observe send operations
db.operation().liveOperations(null).observe(this, new Observer<List<EntityOperation>>() {
db.operation().liveOperations(null).observe(this, new Observer<List<TupleOperationEx>>() {
private List<Long> handling = new ArrayList<>();
@Override
public void onChanged(final List<EntityOperation> operations) {
public void onChanged(final List<TupleOperationEx> operations) {
boolean process = false;
List<Long> ops = new ArrayList<>();
for (EntityOperation op : operations) {
@ -245,7 +245,7 @@ public class ServiceSend extends ServiceBase {
db.folder().setFolderError(outbox.id, null);
db.folder().setFolderSyncState(outbox.id, "syncing");
List<EntityOperation> ops = db.operation().getOperations(outbox.id);
List<TupleOperationEx> ops = db.operation().getOperations(outbox.id);
Log.i(outbox.name + " pending operations=" + ops.size());
for (EntityOperation op : ops) {
EntityMessage message = null;

View File

@ -1103,13 +1103,13 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
cowners.add(cowner);
cowner.start();
db.operation().liveOperations(folder.id).observe(cowner, new Observer<List<EntityOperation>>() {
db.operation().liveOperations(folder.id).observe(cowner, new Observer<List<TupleOperationEx>>() {
private List<Long> handling = new ArrayList<>();
private final PowerManager.WakeLock wlFolder = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":folder." + folder.id);
@Override
public void onChanged(final List<EntityOperation> operations) {
public void onChanged(final List<TupleOperationEx> operations) {
boolean process = false;
List<Long> ops = new ArrayList<>();
for (EntityOperation op : operations) {
@ -1123,9 +1123,10 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
Log.i(folder.name + " operations=" + operations.size() +
" init=" + folder.initialize + " poll=" + folder.poll);
executor.submit(new Runnable() {
executor.submit(new Helper.PriorityRunnable(operations.get(0).priority) {
@Override
public void run() {
super.run();
try {
wlFolder.acquire();
Log.i(folder.name + " process");

View File

@ -22,6 +22,7 @@ package eu.faircode.email;
import java.util.Objects;
public class TupleOperationEx extends EntityOperation {
public int priority;
public String accountName;
public String folderName;
public boolean synchronize;
@ -31,7 +32,8 @@ public class TupleOperationEx extends EntityOperation {
if (obj instanceof TupleOperationEx) {
TupleOperationEx other = (TupleOperationEx) obj;
return (super.equals(obj) &&
Objects.equals(accountName, other.accountName) &&
this.priority == other.priority &&
Objects.equals(this.accountName, other.accountName) &&
Objects.equals(this.folderName, other.folderName) &&
this.synchronize == other.synchronize);
} else