Commit 7604b78f0e0b5df6b9810f26dec60599a98eaf7d
Committed by
GitHub
Merge pull request #3405 from ShvaykaD/improvements/queue-reprocessing-strategy
[2.5.5] Improvements/queue reprocessing strategy
Showing
3 changed files
with
11 additions
and
1 deletions
@@ -56,7 +56,9 @@ public class TbRuleEngineProcessingStrategyFactory { | @@ -56,7 +56,9 @@ public class TbRuleEngineProcessingStrategyFactory { | ||
56 | private final boolean retryTimeout; | 56 | private final boolean retryTimeout; |
57 | private final int maxRetries; | 57 | private final int maxRetries; |
58 | private final double maxAllowedFailurePercentage; | 58 | private final double maxAllowedFailurePercentage; |
59 | - private final long pauseBetweenRetries; | 59 | + private final long maxPauseBetweenRetries; |
60 | + | ||
61 | + private long pauseBetweenRetries; | ||
60 | 62 | ||
61 | private int initialTotalCount; | 63 | private int initialTotalCount; |
62 | private int retryCount; | 64 | private int retryCount; |
@@ -69,6 +71,7 @@ public class TbRuleEngineProcessingStrategyFactory { | @@ -69,6 +71,7 @@ public class TbRuleEngineProcessingStrategyFactory { | ||
69 | this.maxRetries = configuration.getRetries(); | 71 | this.maxRetries = configuration.getRetries(); |
70 | this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); | 72 | this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); |
71 | this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); | 73 | this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); |
74 | + this.maxPauseBetweenRetries = configuration.getMaxPauseBetweenRetries(); | ||
72 | } | 75 | } |
73 | 76 | ||
74 | @Override | 77 | @Override |
@@ -108,6 +111,9 @@ public class TbRuleEngineProcessingStrategyFactory { | @@ -108,6 +111,9 @@ public class TbRuleEngineProcessingStrategyFactory { | ||
108 | } catch (InterruptedException e) { | 111 | } catch (InterruptedException e) { |
109 | throw new RuntimeException(e); | 112 | throw new RuntimeException(e); |
110 | } | 113 | } |
114 | + if (maxPauseBetweenRetries > pauseBetweenRetries) { | ||
115 | + pauseBetweenRetries = Math.min(maxPauseBetweenRetries, pauseBetweenRetries * 2); | ||
116 | + } | ||
111 | } | 117 | } |
112 | return new TbRuleEngineProcessingDecision(false, toReprocess); | 118 | return new TbRuleEngineProcessingDecision(false, toReprocess); |
113 | } | 119 | } |
@@ -729,6 +729,7 @@ queue: | @@ -729,6 +729,7 @@ queue: | ||
729 | retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited | 729 | retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited |
730 | failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; | 730 | failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; |
731 | pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; | 731 | pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; |
732 | + max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:3}"# Max allowed time in seconds for pause between retries. | ||
732 | - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" | 733 | - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" |
733 | topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" | 734 | topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" |
734 | poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" | 735 | poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" |
@@ -744,6 +745,7 @@ queue: | @@ -744,6 +745,7 @@ queue: | ||
744 | retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited | 745 | retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited |
745 | failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; | 746 | failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; |
746 | pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; | 747 | pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; |
748 | + max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries. | ||
747 | - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" | 749 | - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" |
748 | topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" | 750 | topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" |
749 | poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" | 751 | poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" |
@@ -759,6 +761,7 @@ queue: | @@ -759,6 +761,7 @@ queue: | ||
759 | retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited | 761 | retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited |
760 | failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; | 762 | failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; |
761 | pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; | 763 | pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; |
764 | + max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries. | ||
762 | transport: | 765 | transport: |
763 | # For high priority notifications that require minimum latency and processing time | 766 | # For high priority notifications that require minimum latency and processing time |
764 | notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" | 767 | notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" |
@@ -24,5 +24,6 @@ public class TbRuleEngineQueueAckStrategyConfiguration { | @@ -24,5 +24,6 @@ public class TbRuleEngineQueueAckStrategyConfiguration { | ||
24 | private int retries; | 24 | private int retries; |
25 | private double failurePercentage; | 25 | private double failurePercentage; |
26 | private long pauseBetweenRetries; | 26 | private long pauseBetweenRetries; |
27 | + private long maxPauseBetweenRetries; | ||
27 | 28 | ||
28 | } | 29 | } |