Commit f1b82c2d0537d1ab6f1ae501ecac9f460b58e27f

Authored by Dmytro Shvaika
1 parent 6a1c6593

changed logic to multiplication pause

... ... @@ -27,7 +27,6 @@ import java.util.UUID;
27 27 import java.util.concurrent.ConcurrentHashMap;
28 28 import java.util.concurrent.ConcurrentMap;
29 29 import java.util.concurrent.TimeUnit;
30   -import java.util.concurrent.atomic.AtomicInteger;
31 30
32 31 @Component
33 32 @Slf4j
... ... @@ -57,13 +56,11 @@ public class TbRuleEngineProcessingStrategyFactory {
57 56 private final boolean retryTimeout;
58 57 private final int maxRetries;
59 58 private final double maxAllowedFailurePercentage;
60   - private final long pauseBetweenRetries;
61 59
62   - private final boolean expPauseBetweenRetries;
  60 + private final boolean multiplyPauseBetweenRetries;
  61 + private final long maxPauseBetweenRetries;
63 62
64   - private long maxExpPauseBetweenRetries;
65   - private double maxExpDegreeValue;
66   - private AtomicInteger expDegreeStep;
  63 + private long pauseBetweenRetries;
67 64
68 65 private int initialTotalCount;
69 66 private int retryCount;
... ... @@ -76,12 +73,8 @@ public class TbRuleEngineProcessingStrategyFactory {
76 73 this.maxRetries = configuration.getRetries();
77 74 this.maxAllowedFailurePercentage = configuration.getFailurePercentage();
78 75 this.pauseBetweenRetries = configuration.getPauseBetweenRetries();
79   - this.expPauseBetweenRetries = configuration.isExpPauseBetweenRetries();
80   - if (this.expPauseBetweenRetries) {
81   - this.expDegreeStep = new AtomicInteger(1);
82   - this.maxExpPauseBetweenRetries = configuration.getMaxExpPauseBetweenRetries();
83   - this.maxExpDegreeValue = Math.log(maxExpPauseBetweenRetries) / Math.log(pauseBetweenRetries);
84   - }
  76 + this.multiplyPauseBetweenRetries = configuration.isMultiplyPauseBetweenRetries();
  77 + this.maxPauseBetweenRetries = configuration.getMaxPauseBetweenRetries();
85 78 }
86 79
87 80 @Override
... ... @@ -116,24 +109,14 @@ public class TbRuleEngineProcessingStrategyFactory {
116 109 toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
117 110 }
118 111 if (pauseBetweenRetries > 0) {
119   - if (expPauseBetweenRetries) {
120   - long pause;
121   - if (maxExpDegreeValue > expDegreeStep.get()) {
122   - pause = new Double(Math.pow(pauseBetweenRetries, expDegreeStep.getAndIncrement())).longValue();
123   - } else {
124   - pause = maxExpPauseBetweenRetries;
125   - }
126   - try {
127   - Thread.sleep(TimeUnit.SECONDS.toMillis(
128   - pause));
129   - } catch (InterruptedException e) {
130   - throw new RuntimeException(e);
131   - }
132   - } else {
133   - try {
134   - Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries));
135   - } catch (InterruptedException e) {
136   - throw new RuntimeException(e);
  112 + try {
  113 + Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries));
  114 + } catch (InterruptedException e) {
  115 + throw new RuntimeException(e);
  116 + }
  117 + if (multiplyPauseBetweenRetries && maxPauseBetweenRetries > 0) {
  118 + if (pauseBetweenRetries != maxPauseBetweenRetries) {
  119 + pauseBetweenRetries = Math.min(maxPauseBetweenRetries, pauseBetweenRetries * pauseBetweenRetries);
137 120 }
138 121 }
139 122 }
... ...
... ... @@ -725,8 +725,8 @@ queue:
725 725 retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
726 726 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
727 727 pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
728   - exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries;
729   - max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:25}"# Max allowed time in seconds for pause between retries.
  728 + multiply-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MUL_RETRY_PAUSE:false}"# Parameter to enable/disable multiplication of pause value between retries on each iteration;
  729 + max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_MUL_RETRY_PAUSE:25}"# Max allowed time in seconds for pause between retries.
730 730 - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
731 731 topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
732 732 poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
... ... @@ -742,8 +742,8 @@ queue:
742 742 retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
743 743 failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
744 744 pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
745   - exp-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries;
746   - max-exp-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries.
  745 + multiply-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MUL_RETRY_PAUSE:false}"# Parameter to enable/disable multiplication of pause value between retries on each iteration;
  746 + max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_MUL_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries.
747 747 - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}"
748 748 topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
749 749 poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
... ... @@ -759,8 +759,8 @@ queue:
759 759 retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
760 760 failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
761 761 pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
762   - exp-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries;
763   - max-exp-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries.
  762 + multiply-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MUL_RETRY_PAUSE:false}"# Parameter to enable/disable multiplication of pause value between retries on each iteration;
  763 + max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_MUL_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries.
764 764 transport:
765 765 # For high priority notifications that require minimum latency and processing time
766 766 notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
... ...
... ... @@ -24,7 +24,7 @@ public class TbRuleEngineQueueAckStrategyConfiguration {
24 24 private int retries;
25 25 private double failurePercentage;
26 26 private long pauseBetweenRetries;
27   - private boolean expPauseBetweenRetries;
28   - private long maxExpPauseBetweenRetries;
  27 + private boolean multiplyPauseBetweenRetries;
  28 + private long maxPauseBetweenRetries;
29 29
30 30 }
... ...