Commit 177c0f46ad0171bd2c40f3fcb87806fc94b6c1d6

Authored by Sergey Matvienko
Committed by Andrew Shvayka
1 parent bbb7e898

enqueueForTellNext for specific queue produce new message with the new queue fro…

…m parameter. Instead old queue from old Msg. It affects rule node statistics before. This fix is related to Checkpoint node
... ... @@ -190,13 +190,13 @@ class DefaultTbContext implements TbContext {
190 190 @Override
191 191 public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
192 192 TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
193   - enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
  193 + enqueueForTellNext(tpi, queueName, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
194 194 }
195 195
196 196 @Override
197 197 public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
198 198 TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
199   - enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
  199 + enqueueForTellNext(tpi, queueName, tbMsg, relationTypes, null, onSuccess, onFailure);
200 200 }
201 201
202 202 private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
... ... @@ -211,9 +211,13 @@ class DefaultTbContext implements TbContext {
211 211 }
212 212
213 213 private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
  214 + enqueueForTellNext(tpi, source.getQueueName(), source, relationTypes, failureMessage, onSuccess, onFailure);
  215 + }
  216 +
  217 + private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
214 218 RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
215 219 RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
216   - TbMsg tbMsg = TbMsg.newMsg(source, ruleChainId, ruleNodeId);
  220 + TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId);
217 221 TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
218 222 .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
219 223 .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
... ...
... ... @@ -136,8 +136,9 @@ public final class TbMsg implements Serializable {
136 136 tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
137 137 }
138 138
139   - public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
140   - return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
  139 + //used for enqueueForTellNext
  140 + public static TbMsg newMsg(TbMsg tbMsg, String queueName, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
  141 + return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
141 142 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ruleNodeExecCounter.get(), TbMsgCallback.EMPTY);
142 143 }
143 144
... ...