writeback

This commit is contained in:
2025-01-02 10:40:42 +01:00
parent 23f5d60c61
commit 7b71d405e1
8 changed files with 396 additions and 460 deletions

View File

@@ -9,4 +9,8 @@ import java.io.Serializable;
// It is immutable, its version is filled in by the allocator from the AllocVersionProvider
public interface JData extends Serializable {
JObjectKey key();
default int estimateSize() {
return 100;
}
}

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifest;
import com.usatiuk.dhfs.objects.transaction.*;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
@@ -12,7 +11,6 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.io.Serializable;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.*;
@@ -33,6 +31,8 @@ public class JObjectManager {
ObjectSerializer<JDataVersionedWrapper> objectSerializer;
@Inject
TransactionFactory transactionFactory;
@Inject
TxWriteback txWriteback;
private final List<PreCommitTxHook> _preCommitTxHooks;
@@ -160,27 +160,6 @@ public class JObjectManager {
return transactionFactory.createTransaction(counter, new TransactionObjectSourceImpl(counter));
}
// FIXME:
private static class SimpleTxManifest implements Serializable, TxManifest {
private final List<JObjectKey> _written;
private final List<JObjectKey> _deleted;
public SimpleTxManifest(List<JObjectKey> written, List<JObjectKey> deleted) {
_written = written;
_deleted = deleted;
}
@Override
public List<JObjectKey> getWritten() {
return _written;
}
@Override
public List<JObjectKey> getDeleted() {
return _deleted;
}
}
public void commit(TransactionPrivate tx) {
Log.trace("Committing transaction " + tx.getId());
@@ -285,22 +264,19 @@ public class JObjectManager {
Log.tracef("Flushing transaction %d to storage", tx.getId());
var toDelete = new ArrayList<JObjectKey>();
var toWrite = new ArrayList<JObjectKey>();
var bundle = txWriteback.createBundle();
for (var action : current.entrySet()) {
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + action.getKey());
toWrite.add(action.getKey());
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
var data = objectSerializer.serialize(wrapped);
objectStorage.writeObject(action.getKey(), data);
bundle.commit(wrapped);
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey());
toDelete.add(action.getKey());
bundle.delete(action.getKey());
_objects.remove(action.getKey());
}
default -> {
@@ -310,8 +286,7 @@ public class JObjectManager {
}
Log.tracef("Committing transaction %d to storage", tx.getId());
objectStorage.commitTx(new SimpleTxManifest(toWrite, toDelete));
txWriteback.commitBundle(bundle);
} catch (
Throwable t) {
Log.error("Error when committing transaction", t);

View File

@@ -1,11 +1,12 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
public interface TxBundle {
long getId();
void commit(JData obj);
void commit(JDataVersionedWrapper<?> obj);
void delete(JData obj);
void delete(JObjectKey obj);
}

View File

@@ -1,415 +1,370 @@
//package com.usatiuk.dhfs.objects;
//
//import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
//import com.usatiuk.dhfs.utils.VoidFn;
//import io.quarkus.logging.Log;
//import io.quarkus.runtime.ShutdownEvent;
//import io.quarkus.runtime.StartupEvent;
//import jakarta.annotation.Priority;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.enterprise.event.Observes;
//import jakarta.inject.Inject;
//import lombok.Getter;
//import org.apache.commons.lang3.concurrent.BasicThreadFactory;
//import org.eclipse.microprofile.config.inject.ConfigProperty;
//
//import java.util.*;
//import java.util.concurrent.ConcurrentLinkedQueue;
//import java.util.concurrent.CountDownLatch;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
//import java.util.concurrent.atomic.AtomicLong;
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
//@ApplicationScoped
//public class TxWritebackImpl implements TxWriteback {
// private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
// private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
//
// private final Object _flushWaitSynchronizer = new Object();
// private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
// private final AtomicLong _counter = new AtomicLong();
// private final AtomicLong _waitedTotal = new AtomicLong(0);
// @Inject
// ObjectPersistentStore objectPersistentStore;
// @ConfigProperty(name = "dhfs.objects.writeback.limit")
// long sizeLimit;
// private long currentSize = 0;
// private ExecutorService _writebackExecutor;
// private ExecutorService _commitExecutor;
// private ExecutorService _statusExecutor;
// private volatile boolean _ready = false;
//
// void init(@Observes @Priority(110) StartupEvent event) {
// {
// BasicThreadFactory factory = new BasicThreadFactory.Builder()
// .namingPattern("tx-writeback-%d")
// .build();
//
// _writebackExecutor = Executors.newSingleThreadExecutor(factory);
// _writebackExecutor.submit(this::writeback);
// }
//
// {
// BasicThreadFactory factory = new BasicThreadFactory.Builder()
// .namingPattern("writeback-commit-%d")
// .build();
//
// _commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
// }
// _statusExecutor = Executors.newSingleThreadExecutor();
// _statusExecutor.submit(() -> {
// try {
// while (true) {
// Thread.sleep(1000);
// if (currentSize > 0)
// Log.info("Tx commit status: size="
// + currentSize / 1024 / 1024 + "MB");
// }
// } catch (InterruptedException ignored) {
// }
// });
// _ready = true;
// }
//
// void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
// Log.info("Waiting for all transactions to drain");
//
// synchronized (_flushWaitSynchronizer) {
// _ready = false;
// while (currentSize > 0) {
// _flushWaitSynchronizer.wait();
// }
// }
//
// _writebackExecutor.shutdownNow();
// Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
// }
//
// private void verifyReady() {
// if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
// }
//
// private void writeback() {
// while (!Thread.interrupted()) {
// try {
// TxBundle bundle = new TxBundle(0);
// synchronized (_pendingBundles) {
// while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
// _pendingBundles.wait();
//
// long diff = 0;
// while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
// var toCompress = _pendingBundles.poll();
// diff -= toCompress.calculateTotalSize();
// bundle.compress(toCompress);
// }
// diff += bundle.calculateTotalSize();
// synchronized (_flushWaitSynchronizer) {
// currentSize += diff;
// }
// }
//
// var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size());
// ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
//
// for (var c : bundle._committed.values()) {
// _commitExecutor.execute(() -> {
// try {
// Log.trace("Writing new " + c.newMeta.getName());
// objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData);
// } catch (Throwable t) {
// Log.error("Error writing " + c.newMeta.getName(), t);
// errors.add(t);
// } finally {
// latch.countDown();
// }
// });
// }
// for (var c : bundle._meta.values()) {
// _commitExecutor.execute(() -> {
// try {
// Log.trace("Writing (meta) " + c.newMeta.getName());
// objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta);
// } catch (Throwable t) {
// Log.error("Error writing " + c.newMeta.getName(), t);
// errors.add(t);
// } finally {
// latch.countDown();
// }
// });
// }
// if (Log.isDebugEnabled())
// for (var d : bundle._deleted.keySet())
// Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests
//
// latch.await();
// if (!errors.isEmpty()) {
// throw new RuntimeException("Errors in writeback!");
// }
// objectPersistentStore.commitTx(
// new TxManifest(
// Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()),
// bundle._meta.keySet().stream().map(t -> t.getMeta().getName())).collect(Collectors.toCollection(ArrayList::new)),
// bundle._deleted.keySet().stream().map(t -> t.getMeta().getName()).collect(Collectors.toCollection(ArrayList::new))
// ));
// Log.trace("Bundle " + bundle.getId() + " committed");
//
//
// List<List<VoidFn>> callbacks = new ArrayList<>();
// synchronized (_notFlushedBundles) {
// _lastWrittenTx.set(bundle.getId());
// while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
// callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
// }
// }
// callbacks.forEach(l -> l.forEach(VoidFn::apply));
//
// synchronized (_flushWaitSynchronizer) {
// currentSize -= ((TxBundle) bundle).calculateTotalSize();
// // FIXME:
// if (currentSize <= sizeLimit || !_ready)
// _flushWaitSynchronizer.notifyAll();
// }
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// Log.error("Uncaught exception in writeback", e);
// } catch (Throwable o) {
// Log.error("Uncaught THROWABLE in writeback", o);
// }
// }
// Log.info("Writeback thread exiting");
// }
//
// @Override
// public com.usatiuk.dhfs.objects.jrepository.TxBundle createBundle() {
// verifyReady();
// boolean wait = false;
// while (true) {
// if (wait) {
// synchronized (_flushWaitSynchronizer) {
// long started = System.currentTimeMillis();
// while (currentSize > sizeLimit) {
// try {
// _flushWaitSynchronizer.wait();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// long waited = System.currentTimeMillis() - started;
// _waitedTotal.addAndGet(waited);
// if (Log.isTraceEnabled())
// Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
// wait = false;
// }
// }
// synchronized (_pendingBundles) {
// synchronized (_flushWaitSynchronizer) {
// if (currentSize > sizeLimit) {
// if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
// var target = _pendingBundles.poll();
//
// long diff = -target.calculateTotalSize();
// while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
// var toCompress = _pendingBundles.poll();
// diff -= toCompress.calculateTotalSize();
// target.compress(toCompress);
// }
// diff += target.calculateTotalSize();
// currentSize += diff;
// _pendingBundles.addFirst(target);
// }
// }
//
// if (currentSize > sizeLimit) {
// wait = true;
// continue;
// }
// }
// synchronized (_notFlushedBundles) {
// var bundle = new TxBundle(_counter.incrementAndGet());
// _pendingBundles.addLast(bundle);
// _notFlushedBundles.put(bundle.getId(), bundle);
// return bundle;
// }
// }
// }
// }
//
// @Override
// public void commitBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
// verifyReady();
// synchronized (_pendingBundles) {
// ((TxBundle) bundle).setReady();
// if (_pendingBundles.peek() == bundle)
// _pendingBundles.notify();
// synchronized (_flushWaitSynchronizer) {
// currentSize += ((TxBundle) bundle).calculateTotalSize();
// }
// }
// }
//
// @Override
// public void dropBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
// verifyReady();
// synchronized (_pendingBundles) {
// Log.warn("Dropped bundle: " + bundle);
// _pendingBundles.remove((TxBundle) bundle);
// synchronized (_flushWaitSynchronizer) {
// currentSize -= ((TxBundle) bundle).calculateTotalSize();
// }
// }
// }
//
// @Override
// public void fence(long bundleId) {
// var latch = new CountDownLatch(1);
// asyncFence(bundleId, latch::countDown);
// try {
// latch.await();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
//
// @Override
// public void asyncFence(long bundleId, VoidFn fn) {
// verifyReady();
// if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
// if (_lastWrittenTx.get() >= bundleId) {
// fn.apply();
// return;
// }
// synchronized (_notFlushedBundles) {
// if (_lastWrittenTx.get() >= bundleId) {
// fn.apply();
// return;
// }
// _notFlushedBundles.get(bundleId).addCallback(fn);
// }
// }
//
// @Getter
// private static class TxManifest implements com.usatiuk.dhfs.objects.repository.persistence.TxManifest {
// private final ArrayList<String> _written;
// private final ArrayList<String> _deleted;
//
// private TxManifest(ArrayList<String> written, ArrayList<String> deleted) {
// _written = written;
// _deleted = deleted;
// }
// }
//
// private class TxBundle implements com.usatiuk.dhfs.objects.jrepository.TxBundle {
// private final HashMap<JObject<?>, CommittedEntry> _committed = new HashMap<>();
// private final HashMap<JObject<?>, CommittedMeta> _meta = new HashMap<>();
// private final HashMap<JObject<?>, Integer> _deleted = new HashMap<>();
// private final ArrayList<VoidFn> _callbacks = new ArrayList<>();
// private long _txId;
// @Getter
// private volatile boolean _ready = false;
// private long _size = -1;
// private boolean _wasCommitted = false;
//
// private TxBundle(long txId) {_txId = txId;}
//
// @Override
// public long getId() {
// return _txId;
// }
//
// public void setReady() {
// _ready = true;
// }
//
// public void addCallback(VoidFn callback) {
// synchronized (_callbacks) {
// if (_wasCommitted) throw new IllegalStateException();
// _callbacks.add(callback);
// }
// }
//
// public List<VoidFn> setCommitted() {
// synchronized (_callbacks) {
// _wasCommitted = true;
// return Collections.unmodifiableList(_callbacks);
// }
// }
//
// @Override
// public void commit(JObject<?> obj, ObjectMetadataP meta, JObjectDataP data) {
// synchronized (_committed) {
// _committed.put(obj, new CommittedEntry(meta, data, obj.estimateSize()));
// }
// }
//
// @Override
// public void commitMetaChange(JObject<?> obj, ObjectMetadataP meta) {
// synchronized (_meta) {
// _meta.put(obj, new CommittedMeta(meta, obj.estimateSize()));
// }
// }
//
// @Override
// public void delete(JObject<?> obj) {
// synchronized (_deleted) {
// _deleted.put(obj, obj.estimateSize());
// }
// }
//
//
// public long calculateTotalSize() {
// if (_size >= 0) return _size;
// long out = 0;
// for (var c : _committed.values())
// out += c.size;
// for (var c : _meta.values())
// out += c.size;
// for (var c : _deleted.entrySet())
// out += c.getValue();
// _size = out;
// return _size;
// }
//
// public void compress(TxBundle other) {
// if (_txId >= other._txId)
// throw new IllegalArgumentException("Compressing an older bundle into newer");
//
// _txId = other._txId;
// _size = -1;
//
// for (var d : other._deleted.entrySet()) {
// _committed.remove(d.getKey());
// _meta.remove(d.getKey());
// _deleted.put(d.getKey(), d.getValue());
// }
//
// for (var c : other._committed.entrySet()) {
// _committed.put(c.getKey(), c.getValue());
// _meta.remove(c.getKey());
// _deleted.remove(c.getKey());
// }
//
// for (var m : other._meta.entrySet()) {
// var deleted = _deleted.remove(m.getKey());
// if (deleted != null) {
// _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, null, m.getKey().estimateSize()));
// continue;
// }
// var committed = _committed.remove(m.getKey());
// if (committed != null) {
// _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, committed.newData, m.getKey().estimateSize()));
// continue;
// }
// _meta.put(m.getKey(), m.getValue());
// }
// }
//
// private record CommittedEntry(ObjectMetadataP newMeta, JObjectDataP newData, int size) {}
//
// private record CommittedMeta(ObjectMetadataP newMeta, int size) {}
//
// private record Deleted(JObject<?> handle) {}
// }
//}
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifest;
import com.usatiuk.dhfs.utils.VoidFn;
import com.usatiuk.objects.common.runtime.JObjectKey;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import lombok.Getter;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@ApplicationScoped
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
ObjectSerializer<JDataVersionedWrapper> objectSerializer;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(110) StartupEvent event) {
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("tx-writeback-%d")
.build();
_writebackExecutor = Executors.newSingleThreadExecutor(factory);
_writebackExecutor.submit(this::writeback);
}
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("writeback-commit-%d")
.build();
_commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
}
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size="
+ currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
});
_ready = true;
}
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
}
}
_writebackExecutor.shutdownNow();
Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
}
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundleImpl bundle = new TxBundleImpl(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
}
var latch = new CountDownLatch((int) bundle._entries.values().stream().filter(e -> e instanceof TxBundleImpl.CommittedEntry).count());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var e : bundle._entries.values()) {
switch (e) {
case TxBundleImpl.CommittedEntry c -> _commitExecutor.execute(() -> {
try {
Log.trace("Writing new " + c.key());
objectPersistentStore.writeObject(c.key(), objectSerializer.serialize(c.data()));
} catch (Throwable t) {
Log.error("Error writing " + c.key(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
case TxBundleImpl.DeletedEntry d -> {
if (Log.isDebugEnabled())
Log.debug("Deleting from persistent storage " + d.key()); // FIXME: For tests
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
latch.await();
if (!errors.isEmpty()) {
throw new RuntimeException("Errors in writeback!");
}
objectPersistentStore.commitTx(
new TxManifest(
bundle._entries.values().stream().filter(e -> e instanceof TxBundleImpl.CommittedEntry).map(TxBundleImpl.BundleEntry::key).toList(),
bundle._entries.values().stream().filter(e -> e instanceof TxBundleImpl.DeletedEntry).map(TxBundleImpl.BundleEntry::key).toList()
));
Log.trace("Bundle " + bundle.getId() + " committed");
List<List<VoidFn>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(VoidFn::apply));
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundleImpl) bundle).calculateTotalSize();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
} catch (Throwable o) {
Log.error("Uncaught THROWABLE in writeback", o);
}
}
Log.info("Writeback thread exiting");
}
@Override
public TxBundle createBundle() {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
wait = false;
}
}
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
synchronized (_notFlushedBundles) {
var bundle = new TxBundleImpl(_counter.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
return bundle;
}
}
}
}
@Override
public void commitBundle(TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
((TxBundleImpl) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundleImpl) bundle).calculateTotalSize();
}
}
}
@Override
public void dropBundle(TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundleImpl) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundleImpl) bundle).calculateTotalSize();
}
}
}
@Override
public void fence(long bundleId) {
var latch = new CountDownLatch(1);
asyncFence(bundleId, latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void asyncFence(long bundleId, VoidFn fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
}
}
private class TxBundleImpl implements TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<VoidFn> _callbacks = new ArrayList<>();
private long _txId;
@Getter
private volatile boolean _ready = false;
private long _size = -1;
private boolean _wasCommitted = false;
private TxBundleImpl(long txId) {
_txId = txId;
}
@Override
public long getId() {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(VoidFn callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<VoidFn> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
}
@Override
public void commit(JDataVersionedWrapper<?> obj) {
synchronized (_entries) {
_entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
}
}
@Override
public void delete(JObjectKey obj) {
synchronized (_entries) {
_entries.put(obj, new DeletedEntry(obj));
}
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
_size = _entries.values().stream().mapToInt(BundleEntry::size).sum();
return _size;
}
public void compress(TxBundleImpl other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
_entries.putAll(other._entries);
}
private interface BundleEntry {
JObjectKey key();
int size();
}
private record CommittedEntry(JObjectKey key, JDataVersionedWrapper<?> data, int size)
implements BundleEntry {
}
private record DeletedEntry(JObjectKey key)
implements BundleEntry {
@Override
public int size() {
return 64;
}
}
}
}

