Commit 0356431856774952d762b1f5c2392411c2c7e74b
Committed by
GitHub
Merge pull request #3548 from ShvaykaD/improvements/TbSendRPCRequestNode
[2.5.5] fix enqueue methods impl in DefaultTbContext
Showing
1 changed file
with
16 additions
and
9 deletions
... | ... | @@ -66,7 +66,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; |
66 | 66 | |
67 | 67 | import java.util.Collections; |
68 | 68 | import java.util.Set; |
69 | -import java.util.concurrent.TimeUnit; | |
70 | 69 | import java.util.function.Consumer; |
71 | 70 | |
72 | 71 | /** |
... | ... | @@ -121,7 +120,7 @@ class DefaultTbContext implements TbContext { |
121 | 120 | |
122 | 121 | @Override |
123 | 122 | public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) { |
124 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | |
123 | + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); | |
125 | 124 | enqueue(tpi, tbMsg, onFailure, onSuccess); |
126 | 125 | } |
127 | 126 | |
... | ... | @@ -138,46 +137,54 @@ class DefaultTbContext implements TbContext { |
138 | 137 | |
139 | 138 | @Override |
140 | 139 | public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { |
141 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); | |
140 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); | |
142 | 141 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); |
143 | 142 | } |
144 | 143 | |
145 | 144 | @Override |
146 | 145 | public void enqueueForTellNext(TbMsg tbMsg, String relationType) { |
147 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); | |
146 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); | |
148 | 147 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); |
149 | 148 | } |
150 | 149 | |
151 | 150 | @Override |
152 | 151 | public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) { |
153 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); | |
152 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); | |
154 | 153 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); |
155 | 154 | } |
156 | 155 | |
157 | 156 | @Override |
158 | 157 | public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { |
159 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); | |
158 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); | |
160 | 159 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); |
161 | 160 | } |
162 | 161 | |
163 | 162 | @Override |
164 | 163 | public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { |
165 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); | |
164 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); | |
166 | 165 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); |
167 | 166 | } |
168 | 167 | |
169 | 168 | @Override |
170 | 169 | public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { |
171 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | |
170 | + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); | |
172 | 171 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); |
173 | 172 | } |
174 | 173 | |
175 | 174 | @Override |
176 | 175 | public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { |
177 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | |
176 | + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); | |
178 | 177 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); |
179 | 178 | } |
180 | 179 | |
180 | + private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) { | |
181 | + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | |
182 | + } | |
183 | + | |
184 | + private TopicPartitionInfo resolvePartition(TbMsg tbMsg) { | |
185 | + return resolvePartition(tbMsg, tbMsg.getQueueName()); | |
186 | + } | |
187 | + | |
181 | 188 | private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) { |
182 | 189 | RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); |
183 | 190 | RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); | ... | ... |