Commit 94bb9f5be3c3ca878b9ed5d6b95a7e3156c29ea0

Authored by ShvaykaD
1 parent a3b005e4

update the fix in the API

@@ -138,31 +138,31 @@ class DefaultTbContext implements TbContext { @@ -138,31 +138,31 @@ class DefaultTbContext implements TbContext {
138 138
139 @Override 139 @Override
140 public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { 140 public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
141 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 141 + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator());
142 enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); 142 enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
143 } 143 }
144 144
145 @Override 145 @Override
146 public void enqueueForTellNext(TbMsg tbMsg, String relationType) { 146 public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
147 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 147 + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator());
148 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); 148 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
149 } 149 }
150 150
151 @Override 151 @Override
152 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) { 152 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
153 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 153 + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator());
154 enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); 154 enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
155 } 155 }
156 156
157 @Override 157 @Override
158 public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { 158 public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
159 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 159 + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator());
160 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); 160 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
161 } 161 }
162 162
163 @Override 163 @Override
164 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { 164 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
165 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 165 + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator());
166 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); 166 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
167 } 167 }
168 168
@@ -113,7 +113,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -113,7 +113,7 @@ public class TbSendRPCRequestNode implements TbNode {
113 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { 113 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
114 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { 114 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
115 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); 115 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
116 - ctx.enqueueForTellNext(next, next.getQueueName(), TbRelationTypes.SUCCESS, null, null); 116 + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
117 } else { 117 } else {
118 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); 118 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
119 ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); 119 ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));