Commit bbb7e8988128363860804e62b3404b3d5de0f1ad
Committed by
GitHub
Merge pull request #5357 from smatvienko-tb/transport_from_core_skipping_message_without_nodeId
Transport - skipping messages to transport without nodeId
Showing
3 changed files
with
18 additions
and
0 deletions
@@ -196,6 +196,13 @@ public class DefaultTbClusterService implements TbClusterService { | @@ -196,6 +196,13 @@ public class DefaultTbClusterService implements TbClusterService { | ||
196 | 196 | ||
197 | @Override | 197 | @Override |
198 | public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) { | 198 | public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) { |
199 | + if (serviceId == null || serviceId.isEmpty()){ | ||
200 | + log.trace("pushNotificationToTransport: skipping message without serviceId [{}], (ToTransportMsg) response [{}]", serviceId, response); | ||
201 | + if (callback != null) { | ||
202 | + callback.onSuccess(null); //callback that message already sent, no useful payload expected | ||
203 | + } | ||
204 | + return; | ||
205 | + } | ||
199 | TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId); | 206 | TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId); |
200 | log.trace("PUSHING msg: {} to:{}", response, tpi); | 207 | log.trace("PUSHING msg: {} to:{}", response, tpi); |
201 | producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback); | 208 | producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback); |
@@ -87,6 +87,10 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi | @@ -87,6 +87,10 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi | ||
87 | 87 | ||
88 | @Override | 88 | @Override |
89 | public void sendRpcReplyToDevice(String serviceId, UUID sessionId, int requestId, String body) { | 89 | public void sendRpcReplyToDevice(String serviceId, UUID sessionId, int requestId, String body) { |
90 | + if (serviceId == null || serviceId.isEmpty()){ | ||
91 | + log.trace("sendRpcReplyToDevice: skipping message without serviceId [{}], sessionId[{}], requestId[{}], body[{}]", serviceId, sessionId, requestId, body); | ||
92 | + return; | ||
93 | + } | ||
90 | TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() | 94 | TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() |
91 | .setRequestId(requestId) | 95 | .setRequestId(requestId) |
92 | .setPayload(body).build(); | 96 | .setPayload(body).build(); |
@@ -54,6 +54,13 @@ public class DefaultTbCoreToTransportService implements TbCoreToTransportService | @@ -54,6 +54,13 @@ public class DefaultTbCoreToTransportService implements TbCoreToTransportService | ||
54 | 54 | ||
55 | @Override | 55 | @Override |
56 | public void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) { | 56 | public void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) { |
57 | + if (nodeId == null || nodeId.isEmpty()){ | ||
58 | + log.trace("process: skipping message without nodeId [{}], (ToTransportMsg) msg [{}]", nodeId, msg); | ||
59 | + if (onSuccess != null) { | ||
60 | + onSuccess.run(); | ||
61 | + } | ||
62 | + return; | ||
63 | + } | ||
57 | TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId); | 64 | TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId); |
58 | UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); | 65 | UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); |
59 | log.trace("[{}][{}] Pushing session data to topic: {}", tpi.getFullTopicName(), sessionId, msg); | 66 | log.trace("[{}][{}] Pushing session data to topic: {}", tpi.getFullTopicName(), sessionId, msg); |