Showing
24 changed files
with
189 additions
and
83 deletions
... | ... | @@ -15,7 +15,6 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.actors.device; |
17 | 17 | |
18 | -import lombok.extern.slf4j.Slf4j; | |
19 | 18 | import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; |
20 | 19 | import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; |
21 | 20 | import org.thingsboard.server.actors.ActorSystemContext; |
... | ... | @@ -29,7 +28,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; |
29 | 28 | import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; |
30 | 29 | import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; |
31 | 30 | |
32 | -@Slf4j | |
33 | 31 | public class DeviceActor extends ContextAwareActor { |
34 | 32 | |
35 | 33 | private final DeviceActorMessageProcessor processor; | ... | ... |
... | ... | @@ -348,9 +348,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { |
348 | 348 | int requestId = msg.getMsg().getRequestId(); |
349 | 349 | ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); |
350 | 350 | if (data != null) { |
351 | + log.debug("[{}] Pushing reply to [{}][{}]!", deviceId, data.getNodeId(), data.getSessionId()); | |
351 | 352 | sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder() |
352 | 353 | .setRequestId(requestId).setPayload(msg.getMsg().getData()).build() |
353 | 354 | , data.getSessionId(), data.getNodeId()); |
355 | + } else { | |
356 | + log.debug("[{}][{}] Pending RPC request to server not found!", deviceId, requestId); | |
354 | 357 | } |
355 | 358 | } |
356 | 359 | ... | ... |
... | ... | @@ -40,7 +40,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener { |
40 | 40 | |
41 | 41 | @Override |
42 | 42 | public void onConnected(GrpcSession session) { |
43 | - log.info("{} session started -> {}", getType(session), session.getRemoteServer()); | |
43 | + log.info("[{}][{}] session started", session.getRemoteServer(), getType(session)); | |
44 | 44 | if (!session.isClient()) { |
45 | 45 | manager.tell(new RpcSessionConnectedMsg(session.getRemoteServer(), session.getSessionId()), self); |
46 | 46 | } |
... | ... | @@ -48,21 +48,19 @@ public class BasicRpcSessionListener implements GrpcSessionListener { |
48 | 48 | |
49 | 49 | @Override |
50 | 50 | public void onDisconnected(GrpcSession session) { |
51 | - log.info("{} session closed -> {}", getType(session), session.getRemoteServer()); | |
51 | + log.info("[{}][{}] session closed", session.getRemoteServer(), getType(session)); | |
52 | 52 | manager.tell(new RpcSessionDisconnectedMsg(session.isClient(), session.getRemoteServer()), self); |
53 | 53 | } |
54 | 54 | |
55 | 55 | @Override |
56 | 56 | public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) { |
57 | - log.trace("{} Service [{}] received session actor msg {}", getType(session), | |
58 | - session.getRemoteServer(), | |
59 | - clusterMessage); | |
57 | + log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage); | |
60 | 58 | service.onReceivedMsg(session.getRemoteServer(), clusterMessage); |
61 | 59 | } |
62 | 60 | |
63 | 61 | @Override |
64 | 62 | public void onError(GrpcSession session, Throwable t) { |
65 | - log.warn("{} session got error -> {}", getType(session), session.getRemoteServer(), t); | |
63 | + log.warn("[{}][{}] session got error -> {}", session.getRemoteServer(), getType(session), t); | |
66 | 64 | manager.tell(new RpcSessionClosedMsg(session.isClient(), session.getRemoteServer()), self); |
67 | 65 | session.close(); |
68 | 66 | } | ... | ... |
... | ... | @@ -69,6 +69,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
69 | 69 | private RuleNodeId firstId; |
70 | 70 | private RuleNodeCtx firstNode; |
71 | 71 | private boolean started; |
72 | + private String ruleChainName; | |
72 | 73 | |
73 | 74 | RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext |
74 | 75 | , ActorRef parent, ActorRef self) { |
... | ... | @@ -78,15 +79,24 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
78 | 79 | this.nodeActors = new HashMap<>(); |
79 | 80 | this.nodeRoutes = new HashMap<>(); |
80 | 81 | this.service = systemContext.getRuleChainService(); |
82 | + this.ruleChainName = ruleChainId.toString(); | |
81 | 83 | } |
82 | 84 | |
83 | 85 | @Override |
84 | - public void start(ActorContext context) throws Exception { | |
86 | + public String getComponentName() { | |
87 | + return null; | |
88 | + } | |
89 | + | |
90 | + @Override | |
91 | + public void start(ActorContext context) { | |
85 | 92 | if (!started) { |
86 | 93 | RuleChain ruleChain = service.findRuleChainById(entityId); |
94 | + ruleChainName = ruleChain.getName(); | |
87 | 95 | List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId); |
96 | + log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size()); | |
88 | 97 | // Creating and starting the actors; |
89 | 98 | for (RuleNode ruleNode : ruleNodeList) { |
99 | + log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode); | |
90 | 100 | ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); |
91 | 101 | nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); |
92 | 102 | } |
... | ... | @@ -98,16 +108,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
98 | 108 | } |
99 | 109 | |
100 | 110 | @Override |
101 | - public void onUpdate(ActorContext context) throws Exception { | |
111 | + public void onUpdate(ActorContext context) { | |
102 | 112 | RuleChain ruleChain = service.findRuleChainById(entityId); |
113 | + ruleChainName = ruleChain.getName(); | |
103 | 114 | List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId); |
104 | - | |
115 | + log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size()); | |
105 | 116 | for (RuleNode ruleNode : ruleNodeList) { |
106 | 117 | RuleNodeCtx existing = nodeActors.get(ruleNode.getId()); |
107 | 118 | if (existing == null) { |
119 | + log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode); | |
108 | 120 | ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); |
109 | 121 | nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); |
110 | 122 | } else { |
123 | + log.trace("[{}][{}] Updating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode); | |
111 | 124 | existing.setSelf(ruleNode); |
112 | 125 | existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self); |
113 | 126 | } |
... | ... | @@ -116,6 +129,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
116 | 129 | Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet()); |
117 | 130 | List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList()); |
118 | 131 | removedRules.forEach(ruleNodeId -> { |
132 | + log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId); | |
119 | 133 | RuleNodeCtx removed = nodeActors.remove(ruleNodeId); |
120 | 134 | removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self); |
121 | 135 | }); |
... | ... | @@ -124,7 +138,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
124 | 138 | } |
125 | 139 | |
126 | 140 | @Override |
127 | - public void stop(ActorContext context) throws Exception { | |
141 | + public void stop(ActorContext context) { | |
142 | + log.trace("[{}][{}] Stopping rule chain with {} nodes", tenantId, entityId, nodeActors.size()); | |
128 | 143 | nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop); |
129 | 144 | nodeActors.clear(); |
130 | 145 | nodeRoutes.clear(); |
... | ... | @@ -133,7 +148,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
133 | 148 | } |
134 | 149 | |
135 | 150 | @Override |
136 | - public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { | |
151 | + public void onClusterEventMsg(ClusterEventMsg msg) { | |
137 | 152 | |
138 | 153 | } |
139 | 154 | |
... | ... | @@ -150,10 +165,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
150 | 165 | // Populating the routes map; |
151 | 166 | for (RuleNode ruleNode : ruleNodeList) { |
152 | 167 | List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId()); |
168 | + log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size()); | |
153 | 169 | if (relations.size() == 0) { |
154 | 170 | nodeRoutes.put(ruleNode.getId(), Collections.emptyList()); |
155 | 171 | } else { |
156 | 172 | for (EntityRelation relation : relations) { |
173 | + log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo()); | |
157 | 174 | if (relation.getTo().getEntityType() == EntityType.RULE_NODE) { |
158 | 175 | RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId())); |
159 | 176 | if (ruleNodeCtx == null) { |
... | ... | @@ -232,17 +249,20 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
232 | 249 | int relationsCount = relations.size(); |
233 | 250 | EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId(); |
234 | 251 | if (relationsCount == 0) { |
252 | + log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); | |
235 | 253 | if (ackId != null) { |
236 | 254 | // TODO: Ack this message in Kafka |
237 | 255 | // queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); |
238 | 256 | } |
239 | 257 | } else if (relationsCount == 1) { |
240 | 258 | for (RuleNodeRelation relation : relations) { |
259 | + log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); | |
241 | 260 | pushToTarget(msg, relation.getOut(), relation.getType()); |
242 | 261 | } |
243 | 262 | } else { |
244 | 263 | for (RuleNodeRelation relation : relations) { |
245 | 264 | EntityId target = relation.getOut(); |
265 | + log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); | |
246 | 266 | switch (target.getEntityType()) { |
247 | 267 | case RULE_NODE: |
248 | 268 | enqueueAndForwardMsgCopyToNode(msg, target, relation.getType()); | ... | ... |
... | ... | @@ -15,7 +15,6 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | -import lombok.extern.slf4j.Slf4j; | |
19 | 18 | import org.thingsboard.server.actors.ActorSystemContext; |
20 | 19 | import org.thingsboard.server.actors.service.ComponentActor; |
21 | 20 | import org.thingsboard.server.actors.service.ContextBasedCreator; |
... | ... | @@ -25,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId; |
25 | 24 | import org.thingsboard.server.common.msg.TbActorMsg; |
26 | 25 | import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
27 | 26 | |
28 | -@Slf4j | |
29 | 27 | public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> { |
30 | 28 | |
31 | 29 | private final RuleChainId ruleChainId; |
... | ... | @@ -62,7 +60,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa |
62 | 60 | } |
63 | 61 | |
64 | 62 | private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) { |
65 | - log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg()); | |
63 | + if (log.isDebugEnabled()) { | |
64 | + log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg()); | |
65 | + } | |
66 | 66 | try { |
67 | 67 | processor.onRuleToSelfMsg(msg); |
68 | 68 | increaseMessagesProcessedCount(); |
... | ... | @@ -72,7 +72,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa |
72 | 72 | } |
73 | 73 | |
74 | 74 | private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) { |
75 | - log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg()); | |
75 | + if (log.isDebugEnabled()) { | |
76 | + log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg()); | |
77 | + } | |
76 | 78 | try { |
77 | 79 | processor.onRuleChainToRuleNodeMsg(msg); |
78 | 80 | increaseMessagesProcessedCount(); | ... | ... |
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
... | ... | @@ -75,7 +75,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
75 | 75 | } |
76 | 76 | |
77 | 77 | @Override |
78 | - public void stop(ActorContext context) throws Exception { | |
78 | + public void stop(ActorContext context) { | |
79 | 79 | if (tbNode != null) { |
80 | 80 | tbNode.destroy(); |
81 | 81 | } |
... | ... | @@ -83,7 +83,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
83 | 83 | } |
84 | 84 | |
85 | 85 | @Override |
86 | - public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { | |
86 | + public void onClusterEventMsg(ClusterEventMsg msg) { | |
87 | 87 | |
88 | 88 | } |
89 | 89 | |
... | ... | @@ -111,6 +111,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
111 | 111 | } |
112 | 112 | } |
113 | 113 | |
114 | + @Override | |
115 | + public String getComponentName() { | |
116 | + return ruleNode.getName(); | |
117 | + } | |
118 | + | |
114 | 119 | private TbNode initComponent(RuleNode ruleNode) throws Exception { |
115 | 120 | Class<?> componentClazz = Class.forName(ruleNode.getType()); |
116 | 121 | TbNode tbNode = (TbNode) (componentClazz.newInstance()); | ... | ... |
... | ... | @@ -31,7 +31,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
31 | 31 | /** |
32 | 32 | * @author Andrew Shvayka |
33 | 33 | */ |
34 | -@Slf4j | |
35 | 34 | public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor { |
36 | 35 | |
37 | 36 | private long lastPersistedErrorTs = 0L; |
... | ... | @@ -54,6 +53,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
54 | 53 | @Override |
55 | 54 | public void preStart() { |
56 | 55 | try { |
56 | + log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType()); | |
57 | 57 | processor.start(context()); |
58 | 58 | logLifecycleEvent(ComponentLifecycleEvent.STARTED); |
59 | 59 | if (systemContext.isStatisticsEnabled()) { |
... | ... | @@ -78,6 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
78 | 78 | @Override |
79 | 79 | public void postStop() { |
80 | 80 | try { |
81 | + log.debug("[{}][{}] Stopping processor.", tenantId, id, id.getEntityType()); | |
81 | 82 | processor.stop(context()); |
82 | 83 | logLifecycleEvent(ComponentLifecycleEvent.STOPPED); |
83 | 84 | } catch (Exception e) { |
... | ... | @@ -88,6 +89,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
88 | 89 | } |
89 | 90 | |
90 | 91 | protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { |
92 | + log.debug("[{}][{}][{}] onComponentLifecycleMsg: [{}]", tenantId, id, id.getEntityType(), msg.getEvent()); | |
91 | 93 | try { |
92 | 94 | switch (msg.getEvent()) { |
93 | 95 | case CREATED: |
... | ... | @@ -148,9 +150,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
148 | 150 | private void logAndPersist(String method, Exception e, boolean critical) { |
149 | 151 | errorsOccurred++; |
150 | 152 | if (critical) { |
151 | - log.warn("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e); | |
153 | + log.warn("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e); | |
152 | 154 | } else { |
153 | - log.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e); | |
155 | + log.debug("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e); | |
154 | 156 | } |
155 | 157 | long ts = System.currentTimeMillis(); |
156 | 158 | if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) { | ... | ... |
... | ... | @@ -17,15 +17,16 @@ package org.thingsboard.server.actors.service; |
17 | 17 | |
18 | 18 | import akka.actor.Terminated; |
19 | 19 | import akka.actor.UntypedActor; |
20 | -import akka.event.Logging; | |
21 | -import akka.event.LoggingAdapter; | |
22 | -import lombok.extern.slf4j.Slf4j; | |
20 | +import org.slf4j.Logger; | |
21 | +import org.slf4j.LoggerFactory; | |
23 | 22 | import org.thingsboard.server.actors.ActorSystemContext; |
24 | 23 | import org.thingsboard.server.common.msg.TbActorMsg; |
25 | 24 | |
26 | -@Slf4j | |
25 | + | |
27 | 26 | public abstract class ContextAwareActor extends UntypedActor { |
28 | 27 | |
28 | + protected final Logger log = LoggerFactory.getLogger(getClass()); | |
29 | + | |
29 | 30 | public static final int ENTITY_PACK_LIMIT = 1024; |
30 | 31 | |
31 | 32 | protected final ActorSystemContext systemContext; | ... | ... |
... | ... | @@ -44,6 +44,8 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract |
44 | 44 | this.entityId = id; |
45 | 45 | } |
46 | 46 | |
47 | + public abstract String getComponentName(); | |
48 | + | |
47 | 49 | public abstract void start(ActorContext context) throws Exception; |
48 | 50 | |
49 | 51 | public abstract void stop(ActorContext context) throws Exception; | ... | ... |
... | ... | @@ -71,7 +71,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
71 | 71 | private int maxErrors; |
72 | 72 | |
73 | 73 | private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate; |
74 | - protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>(); | |
74 | + private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>(); | |
75 | 75 | |
76 | 76 | @PostConstruct |
77 | 77 | public void init() { |
... | ... | @@ -100,7 +100,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
100 | 100 | responseBuilder.settings(kafkaSettings); |
101 | 101 | responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId()); |
102 | 102 | responseBuilder.clientId("js-" + nodeIdProvider.getNodeId()); |
103 | - responseBuilder.groupId("rule-engine-node"); | |
103 | + responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); | |
104 | 104 | responseBuilder.autoCommit(true); |
105 | 105 | responseBuilder.autoCommitIntervalMs(autoCommitInterval); |
106 | 106 | responseBuilder.decoder(new RemoteJsResponseDecoder()); | ... | ... |
... | ... | @@ -149,6 +149,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ |
149 | 149 | records.forEach(record -> { |
150 | 150 | try { |
151 | 151 | ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record); |
152 | + log.trace("Forwarding message to rule engine {}", toRuleEngineMsg); | |
152 | 153 | if (toRuleEngineMsg.hasToDeviceActorMsg()) { |
153 | 154 | forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg()); |
154 | 155 | } |
... | ... | @@ -175,18 +176,21 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ |
175 | 176 | |
176 | 177 | @Override |
177 | 178 | public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) { |
178 | - notificationsProducer.send(notificationsTopic + "." + nodeId, | |
179 | - new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(), | |
180 | - ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build() | |
181 | - , new QueueCallbackAdaptor(onSuccess, onFailure)); | |
179 | + String topic = notificationsTopic + "." + nodeId; | |
180 | + UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); | |
181 | + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build(); | |
182 | + log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg); | |
183 | + notificationsProducer.send(topic, sessionId.toString(), transportMsg, new QueueCallbackAdaptor(onSuccess, onFailure)); | |
182 | 184 | } |
183 | 185 | |
184 | 186 | private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) { |
185 | 187 | TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg); |
186 | 188 | Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId()); |
187 | 189 | if (address.isPresent()) { |
190 | + log.trace("[{}] Pushing message to remote server: {}", address.get(), toDeviceActorMsg); | |
188 | 191 | rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper)); |
189 | 192 | } else { |
193 | + log.trace("Pushing message to local server: {}", toDeviceActorMsg); | |
190 | 194 | actorContext.getAppActor().tell(wrapper, ActorRef.noSender()); |
191 | 195 | } |
192 | 196 | } | ... | ... |
... | ... | @@ -77,9 +77,10 @@ public class TBKafkaProducerTemplate<T> { |
77 | 77 | result.all().get(); |
78 | 78 | } catch (Exception e) { |
79 | 79 | if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) { |
80 | - log.trace("[{}] Topic already exists: ", defaultTopic); | |
80 | + log.trace("[{}] Topic already exists.", defaultTopic); | |
81 | 81 | } else { |
82 | - log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e); | |
82 | + log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e); | |
83 | + throw new RuntimeException(e); | |
83 | 84 | } |
84 | 85 | } |
85 | 86 | //Maybe this should not be cached, but we don't plan to change size of partitions | ... | ... |
... | ... | @@ -23,6 +23,9 @@ import lombok.extern.slf4j.Slf4j; |
23 | 23 | import org.apache.kafka.clients.admin.CreateTopicsResult; |
24 | 24 | import org.apache.kafka.clients.admin.NewTopic; |
25 | 25 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
26 | +import org.apache.kafka.clients.producer.Callback; | |
27 | +import org.apache.kafka.clients.producer.RecordMetadata; | |
28 | +import org.apache.kafka.common.errors.TopicExistsException; | |
26 | 29 | import org.apache.kafka.common.header.Header; |
27 | 30 | import org.apache.kafka.common.header.internals.RecordHeader; |
28 | 31 | |
... | ... | @@ -83,7 +86,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe |
83 | 86 | CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1)); |
84 | 87 | result.all().get(); |
85 | 88 | } catch (Exception e) { |
86 | - log.trace("Failed to create topic: {}", e.getMessage(), e); | |
89 | + if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) { | |
90 | + log.trace("[{}] Topic already exists. ", responseTemplate.getTopic()); | |
91 | + } else { | |
92 | + log.info("[{}] Failed to create topic: {}", responseTemplate.getTopic(), e.getMessage(), e); | |
93 | + throw new RuntimeException(e); | |
94 | + } | |
95 | + | |
87 | 96 | } |
88 | 97 | this.requestTemplate.init(); |
89 | 98 | tickTs = System.currentTimeMillis(); |
... | ... | @@ -96,6 +105,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe |
96 | 105 | log.trace("Polling responses completed, consumer records count [{}]", responses.count()); |
97 | 106 | } |
98 | 107 | responses.forEach(response -> { |
108 | + log.trace("Received response to Kafka Template request: {}", response); | |
99 | 109 | Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); |
100 | 110 | Response decodedResponse = null; |
101 | 111 | UUID requestId = null; |
... | ... | @@ -167,7 +177,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe |
167 | 177 | pendingRequests.putIfAbsent(requestId, responseMetaData); |
168 | 178 | request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); |
169 | 179 | log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); |
170 | - requestTemplate.send(key, request, headers, null); | |
180 | + requestTemplate.send(key, request, headers, (metadata, exception) -> { | |
181 | + if (exception != null) { | |
182 | + log.trace("[{}] Failed to post the request", requestId, exception); | |
183 | + } else { | |
184 | + log.trace("[{}] Posted the request", requestId, metadata); | |
185 | + } | |
186 | + }); | |
171 | 187 | return future; |
172 | 188 | } |
173 | 189 | ... | ... |
... | ... | @@ -20,14 +20,29 @@ import lombok.Data; |
20 | 20 | import lombok.NoArgsConstructor; |
21 | 21 | import org.thingsboard.server.common.data.EntityType; |
22 | 22 | |
23 | +import javax.persistence.Column; | |
24 | +import javax.persistence.Embeddable; | |
25 | +import javax.persistence.EnumType; | |
26 | +import javax.persistence.Enumerated; | |
23 | 27 | import java.io.Serializable; |
24 | 28 | |
29 | +import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_KEY_COLUMN; | |
30 | +import static org.thingsboard.server.dao.model.ModelConstants.ATTRIBUTE_TYPE_COLUMN; | |
31 | +import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; | |
32 | +import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN; | |
33 | + | |
25 | 34 | @Data |
26 | 35 | @AllArgsConstructor |
27 | 36 | @NoArgsConstructor |
37 | +@Embeddable | |
28 | 38 | public class AttributeKvCompositeKey implements Serializable { |
39 | + @Enumerated(EnumType.STRING) | |
40 | + @Column(name = ENTITY_TYPE_COLUMN) | |
29 | 41 | private EntityType entityType; |
42 | + @Column(name = ENTITY_ID_COLUMN) | |
30 | 43 | private String entityId; |
44 | + @Column(name = ATTRIBUTE_TYPE_COLUMN) | |
31 | 45 | private String attributeType; |
46 | + @Column(name = ATTRIBUTE_KEY_COLUMN) | |
32 | 47 | private String attributeKey; |
33 | 48 | } | ... | ... |
... | ... | @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; |
27 | 27 | import org.thingsboard.server.dao.model.ToData; |
28 | 28 | |
29 | 29 | import javax.persistence.Column; |
30 | +import javax.persistence.EmbeddedId; | |
30 | 31 | import javax.persistence.Entity; |
31 | 32 | import javax.persistence.EnumType; |
32 | 33 | import javax.persistence.Enumerated; |
... | ... | @@ -48,25 +49,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUM |
48 | 49 | @Data |
49 | 50 | @Entity |
50 | 51 | @Table(name = "attribute_kv") |
51 | -@IdClass(AttributeKvCompositeKey.class) | |
52 | 52 | public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable { |
53 | 53 | |
54 | - @Id | |
55 | - @Enumerated(EnumType.STRING) | |
56 | - @Column(name = ENTITY_TYPE_COLUMN) | |
57 | - private EntityType entityType; | |
58 | - | |
59 | - @Id | |
60 | - @Column(name = ENTITY_ID_COLUMN) | |
61 | - private String entityId; | |
62 | - | |
63 | - @Id | |
64 | - @Column(name = ATTRIBUTE_TYPE_COLUMN) | |
65 | - private String attributeType; | |
66 | - | |
67 | - @Id | |
68 | - @Column(name = ATTRIBUTE_KEY_COLUMN) | |
69 | - private String attributeKey; | |
54 | + @EmbeddedId | |
55 | + private AttributeKvCompositeKey id; | |
70 | 56 | |
71 | 57 | @Column(name = BOOLEAN_VALUE_COLUMN) |
72 | 58 | private Boolean booleanValue; |
... | ... | @@ -87,13 +73,13 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable |
87 | 73 | public AttributeKvEntry toData() { |
88 | 74 | KvEntry kvEntry = null; |
89 | 75 | if (strValue != null) { |
90 | - kvEntry = new StringDataEntry(attributeKey, strValue); | |
76 | + kvEntry = new StringDataEntry(id.getAttributeKey(), strValue); | |
91 | 77 | } else if (booleanValue != null) { |
92 | - kvEntry = new BooleanDataEntry(attributeKey, booleanValue); | |
78 | + kvEntry = new BooleanDataEntry(id.getAttributeKey(), booleanValue); | |
93 | 79 | } else if (doubleValue != null) { |
94 | - kvEntry = new DoubleDataEntry(attributeKey, doubleValue); | |
80 | + kvEntry = new DoubleDataEntry(id.getAttributeKey(), doubleValue); | |
95 | 81 | } else if (longValue != null) { |
96 | - kvEntry = new LongDataEntry(attributeKey, longValue); | |
82 | + kvEntry = new LongDataEntry(id.getAttributeKey(), longValue); | |
97 | 83 | } |
98 | 84 | return new BaseAttributeKvEntry(kvEntry, lastUpdateTs); |
99 | 85 | } | ... | ... |
... | ... | @@ -15,7 +15,9 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.sql.attributes; |
17 | 17 | |
18 | +import org.springframework.data.jpa.repository.Query; | |
18 | 19 | import org.springframework.data.repository.CrudRepository; |
20 | +import org.springframework.data.repository.query.Param; | |
19 | 21 | import org.thingsboard.server.common.data.EntityType; |
20 | 22 | import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey; |
21 | 23 | import org.thingsboard.server.dao.model.sql.AttributeKvEntity; |
... | ... | @@ -26,8 +28,11 @@ import java.util.List; |
26 | 28 | @SqlDao |
27 | 29 | public interface AttributeKvRepository extends CrudRepository<AttributeKvEntity, AttributeKvCompositeKey> { |
28 | 30 | |
29 | - List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(EntityType entityType, | |
30 | - String entityId, | |
31 | - String attributeType); | |
31 | + @Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityType = :entityType " + | |
32 | + "AND a.id.entityId = :entityId " + | |
33 | + "AND a.id.attributeType = :attributeType") | |
34 | + List<AttributeKvEntity> findAllByEntityTypeAndEntityIdAndAttributeType(@Param("entityType") EntityType entityType, | |
35 | + @Param("entityId") String entityId, | |
36 | + @Param("attributeType") String attributeType); | |
32 | 37 | } |
33 | 38 | ... | ... |
... | ... | @@ -79,10 +79,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl |
79 | 79 | @Override |
80 | 80 | public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { |
81 | 81 | AttributeKvEntity entity = new AttributeKvEntity(); |
82 | - entity.setEntityType(entityId.getEntityType()); | |
83 | - entity.setEntityId(fromTimeUUID(entityId.getId())); | |
84 | - entity.setAttributeType(attributeType); | |
85 | - entity.setAttributeKey(attribute.getKey()); | |
82 | + entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, attribute.getKey())); | |
86 | 83 | entity.setLastUpdateTs(attribute.getLastUpdateTs()); |
87 | 84 | entity.setStrValue(attribute.getStrValue().orElse(null)); |
88 | 85 | entity.setDoubleValue(attribute.getDoubleValue().orElse(null)); |
... | ... | @@ -100,10 +97,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl |
100 | 97 | .stream() |
101 | 98 | .map(key -> { |
102 | 99 | AttributeKvEntity entityToDelete = new AttributeKvEntity(); |
103 | - entityToDelete.setEntityType(entityId.getEntityType()); | |
104 | - entityToDelete.setEntityId(fromTimeUUID(entityId.getId())); | |
105 | - entityToDelete.setAttributeType(attributeType); | |
106 | - entityToDelete.setAttributeKey(key); | |
100 | + entityToDelete.setId(new AttributeKvCompositeKey(entityId.getEntityType(), fromTimeUUID(entityId.getId()), attributeType, key)); | |
107 | 101 | return entityToDelete; |
108 | 102 | }).collect(Collectors.toList()); |
109 | 103 | ... | ... |
... | ... | @@ -18,4 +18,6 @@ As result, in REPOSITORY column, next images should be present: |
18 | 18 | |
19 | 19 | - Run the black box tests in the [msa/black-box-tests](../black-box-tests) directory: |
20 | 20 | |
21 | - mvn clean install -DblackBoxTests.skip=false | |
\ No newline at end of file | ||
21 | + mvn clean install -DblackBoxTests.skip=false | |
22 | + | |
23 | + | ... | ... |
... | ... | @@ -33,6 +33,9 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; |
33 | 33 | import org.apache.http.ssl.SSLContextBuilder; |
34 | 34 | import org.apache.http.ssl.SSLContexts; |
35 | 35 | import org.junit.*; |
36 | +import org.junit.rules.TestRule; | |
37 | +import org.junit.rules.TestWatcher; | |
38 | +import org.junit.runner.Description; | |
36 | 39 | import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; |
37 | 40 | import org.thingsboard.client.tools.RestClient; |
38 | 41 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -60,6 +63,33 @@ public abstract class AbstractContainerTest { |
60 | 63 | restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); |
61 | 64 | } |
62 | 65 | |
66 | + @Rule | |
67 | + public TestRule watcher = new TestWatcher() { | |
68 | + protected void starting(Description description) { | |
69 | + log.info("================================================="); | |
70 | + log.info("STARTING TEST: {}" , description.getMethodName()); | |
71 | + log.info("================================================="); | |
72 | + } | |
73 | + | |
74 | + /** | |
75 | + * Invoked when a test succeeds | |
76 | + */ | |
77 | + protected void succeeded(Description description) { | |
78 | + log.info("================================================="); | |
79 | + log.info("SUCCEEDED TEST: {}" , description.getMethodName()); | |
80 | + log.info("================================================="); | |
81 | + } | |
82 | + | |
83 | + /** | |
84 | + * Invoked when a test fails | |
85 | + */ | |
86 | + protected void failed(Throwable e, Description description) { | |
87 | + log.info("================================================="); | |
88 | + log.info("FAILED TEST: {}" , description.getMethodName(), e); | |
89 | + log.info("================================================="); | |
90 | + } | |
91 | + }; | |
92 | + | |
63 | 93 | protected Device createDevice(String name) { |
64 | 94 | return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT"); |
65 | 95 | } |
... | ... | @@ -82,6 +112,7 @@ public abstract class AbstractContainerTest { |
82 | 112 | JsonObject wsRequest = new JsonObject(); |
83 | 113 | wsRequest.add(property.toString(), cmd); |
84 | 114 | wsClient.send(wsRequest.toString()); |
115 | + wsClient.waitForFirstReply(); | |
85 | 116 | return wsClient; |
86 | 117 | } |
87 | 118 | ... | ... |
... | ... | @@ -31,9 +31,11 @@ public class WsClient extends WebSocketClient { |
31 | 31 | private static final ObjectMapper mapper = new ObjectMapper(); |
32 | 32 | private WsTelemetryResponse message; |
33 | 33 | |
34 | - private CountDownLatch latch = new CountDownLatch(1);; | |
34 | + private volatile boolean firstReplyReceived; | |
35 | + private CountDownLatch firstReply = new CountDownLatch(1); | |
36 | + private CountDownLatch latch = new CountDownLatch(1); | |
35 | 37 | |
36 | - public WsClient(URI serverUri) { | |
38 | + WsClient(URI serverUri) { | |
37 | 39 | super(serverUri); |
38 | 40 | } |
39 | 41 | |
... | ... | @@ -43,14 +45,19 @@ public class WsClient extends WebSocketClient { |
43 | 45 | |
44 | 46 | @Override |
45 | 47 | public void onMessage(String message) { |
46 | - try { | |
47 | - WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class); | |
48 | - if (!response.getData().isEmpty()) { | |
49 | - this.message = response; | |
50 | - latch.countDown(); | |
48 | + if (!firstReplyReceived) { | |
49 | + firstReplyReceived = true; | |
50 | + firstReply.countDown(); | |
51 | + } else { | |
52 | + try { | |
53 | + WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class); | |
54 | + if (!response.getData().isEmpty()) { | |
55 | + this.message = response; | |
56 | + latch.countDown(); | |
57 | + } | |
58 | + } catch (IOException e) { | |
59 | + log.error("ws message can't be read"); | |
51 | 60 | } |
52 | - } catch (IOException e) { | |
53 | - log.error("ws message can't be read"); | |
54 | 61 | } |
55 | 62 | } |
56 | 63 | |
... | ... | @@ -73,4 +80,13 @@ public class WsClient extends WebSocketClient { |
73 | 80 | } |
74 | 81 | return null; |
75 | 82 | } |
83 | + | |
84 | + void waitForFirstReply() { | |
85 | + try { | |
86 | + firstReply.await(10, TimeUnit.SECONDS); | |
87 | + } catch (InterruptedException e) { | |
88 | + log.error("Timeout, ws message wasn't received"); | |
89 | + throw new RuntimeException(e); | |
90 | + } | |
91 | + } | |
76 | 92 | } | ... | ... |
... | ... | @@ -28,6 +28,9 @@ import lombok.Data; |
28 | 28 | import lombok.extern.slf4j.Slf4j; |
29 | 29 | import org.apache.commons.lang3.RandomStringUtils; |
30 | 30 | import org.junit.*; |
31 | +import org.junit.rules.TestRule; | |
32 | +import org.junit.rules.TestWatcher; | |
33 | +import org.junit.runner.Description; | |
31 | 34 | import org.springframework.core.ParameterizedTypeReference; |
32 | 35 | import org.springframework.http.HttpMethod; |
33 | 36 | import org.springframework.http.ResponseEntity; |
... | ... | @@ -65,6 +68,7 @@ public class MqttClientTest extends AbstractContainerTest { |
65 | 68 | MqttClient mqttClient = getMqttClient(deviceCredentials, null); |
66 | 69 | mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())); |
67 | 70 | WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); |
71 | + log.info("Received telemetry: {}", actualLatestTelemetry); | |
68 | 72 | wsClient.closeBlocking(); |
69 | 73 | |
70 | 74 | Assert.assertEquals(4, actualLatestTelemetry.getData().size()); |
... | ... | @@ -91,6 +95,7 @@ public class MqttClientTest extends AbstractContainerTest { |
91 | 95 | MqttClient mqttClient = getMqttClient(deviceCredentials, null); |
92 | 96 | mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())); |
93 | 97 | WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); |
98 | + log.info("Received telemetry: {}", actualLatestTelemetry); | |
94 | 99 | wsClient.closeBlocking(); |
95 | 100 | |
96 | 101 | Assert.assertEquals(4, actualLatestTelemetry.getData().size()); |
... | ... | @@ -120,6 +125,7 @@ public class MqttClientTest extends AbstractContainerTest { |
120 | 125 | clientAttributes.addProperty("attr4", 73); |
121 | 126 | mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())); |
122 | 127 | WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); |
128 | + log.info("Received telemetry: {}", actualLatestTelemetry); | |
123 | 129 | wsClient.closeBlocking(); |
124 | 130 | |
125 | 131 | Assert.assertEquals(4, actualLatestTelemetry.getData().size()); |
... | ... | @@ -168,6 +174,7 @@ public class MqttClientTest extends AbstractContainerTest { |
168 | 174 | mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())); |
169 | 175 | MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); |
170 | 176 | AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); |
177 | + log.info("Received telemetry: {}", attributes); | |
171 | 178 | |
172 | 179 | Assert.assertEquals(1, attributes.getClient().size()); |
173 | 180 | Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr")); |
... | ... | @@ -281,6 +288,7 @@ public class MqttClientTest extends AbstractContainerTest { |
281 | 288 | // Create a new root rule chain |
282 | 289 | RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); |
283 | 290 | |
291 | + TimeUnit.SECONDS.sleep(3); | |
284 | 292 | // Send the request to the server |
285 | 293 | JsonObject clientRequest = new JsonObject(); |
286 | 294 | clientRequest.addProperty("method", "getResponse"); |
... | ... | @@ -360,12 +368,12 @@ public class MqttClientTest extends AbstractContainerTest { |
360 | 368 | return defaultRuleChain.get().getId(); |
361 | 369 | } |
362 | 370 | |
363 | - private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException { | |
371 | + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { | |
364 | 372 | MqttClientConfig clientConfig = new MqttClientConfig(); |
365 | 373 | clientConfig.setClientId("MQTT client from test"); |
366 | 374 | clientConfig.setUsername(deviceCredentials.getCredentialsId()); |
367 | 375 | MqttClient mqttClient = MqttClient.create(clientConfig, listener); |
368 | - mqttClient.connect("localhost", 1883).sync(); | |
376 | + mqttClient.connect("localhost", 1883).get(); | |
369 | 377 | return mqttClient; |
370 | 378 | } |
371 | 379 | ... | ... |