Showing
1 changed file
with
16 additions
and
9 deletions
@@ -66,7 +66,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; | @@ -66,7 +66,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; | ||
66 | 66 | ||
67 | import java.util.Collections; | 67 | import java.util.Collections; |
68 | import java.util.Set; | 68 | import java.util.Set; |
69 | -import java.util.concurrent.TimeUnit; | ||
70 | import java.util.function.Consumer; | 69 | import java.util.function.Consumer; |
71 | 70 | ||
72 | /** | 71 | /** |
@@ -121,7 +120,7 @@ class DefaultTbContext implements TbContext { | @@ -121,7 +120,7 @@ class DefaultTbContext implements TbContext { | ||
121 | 120 | ||
122 | @Override | 121 | @Override |
123 | public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) { | 122 | public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) { |
124 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | 123 | + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); |
125 | enqueue(tpi, tbMsg, onFailure, onSuccess); | 124 | enqueue(tpi, tbMsg, onFailure, onSuccess); |
126 | } | 125 | } |
127 | 126 | ||
@@ -138,46 +137,54 @@ class DefaultTbContext implements TbContext { | @@ -138,46 +137,54 @@ class DefaultTbContext implements TbContext { | ||
138 | 137 | ||
139 | @Override | 138 | @Override |
140 | public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { | 139 | public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { |
141 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); | 140 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); |
142 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); | 141 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); |
143 | } | 142 | } |
144 | 143 | ||
145 | @Override | 144 | @Override |
146 | public void enqueueForTellNext(TbMsg tbMsg, String relationType) { | 145 | public void enqueueForTellNext(TbMsg tbMsg, String relationType) { |
147 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); | 146 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); |
148 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); | 147 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); |
149 | } | 148 | } |
150 | 149 | ||
151 | @Override | 150 | @Override |
152 | public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) { | 151 | public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) { |
153 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); | 152 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); |
154 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); | 153 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); |
155 | } | 154 | } |
156 | 155 | ||
157 | @Override | 156 | @Override |
158 | public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { | 157 | public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { |
159 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); | 158 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); |
160 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); | 159 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); |
161 | } | 160 | } |
162 | 161 | ||
163 | @Override | 162 | @Override |
164 | public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { | 163 | public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { |
165 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); | 164 | + TopicPartitionInfo tpi = resolvePartition(tbMsg); |
166 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); | 165 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); |
167 | } | 166 | } |
168 | 167 | ||
169 | @Override | 168 | @Override |
170 | public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { | 169 | public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { |
171 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | 170 | + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); |
172 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); | 171 | enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); |
173 | } | 172 | } |
174 | 173 | ||
175 | @Override | 174 | @Override |
176 | public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { | 175 | public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { |
177 | - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | 176 | + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); |
178 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); | 177 | enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); |
179 | } | 178 | } |
180 | 179 | ||
180 | + private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) { | ||
181 | + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); | ||
182 | + } | ||
183 | + | ||
184 | + private TopicPartitionInfo resolvePartition(TbMsg tbMsg) { | ||
185 | + return resolvePartition(tbMsg, tbMsg.getQueueName()); | ||
186 | + } | ||
187 | + | ||
181 | private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) { | 188 | private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) { |
182 | RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); | 189 | RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); |
183 | RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); | 190 | RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); |