mirror of https://github.com/M66B/FairEmail.git
parent
5b210a238e
commit
e6dff96061
|
@ -405,14 +405,10 @@ open class InvalidationTracker @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX
|
|||
|
||||
private fun checkUpdatedTable(): Set<Int> {
|
||||
val invalidatedTableIds = buildSet {
|
||||
try {
|
||||
database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor ->
|
||||
while (cursor.moveToNext()) {
|
||||
add(cursor.getInt(0))
|
||||
}
|
||||
database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor ->
|
||||
while (cursor.moveToNext()) {
|
||||
add(cursor.getInt(0))
|
||||
}
|
||||
} catch (ex: Throwable) {
|
||||
eu.faircode.email.Log.w(ex)
|
||||
}
|
||||
}
|
||||
if (invalidatedTableIds.isNotEmpty()) {
|
||||
|
|
|
@ -54,70 +54,62 @@ internal class RoomTrackingLiveData<T> (
|
|||
val invalid = AtomicBoolean(true)
|
||||
val computing = AtomicBoolean(false)
|
||||
val registeredObserver = AtomicBoolean(false)
|
||||
val queued = eu.faircode.email.ObjectHolder<Int>(0)
|
||||
val lock = Object()
|
||||
val refreshRunnable = Runnable {
|
||||
synchronized(lock) {
|
||||
queued.value--
|
||||
if (queued.value < 0) {
|
||||
eu.faircode.email.Log.e("$computeFunction queued=$queued.value")
|
||||
queued.value = 0
|
||||
}
|
||||
}
|
||||
|
||||
if (registeredObserver.compareAndSet(false, true)) {
|
||||
database.invalidationTracker.addWeakObserver(observer)
|
||||
}
|
||||
|
||||
var value: T? = null
|
||||
var computed = false
|
||||
synchronized(computeFunction) {
|
||||
var retry = 0
|
||||
while (!computed) {
|
||||
try {
|
||||
value = computeFunction.call()
|
||||
computed = true
|
||||
} catch (e: Throwable) {
|
||||
if (++retry > 5) {
|
||||
eu.faircode.email.Log.e(e)
|
||||
break
|
||||
}
|
||||
eu.faircode.email.Log.w(e)
|
||||
try {
|
||||
Thread.sleep(2000L)
|
||||
} catch (ignored: InterruptedException) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (computed) {
|
||||
postValue(value)
|
||||
}
|
||||
}
|
||||
if (registeredObserver.compareAndSet(false, true)) {
|
||||
database.invalidationTracker.addWeakObserver(observer)
|
||||
}
|
||||
var computed: Boolean
|
||||
do {
|
||||
computed = false
|
||||
// compute can happen only in 1 thread but no reason to lock others.
|
||||
if (computing.compareAndSet(false, true)) {
|
||||
// as long as it is invalid, keep computing.
|
||||
try {
|
||||
var value: T? = null
|
||||
while (invalid.compareAndSet(true, false)) {
|
||||
computed = true
|
||||
try {
|
||||
value = computeFunction.call()
|
||||
} catch (e: Exception) {
|
||||
throw RuntimeException(
|
||||
"Exception while computing database live data.",
|
||||
e
|
||||
)
|
||||
}
|
||||
}
|
||||
if (computed) {
|
||||
postValue(value)
|
||||
}
|
||||
} finally {
|
||||
// release compute lock
|
||||
computing.set(false)
|
||||
}
|
||||
}
|
||||
// check invalid after releasing compute lock to avoid the following scenario.
|
||||
// Thread A runs compute()
|
||||
// Thread A checks invalid, it is false
|
||||
// Main thread sets invalid to true
|
||||
// Thread B runs, fails to acquire compute lock and skips
|
||||
// Thread A releases compute lock
|
||||
// We've left invalid in set state. The check below recovers.
|
||||
} while (computed && invalid.get())
|
||||
}
|
||||
|
||||
val invalidationRunnable = Runnable {
|
||||
val isActive = hasActiveObservers()
|
||||
if (isActive) {
|
||||
synchronized(lock) {
|
||||
if (queued.value > 0) {
|
||||
eu.faircode.email.Log.persist(eu.faircode.email.EntityLog.Type.Debug, "$computeFunction queued=$queued.value")
|
||||
} else {
|
||||
queued.value++
|
||||
queryExecutor.execute(refreshRunnable)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
val isActive = hasActiveObservers()
|
||||
if (invalid.compareAndSet(false, true)) {
|
||||
if (isActive) {
|
||||
queryExecutor.execute(refreshRunnable)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun onActive() {
|
||||
super.onActive()
|
||||
container.onActive(this as LiveData<Any>)
|
||||
synchronized(lock) {
|
||||
queued.value++
|
||||
queryExecutor.execute(refreshRunnable)
|
||||
}
|
||||
}
|
||||
super.onActive()
|
||||
container.onActive(this as LiveData<Any>)
|
||||
queryExecutor.execute(refreshRunnable)
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun onInactive() {
|
||||
|
|
|
@ -153,7 +153,7 @@ public abstract class LimitOffsetDataSource<T> extends androidx.paging.Positiona
|
|||
@NonNull LoadInitialCallback<T> callback) {
|
||||
registerObserverIfNecessary();
|
||||
List<T> list = Collections.emptyList();
|
||||
int totalCount = 0;
|
||||
int totalCount;
|
||||
int firstLoadPosition = 0;
|
||||
RoomSQLiteQuery sqLiteQuery = null;
|
||||
Cursor cursor = null;
|
||||
|
@ -171,8 +171,6 @@ public abstract class LimitOffsetDataSource<T> extends androidx.paging.Positiona
|
|||
mDb.setTransactionSuccessful();
|
||||
list = rows;
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
eu.faircode.email.Log.w(ex);
|
||||
} finally {
|
||||
if (cursor != null) {
|
||||
cursor.close();
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
--- /home/marcel/support/room/runtime/src/main/java/androidx/room/paging/LimitOffsetDataSource.java 2020-05-18 15:59:35.380887546 +0200
|
||||
+++ /home/marcel/email/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java 2020-06-15 16:11:37.701097921 +0200
|
||||
@@ -20,6 +20,7 @@ import android.database.Cursor;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.RestrictTo;
|
||||
+import androidx.paging.PositionalDataSource;
|
||||
import androidx.room.InvalidationTracker;
|
||||
import androidx.room.RoomDatabase;
|
||||
import androidx.room.RoomSQLiteQuery;
|
||||
@@ -42,9 +43,8 @@ import java.util.Set;
|
||||
*
|
||||
* @hide
|
||||
*/
|
||||
-@SuppressWarnings("deprecation")
|
||||
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
|
||||
-public abstract class LimitOffsetDataSource<T> extends androidx.paging.PositionalDataSource<T> {
|
||||
+public abstract class LimitOffsetDataSource<T> extends PositionalDataSource<T> {
|
||||
private final RoomSQLiteQuery mSourceQuery;
|
||||
private final String mCountQuery;
|
||||
private final String mLimitOffsetQuery;
|
||||
@@ -128,6 +128,8 @@ public abstract class LimitOffsetDataSou
|
||||
mDb.setTransactionSuccessful();
|
||||
list = rows;
|
||||
}
|
||||
+ } catch (Throwable ex) {
|
||||
+ eu.faircode.email.Log.w(ex);
|
||||
} finally {
|
||||
if (cursor != null) {
|
||||
cursor.close();
|
|
@ -0,0 +1,130 @@
|
|||
diff --git a/app/src/main/java/androidx/room/InvalidationTracker.java b/app/src/main/java/androidx/room/InvalidationTracker.java
|
||||
index 99e69154b1..3a0c5ca530 100644
|
||||
--- a/app/src/main/java/androidx/room/InvalidationTracker.java
|
||||
+++ b/app/src/main/java/androidx/room/InvalidationTracker.java
|
||||
@@ -464,8 +464,6 @@ public class InvalidationTracker {
|
||||
final int tableId = cursor.getInt(0);
|
||||
invalidatedTableIds.add(tableId);
|
||||
}
|
||||
- } catch (Throwable ex) {
|
||||
- eu.faircode.email.Log.w(ex);
|
||||
} finally {
|
||||
cursor.close();
|
||||
}
|
||||
diff --git a/app/src/main/java/androidx/room/RoomTrackingLiveData.java b/app/src/main/java/androidx/room/RoomTrackingLiveData.java
|
||||
index 10c4fbcf8b..8df1014a41 100644
|
||||
--- a/app/src/main/java/androidx/room/RoomTrackingLiveData.java
|
||||
+++ b/app/src/main/java/androidx/room/RoomTrackingLiveData.java
|
||||
@@ -29,9 +29,6 @@ import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
-import java.util.concurrent.atomic.AtomicInteger;
|
||||
-
|
||||
-import eu.faircode.email.EntityLog;
|
||||
|
||||
/**
|
||||
* A LiveData implementation that closely works with {@link InvalidationTracker} to implement
|
||||
@@ -62,8 +59,11 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
final InvalidationTracker.Observer mObserver;
|
||||
|
||||
- final AtomicInteger queued = new AtomicInteger(0);
|
||||
- final AtomicInteger running = new AtomicInteger(0);
|
||||
+ @SuppressWarnings("WeakerAccess")
|
||||
+ final AtomicBoolean mInvalid = new AtomicBoolean(true);
|
||||
+
|
||||
+ @SuppressWarnings("WeakerAccess")
|
||||
+ final AtomicBoolean mComputing = new AtomicBoolean(false);
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
final AtomicBoolean mRegisteredObserver = new AtomicBoolean(false);
|
||||
@@ -76,37 +76,39 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
|
||||
if (mRegisteredObserver.compareAndSet(false, true)) {
|
||||
mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
|
||||
}
|
||||
- try {
|
||||
- running.incrementAndGet();
|
||||
-
|
||||
- T value = null;
|
||||
- boolean computed = false;
|
||||
- synchronized (mComputeFunction) {
|
||||
- int retry = 0;
|
||||
- while (!computed) {
|
||||
- try {
|
||||
- value = mComputeFunction.call();
|
||||
+ boolean computed;
|
||||
+ do {
|
||||
+ computed = false;
|
||||
+ // compute can happen only in 1 thread but no reason to lock others.
|
||||
+ if (mComputing.compareAndSet(false, true)) {
|
||||
+ // as long as it is invalid, keep computing.
|
||||
+ try {
|
||||
+ T value = null;
|
||||
+ while (mInvalid.compareAndSet(true, false)) {
|
||||
computed = true;
|
||||
- } catch (Throwable e) {
|
||||
- if (++retry > 5) {
|
||||
- eu.faircode.email.Log.e(e);
|
||||
- break;
|
||||
- }
|
||||
- eu.faircode.email.Log.w(e);
|
||||
try {
|
||||
- Thread.sleep(2000L);
|
||||
- } catch (InterruptedException ignored) {
|
||||
+ value = mComputeFunction.call();
|
||||
+ } catch (Exception e) {
|
||||
+ throw new RuntimeException("Exception while computing database"
|
||||
+ + " live data.", e);
|
||||
}
|
||||
}
|
||||
+ if (computed) {
|
||||
+ postValue(value);
|
||||
+ }
|
||||
+ } finally {
|
||||
+ // release compute lock
|
||||
+ mComputing.set(false);
|
||||
}
|
||||
}
|
||||
- if (computed) {
|
||||
- postValue(value);
|
||||
- }
|
||||
- } finally {
|
||||
- queued.decrementAndGet();
|
||||
- running.decrementAndGet();
|
||||
- }
|
||||
+ // check invalid after releasing compute lock to avoid the following scenario.
|
||||
+ // Thread A runs compute()
|
||||
+ // Thread A checks invalid, it is false
|
||||
+ // Main thread sets invalid to true
|
||||
+ // Thread B runs, fails to acquire compute lock and skips
|
||||
+ // Thread A releases compute lock
|
||||
+ // We've left invalid in set state. The check below recovers.
|
||||
+ } while (computed && mInvalid.get());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -115,19 +117,14 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
|
||||
@MainThread
|
||||
@Override
|
||||
public void run() {
|
||||
- if (running.get() == 0 && queued.get() > 0) {
|
||||
- eu.faircode.email.Log.persist(EntityLog.Type.Debug,
|
||||
- mComputeFunction + " running=" + running + " queued=" + queued);
|
||||
- return;
|
||||
- }
|
||||
boolean isActive = hasActiveObservers();
|
||||
- if (isActive) {
|
||||
- queued.incrementAndGet();
|
||||
- getQueryExecutor().execute(mRefreshRunnable);
|
||||
+ if (mInvalid.compareAndSet(false, true)) {
|
||||
+ if (isActive) {
|
||||
+ getQueryExecutor().execute(mRefreshRunnable);
|
||||
+ }
|
||||
}
|
||||
}
|
||||
};
|
||||
-
|
||||
@SuppressLint("RestrictedApi")
|
||||
RoomTrackingLiveData(
|
||||
RoomDatabase database,
|
|
@ -1,169 +0,0 @@
|
|||
diff --git a/app/src/main/java/androidx/room/InvalidationTracker.kt b/app/src/main/java/androidx/room/InvalidationTracker.kt
|
||||
index 38067b702f..76cf95815c 100644
|
||||
--- a/app/src/main/java/androidx/room/InvalidationTracker.kt
|
||||
+++ b/app/src/main/java/androidx/room/InvalidationTracker.kt
|
||||
@@ -405,10 +405,14 @@ open class InvalidationTracker @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX
|
||||
|
||||
private fun checkUpdatedTable(): Set<Int> {
|
||||
val invalidatedTableIds = buildSet {
|
||||
- database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor ->
|
||||
- while (cursor.moveToNext()) {
|
||||
- add(cursor.getInt(0))
|
||||
+ try {
|
||||
+ database.query(SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)).useCursor { cursor ->
|
||||
+ while (cursor.moveToNext()) {
|
||||
+ add(cursor.getInt(0))
|
||||
+ }
|
||||
}
|
||||
+ } catch (ex: Throwable) {
|
||||
+ eu.faircode.email.Log.w(ex)
|
||||
}
|
||||
}
|
||||
if (invalidatedTableIds.isNotEmpty()) {
|
||||
diff --git a/app/src/main/java/androidx/room/RoomTrackingLiveData.kt b/app/src/main/java/androidx/room/RoomTrackingLiveData.kt
|
||||
index 171b57d16e..85c14c1a93 100644
|
||||
--- a/app/src/main/java/androidx/room/RoomTrackingLiveData.kt
|
||||
+++ b/app/src/main/java/androidx/room/RoomTrackingLiveData.kt
|
||||
@@ -54,62 +54,70 @@ internal class RoomTrackingLiveData<T> (
|
||||
val invalid = AtomicBoolean(true)
|
||||
val computing = AtomicBoolean(false)
|
||||
val registeredObserver = AtomicBoolean(false)
|
||||
+ val queued = eu.faircode.email.ObjectHolder<Int>(0)
|
||||
+ val lock = Object()
|
||||
val refreshRunnable = Runnable {
|
||||
- if (registeredObserver.compareAndSet(false, true)) {
|
||||
- database.invalidationTracker.addWeakObserver(observer)
|
||||
- }
|
||||
- var computed: Boolean
|
||||
- do {
|
||||
- computed = false
|
||||
- // compute can happen only in 1 thread but no reason to lock others.
|
||||
- if (computing.compareAndSet(false, true)) {
|
||||
- // as long as it is invalid, keep computing.
|
||||
- try {
|
||||
- var value: T? = null
|
||||
- while (invalid.compareAndSet(true, false)) {
|
||||
- computed = true
|
||||
- try {
|
||||
- value = computeFunction.call()
|
||||
- } catch (e: Exception) {
|
||||
- throw RuntimeException(
|
||||
- "Exception while computing database live data.",
|
||||
- e
|
||||
- )
|
||||
- }
|
||||
- }
|
||||
- if (computed) {
|
||||
- postValue(value)
|
||||
- }
|
||||
- } finally {
|
||||
- // release compute lock
|
||||
- computing.set(false)
|
||||
- }
|
||||
- }
|
||||
- // check invalid after releasing compute lock to avoid the following scenario.
|
||||
- // Thread A runs compute()
|
||||
- // Thread A checks invalid, it is false
|
||||
- // Main thread sets invalid to true
|
||||
- // Thread B runs, fails to acquire compute lock and skips
|
||||
- // Thread A releases compute lock
|
||||
- // We've left invalid in set state. The check below recovers.
|
||||
- } while (computed && invalid.get())
|
||||
- }
|
||||
+ synchronized(lock) {
|
||||
+ queued.value--
|
||||
+ if (queued.value < 0) {
|
||||
+ eu.faircode.email.Log.e("$computeFunction queued=$queued.value")
|
||||
+ queued.value = 0
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ if (registeredObserver.compareAndSet(false, true)) {
|
||||
+ database.invalidationTracker.addWeakObserver(observer)
|
||||
+ }
|
||||
+
|
||||
+ var value: T? = null
|
||||
+ var computed = false
|
||||
+ synchronized(computeFunction) {
|
||||
+ var retry = 0
|
||||
+ while (!computed) {
|
||||
+ try {
|
||||
+ value = computeFunction.call()
|
||||
+ computed = true
|
||||
+ } catch (e: Throwable) {
|
||||
+ if (++retry > 5) {
|
||||
+ eu.faircode.email.Log.e(e)
|
||||
+ break
|
||||
+ }
|
||||
+ eu.faircode.email.Log.w(e)
|
||||
+ try {
|
||||
+ Thread.sleep(2000L)
|
||||
+ } catch (ignored: InterruptedException) {
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+ if (computed) {
|
||||
+ postValue(value)
|
||||
+ }
|
||||
+ }
|
||||
|
||||
val invalidationRunnable = Runnable {
|
||||
- val isActive = hasActiveObservers()
|
||||
- if (invalid.compareAndSet(false, true)) {
|
||||
- if (isActive) {
|
||||
- queryExecutor.execute(refreshRunnable)
|
||||
- }
|
||||
- }
|
||||
- }
|
||||
+ val isActive = hasActiveObservers()
|
||||
+ if (isActive) {
|
||||
+ synchronized(lock) {
|
||||
+ if (queued.value > 0) {
|
||||
+ eu.faircode.email.Log.persist(eu.faircode.email.EntityLog.Type.Debug, "$computeFunction queued=$queued.value")
|
||||
+ } else {
|
||||
+ queued.value++
|
||||
+ queryExecutor.execute(refreshRunnable)
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun onActive() {
|
||||
- super.onActive()
|
||||
- container.onActive(this as LiveData<Any>)
|
||||
- queryExecutor.execute(refreshRunnable)
|
||||
- }
|
||||
+ super.onActive()
|
||||
+ container.onActive(this as LiveData<Any>)
|
||||
+ synchronized(lock) {
|
||||
+ queued.value++
|
||||
+ queryExecutor.execute(refreshRunnable)
|
||||
+ }
|
||||
+ }
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun onInactive() {
|
||||
diff --git a/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java b/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java
|
||||
index 2b5c391dbc..077ae233e8 100644
|
||||
--- a/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java
|
||||
+++ b/app/src/main/java/androidx/room/paging/LimitOffsetDataSource.java
|
||||
@@ -153,7 +153,7 @@ public abstract class LimitOffsetDataSource<T> extends androidx.paging.Positiona
|
||||
@NonNull LoadInitialCallback<T> callback) {
|
||||
registerObserverIfNecessary();
|
||||
List<T> list = Collections.emptyList();
|
||||
- int totalCount;
|
||||
+ int totalCount = 0;
|
||||
int firstLoadPosition = 0;
|
||||
RoomSQLiteQuery sqLiteQuery = null;
|
||||
Cursor cursor = null;
|
||||
@@ -171,6 +171,8 @@ public abstract class LimitOffsetDataSource<T> extends androidx.paging.Positiona
|
||||
mDb.setTransactionSuccessful();
|
||||
list = rows;
|
||||
}
|
||||
+ } catch (Throwable ex) {
|
||||
+ eu.faircode.email.Log.w(ex);
|
||||
} finally {
|
||||
if (cursor != null) {
|
||||
cursor.close();
|
Loading…
Reference in New Issue