Commit b93f3d18c049ff9c83a0278e975f5ba018948dfc
1 parent
8ec23f22
added ability to use exp-pause-between-retries on queue msgs reprocessing
Showing
3 changed files
with
40 additions
and
4 deletions
@@ -27,6 +27,7 @@ import java.util.UUID; | @@ -27,6 +27,7 @@ import java.util.UUID; | ||
27 | import java.util.concurrent.ConcurrentHashMap; | 27 | import java.util.concurrent.ConcurrentHashMap; |
28 | import java.util.concurrent.ConcurrentMap; | 28 | import java.util.concurrent.ConcurrentMap; |
29 | import java.util.concurrent.TimeUnit; | 29 | import java.util.concurrent.TimeUnit; |
30 | +import java.util.concurrent.atomic.AtomicInteger; | ||
30 | 31 | ||
31 | @Component | 32 | @Component |
32 | @Slf4j | 33 | @Slf4j |
@@ -58,6 +59,12 @@ public class TbRuleEngineProcessingStrategyFactory { | @@ -58,6 +59,12 @@ public class TbRuleEngineProcessingStrategyFactory { | ||
58 | private final double maxAllowedFailurePercentage; | 59 | private final double maxAllowedFailurePercentage; |
59 | private final long pauseBetweenRetries; | 60 | private final long pauseBetweenRetries; |
60 | 61 | ||
62 | + private final boolean expPauseBetweenRetries; | ||
63 | + | ||
64 | + private long maxExpPauseBetweenRetries; | ||
65 | + private int maxExpDegreeValue; | ||
66 | + private AtomicInteger expDegreeStep; | ||
67 | + | ||
61 | private int initialTotalCount; | 68 | private int initialTotalCount; |
62 | private int retryCount; | 69 | private int retryCount; |
63 | 70 | ||
@@ -69,6 +76,12 @@ public class TbRuleEngineProcessingStrategyFactory { | @@ -69,6 +76,12 @@ public class TbRuleEngineProcessingStrategyFactory { | ||
69 | this.maxRetries = configuration.getRetries(); | 76 | this.maxRetries = configuration.getRetries(); |
70 | this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); | 77 | this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); |
71 | this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); | 78 | this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); |
79 | + this.expPauseBetweenRetries = configuration.isExpPauseBetweenRetries(); | ||
80 | + if (this.expPauseBetweenRetries) { | ||
81 | + this.expDegreeStep = new AtomicInteger(1); | ||
82 | + this.maxExpPauseBetweenRetries = configuration.getMaxExpPauseBetweenRetries(); | ||
83 | + this.maxExpDegreeValue = new Double(Math.log(maxExpPauseBetweenRetries) / Math.log(pauseBetweenRetries)).intValue(); | ||
84 | + } | ||
72 | } | 85 | } |
73 | 86 | ||
74 | @Override | 87 | @Override |
@@ -103,10 +116,25 @@ public class TbRuleEngineProcessingStrategyFactory { | @@ -103,10 +116,25 @@ public class TbRuleEngineProcessingStrategyFactory { | ||
103 | toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); | 116 | toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); |
104 | } | 117 | } |
105 | if (pauseBetweenRetries > 0) { | 118 | if (pauseBetweenRetries > 0) { |
106 | - try { | ||
107 | - Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); | ||
108 | - } catch (InterruptedException e) { | ||
109 | - throw new RuntimeException(e); | 119 | + if (expPauseBetweenRetries) { |
120 | + long pause; | ||
121 | + if (maxExpDegreeValue > expDegreeStep.get()) { | ||
122 | + pause = new Double(Math.pow(pauseBetweenRetries, expDegreeStep.getAndIncrement())).longValue(); | ||
123 | + } else { | ||
124 | + pause = maxExpPauseBetweenRetries; | ||
125 | + } | ||
126 | + try { | ||
127 | + Thread.sleep(TimeUnit.SECONDS.toMillis( | ||
128 | + pause)); | ||
129 | + } catch (InterruptedException e) { | ||
130 | + throw new RuntimeException(e); | ||
131 | + } | ||
132 | + } else { | ||
133 | + try { | ||
134 | + Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); | ||
135 | + } catch (InterruptedException e) { | ||
136 | + throw new RuntimeException(e); | ||
137 | + } | ||
110 | } | 138 | } |
111 | } | 139 | } |
112 | return new TbRuleEngineProcessingDecision(false, toReprocess); | 140 | return new TbRuleEngineProcessingDecision(false, toReprocess); |
@@ -725,6 +725,8 @@ queue: | @@ -725,6 +725,8 @@ queue: | ||
725 | retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited | 725 | retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited |
726 | failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; | 726 | failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; |
727 | pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; | 727 | pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; |
728 | + exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; | ||
729 | + max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:86400}"# Max allowed time in seconds for pause between retries. | ||
728 | - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" | 730 | - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" |
729 | topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" | 731 | topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" |
730 | poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" | 732 | poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" |
@@ -740,6 +742,8 @@ queue: | @@ -740,6 +742,8 @@ queue: | ||
740 | retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited | 742 | retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited |
741 | failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; | 743 | failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; |
742 | pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; | 744 | pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; |
745 | + exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; | ||
746 | + max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:86400}"# Max allowed time in seconds for pause between retries. | ||
743 | - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" | 747 | - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" |
744 | topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" | 748 | topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" |
745 | poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" | 749 | poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" |
@@ -755,6 +759,8 @@ queue: | @@ -755,6 +759,8 @@ queue: | ||
755 | retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited | 759 | retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited |
756 | failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; | 760 | failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; |
757 | pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; | 761 | pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; |
762 | + exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; | ||
763 | + max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:86400}"# Max allowed time in seconds for pause between retries. | ||
758 | transport: | 764 | transport: |
759 | # For high priority notifications that require minimum latency and processing time | 765 | # For high priority notifications that require minimum latency and processing time |
760 | notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" | 766 | notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" |
@@ -24,5 +24,7 @@ public class TbRuleEngineQueueAckStrategyConfiguration { | @@ -24,5 +24,7 @@ public class TbRuleEngineQueueAckStrategyConfiguration { | ||
24 | private int retries; | 24 | private int retries; |
25 | private double failurePercentage; | 25 | private double failurePercentage; |
26 | private long pauseBetweenRetries; | 26 | private long pauseBetweenRetries; |
27 | + private boolean expPauseBetweenRetries; | ||
28 | + private long maxExpPauseBetweenRetries; | ||
27 | 29 | ||
28 | } | 30 | } |