diff --git a/app/src/main/java/androidx/room/InvalidationTracker.kt b/app/src/main/java/androidx/room/InvalidationTracker.kt index 38067b702f..76cf95815c 100644 --- a/app/src/main/java/androidx/room/InvalidationTracker.kt +++ b/app/src/main/java/androidx/room/InvalidationTracker.kt @@ -405,10 +405,14 @@ open class InvalidationTracker @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX private fun checkUpdatedTable(): Set { val invalidatedTableIds = buildSet { - database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor -> - while (cursor.moveToNext()) { - add(cursor.getInt(0)) + try { + database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor -> + while (cursor.moveToNext()) { + add(cursor.getInt(0)) + } } + } catch (ex: Throwable) { + eu.faircode.email.Log.w(ex) } } if (invalidatedTableIds.isNotEmpty()) { diff --git a/app/src/main/java/androidx/room/RoomTrackingLiveData.kt b/app/src/main/java/androidx/room/RoomTrackingLiveData.kt index 171b57d16e..85c14c1a93 100644 --- a/app/src/main/java/androidx/room/RoomTrackingLiveData.kt +++ b/app/src/main/java/androidx/room/RoomTrackingLiveData.kt @@ -54,62 +54,70 @@ internal class RoomTrackingLiveData ( val invalid = AtomicBoolean(true) val computing = AtomicBoolean(false) val registeredObserver = AtomicBoolean(false) + val queued = eu.faircode.email.ObjectHolder(0) + val lock = Object() val refreshRunnable = Runnable { - if (registeredObserver.compareAndSet(false, true)) { - database.invalidationTracker.addWeakObserver(observer) - } - var computed: Boolean - do { - computed = false - // compute can happen only in 1 thread but no reason to lock others. - if (computing.compareAndSet(false, true)) { - // as long as it is invalid, keep computing. - try { - var value: T? = null - while (invalid.compareAndSet(true, false)) { - computed = true - try { - value = computeFunction.call() - } catch (e: Exception) { - throw RuntimeException( - "Exception while computing database live data.", - e - ) - } - } - if (computed) { - postValue(value) - } - } finally { - // release compute lock - computing.set(false) - } - } - // check invalid after releasing compute lock to avoid the following scenario. - // Thread A runs compute() - // Thread A checks invalid, it is false - // Main thread sets invalid to true - // Thread B runs, fails to acquire compute lock and skips - // Thread A releases compute lock - // We've left invalid in set state. The check below recovers. - } while (computed && invalid.get()) - } + synchronized(lock) { + queued.value-- + if (queued.value < 0) { + eu.faircode.email.Log.e("$computeFunction queued=$queued.value") + queued.value = 0 + } + } + + if (registeredObserver.compareAndSet(false, true)) { + database.invalidationTracker.addWeakObserver(observer) + } + + var value: T? = null + var computed = false + synchronized(computeFunction) { + var retry = 0 + while (!computed) { + try { + value = computeFunction.call() + computed = true + } catch (e: Throwable) { + if (++retry > 5) { + eu.faircode.email.Log.e(e) + break + } + eu.faircode.email.Log.w(e) + try { + Thread.sleep(2000L) + } catch (ignored: InterruptedException) { + } + } + } + } + if (computed) { + postValue(value) + } + } val invalidationRunnable = Runnable { - val isActive = hasActiveObservers() - if (invalid.compareAndSet(false, true)) { - if (isActive) { - queryExecutor.execute(refreshRunnable) - } - } - } + val isActive = hasActiveObservers() + if (isActive) { + synchronized(lock) { + if (queued.value > 0) { + eu.faircode.email.Log.persist(eu.faircode.email.EntityLog.Type.Debug, "$computeFunction queued=$queued.value") + } else { + queued.value++ + queryExecutor.execute(refreshRunnable) + } + } + } + } @Suppress("UNCHECKED_CAST") override fun onActive() { - super.onActive() - container.onActive(this as LiveData) - queryExecutor.execute(refreshRunnable) - } + super.onActive() + container.onActive(this as LiveData) + synchronized(lock) { + queued.value++ + queryExecutor.execute(refreshRunnable) + } + } @Suppress("UNCHECKED_CAST") override fun onInactive() { diff --git a/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java b/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java index 2b5c391dbc..077ae233e8 100644 --- a/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java +++ b/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java @@ -153,7 +153,7 @@ public abstract class LimitOffsetDataSource extends androidx.paging.Positiona @NonNull LoadInitialCallback callback) { registerObserverIfNecessary(); List list = Collections.emptyList(); - int totalCount; + int totalCount = 0; int firstLoadPosition = 0; RoomSQLiteQuery sqLiteQuery = null; Cursor cursor = null; @@ -171,6 +171,8 @@ public abstract class LimitOffsetDataSource extends androidx.paging.Positiona mDb.setTransactionSuccessful(); list = rows; } + } catch (Throwable ex) { + eu.faircode.email.Log.w(ex); } finally { if (cursor != null) { cursor.close(); diff --git a/patches/LimitOffsetDataSource.patch b/patches/LimitOffsetDataSource.patch deleted file mode 100644 index 5931692424..0000000000 --- a/patches/LimitOffsetDataSource.patch +++ /dev/null @@ -1,30 +0,0 @@ ---- /home/marcel/support/room/runtime/src/main/java/androidx/room/paging/LimitOffsetDataSource.java 2020-05-18 15:59:35.380887546 +0200 -+++ /home/marcel/email/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java 2020-06-15 16:11:37.701097921 +0200 -@@ -20,6 +20,7 @@ import android.database.Cursor; - - import androidx.annotation.NonNull; - import androidx.annotation.RestrictTo; -+import androidx.paging.PositionalDataSource; - import androidx.room.InvalidationTracker; - import androidx.room.RoomDatabase; - import androidx.room.RoomSQLiteQuery; -@@ -42,9 +43,8 @@ import java.util.Set; - * - * @hide - */ --@SuppressWarnings("deprecation") - @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX) --public abstract class LimitOffsetDataSource extends androidx.paging.PositionalDataSource { -+public abstract class LimitOffsetDataSource extends PositionalDataSource { - private final RoomSQLiteQuery mSourceQuery; - private final String mCountQuery; - private final String mLimitOffsetQuery; -@@ -128,6 +128,8 @@ public abstract class LimitOffsetDataSou - mDb.setTransactionSuccessful(); - list = rows; - } -+ } catch (Throwable ex) { -+ eu.faircode.email.Log.w(ex); - } finally { - if (cursor != null) { - cursor.close(); diff --git a/patches/room.patch b/patches/room.patch deleted file mode 100644 index 0bc877366d..0000000000 --- a/patches/room.patch +++ /dev/null @@ -1,130 +0,0 @@ -diff --git a/app/src/main/java/androidx/room/InvalidationTracker.java b/app/src/main/java/androidx/room/InvalidationTracker.java -index 99e69154b1..3a0c5ca530 100644 ---- a/app/src/main/java/androidx/room/InvalidationTracker.java -+++ b/app/src/main/java/androidx/room/InvalidationTracker.java -@@ -464,8 +464,6 @@ public class InvalidationTracker { - final int tableId = cursor.getInt(0); - invalidatedTableIds.add(tableId); - } -- } catch (Throwable ex) { -- eu.faircode.email.Log.w(ex); - } finally { - cursor.close(); - } -diff --git a/app/src/main/java/androidx/room/RoomTrackingLiveData.java b/app/src/main/java/androidx/room/RoomTrackingLiveData.java -index 10c4fbcf8b..8df1014a41 100644 ---- a/app/src/main/java/androidx/room/RoomTrackingLiveData.java -+++ b/app/src/main/java/androidx/room/RoomTrackingLiveData.java -@@ -29,9 +29,6 @@ import java.util.Set; - import java.util.concurrent.Callable; - import java.util.concurrent.Executor; - import java.util.concurrent.atomic.AtomicBoolean; --import java.util.concurrent.atomic.AtomicInteger; -- --import eu.faircode.email.EntityLog; - - /** - * A LiveData implementation that closely works with {@link InvalidationTracker} to implement -@@ -62,8 +59,11 @@ class RoomTrackingLiveData extends LiveData { - @SuppressWarnings("WeakerAccess") - final InvalidationTracker.Observer mObserver; - -- final AtomicInteger queued = new AtomicInteger(0); -- final AtomicInteger running = new AtomicInteger(0); -+ @SuppressWarnings("WeakerAccess") -+ final AtomicBoolean mInvalid = new AtomicBoolean(true); -+ -+ @SuppressWarnings("WeakerAccess") -+ final AtomicBoolean mComputing = new AtomicBoolean(false); - - @SuppressWarnings("WeakerAccess") - final AtomicBoolean mRegisteredObserver = new AtomicBoolean(false); -@@ -76,37 +76,39 @@ class RoomTrackingLiveData extends LiveData { - if (mRegisteredObserver.compareAndSet(false, true)) { - mDatabase.getInvalidationTracker().addWeakObserver(mObserver); - } -- try { -- running.incrementAndGet(); -- -- T value = null; -- boolean computed = false; -- synchronized (mComputeFunction) { -- int retry = 0; -- while (!computed) { -- try { -- value = mComputeFunction.call(); -+ boolean computed; -+ do { -+ computed = false; -+ // compute can happen only in 1 thread but no reason to lock others. -+ if (mComputing.compareAndSet(false, true)) { -+ // as long as it is invalid, keep computing. -+ try { -+ T value = null; -+ while (mInvalid.compareAndSet(true, false)) { - computed = true; -- } catch (Throwable e) { -- if (++retry > 5) { -- eu.faircode.email.Log.e(e); -- break; -- } -- eu.faircode.email.Log.w(e); - try { -- Thread.sleep(2000L); -- } catch (InterruptedException ignored) { -+ value = mComputeFunction.call(); -+ } catch (Exception e) { -+ throw new RuntimeException("Exception while computing database" -+ + " live data.", e); - } - } -+ if (computed) { -+ postValue(value); -+ } -+ } finally { -+ // release compute lock -+ mComputing.set(false); - } - } -- if (computed) { -- postValue(value); -- } -- } finally { -- queued.decrementAndGet(); -- running.decrementAndGet(); -- } -+ // check invalid after releasing compute lock to avoid the following scenario. -+ // Thread A runs compute() -+ // Thread A checks invalid, it is false -+ // Main thread sets invalid to true -+ // Thread B runs, fails to acquire compute lock and skips -+ // Thread A releases compute lock -+ // We've left invalid in set state. The check below recovers. -+ } while (computed && mInvalid.get()); - } - }; - -@@ -115,19 +117,14 @@ class RoomTrackingLiveData extends LiveData { - @MainThread - @Override - public void run() { -- if (running.get() == 0 && queued.get() > 0) { -- eu.faircode.email.Log.persist(EntityLog.Type.Debug, -- mComputeFunction + " running=" + running + " queued=" + queued); -- return; -- } - boolean isActive = hasActiveObservers(); -- if (isActive) { -- queued.incrementAndGet(); -- getQueryExecutor().execute(mRefreshRunnable); -+ if (mInvalid.compareAndSet(false, true)) { -+ if (isActive) { -+ getQueryExecutor().execute(mRefreshRunnable); -+ } - } - } - }; -- - @SuppressLint("RestrictedApi") - RoomTrackingLiveData( - RoomDatabase database, diff --git a/patches/roomkt.patch b/patches/roomkt.patch new file mode 100644 index 0000000000..bffa30ec1a --- /dev/null +++ b/patches/roomkt.patch @@ -0,0 +1,169 @@ +diff --git a/app/src/main/java/androidx/room/InvalidationTracker.kt b/app/src/main/java/androidx/room/InvalidationTracker.kt +index 38067b702f..76cf95815c 100644 +--- a/app/src/main/java/androidx/room/InvalidationTracker.kt ++++ b/app/src/main/java/androidx/room/InvalidationTracker.kt +@@ -405,10 +405,14 @@ open class InvalidationTracker @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX + + private fun checkUpdatedTable(): Set { + val invalidatedTableIds = buildSet { +- database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor -> +- while (cursor.moveToNext()) { +- add(cursor.getInt(0)) ++ try { ++ database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor -> ++ while (cursor.moveToNext()) { ++ add(cursor.getInt(0)) ++ } + } ++ } catch (ex: Throwable) { ++ eu.faircode.email.Log.w(ex) + } + } + if (invalidatedTableIds.isNotEmpty()) { +diff --git a/app/src/main/java/androidx/room/RoomTrackingLiveData.kt b/app/src/main/java/androidx/room/RoomTrackingLiveData.kt +index 171b57d16e..85c14c1a93 100644 +--- a/app/src/main/java/androidx/room/RoomTrackingLiveData.kt ++++ b/app/src/main/java/androidx/room/RoomTrackingLiveData.kt +@@ -54,62 +54,70 @@ internal class RoomTrackingLiveData ( + val invalid = AtomicBoolean(true) + val computing = AtomicBoolean(false) + val registeredObserver = AtomicBoolean(false) ++ val queued = eu.faircode.email.ObjectHolder(0) ++ val lock = Object() + val refreshRunnable = Runnable { +- if (registeredObserver.compareAndSet(false, true)) { +- database.invalidationTracker.addWeakObserver(observer) +- } +- var computed: Boolean +- do { +- computed = false +- // compute can happen only in 1 thread but no reason to lock others. +- if (computing.compareAndSet(false, true)) { +- // as long as it is invalid, keep computing. +- try { +- var value: T? = null +- while (invalid.compareAndSet(true, false)) { +- computed = true +- try { +- value = computeFunction.call() +- } catch (e: Exception) { +- throw RuntimeException( +- "Exception while computing database live data.", +- e +- ) +- } +- } +- if (computed) { +- postValue(value) +- } +- } finally { +- // release compute lock +- computing.set(false) +- } +- } +- // check invalid after releasing compute lock to avoid the following scenario. +- // Thread A runs compute() +- // Thread A checks invalid, it is false +- // Main thread sets invalid to true +- // Thread B runs, fails to acquire compute lock and skips +- // Thread A releases compute lock +- // We've left invalid in set state. The check below recovers. +- } while (computed && invalid.get()) +- } ++ synchronized(lock) { ++ queued.value-- ++ if (queued.value < 0) { ++ eu.faircode.email.Log.e("$computeFunction queued=$queued.value") ++ queued.value = 0 ++ } ++ } ++ ++ if (registeredObserver.compareAndSet(false, true)) { ++ database.invalidationTracker.addWeakObserver(observer) ++ } ++ ++ var value: T? = null ++ var computed = false ++ synchronized(computeFunction) { ++ var retry = 0 ++ while (!computed) { ++ try { ++ value = computeFunction.call() ++ computed = true ++ } catch (e: Throwable) { ++ if (++retry > 5) { ++ eu.faircode.email.Log.e(e) ++ break ++ } ++ eu.faircode.email.Log.w(e) ++ try { ++ Thread.sleep(2000L) ++ } catch (ignored: InterruptedException) { ++ } ++ } ++ } ++ } ++ if (computed) { ++ postValue(value) ++ } ++ } + + val invalidationRunnable = Runnable { +- val isActive = hasActiveObservers() +- if (invalid.compareAndSet(false, true)) { +- if (isActive) { +- queryExecutor.execute(refreshRunnable) +- } +- } +- } ++ val isActive = hasActiveObservers() ++ if (isActive) { ++ synchronized(lock) { ++ if (queued.value > 0) { ++ eu.faircode.email.Log.persist(eu.faircode.email.EntityLog.Type.Debug, "$computeFunction queued=$queued.value") ++ } else { ++ queued.value++ ++ queryExecutor.execute(refreshRunnable) ++ } ++ } ++ } ++ } + + @Suppress("UNCHECKED_CAST") + override fun onActive() { +- super.onActive() +- container.onActive(this as LiveData) +- queryExecutor.execute(refreshRunnable) +- } ++ super.onActive() ++ container.onActive(this as LiveData) ++ synchronized(lock) { ++ queued.value++ ++ queryExecutor.execute(refreshRunnable) ++ } ++ } + + @Suppress("UNCHECKED_CAST") + override fun onInactive() { +diff --git a/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java b/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java +index 2b5c391dbc..077ae233e8 100644 +--- a/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java ++++ b/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java +@@ -153,7 +153,7 @@ public abstract class LimitOffsetDataSource extends androidx.paging.Positiona + @NonNull LoadInitialCallback callback) { + registerObserverIfNecessary(); + List list = Collections.emptyList(); +- int totalCount; ++ int totalCount = 0; + int firstLoadPosition = 0; + RoomSQLiteQuery sqLiteQuery = null; + Cursor cursor = null; +@@ -171,6 +171,8 @@ public abstract class LimitOffsetDataSource extends androidx.paging.Positiona + mDb.setTransactionSuccessful(); + list = rows; + } ++ } catch (Throwable ex) { ++ eu.faircode.email.Log.w(ex); + } finally { + if (cursor != null) { + cursor.close();