mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Server: some tx commit cleanup
This commit is contained in:
@@ -87,17 +87,15 @@ public class JObjectTxManager {
|
||||
for (var obj : state._writeObjects.entrySet()) {
|
||||
Log.debug("Committing " + obj.getKey().getMeta().getName() + " deleted=" + obj.getKey().getMeta().isDeleted() + " deletion-candidate=" + obj.getKey().getMeta().isDeletionCandidate());
|
||||
|
||||
var dataDiff = obj.getValue().snapshot == null
|
||||
|| obj.getKey().getMeta().changelogHash() != obj.getValue().snapshot.changelogHash()
|
||||
var dataDiff = obj.getKey().getMeta().changelogHash() != obj.getValue().snapshot.changelogHash()
|
||||
|| obj.getValue()._forceInvalidated;
|
||||
|
||||
if (refVerification) {
|
||||
boolean dataDiffReal = obj.getValue().snapshot == null
|
||||
|| (obj.getValue().snapshot.data() == null) != (obj.getKey().getData() == null)
|
||||
|| (obj.getKey().getData() != null &&
|
||||
!Objects.equals(obj.getValue().snapshot.data(), dataProtoSerializer.serialize(obj.getKey().getData())));
|
||||
// Null check in case of object not being resolved before (though then we can't check this)
|
||||
boolean dataDiffReal = obj.getValue().snapshot.data() != null
|
||||
&& !Objects.equals(obj.getValue().snapshot.data(), obj.getKey().getData() == null ? null : dataProtoSerializer.serialize(obj.getKey().getData()));
|
||||
|
||||
if (dataDiffReal && !dataDiff && obj.getValue().snapshot.data() != null) {
|
||||
if (dataDiffReal && !dataDiff) {
|
||||
var msg = "Data diff not equal for " + obj.getKey().getMeta().getName() + " " + obj.getKey().getData() + " before = " + ((obj.getValue().snapshot != null) ? obj.getValue().snapshot.data() : null) + " after = " + ((obj.getKey().getData() != null) ? dataProtoSerializer.serialize(obj.getKey().getData()) : null);
|
||||
throw new IllegalStateException(msg);
|
||||
}
|
||||
@@ -125,12 +123,14 @@ public class JObjectTxManager {
|
||||
throw new IllegalStateException("Mutator could not be reapplied for object " + obj.getKey().getMeta().getName() + "\n old = " + cur + "\n reapplied = " + cur2 + "\n");
|
||||
}
|
||||
|
||||
notifyWrite(obj.getKey(),
|
||||
obj.getValue().snapshot == null || !Objects.equals(obj.getValue().snapshot.meta(), metaProtoSerializer.serialize(obj.getKey().getMeta())),
|
||||
obj.getValue().snapshot == null || dataDiff);
|
||||
obj.getValue()._metaSerialized = metaProtoSerializer.serialize(obj.getKey().getMeta());
|
||||
obj.getValue()._metaChanged = !Objects.equals(obj.getValue().snapshot.meta(), obj.getValue()._metaSerialized);
|
||||
obj.getValue()._dataChanged = dataDiff;
|
||||
|
||||
notifyWrite(obj.getKey(), obj.getValue()._metaChanged, dataDiff);
|
||||
|
||||
if (refVerification) {
|
||||
var oldRefs = (obj.getValue().snapshot == null || obj.getValue().snapshot.data() == null)
|
||||
var oldRefs = obj.getValue().snapshot.data() == null
|
||||
? null
|
||||
: ((JObjectData) dataProtoSerializer.deserialize(obj.getValue().snapshot.data())).extractRefs();
|
||||
verifyRefs(obj.getKey(), oldRefs);
|
||||
@@ -143,19 +143,23 @@ public class JObjectTxManager {
|
||||
|
||||
state._writeObjects.forEach((key, value) -> {
|
||||
try {
|
||||
var dataDiff = value.snapshot == null
|
||||
|| key.getMeta().changelogHash() != value.snapshot.changelogHash()
|
||||
|| value._forceInvalidated;
|
||||
|
||||
key.getMeta().setLastModifiedTx(bundle.getId());
|
||||
if (key.getMeta().isDeleted()) {
|
||||
bundle.delete(key);
|
||||
latch.countDown();
|
||||
} else if (key.getMeta().isHaveLocalCopy() && (key.getData() != null && dataDiff)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!value._dataChanged && !value._metaChanged) {
|
||||
latch.countDown();
|
||||
return;
|
||||
}
|
||||
|
||||
if (key.getMeta().isHaveLocalCopy() && value._dataChanged) {
|
||||
_serializerThreads.execute(() -> {
|
||||
try {
|
||||
bundle.commit(key,
|
||||
metaProtoSerializer.serialize(key.getMeta()),
|
||||
value._metaSerialized,
|
||||
dataProtoSerializer.serialize(key.getData())
|
||||
);
|
||||
} catch (Throwable t) {
|
||||
@@ -165,16 +169,11 @@ public class JObjectTxManager {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
} else if (key.getMeta().isHaveLocalCopy() && !dataDiff) {
|
||||
bundle.commitMetaChange(key,
|
||||
metaProtoSerializer.serialize(key.getMeta())
|
||||
);
|
||||
} else if (key.getMeta().isHaveLocalCopy() && !value._dataChanged) {
|
||||
bundle.commitMetaChange(key, value._metaSerialized);
|
||||
latch.countDown();
|
||||
} else if (!key.getMeta().isHaveLocalCopy()) {
|
||||
bundle.commit(key,
|
||||
metaProtoSerializer.serialize(key.getMeta()),
|
||||
null
|
||||
);
|
||||
bundle.commit(key, value._metaSerialized, null);
|
||||
latch.countDown();
|
||||
} else {
|
||||
throw new IllegalStateException("Unexpected object flush combination");
|
||||
@@ -406,6 +405,9 @@ public class JObjectTxManager {
|
||||
final List<JMutator<?>> _mutators = new LinkedList<>();
|
||||
boolean _forceInvalidated = false;
|
||||
final boolean _copy;
|
||||
ObjectMetadataP _metaSerialized; // Filled in when committing
|
||||
boolean _metaChanged = false; // Filled in when committing
|
||||
boolean _dataChanged = false; // Filled in when committing
|
||||
|
||||
private TxObjectState(JObjectSnapshot snapshot, boolean copy) {
|
||||
this.snapshot = snapshot;
|
||||
|
||||
Reference in New Issue
Block a user