Use critical section for room tracking

This commit is contained in:
M66B 2023-11-08 13:00:09 +01:00
parent 0c98fa8fd5
commit 2219fc19f1
1 changed files with 39 additions and 39 deletions

View File

@ -29,9 +29,6 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import eu.faircode.email.EntityLog;
/** /**
* A LiveData implementation that closely works with {@link InvalidationTracker} to implement * A LiveData implementation that closely works with {@link InvalidationTracker} to implement
@ -62,8 +59,8 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
final InvalidationTracker.Observer mObserver; final InvalidationTracker.Observer mObserver;
final AtomicInteger queued = new AtomicInteger(0); private int queued = 0;
final AtomicInteger running = new AtomicInteger(0); private final Object lock = new Object();
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
final AtomicBoolean mRegisteredObserver = new AtomicBoolean(false); final AtomicBoolean mRegisteredObserver = new AtomicBoolean(false);
@ -73,39 +70,41 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
@WorkerThread @WorkerThread
@Override @Override
public void run() { public void run() {
synchronized (lock) {
queued--;
if (queued < 0) {
eu.faircode.email.Log.e(mComputeFunction + " queued=" + queued);
queued = 0;
}
}
if (mRegisteredObserver.compareAndSet(false, true)) { if (mRegisteredObserver.compareAndSet(false, true)) {
mDatabase.getInvalidationTracker().addWeakObserver(mObserver); mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
} }
try {
running.incrementAndGet();
T value = null; T value = null;
boolean computed = false; boolean computed = false;
synchronized (mComputeFunction) { synchronized (mComputeFunction) {
int retry = 0; int retry = 0;
while (!computed) { while (!computed) {
try {
value = mComputeFunction.call();
computed = true;
} catch (Throwable e) {
if (++retry > 5) {
eu.faircode.email.Log.e(e);
break;
}
eu.faircode.email.Log.w(e);
try { try {
value = mComputeFunction.call(); Thread.sleep(2000L);
computed = true; } catch (InterruptedException ignored) {
} catch (Throwable e) {
if (++retry > 5) {
eu.faircode.email.Log.e(e);
break;
}
eu.faircode.email.Log.w(e);
try {
Thread.sleep(2000L);
} catch (InterruptedException ignored) {
}
} }
} }
} }
if (computed) { }
postValue(value); if (computed) {
} postValue(value);
} finally {
queued.decrementAndGet();
running.decrementAndGet();
} }
} }
}; };
@ -115,16 +114,17 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
@MainThread @MainThread
@Override @Override
public void run() { public void run() {
if (running.get() == 0 && queued.get() > 0) {
eu.faircode.email.Log.persist(EntityLog.Type.Debug,
mComputeFunction + " running=" + running + " queued=" + queued);
return;
}
boolean isActive = hasActiveObservers(); boolean isActive = hasActiveObservers();
if (isActive) { if (isActive)
queued.incrementAndGet(); synchronized (lock) {
getQueryExecutor().execute(mRefreshRunnable); if (queued > 0)
} eu.faircode.email.Log.persist(eu.faircode.email.EntityLog.Type.Debug,
mComputeFunction + " queued=" + queued);
else {
queued++;
getQueryExecutor().execute(mRefreshRunnable);
}
}
} }
}; };