From cb6d1b4c84d70e66ee0ebbd2b910d828bde99d12 Mon Sep 17 00:00:00 2001 From: gergo Date: Tue, 21 Oct 2025 20:49:33 +0200 Subject: [PATCH 1/2] Log consumers pause after repeated transient data exceptions --- .../actor/ActorTypePartitionConsumer.java | 1 + .../qfoundation/execution/Exceptions.java | 22 +++++++++++++++++++ .../gitlab/qfoundation/log/LogConsumer.java | 4 ++++ .../messaging/QueuePartitionConsumer.java | 1 + .../messaging/TaskPartitionConsumer.java | 1 + .../messaging/TopicPartitionConsumer.java | 1 + .../messaging/TriggerPartitionConsumer.java | 1 + 7 files changed, 31 insertions(+) diff --git a/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java index 412e4beb..217867f5 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java @@ -314,6 +314,7 @@ public class ActorTypePartitionConsumer implements LogConsumer { fence = null; if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java b/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java index 19c01e85..f69b79fa 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java @@ -4,6 +4,7 @@ import io.gitlab.qfoundation.partition.PartitionNotOwnedException; import io.gitlab.qfoundation.partition.PartitionOwnershipChangedException; import javax.transaction.xa.XAException; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -13,6 +14,12 @@ import java.util.concurrent.ExecutionException; */ public final class Exceptions { + private static final long BASE_DELAY_MS = 100; + + private static final long MAX_DELAY_MS = 10000; + + private final static Random RANDOM = new Random(); + private Exceptions() { } @@ -99,4 +106,19 @@ public final class Exceptions { public interface CheckedRunnable { void run() throws Exception; } + + public static void sleepAfterError(int consecutiveErrorCount) { + try { + if (consecutiveErrorCount <= 1) { + Thread.yield(); + } else { + long delay = Math.min(BASE_DELAY_MS * (1L << (consecutiveErrorCount - 2)), MAX_DELAY_MS); + long jitteredDelay = RANDOM.nextLong(delay); + Thread.sleep(jitteredDelay); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeInterruptedException(); + } + } } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java index def22392..fc90b29d 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java @@ -2,10 +2,12 @@ package io.gitlab.qfoundation.log; import com.apple.foundationdb.tuple.Versionstamp; import io.gitlab.qfoundation.database.Priority; +import io.gitlab.qfoundation.execution.RuntimeInterruptedException; import jakarta.annotation.Nullable; import java.time.Duration; import java.util.List; +import java.util.Random; /** * Provides logic for a LogWorker when processing entries. It contains settings and lifecycle hooks. @@ -97,4 +99,6 @@ public interface LogConsumer { default Priority priority() { return Priority.NORMAL; } + + } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java index 4e3cdf3f..4f2694bf 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java @@ -188,6 +188,7 @@ public class QueuePartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java index 59040f6a..5daf19b1 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java @@ -326,6 +326,7 @@ public class TaskPartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java index 8f3cfc29..e73608e2 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java @@ -291,6 +291,7 @@ public class TopicPartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java index a993d0c7..9a96161c 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java @@ -275,6 +275,7 @@ public class TriggerPartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } -- GitLab From ef1d3d334c0f40fe4c2ff86c5b0e3e697bae2d2b Mon Sep 17 00:00:00 2001 From: gergo Date: Tue, 21 Oct 2025 20:50:08 +0200 Subject: [PATCH 2/2] Log consumers pause after repeated transient data exceptions --- .../src/main/java/io/gitlab/qfoundation/log/LogConsumer.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java index fc90b29d..def22392 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/log/LogConsumer.java @@ -2,12 +2,10 @@ package io.gitlab.qfoundation.log; import com.apple.foundationdb.tuple.Versionstamp; import io.gitlab.qfoundation.database.Priority; -import io.gitlab.qfoundation.execution.RuntimeInterruptedException; import jakarta.annotation.Nullable; import java.time.Duration; import java.util.List; -import java.util.Random; /** * Provides logic for a LogWorker when processing entries. It contains settings and lifecycle hooks. @@ -99,6 +97,4 @@ public interface LogConsumer { default Priority priority() { return Priority.NORMAL; } - - } -- GitLab