From a980b0a76f47d227539fac5c55687810722153e7 Mon Sep 17 00:00:00 2001 From: M66B Date: Sat, 4 Aug 2018 22:35:47 +0000 Subject: [PATCH] Rewritten operation processing --- .../java/eu/faircode/email/DaoOperation.java | 6 + .../eu/faircode/email/EntityOperation.java | 5 +- .../eu/faircode/email/ServiceSynchronize.java | 412 ++++++++++-------- 3 files changed, 244 insertions(+), 179 deletions(-) diff --git a/app/src/main/java/eu/faircode/email/DaoOperation.java b/app/src/main/java/eu/faircode/email/DaoOperation.java index a3fd36f3a0..8d0f359e36 100644 --- a/app/src/main/java/eu/faircode/email/DaoOperation.java +++ b/app/src/main/java/eu/faircode/email/DaoOperation.java @@ -37,6 +37,12 @@ public interface DaoOperation { " ORDER BY operation.id") List getOperations(long folder); + @Query("SELECT COUNT(operation.id) FROM operation" + + " JOIN message ON message.id = operation.message" + + " WHERE folder = :folder" + + " ORDER BY operation.id") + int getOperationCount(long folder); + @Query("DELETE FROM operation WHERE id = :id") void deleteOperation(long id); diff --git a/app/src/main/java/eu/faircode/email/EntityOperation.java b/app/src/main/java/eu/faircode/email/EntityOperation.java index cdaa32e7cc..8b19b14b14 100644 --- a/app/src/main/java/eu/faircode/email/EntityOperation.java +++ b/app/src/main/java/eu/faircode/email/EntityOperation.java @@ -99,6 +99,9 @@ public class EntityOperation { LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(context); lbm.sendBroadcast( - new Intent(ServiceSynchronize.ACTION_PROCESS_OPERATIONS + message.folder)); + new Intent(SEND.equals(name) + ? ServiceSynchronize.ACTION_PROCESS_OUTBOX + : ServiceSynchronize.ACTION_PROCESS_FOLDER) + .putExtra("folder", message.folder)); } } diff --git a/app/src/main/java/eu/faircode/email/ServiceSynchronize.java b/app/src/main/java/eu/faircode/email/ServiceSynchronize.java index 3c1fdd872e..6c5703320e 100644 --- a/app/src/main/java/eu/faircode/email/ServiceSynchronize.java +++ b/app/src/main/java/eu/faircode/email/ServiceSynchronize.java @@ -56,7 +56,9 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -98,7 +100,8 @@ public class ServiceSynchronize extends LifecycleService { private static final int FETCH_BATCH_SIZE = 10; private static final int DOWNLOAD_BUFFER_SIZE = 8192; // bytes - static final String ACTION_PROCESS_OPERATIONS = BuildConfig.APPLICATION_ID + ".PROCESS_OPERATIONS."; + static final String ACTION_PROCESS_OUTBOX = BuildConfig.APPLICATION_ID + ".PROCESS_OUTBOX"; + static final String ACTION_PROCESS_FOLDER = BuildConfig.APPLICATION_ID + ".PROCESS_FOLDER"; private class ServiceState { boolean running = false; @@ -278,6 +281,7 @@ public class ServiceSynchronize extends LifecycleService { // Listen for connection changes istore.addConnectionListener(new ConnectionAdapter() { List folderThreads = new ArrayList<>(); + Map mapFolder = new HashMap<>(); @Override public void opened(ConnectionEvent e) { @@ -291,8 +295,25 @@ public class ServiceSynchronize extends LifecycleService { Thread thread = new Thread(new Runnable() { @Override public void run() { + IMAPFolder ifolder = null; try { - monitorFolder(account, folder, fstore); + Log.i(Helper.TAG, folder.name + " start"); + + ifolder = (IMAPFolder) fstore.getFolder(folder.name); + ifolder.open(Folder.READ_WRITE); + + synchronized (mapFolder) { + mapFolder.put(folder.id, ifolder); + } + + monitorFolder(account, folder, fstore, ifolder); + + } catch (FolderNotFoundException ex) { + Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + + // Disable synchronization + folder.synchronize = false; + DB.getInstance(ServiceSynchronize.this).folder().updateFolder(folder); } catch (Throwable ex) { Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); reportError(account.name, folder.name, ex); @@ -303,12 +324,30 @@ public class ServiceSynchronize extends LifecycleService { } catch (MessagingException e1) { Log.w(Helper.TAG, account.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); } + } finally { + if (ifolder != null && ifolder.isOpen()) { + try { + ifolder.close(false); + } catch (MessagingException ex) { + Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + } + } + Log.i(Helper.TAG, folder.name + " stop"); } } }, "sync.folder." + folder.id); folderThreads.add(thread); thread.start(); } + + LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); + lbm.registerReceiver(offline, new IntentFilter(ACTION_PROCESS_FOLDER)); + Log.i(Helper.TAG, "listen process folder"); + for (final EntityFolder folder : db.folder().getFolders(account.id, false)) + if (!EntityFolder.TYPE_OUTBOX.equals(folder.type) && + db.operation().getOperationCount(folder.id) > 0) + lbm.sendBroadcast(new Intent(ACTION_PROCESS_FOLDER).putExtra("folder", folder.id)); + } catch (Throwable ex) { Log.e(Helper.TAG, account.name + " " + ex + "\n" + Log.getStackTraceString(ex)); reportError(account.name, null, ex); @@ -326,6 +365,13 @@ public class ServiceSynchronize extends LifecycleService { public void disconnected(ConnectionEvent e) { Log.e(Helper.TAG, account.name + " disconnected"); + LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); + lbm.unregisterReceiver(offline); + + synchronized (mapFolder) { + mapFolder.clear(); + } + // Check connection synchronized (state) { state.notifyAll(); @@ -336,11 +382,71 @@ public class ServiceSynchronize extends LifecycleService { public void closed(ConnectionEvent e) { Log.e(Helper.TAG, account.name + " closed"); + LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); + lbm.unregisterReceiver(offline); + + synchronized (mapFolder) { + mapFolder.clear(); + } + // Check connection synchronized (state) { state.notifyAll(); } } + + BroadcastReceiver offline = new BroadcastReceiver() { + @Override + public void onReceive(Context context, Intent intent) { + final long fid = intent.getLongExtra("folder", -1); + + IMAPFolder x; + synchronized (mapFolder) { + x = mapFolder.get(fid); + } + final boolean shouldClose = (x == null); + final IMAPFolder ffolder = x; + + Log.i(Helper.TAG, "run operations folder=" + fid + " offline=" + shouldClose); + executor.submit(new Runnable() { + @Override + public void run() { + EntityFolder folder = DB.getInstance(ServiceSynchronize.this).folder().getFolder(fid); + IMAPFolder ifolder = ffolder; + try { + Log.i(Helper.TAG, folder.name + " start operations"); + if (ifolder == null) { + ifolder = (IMAPFolder) fstore.getFolder(folder.name); + ifolder.open(Folder.READ_WRITE); + } + + processOperations(folder, fstore, ifolder); + } catch (Throwable ex) { + Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + reportError(account.name, folder.name, ex); + + // Cascade up + try { + fstore.close(); + } catch (MessagingException e1) { + Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); + } + } finally { + Log.i(Helper.TAG, folder.name + " start operations"); + if (shouldClose) + if (ifolder != null && ifolder.isOpen()) { + try { + ifolder.close(false); + } catch (MessagingException ex) { + Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + } + } + Log.i(Helper.TAG, folder.name + " stop operations"); + } + } + }); + } + }; }); // Initiate connection @@ -392,181 +498,127 @@ public class ServiceSynchronize extends LifecycleService { Log.i(Helper.TAG, account.name + " stopped"); } - private void monitorFolder(final EntityAccount account, final EntityFolder folder, final IMAPStore istore) throws MessagingException, JSONException, IOException { - IMAPFolder ifolder = null; - try { - Log.i(Helper.TAG, folder.name + " start"); - - ifolder = (IMAPFolder) istore.getFolder(folder.name); - final IMAPFolder ffolder = ifolder; - ifolder.open(Folder.READ_WRITE); - - // Listen for new and deleted messages - ifolder.addMessageCountListener(new MessageCountAdapter() { - @Override - public void messagesAdded(MessageCountEvent e) { - try { - Log.i(Helper.TAG, folder.name + " messages added"); - for (Message imessage : e.getMessages()) - synchronizeMessage(folder, ffolder, (IMAPMessage) imessage); - } catch (MessageRemovedException ex) { - Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - } catch (Throwable ex) { - Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - reportError(account.name, folder.name, ex); - - // Cascade up - try { - istore.close(); - } catch (MessagingException e1) { - Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); - } - } - } - - @Override - public void messagesRemoved(MessageCountEvent e) { - try { - Log.i(Helper.TAG, folder.name + " messages removed"); - for (Message imessage : e.getMessages()) - try { - long uid = ffolder.getUID(imessage); - DB db = DB.getInstance(ServiceSynchronize.this); - db.message().deleteMessage(folder.id, uid); - Log.i(Helper.TAG, "Deleted uid=" + uid); - } catch (MessageRemovedException ex) { - Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - } - } catch (Throwable ex) { - Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - reportError(account.name, folder.name, ex); - - // Cascade up - try { - istore.close(); - } catch (MessagingException e1) { - Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); - } - } - } - }); - - // Fetch e-mail - synchronizeMessages(folder, ifolder); - - // Flags (like "seen") at the remote could be changed while synchronizing - - // Listen for changed messages - ifolder.addMessageChangedListener(new MessageChangedListener() { - @Override - public void messageChanged(MessageChangedEvent e) { - try { - Log.i(Helper.TAG, folder.name + " message changed"); - synchronizeMessage(folder, ffolder, (IMAPMessage) e.getMessage()); - } catch (MessageRemovedException ex) { - Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - } catch (Throwable ex) { - Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - reportError(account.name, folder.name, ex); - - // Cascade up - try { - istore.close(); - } catch (MessagingException e1) { - Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); - } - } - } - }); - - BroadcastReceiver receiver = new BroadcastReceiver() { - @Override - public void onReceive(Context context, Intent intent) { - Log.i(Helper.TAG, folder.name + " submit process id=" + folder.id); - executor.submit(new Runnable() { - @Override - public void run() { - try { - synchronized (folder) { - processOperations(folder, istore, ffolder); - } - } catch (Throwable ex) { - Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - reportError(account.name, folder.name, ex); - - // Cascade up - try { - istore.close(); - } catch (MessagingException e1) { - Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); - } - } - } - }); - } - }; - - // Listen for process operations requests - LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(this); - lbm.registerReceiver(receiver, new IntentFilter(ACTION_PROCESS_OPERATIONS + folder.id)); - Log.i(Helper.TAG, folder.name + " listen process id=" + folder.id); - try { - lbm.sendBroadcast(new Intent(ACTION_PROCESS_OPERATIONS + folder.id)); - - // Keep alive - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - boolean open; - do { - try { - Thread.sleep(NOOP_INTERVAL); - } catch (InterruptedException ex) { - Log.w(Helper.TAG, folder.name + " " + ex.toString()); - } - open = ffolder.isOpen(); - if (open) - noop(folder, ffolder); - } while (open); - } catch (Throwable ex) { - Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - reportError(account.name, folder.name, ex); - - // Cascade up - try { - istore.close(); - } catch (MessagingException e1) { - Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); - } - } - } - }, "sync.noop." + folder.id); - thread.start(); - - // Idle - while (state.running) { - Log.i(Helper.TAG, folder.name + " start idle"); - ifolder.idle(false); - Log.i(Helper.TAG, folder.name + " end idle"); - } - } finally { - lbm.unregisterReceiver(receiver); - Log.i(Helper.TAG, folder.name + " unlisten process id=" + folder.id); - } - } catch (FolderNotFoundException ex) { - Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); - folder.synchronize = false; - DB.getInstance(this).folder().updateFolder(folder); - } finally { - if (ifolder != null && ifolder.isOpen()) { + private void monitorFolder(final EntityAccount account, final EntityFolder folder, final IMAPStore istore, final IMAPFolder ifolder) throws MessagingException, JSONException, IOException { + // Listen for new and deleted messages + ifolder.addMessageCountListener(new MessageCountAdapter() { + @Override + public void messagesAdded(MessageCountEvent e) { try { - ifolder.close(false); - } catch (MessagingException ex) { + Log.i(Helper.TAG, folder.name + " messages added"); + for (Message imessage : e.getMessages()) + synchronizeMessage(folder, ifolder, (IMAPMessage) imessage); + } catch (MessageRemovedException ex) { Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + } catch (Throwable ex) { + Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + reportError(account.name, folder.name, ex); + + // Cascade up + try { + istore.close(); + } catch (MessagingException e1) { + Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); + } } } - Log.i(Helper.TAG, folder.name + " stop"); + + @Override + public void messagesRemoved(MessageCountEvent e) { + try { + Log.i(Helper.TAG, folder.name + " messages removed"); + for (Message imessage : e.getMessages()) + try { + long uid = ifolder.getUID(imessage); + DB db = DB.getInstance(ServiceSynchronize.this); + db.message().deleteMessage(folder.id, uid); + Log.i(Helper.TAG, "Deleted uid=" + uid); + } catch (MessageRemovedException ex) { + Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + } + } catch (Throwable ex) { + Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + reportError(account.name, folder.name, ex); + + // Cascade up + try { + istore.close(); + } catch (MessagingException e1) { + Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); + } + } + } + }); + + // Fetch e-mail + synchronizeMessages(folder, ifolder); + + // Flags (like "seen") at the remote could be changed while synchronizing + + // Listen for changed messages + ifolder.addMessageChangedListener(new MessageChangedListener() { + @Override + public void messageChanged(MessageChangedEvent e) { + try { + Log.i(Helper.TAG, folder.name + " message changed"); + synchronizeMessage(folder, ifolder, (IMAPMessage) e.getMessage()); + } catch (MessageRemovedException ex) { + Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + } catch (Throwable ex) { + Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + reportError(account.name, folder.name, ex); + + // Cascade up + try { + istore.close(); + } catch (MessagingException e1) { + Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); + } + } + } + }); + + // Listen for process operations requests + Log.i(Helper.TAG, folder.name + " start"); + try { + // Keep alive + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean open; + do { + try { + Thread.sleep(NOOP_INTERVAL); + } catch (InterruptedException ex) { + Log.w(Helper.TAG, folder.name + " " + ex.toString()); + } + open = ifolder.isOpen(); + if (open) + noop(folder, ifolder); + } while (open); + } catch (Throwable ex) { + Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); + reportError(account.name, folder.name, ex); + + // Cascade up + try { + istore.close(); + } catch (MessagingException e1) { + Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); + } + } + } + }, "sync.noop." + folder.id); + thread.start(); + + // Idle + while (state.running) { + Log.i(Helper.TAG, folder.name + " start idle"); + ifolder.idle(false); + Log.i(Helper.TAG, folder.name + " end idle"); + } + } finally { + Log.i(Helper.TAG, folder.name + " end"); } } @@ -693,6 +745,7 @@ public class ServiceSynchronize extends LifecycleService { } finally { itransport.close(); } + // TODO: cache transport? } else if (EntityOperation.ATTACHMENT.equals(op.name)) { int sequence = jargs.getInt(0); @@ -1026,9 +1079,9 @@ public class ServiceSynchronize extends LifecycleService { outbox = db.folder().getOutbox(); if (outbox != null) { LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); - lbm.registerReceiver(receiverOutbox, new IntentFilter(ACTION_PROCESS_OPERATIONS + outbox.id)); - Log.i(Helper.TAG, outbox.name + " listen process id=" + outbox.id); - lbm.sendBroadcast(new Intent(ACTION_PROCESS_OPERATIONS + outbox.id)); + lbm.registerReceiver(receiverOutbox, new IntentFilter(ACTION_PROCESS_OUTBOX)); + Log.i(Helper.TAG, outbox.name + " listen operations"); + lbm.sendBroadcast(new Intent(ACTION_PROCESS_OUTBOX)); } } @@ -1052,24 +1105,27 @@ public class ServiceSynchronize extends LifecycleService { if (outbox != null) { LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); lbm.unregisterReceiver(receiverOutbox); - Log.i(Helper.TAG, outbox.name + " unlisten process id=" + outbox.id); + Log.i(Helper.TAG, outbox.name + " unlisten operations"); } } BroadcastReceiver receiverOutbox = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { - Log.i(Helper.TAG, outbox.name + " submit process id=" + outbox.id); + Log.i(Helper.TAG, outbox.name + " run operations"); executor.submit(new Runnable() { @Override public void run() { try { + Log.i(Helper.TAG, outbox.name + " start operations"); synchronized (outbox) { processOperations(outbox, null, null); } } catch (Throwable ex) { Log.e(Helper.TAG, outbox.name + " " + ex + "\n" + Log.getStackTraceString(ex)); reportError(null, outbox.name, ex); + } finally { + Log.i(Helper.TAG, outbox.name + " end operations"); } } });