diff --git a/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java index 412e4bebd810c049d9ed12ac274211da4cd135a2..217867f5feba805f68aa13669bd006c2e5cfff84 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/actor/ActorTypePartitionConsumer.java @@ -314,6 +314,7 @@ public class ActorTypePartitionConsumer implements LogConsumer { fence = null; if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java b/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java index 19c01e8589ce236accac572d004ace290ebb8cb5..f69b79fad85b01c2b0cb929c882e3adf9347c8c8 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/execution/Exceptions.java @@ -4,6 +4,7 @@ import io.gitlab.qfoundation.partition.PartitionNotOwnedException; import io.gitlab.qfoundation.partition.PartitionOwnershipChangedException; import javax.transaction.xa.XAException; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -13,6 +14,12 @@ import java.util.concurrent.ExecutionException; */ public final class Exceptions { + private static final long BASE_DELAY_MS = 100; + + private static final long MAX_DELAY_MS = 10000; + + private final static Random RANDOM = new Random(); + private Exceptions() { } @@ -99,4 +106,19 @@ public final class Exceptions { public interface CheckedRunnable { void run() throws Exception; } + + public static void sleepAfterError(int consecutiveErrorCount) { + try { + if (consecutiveErrorCount <= 1) { + Thread.yield(); + } else { + long delay = Math.min(BASE_DELAY_MS * (1L << (consecutiveErrorCount - 2)), MAX_DELAY_MS); + long jitteredDelay = RANDOM.nextLong(delay); + Thread.sleep(jitteredDelay); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeInterruptedException(); + } + } } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java index 4e3cdf3fc3d361a591a13d79bff67b3c87489f4a..4f2694bfdf1a4185ca6b2242cbc22c0f46c911af 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/QueuePartitionConsumer.java @@ -188,6 +188,7 @@ public class QueuePartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java index 59040f6abde2d49d22476eb4513400186d871b63..5daf19b121f6a52664468cb67e8925015e13eec5 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TaskPartitionConsumer.java @@ -326,6 +326,7 @@ public class TaskPartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java index 8f3cfc2998def10bb6f9032649bffa4be68e36d3..e73608e2a2f486c481e1ee531a76efb8e74dd353 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TopicPartitionConsumer.java @@ -291,6 +291,7 @@ public class TopicPartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; } diff --git a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java index a993d0c7321d733ce1f2ece18bc4462fad108bdd..9a96161c7ef058ff7a91bcf26e11b91e8bc2261d 100644 --- a/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java +++ b/runtime/src/main/java/io/gitlab/qfoundation/messaging/TriggerPartitionConsumer.java @@ -275,6 +275,7 @@ public class TriggerPartitionConsumer implements LogConsumer { errorCounter.increment(); if (Exceptions.isTransientError(error)) { + Exceptions.sleepAfterError(consecutiveFailures); return LogRollbackAction.KEEP; }