View File

@@ -244,17 +244,17 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
public void commitTxImpl(TxManifest manifest, boolean failIfNotFound) {
try {
if (manifest.getDeleted().isEmpty() && manifest.getWritten().isEmpty()) {
if (manifest.deleted().isEmpty() && manifest.written().isEmpty()) {
Log.debug("Empty manifest, skipping");
return;
}
putTxManifest(manifest);
var latch = new CountDownLatch(manifest.getWritten().size() + manifest.getDeleted().size());
var latch = new CountDownLatch(manifest.written().size() + manifest.deleted().size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var n : manifest.getWritten()) {
for (var n : manifest.written()) {
_flushExecutor.execute(() -> {
try {
Files.move(getTmpObjPath(n), getObjPath(n), ATOMIC_MOVE, REPLACE_EXISTING);
@@ -267,7 +267,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
}
});
}
for (var d : manifest.getDeleted()) {
for (var d : manifest.deleted()) {
_flushExecutor.execute(() -> {
try {
deleteImpl(getObjPath(d));

View File

@@ -43,10 +43,10 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
@Override
public void commitTx(TxManifest names) {
synchronized (this) {
for (JObjectKey key : names.getWritten()) {
for (JObjectKey key : names.written()) {
_objects.put(key, _pending.get(key));
}
for (JObjectKey key : names.getDeleted()) {
for (JObjectKey key : names.deleted()) {
_objects.remove(key);
}
}

View File

@@ -3,11 +3,8 @@ package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.io.Serializable;
import java.util.List;
import java.util.Collection;
// FIXME: Serializable
public interface TxManifest extends Serializable {
List<JObjectKey> getWritten();
List<JObjectKey> getDeleted();
public record TxManifest(Collection<JObjectKey> written, Collection<JObjectKey> deleted) implements Serializable {
}

View File

@@ -6,7 +6,6 @@ import com.usatiuk.objects.common.runtime.JObjectKey;
import lombok.Builder;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
@Builder(toBuilder = true)
@@ -25,4 +24,9 @@ public record ChunkData(JObjectKey key, Collection<JObjectKey> refsFrom, boolean
public ChunkData withFrozen(boolean frozen) {
return this.toBuilder().frozen(frozen).build();
}
@Override
public int estimateSize() {
return data.size();
}
}