Commit e76d082b0e9a7e6efc7b52ce42b02054e8fdb09d

Authored by 黄 x
2 parents f7d4708f 17653700

Merge remote-tracking branch 'origin/master'

Showing 73 changed files with 224 additions and 42 deletions
... ... @@ -66,6 +66,7 @@ public class AppActor extends ContextAwareActor {
66 66
67 67 @Override
68 68 protected boolean doProcess(TbActorMsg msg) {
  69 + log.info("演员【AppActor】消息消费: {}", msg);
69 70 if (!ruleChainsInitialized) {
70 71 initTenantActors();
71 72 ruleChainsInitialized = true;
... ... @@ -145,6 +146,7 @@ public class AppActor extends ContextAwareActor {
145 146 msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
146 147 } else {
147 148 if (!deletedTenants.contains(msg.getTenantId())) {
  149 + log.info("演员消息生产——普通消息内容:【{}】",msg);
148 150 getOrCreateTenantActor(msg.getTenantId()).tell(msg);
149 151 } else {
150 152 msg.getMsg().getCallback().onSuccess();
... ... @@ -173,6 +175,7 @@ public class AppActor extends ContextAwareActor {
173 175 }
174 176 }
175 177 if (target != null) {
  178 + log.info("演员消息生产——高级消息内容:【{}】", msg);
176 179 target.tellWithHighPriority(msg);
177 180 } else {
178 181 log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
... ... @@ -183,8 +186,10 @@ public class AppActor extends ContextAwareActor {
183 186 if (!deletedTenants.contains(msg.getTenantId())) {
184 187 TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
185 188 if (priority) {
  189 + log.info("演员消息生产——高级消息内容:【{}】", msg);
186 190 tenantActor.tellWithHighPriority(msg);
187 191 } else {
  192 + log.info("演员消息生产——普通消息内容:【{}】",msg);
188 193 tenantActor.tell(msg);
189 194 }
190 195 } else {
... ... @@ -208,6 +213,7 @@ public class AppActor extends ContextAwareActor {
208 213 target = getOrCreateTenantActor(msg.getTenantId());
209 214 }
210 215 if (target != null) {
  216 + log.info("演员消息生产——高级消息内容:【{}】", msg);
211 217 target.tellWithHighPriority(msg);
212 218 } else {
213 219 log.debug("[{}] Invalid edge event update msg: {}", msg.getTenantId(), msg);
... ...
... ... @@ -57,6 +57,7 @@ public class DeviceActor extends ContextAwareActor {
57 57
58 58 @Override
59 59 protected boolean doProcess(TbActorMsg msg) {
  60 + log.info("演员【DeviceActor】消息消费: {}", msg);
60 61 switch (msg.getMsgType()) {
61 62 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
62 63 processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
33 33 import org.thingsboard.server.actors.ActorSystemContext;
34 34 import org.thingsboard.server.actors.TbActorCtx;
35 35 import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
  36 +import org.thingsboard.server.actors.stats.StatsPersistTick;
36 37 import org.thingsboard.server.common.data.DataConstants;
37 38 import org.thingsboard.server.common.data.Device;
38 39 import org.thingsboard.server.common.data.StringUtils;
... ... @@ -301,6 +302,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
301 302 private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
302 303 toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
303 304 DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
  305 + log.info("演员消息生产——高级消息内容:【{}】", timeoutMsg);
304 306 scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
305 307 }
306 308
... ... @@ -931,6 +933,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
931 933 }
932 934
933 935 void init(TbActorCtx ctx) {
  936 + log.info("演员消息生产——高级消息内容:【{}】", SessionTimeoutCheckMsg.instance());
934 937 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
935 938 PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
936 939 PageData<Rpc> pageData;
... ...
... ... @@ -110,6 +110,7 @@ class DefaultTbContext implements TbContext {
110 110 tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
111 111 }
112 112
  113 +
113 114 @Override
114 115 public void tellNext(TbMsg msg, String relationType) {
115 116 tellNext(msg, Collections.singleton(relationType), null);
... ... @@ -253,6 +254,7 @@ class DefaultTbContext implements TbContext {
253 254
254 255 @Override
255 256 public void tellFailure(TbMsg msg, Throwable th) {
  257 + log.info("演员消息生产——普通消息内容:tellFailure【{}】",msg);
256 258 if (nodeCtx.getSelf().isDebugMode()) {
257 259 mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
258 260 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors.ruleChain;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 19 import org.thingsboard.server.actors.ActorSystemContext;
19 20 import org.thingsboard.server.actors.TbActor;
20 21 import org.thingsboard.server.actors.TbActorCtx;
... ... @@ -29,7 +30,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
29 30 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
30 31 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
31 32 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
32   -
  33 +@Slf4j
33 34 public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
34 35
35 36 private final RuleChain ruleChain;
... ... @@ -47,6 +48,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
47 48
48 49 @Override
49 50 protected boolean doProcess(TbActorMsg msg) {
  51 + log.info("演员【RuleChainActor】消息消费: {}", msg);
50 52 switch (msg.getMsgType()) {
51 53 case COMPONENT_LIFE_CYCLE_MSG:
52 54 onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
... ...
... ... @@ -133,6 +133,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
133 133 } else {
134 134 log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
135 135 existing.setSelf(ruleNode);
  136 + log.info("演员消息生产——高级消息内容:RuleNodeUpdatedMsg【{}】", new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
136 137 existing.getSelfActor().tellWithHighPriority(new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
137 138 }
138 139 }
... ... @@ -142,6 +143,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
142 143 removedRules.forEach(ruleNodeId -> {
143 144 log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
144 145 RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
  146 + log.info("演员消息生产——高级消息内容:ComponentLifecycleMsg【{}】", new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
145 147 removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
146 148 });
147 149
... ... @@ -160,7 +162,10 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
160 162
161 163 @Override
162 164 public void onPartitionChangeMsg(PartitionChangeMsg msg) {
163   - nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg));
  165 + nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> {
  166 + log.info("演员消息生产——高级消息内容:【{}】", msg);
  167 + actorRef.tellWithHighPriority(msg);
  168 + });
164 169 }
165 170
166 171 private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
... ... @@ -313,6 +318,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
313 318 pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
314 319 break;
315 320 case RULE_CHAIN:
  321 + log.info("演员消息生产——普通消息内容:RuleChainToRuleChainMsg【{}】",msg);
316 322 parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
317 323 break;
318 324 }
... ... @@ -344,6 +350,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
344 350
345 351 private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
346 352 if (nodeCtx != null) {
  353 + log.info("演员消息生产——普通消息内容:RuleChainToRuleNodeMsg【{}】",msg);
347 354 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
348 355 } else {
349 356 log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
... ...
... ... @@ -51,6 +51,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
51 51
52 52 @Override
53 53 protected boolean doProcess(TbActorMsg msg) {
  54 + log.info("演员【RuleNodeActor】消息消费: {}", msg);
54 55 switch (msg.getMsgType()) {
55 56 case COMPONENT_LIFE_CYCLE_MSG:
56 57 case RULE_NODE_UPDATED_MSG:
... ...
... ... @@ -140,6 +140,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
140 140
141 141 protected void onStatsPersistTick(EntityId entityId) {
142 142 try {
  143 + log.info("演员消息生产——普通消息内容:StatsPersistMsg【{}】",new StatsPersistMsg(messagesProcessed, errorsOccurred, tenantId, entityId));
143 144 systemContext.getStatsActor().tell(new StatsPersistMsg(messagesProcessed, errorsOccurred, tenantId, entityId));
144 145 resetStatsCounters();
145 146 } catch (Exception e) {
... ...
... ... @@ -32,6 +32,7 @@ import org.thingsboard.server.actors.TbActorSystemSettings;
32 32 import org.thingsboard.server.actors.app.AppActor;
33 33 import org.thingsboard.server.actors.app.AppInitMsg;
34 34 import org.thingsboard.server.actors.stats.StatsActor;
  35 +import org.thingsboard.server.common.msg.plugin.RuleNodeUpdatedMsg;
35 36 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
36 37 import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
37 38 import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
... ... @@ -117,12 +118,14 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
117 118 @Order(value = 2)
118 119 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
119 120 log.info("Received application ready event. Sending application init message to actor system");
  121 + log.info("演员消息生产——高级消息内容:AppInitMsg【{}】", new AppInitMsg());
120 122 appActor.tellWithHighPriority(new AppInitMsg());
121 123 }
122 124
123 125 @Override
124 126 protected void onTbApplicationEvent(PartitionChangeEvent event) {
125 127 log.info("Received partition change event.");
  128 + log.info("演员消息生产——高级消息内容:PartitionChangeMsg【{}】", new PartitionChangeMsg(event.getServiceQueueKey(), event.getPartitions()));
126 129 this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceQueueKey(), event.getPartitions()));
127 130 }
128 131
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.shared;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.actors.ActorSystemContext;
20 20 import org.thingsboard.server.actors.TbActorCtx;
  21 +import org.thingsboard.server.actors.device.SessionTimeoutCheckMsg;
21 22 import org.thingsboard.server.actors.stats.StatsPersistTick;
22 23 import org.thingsboard.server.common.data.TenantProfile;
23 24 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -79,6 +80,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
79 80 }
80 81
81 82 public void scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) {
  83 + log.info("演员消息生产——高级消息内容:ComponentActor【StatsPersistTick】");
82 84 schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
83 85 }
84 86
... ...
... ... @@ -41,7 +41,7 @@ public class StatsActor extends ContextAwareActor {
41 41
42 42 @Override
43 43 protected boolean doProcess(TbActorMsg msg) {
44   - log.debug("Received message: {}", msg);
  44 + log.info("演员【StatsActor】消息消费: {}", msg);
45 45 if (msg.getMsgType().equals(MsgType.STATS_PERSIST_MSG)) {
46 46 onStatsPersistMsg((StatsPersistMsg) msg);
47 47 return true;
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.actors.TbActorNotRegisteredException;
25 25 import org.thingsboard.server.actors.TbActorRef;
26 26 import org.thingsboard.server.actors.TbEntityActorId;
27 27 import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
  28 +import org.thingsboard.server.actors.app.AppInitMsg;
28 29 import org.thingsboard.server.actors.device.DeviceActorCreator;
29 30 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
30 31 import org.thingsboard.server.actors.service.ContextBasedCreator;
... ... @@ -193,6 +194,7 @@ public class TenantActor extends RuleChainManagerActor {
193 194 if (apiUsageState.isReExecEnabled()) {
194 195 if (tbMsg.getRuleChainId() == null) {
195 196 if (getRootChainActor() != null) {
  197 + log.info("演员消息生产——普通消息内容:【{}】",msg);
196 198 getRootChainActor().tell(msg);
197 199 } else {
198 200 tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
... ... @@ -200,6 +202,7 @@ public class TenantActor extends RuleChainManagerActor {
200 202 }
201 203 } else {
202 204 try {
  205 + log.info("演员消息生产——普通消息内容:TbEntityActorId【{}】",msg);
203 206 ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
204 207 } catch (TbActorNotRegisteredException ex) {
205 208 log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
... ... @@ -215,6 +218,7 @@ public class TenantActor extends RuleChainManagerActor {
215 218
216 219 private void onRuleChainMsg(RuleChainAwareMsg msg) {
217 220 if (apiUsageState.isReExecEnabled()) {
  221 + log.info("演员消息生产——普通消息内容:【{}】",msg);
218 222 getOrCreateActor(msg.getRuleChainId()).tell(msg);
219 223 }
220 224 }
... ... @@ -225,8 +229,10 @@ public class TenantActor extends RuleChainManagerActor {
225 229 }
226 230 TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
227 231 if (priority) {
  232 + log.info("演员消息生产——高级消息内容:【{}】", msg);
228 233 deviceActor.tellWithHighPriority(msg);
229 234 } else {
  235 + log.info("演员消息生产——普通消息内容:【{}】",msg);
230 236 deviceActor.tell(msg);
231 237 }
232 238 }
... ... @@ -263,6 +269,7 @@ public class TenantActor extends RuleChainManagerActor {
263 269 visit(ruleChain, target);
264 270 }
265 271 }
  272 + log.info("演员消息生产——高级消息内容:【{}】",msg);
266 273 target.tellWithHighPriority(msg);
267 274 } else {
268 275 log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
... ...
... ... @@ -245,7 +245,9 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
245 245 TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
246 246 .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
247 247 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
248   - ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
  248 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(tbMsg.getId(), msg);
  249 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  250 + ruleEngineMsgProducer.send(tpi,queueMsg , callback);
249 251 }
250 252
251 253 private TbMsgMetaData createTbMsgMetaData(Device device) {
... ...
... ... @@ -251,7 +251,9 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
251 251 }
252 252
253 253 TopicPartitionInfo tpi = new TopicPartitionInfo(otaPackageStateMsgProducer.getDefaultTopic(), null, null, false);
254   - otaPackageStateMsgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
  254 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), msg);
  255 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  256 + otaPackageStateMsgProducer.send(tpi, queueMsg, null);
255 257
256 258 List<TsKvEntry> telemetry = new ArrayList<>();
257 259 telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(getTargetTelemetryKey(firmware.getType(), TITLE), firmware.getTitle())));
... ...
... ... @@ -95,13 +95,17 @@ public class DefaultTbClusterService implements TbClusterService {
95 95 @Override
96 96 public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) {
97 97 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
98   - producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
  98 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), msg);
  99 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  100 + producerProvider.getTbCoreMsgProducer().send(tpi, queueMsg, callback);
99 101 toCoreMsgs.incrementAndGet();
100 102 }
101 103
102 104 @Override
103 105 public void pushMsgToCore(TopicPartitionInfo tpi, UUID msgId, ToCoreMsg msg, TbQueueCallback callback) {
104   - producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
  106 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(msgId, msg);
  107 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  108 + producerProvider.getTbCoreMsgProducer().send(tpi,queueMsg , callback);
105 109 toCoreMsgs.incrementAndGet();
106 110 }
107 111
... ... @@ -111,6 +115,7 @@ public class DefaultTbClusterService implements TbClusterService {
111 115 log.trace("PUSHING msg: {} to:{}", msg, tpi);
112 116 byte[] msgBytes = encodingService.encode(msg);
113 117 ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build();
  118 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg));
114 119 producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback);
115 120 toCoreMsgs.incrementAndGet();
116 121 }
... ... @@ -125,14 +130,18 @@ public class DefaultTbClusterService implements TbClusterService {
125 130 .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1);
126 131 response.getResponse().ifPresent(builder::setResponse);
127 132 ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
128   - producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), callback);
  133 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(response.getId(), msg);
  134 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  135 + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, queueMsg, callback);
129 136 toCoreNfs.incrementAndGet();
130 137 }
131 138
132 139 @Override
133 140 public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
134 141 log.trace("PUSHING msg: {} to:{}", msg, tpi);
135   - producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
  142 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(msgId, msg);
  143 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  144 + producerProvider.getRuleEngineMsgProducer().send(tpi, queueMsg, callback);
136 145 toRuleEngineMsgs.incrementAndGet();
137 146 }
138 147
... ... @@ -158,7 +167,9 @@ public class DefaultTbClusterService implements TbClusterService {
158 167 .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
159 168 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
160 169 .setTbMsg(TbMsg.toByteString(tbMsg)).build();
161   - producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
  170 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(tbMsg.getId(), msg);
  171 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  172 + producerProvider.getRuleEngineMsgProducer().send(tpi, queueMsg, callback);
162 173 toRuleEngineMsgs.incrementAndGet();
163 174 }
164 175
... ... @@ -190,7 +201,9 @@ public class DefaultTbClusterService implements TbClusterService {
190 201 .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1);
191 202 response.getResponse().ifPresent(builder::setResponse);
192 203 ToRuleEngineNotificationMsg msg = ToRuleEngineNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
193   - producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), callback);
  204 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(response.getId(), msg);
  205 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  206 + producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, queueMsg, callback);
194 207 toRuleEngineNfs.incrementAndGet();
195 208 }
196 209
... ... @@ -205,7 +218,9 @@ public class DefaultTbClusterService implements TbClusterService {
205 218 }
206 219 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId);
207 220 log.trace("PUSHING msg: {} to:{}", response, tpi);
208   - producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback);
  221 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), response);
  222 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  223 + producerProvider.getTransportNotificationsMsgProducer().send(tpi, queueMsg, callback);
209 224 toTransportNfs.incrementAndGet();
210 225 }
211 226
... ... @@ -327,6 +342,7 @@ public class DefaultTbClusterService implements TbClusterService {
327 342 for (String serviceId : tbCoreServices) {
328 343 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
329 344 ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
  345 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg));
330 346 toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null);
331 347 toCoreNfs.incrementAndGet();
332 348 }
... ... @@ -348,6 +364,7 @@ public class DefaultTbClusterService implements TbClusterService {
348 364 for (String serviceId : tbCoreServices) {
349 365 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
350 366 ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
  367 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg));
351 368 toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null);
352 369 toCoreNfs.incrementAndGet();
353 370 }
... ... @@ -357,6 +374,7 @@ public class DefaultTbClusterService implements TbClusterService {
357 374 for (String serviceId : tbRuleEngineServices) {
358 375 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
359 376 ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
  377 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg));
360 378 toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null);
361 379 toRuleEngineNfs.incrementAndGet();
362 380 }
... ...
... ... @@ -199,6 +199,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
199 199 if (msgs.isEmpty()) {
200 200 continue;
201 201 }
  202 + log.info("队列消息消费 :launchMainConsumers【{}】 ", msgs);
202 203 List<IdMsgPair<ToCoreMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
203 204 ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = orderedMsgList.stream().collect(
204 205 Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
... ... @@ -235,6 +236,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
235 236 tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
236 237 } else {
237 238 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
  239 + log.info("演员消息生产——普通消息内容:【{}】",msg);
238 240 actorContext.tell(actorMsg.get());
239 241 }
240 242 }
... ... @@ -308,6 +310,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
308 310 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray());
309 311 if (actorMsg.isPresent()) {
310 312 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
  313 + log.info("演员消息生产——高级消息内容:【{}】", actorMsg.get());
311 314 actorContext.tellWithHighPriority(actorMsg.get());
312 315 }
313 316 callback.onSuccess();
... ... @@ -325,6 +328,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
325 328 if (msgs.isEmpty()) {
326 329 continue;
327 330 }
  331 + log.info("队列消息消费 :launchUsageStatsConsumer【{}】 ", msgs);
328 332 ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
329 333 Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
330 334 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
... ... @@ -369,6 +373,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
369 373 if (msgs.isEmpty()) {
370 374 continue;
371 375 }
  376 + log.info("队列消息消费:launchOtaPackageUpdateNotificationConsumer【{}】 ", msgs);
372 377 long timeToSleep = maxProcessingTimeoutPerRecord;
373 378 for (TbProtoQueueMsg<ToOtaPackageStateServiceMsg> msg : msgs) {
374 379 try {
... ... @@ -505,6 +510,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
505 510 if (statsEnabled) {
506 511 stats.log(toDeviceActorMsg);
507 512 }
  513 + log.info("演员消息生产——普通消息内容:TransportToDeviceActorMsgWrapper【{}】",new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
508 514 actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
509 515 }
510 516
... ...
... ... @@ -253,6 +253,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
253 253 if (msgs.isEmpty()) {
254 254 continue;
255 255 }
  256 + log.info("队列消息消费 {} ", msgs);
256 257 final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);
257 258 final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);
258 259 submitStrategy.init(msgs);
... ... @@ -399,6 +400,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
399 400 }
400 401 }
401 402 msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
  403 + log.info("演员消息生产——普通消息内容:【{}】",msg);
402 404 actorContext.tell(msg);
403 405 }
404 406
... ...
... ... @@ -115,6 +115,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
115 115 if (msgs.isEmpty()) {
116 116 continue;
117 117 }
  118 + log.info("队列消息消费 {} ", msgs);
118 119 ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = msgs.stream().collect(
119 120 Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
120 121 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
... ... @@ -184,6 +185,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
184 185 }
185 186 }
186 187 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
  188 + log.info("演员消息生产——高级消息内容:【{}】", actorMsg);
187 189 actorContext.tellWithHighPriority(actorMsg);
188 190 }
189 191 }
... ...
... ... @@ -123,6 +123,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
123 123 log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
124 124 UUID requestId = request.getId();
125 125 localToDeviceRpcRequests.put(requestId, rpcMsg);
  126 + log.info("演员消息生产——高级消息内容:【{}】", rpcMsg);
126 127 actorContext.tellWithHighPriority(rpcMsg);
127 128 scheduleToDeviceTimeout(request, requestId);
128 129 }
... ... @@ -142,6 +143,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
142 143 @Override
143 144 public void processRemoveRpc(RemoveRpcActorMsg removeRpcMsg) {
144 145 log.trace("[{}][{}] Processing remove RPC [{}]", removeRpcMsg.getTenantId(), removeRpcMsg.getRequestId(), removeRpcMsg.getDeviceId());
  146 + log.info("演员消息生产——高级消息内容:【{}】", removeRpcMsg);
145 147 actorContext.tellWithHighPriority(removeRpcMsg);
146 148 }
147 149
... ...
... ... @@ -352,7 +352,9 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
352 352 localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
353 353 } else {
354 354 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
355   - toCoreNotificationsProducer.send(tpi, toProto(s, subscriptionUpdate, ignoreEmptyUpdates), null);
  355 + TbProtoQueueMsg queueMsg =toProto(s, subscriptionUpdate, ignoreEmptyUpdates);
  356 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  357 + toCoreNotificationsProducer.send(tpi, queueMsg, null);
356 358 }
357 359 }
358 360 });
... ... @@ -375,7 +377,9 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
375 377 localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
376 378 } else {
377 379 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
378   - toCoreNotificationsProducer.send(tpi, toProto(s, alarm, deleted), null);
  380 + TbProtoQueueMsg queueMsg =toProto(s, alarm, deleted);
  381 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  382 + toCoreNotificationsProducer.send(tpi, queueMsg, null);
379 383 }
380 384 }
381 385 });
... ... @@ -416,7 +420,9 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
416 420 });
417 421 if (!missedUpdates.isEmpty()) {
418 422 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
419   - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null);
  423 + TbProtoQueueMsg queueMsg =toProto(subscription, missedUpdates);
  424 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  425 + toCoreNotificationsProducer.send(tpi, queueMsg, null);
420 426 }
421 427 },
422 428 e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
... ... @@ -439,6 +445,8 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
439 445 missedUpdates -> {
440 446 if (missedUpdates != null && !missedUpdates.isEmpty()) {
441 447 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
  448 + TbProtoQueueMsg queueMsg =toProto(subscription, missedUpdates);
  449 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
442 450 toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null);
443 451 }
444 452 },
... ... @@ -458,7 +466,9 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
458 466 missedUpdates -> {
459 467 if (missedUpdates != null && !missedUpdates.isEmpty()) {
460 468 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
461   - toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null);
  469 + TbProtoQueueMsg queueMsg =toProto(subscription, missedUpdates);
  470 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  471 + toCoreNotificationsProducer.send(tpi, queueMsg, null);
462 472 }
463 473 },
464 474 e -> log.error("Failed to fetch missed updates.", e),
... ...
... ... @@ -65,6 +65,7 @@ public class DefaultTbCoreToTransportService implements TbCoreToTransportService
65 65 UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
66 66 log.trace("[{}][{}] Pushing session data to topic: {}", tpi.getFullTopicName(), sessionId, msg);
67 67 TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(NULL_UUID, msg);
  68 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
68 69 tbTransportProducer.send(tpi, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
69 70 }
70 71
... ...
... ... @@ -25,19 +25,19 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
25 25 import org.thingsboard.server.common.stats.MessagesStats;
26 26 import org.thingsboard.server.common.stats.StatsFactory;
27 27 import org.thingsboard.server.common.stats.StatsType;
  28 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  29 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
28 30 import org.thingsboard.server.queue.TbQueueConsumer;
29 31 import org.thingsboard.server.queue.TbQueueProducer;
30 32 import org.thingsboard.server.queue.TbQueueResponseTemplate;
31 33 import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate;
32 34 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33   -import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
34   -import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
35 35 import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
36 36 import org.thingsboard.server.queue.util.TbCoreComponent;
37 37
38 38 import javax.annotation.PostConstruct;
39 39 import javax.annotation.PreDestroy;
40   -import java.util.concurrent.*;
  40 +import java.util.concurrent.ExecutorService;
41 41
42 42 /**
43 43 * Created by ashvayka on 05.10.18.
... ...
... ... @@ -1121,4 +1121,4 @@ file:
1121 1121 randomFileName: ${file.storage.randomFileName}
1122 1122 logging:
1123 1123 level:
1124   - org.thingsboard.server.dao.yunteng.mapper: debug
  1124 + org.thingsboard.server.dao.yunteng.mapper: error
... ...
... ... @@ -133,6 +133,7 @@ public class DefaultTbActorSystem implements TbActorSystem {
133 133
134 134 @Override
135 135 public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) {
  136 + log.info("演员消息生产——高级消息内容:【{}】",actorMsg);
136 137 tell(target, actorMsg, true);
137 138 }
138 139
... ... @@ -146,6 +147,7 @@ public class DefaultTbActorSystem implements TbActorSystem {
146 147 if (mailbox == null) {
147 148 throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!");
148 149 }
  150 +
149 151 if (highPriority) {
150 152 mailbox.tellWithHighPriority(actorMsg);
151 153 } else {
... ... @@ -163,7 +165,7 @@ public class DefaultTbActorSystem implements TbActorSystem {
163 165 public void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg) {
164 166 Set<TbActorId> children = parentChildMap.get(parent);
165 167 if (children != null) {
166   - children.stream().filter(childFilter).forEach(id -> tell(id, msg));
  168 + children.stream().filter(childFilter).forEach(id -> {log.info("演员消息生产——普通消息内容:broadcastToChildren【{}】",msg);tell(id, msg);});
167 169 }
168 170 }
169 171
... ...
... ... @@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
20 20 */
21 21 public enum ComponentType {
22 22
23   - ENRICHMENT, FILTER, TRANSFORMATION, ACTION, EXTERNAL
  23 + ENRICHMENT, FILTER, TRANSFORMATION, ACTION, EXTERNAL,FLOW
24 24
25 25 }
... ...
... ... @@ -59,7 +59,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
59 59 log.error("trying subscribe, but consumer stopped for topic {}", topic);
60 60 return;
61 61 }
62   - subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(topic, null, null, true)));
  62 + Set<TopicPartitionInfo> partitions =Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  63 + log.info("队列消息订阅 {} ", partitions);
  64 + subscribeQueue.add(partitions);
63 65 }
64 66
65 67 @Override
... ... @@ -69,6 +71,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
69 71 log.error("trying subscribe, but consumer stopped for topic {}", topic);
70 72 return;
71 73 }
  74 + log.info("队列消息订阅 {} ", partitions);
72 75 subscribeQueue.add(partitions);
73 76 }
74 77
... ... @@ -94,6 +97,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
94 97 partitions = subscribeQueue.poll();
95 98 }
96 99 if (!subscribed) {
  100 + log.info("队列消息订阅 {} ", partitions);
97 101 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
98 102 doSubscribe(topicNames);
99 103 subscribed = true;
... ...
... ... @@ -112,6 +112,9 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
112 112 log.trace("Starting template pool topic {}, for pendingRequests {}", responseTemplate.getTopic(), pendingRequestsCount);
113 113 List<Response> responses = doPoll(); //poll js responses
114 114 log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}] responses", responseTemplate.getTopic(), pendingRequestsCount, responses.size());
  115 + if(!responses.isEmpty()){
  116 + log.info("队列消息消费 fetchAndProcessResponses【{}】 ", responses);
  117 + }
115 118 responses.forEach(this::processResponse); //this can take a long time
116 119 responseTemplate.commit();
117 120 tryCleanStaleRequests();
... ... @@ -249,7 +252,10 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
249 252 if (messagesStats != null) {
250 253 messagesStats.incrementTotal();
251 254 }
252   - requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
  255 +
  256 + TopicPartitionInfo topicInfo = TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build();
  257 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",topicInfo.getFullTopicName(),request);
  258 + requestTemplate.send(topicInfo, request, new TbQueueCallback() {
253 259 @Override
254 260 public void onSuccess(TbQueueMsgMetadata metadata) {
255 261 if (messagesStats != null) {
... ...
... ... @@ -94,6 +94,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
94 94 if (requests.isEmpty()) {
95 95 continue;
96 96 }
  97 + log.info("队列消息消费 {} ", requests);
97 98
98 99 requests.forEach(request -> {
99 100 long currentTime = System.currentTimeMillis();
... ... @@ -118,7 +119,9 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
118 119 response -> {
119 120 pendingRequestCount.decrementAndGet();
120 121 response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
121   - responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
  122 + TopicPartitionInfo topicInfo = TopicPartitionInfo.builder().topic(responseTopic).build();
  123 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",topicInfo.getFullTopicName(),response);
  124 + responseTemplate.send(topicInfo, response, null);
122 125 stats.incrementSuccessful();
123 126 },
124 127 e -> {
... ...
... ... @@ -81,6 +81,7 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
81 81 ProducerRecord<String, byte[]> record;
82 82 Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
83 83 record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
  84 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),record);
84 85 producer.send(record, (metadata, exception) -> {
85 86 if (exception == null) {
86 87 if (callback != null) {
... ...
... ... @@ -16,12 +16,14 @@
16 16 package org.thingsboard.server.queue.memory;
17 17
18 18 import lombok.Data;
  19 +import lombok.extern.slf4j.Slf4j;
19 20 import org.thingsboard.server.queue.TbQueueCallback;
20 21 import org.thingsboard.server.queue.TbQueueMsg;
21 22 import org.thingsboard.server.queue.TbQueueProducer;
22 23 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
23 24
24 25 @Data
  26 +@Slf4j
25 27 public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueueProducer<T> {
26 28
27 29 private final InMemoryStorage storage = InMemoryStorage.getInstance();
... ...
... ... @@ -121,7 +121,11 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
121 121 TenantId tenantId = ownerId.getTenantId();
122 122 EntityId entityId = Optional.ofNullable(ownerId.getEntityId()).orElse(tenantId);
123 123 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId).newByTopic(msgProducer.getDefaultTopic());
124   - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null);
  124 +
  125 +
  126 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build());
  127 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  128 + msgProducer.send(tpi, queueMsg, null);
125 129 }));
126 130
127 131 if (!report.isEmpty()) {
... ...
... ... @@ -225,6 +225,7 @@ public class DefaultTransportService implements TransportService {
225 225 if (records.size() == 0) {
226 226 continue;
227 227 }
  228 + log.info("队列消息消费 {} ", records);
228 229 records.forEach(record -> {
229 230 try {
230 231 processToTransportMsg(record.getValue());
... ... @@ -1033,10 +1034,10 @@ public class DefaultTransportService implements TransportService {
1033 1034 new TransportTbQueueCallback(callback) : null;
1034 1035 tbCoreProducerStats.incrementTotal();
1035 1036 StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
1036   - tbCoreMsgProducer.send(tpi,
1037   - new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
1038   - ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),
1039   - wrappedCallback);
  1037 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()));
  1038 + tbCoreMsgProducer.send(tpi
  1039 + ,new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build())
  1040 + ,wrappedCallback);
1040 1041 }
1041 1042
1042 1043 private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
... ... @@ -1049,7 +1050,10 @@ public class DefaultTransportService implements TransportService {
1049 1050 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
1050 1051 ruleEngineProducerStats.incrementTotal();
1051 1052 StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
1052   - ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
  1053 +
  1054 + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(tbMsg.getId(), msg);
  1055 + log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
  1056 + ruleEngineMsgProducer.send(tpi, queueMsg, wrappedCallback);
1053 1057 }
1054 1058
1055 1059 private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,
... ...
... ... @@ -53,8 +53,10 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
53 53
54 54 @Override
55 55 public void onMsg(TbContext ctx, TbMsg msg) {
  56 + log.info("节点【TbAbstractAlarmNode】处理消费: {}", msg);
56 57 withCallback(processAlarm(ctx, msg),
57 58 alarmResult -> {
  59 + log.info("演员消息生产——普通消息内容:【{}】",msg);
58 60 if (alarmResult.alarm == null) {
59 61 ctx.tellNext(msg, "False");
60 62 } else if (alarmResult.isCreated) {
... ... @@ -109,7 +111,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
109 111
110 112 private void tellNext(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult, String entityAction, String alarmAction) {
111 113 ctx.enqueue(ctx.alarmActionMsg(alarmResult.alarm, ctx.getSelfId(), entityAction),
112   - () -> ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmAction),
  114 + () -> {log.info("演员消息生产——普通消息内容:transformSuccess【{}】",toAlarmMsg(ctx, alarmResult, msg));ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmAction);},
113 115 throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable));
114 116 }
115 117 }
... ...
... ... @@ -63,8 +63,10 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
63 63 @Override
64 64 public void onMsg(TbContext ctx, TbMsg msg) {
65 65 withCallback(processCustomerAction(ctx, msg),
66   - m -> ctx.tellSuccess(msg),
67   - t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  66 + m -> {
  67 + log.info("演员消息生产——普通消息内容:tellFailure【{}】",msg);
  68 + ctx.tellSuccess(msg);}
  69 + ,t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
68 70 }
69 71
70 72 private ListenableFuture<Void> processCustomerAction(TbContext ctx, TbMsg msg) {
... ...
... ... @@ -83,7 +83,10 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
83 83 public void onMsg(TbContext ctx, TbMsg msg) {
84 84 String relationType = processPattern(msg, config.getRelationType());
85 85 withCallback(processEntityRelationAction(ctx, msg, relationType),
86   - filterResult -> ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  86 + filterResult -> {
  87 + log.info("演员消息生产——普通消息内容:tellFailure【{}】",filterResult.getMsg());
  88 + ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE);}
  89 + , t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
87 90 }
88 91
89 92 @Override
... ...
... ... @@ -56,6 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
56 56
57 57 @Override
58 58 protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
  59 + log.info("节点【TbClearAlarmNode】处理消费: {}", msg);
59 60 String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
60 61 ListenableFuture<Alarm> alarmFuture;
61 62 if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
... ...
... ... @@ -81,6 +81,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
81 81
82 82 @Override
83 83 protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
  84 + log.info("节点【TbCreateAlarmNode】处理消费: {}", msg);
84 85 String alarmType;
85 86 final Alarm msgAlarm;
86 87
... ...
... ... @@ -64,6 +64,7 @@ public class TbLogNode implements TbNode {
64 64 public void onSuccess(@Nullable String result) {
65 65 ctx.logJsEvalResponse();
66 66 log.info(result);
  67 + log.info("演员消息生产——普通消息内容:【{}】", msg);
67 68 ctx.tellSuccess(msg);
68 69 }
69 70
... ...
... ... @@ -96,6 +96,7 @@ public class TbMsgCountNode implements TbNode {
96 96 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
97 97 TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), msg != null ? msg.getCustomerId() : null, new TbMsgMetaData(), "");
98 98 nextTickId = tickMsg.getId();
  99 + log.info("演员消息生产——高级消息内容:【{}】", tickMsg);
99 100 ctx.tellSelf(tickMsg, curDelay);
100 101 }
101 102
... ...
... ... @@ -105,6 +105,7 @@ public class TbMsgGeneratorNode implements TbNode {
105 105 @Override
106 106 public void onMsg(TbContext ctx, TbMsg msg) {
107 107 log.trace("onMsg, config {}, msg {}", config, msg);
  108 + log.info("节点【TbMsgGeneratorNode】处理消费: {}", msg);
108 109 if (initialized.get() && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
109 110 TbStopWatch sw = TbStopWatch.startNew();
110 111 withCallback(generate(ctx, msg),
... ... @@ -137,6 +138,7 @@ public class TbMsgGeneratorNode implements TbNode {
137 138 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
138 139 TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
139 140 nextTickId = tickMsg.getId();
  141 + log.info("演员消息生产——高级消息内容:【{}】", tickMsg);
140 142 ctx.tellSelf(tickMsg, curDelay);
141 143 }
142 144
... ...
... ... @@ -74,6 +74,7 @@ public class TbMsgDelayNode implements TbNode {
74 74 if (pendingMsgs.size() < config.getMaxPendingMsgs()) {
75 75 pendingMsgs.put(msg.getId(), msg);
76 76 TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), msg.getCustomerId(), new TbMsgMetaData(), msg.getId().toString());
  77 + log.info("演员消息生产——高级消息内容:【{}】", msg);
77 78 ctx.tellSelf(tickMsg, getDelay(msg));
78 79 ctx.ack(msg);
79 80 } else {
... ...
... ... @@ -150,6 +150,7 @@ public class TbMsgPushToEdgeNode implements TbNode {
150 150 private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
151 151 edgeEvent.setEdgeId(edgeId);
152 152 ctx.getEdgeEventService().save(edgeEvent);
  153 + log.info("演员消息生产——普通消息内容:【{}】",msg);
153 154 ctx.tellNext(msg, SUCCESS);
154 155 ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
155 156 }
... ...
... ... @@ -56,6 +56,7 @@ public class TbCheckAlarmStatusNode implements TbNode {
56 56
57 57 @Override
58 58 public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
  59 + log.info("节点【TbCheckAlarmStatusNode】处理消费: {}", msg);
59 60 try {
60 61 Alarm alarm = mapper.readValue(msg.getData(), Alarm.class);
61 62
... ... @@ -72,6 +73,7 @@ public class TbCheckAlarmStatusNode implements TbNode {
72 73 break;
73 74 }
74 75 }
  76 + log.info("演员消息生产——普通消息内容:【{}】", msg);
75 77 if (isPresent) {
76 78 ctx.tellNext(msg, "True");
77 79 } else {
... ...
... ... @@ -58,6 +58,7 @@ public class TbCheckMessageNode implements TbNode {
58 58 @Override
59 59 public void onMsg(TbContext ctx, TbMsg msg) {
60 60 try {
  61 + log.info("演员消息生产——普通消息内容:【{}】", msg);
61 62 if (config.isCheckAllKeys()) {
62 63 ctx.tellNext(msg, allKeysData(msg) && allKeysMetadata(msg) ? "True" : "False");
63 64 } else {
... ... @@ -131,4 +132,4 @@ public class TbCheckMessageNode implements TbNode {
131 132 return (Map<String, String>) gson.fromJson(msg.getData(), Map.class);
132 133 }
133 134
134   -}
\ No newline at end of file
  135 +}
... ...
... ... @@ -69,7 +69,7 @@ public class TbCheckRelationNode implements TbNode {
69 69 } else {
70 70 checkRelationFuture = processList(ctx, msg);
71 71 }
72   - withCallback(checkRelationFuture, filterResult -> ctx.tellNext(msg, filterResult ? "True" : "False"), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  72 + withCallback(checkRelationFuture, filterResult -> {log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellNext(msg, filterResult ? "True" : "False");}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
73 73 }
74 74
75 75 private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg) {
... ...
... ... @@ -59,6 +59,7 @@ public class TbJsFilterNode implements TbNode {
59 59 withCallback(jsEngine.executeFilterAsync(msg),
60 60 filterResult -> {
61 61 ctx.logJsEvalResponse();
  62 + log.info("演员消息生产——普通消息内容:【{}】", msg);
62 63 ctx.tellNext(msg, filterResult ? "True" : "False");
63 64 },
64 65 t -> {
... ...
... ... @@ -77,6 +77,7 @@ public class TbJsSwitchNode implements TbNode {
77 77 }
78 78
79 79 private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) {
  80 + log.info("演员消息生产——普通消息内容:processSwitch【{}】",msg);
80 81 ctx.tellNext(msg, nextRelations);
81 82 }
82 83
... ...
... ... @@ -49,6 +49,7 @@ public class TbMsgTypeFilterNode implements TbNode {
49 49
50 50 @Override
51 51 public void onMsg(TbContext ctx, TbMsg msg) {
  52 + log.info("演员消息生产——普通消息内容:【{}】", msg);
52 53 ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
53 54 }
54 55
... ...
... ... @@ -114,6 +114,7 @@ public class TbMsgTypeSwitchNode implements TbNode {
114 114 } else {
115 115 relationType = "Other";
116 116 }
  117 + log.info("演员消息生产——普通消息内容:类型{}【{}】",msg.getType(), msg);
117 118 ctx.tellNext(msg, relationType);
118 119 }
119 120
... ...
... ... @@ -48,6 +48,7 @@ public class TbOriginatorTypeFilterNode implements TbNode {
48 48 @Override
49 49 public void onMsg(TbContext ctx, TbMsg msg) {
50 50 EntityType originatorType = msg.getOriginator().getEntityType();
  51 + log.info("演员消息生产——普通消息内容:【{}】", msg);
51 52 ctx.tellNext(msg, config.getOriginatorTypes().contains(originatorType) ? "True" : "False");
52 53 }
53 54
... ...
... ... @@ -87,6 +87,7 @@ public class TbOriginatorTypeSwitchNode implements TbNode {
87 87 default:
88 88 throw new TbNodeException("Unsupported originator type: " + originatorType);
89 89 }
  90 + log.info("演员消息生产——普通消息内容:【{}】", msg);
90 91 ctx.tellNext(msg, relationType);
91 92 }
92 93
... ...
... ... @@ -48,6 +48,7 @@ public class TbAckNode implements TbNode {
48 48 @Override
49 49 public void onMsg(TbContext ctx, TbMsg msg) {
50 50 ctx.ack(msg);
  51 + log.info("演员消息生产——普通消息内容:【{}】", msg);
51 52 ctx.tellSuccess(msg);
52 53 }
53 54
... ...
... ... @@ -101,11 +101,13 @@ public class TbPubSubNode implements TbNode {
101 101 ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
102 102 public void onSuccess(String messageId) {
103 103 TbMsg next = processPublishResult(ctx, msg, messageId);
  104 + log.info("演员消息生产——普通消息内容:【{}】", msg);
104 105 ctx.tellSuccess(next);
105 106 }
106 107
107 108 public void onFailure(Throwable t) {
108 109 TbMsg next = processException(ctx, msg, t);
  110 + log.info("演员消息生产——普通消息内容:【{}】", next);
109 111 ctx.tellFailure(next, t);
110 112 }
111 113 },
... ...
... ... @@ -97,6 +97,7 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofe
97 97 }
98 98 }
99 99 if (!told) {
  100 + log.info("演员消息生产——普通消息内容:【{}】", msg);
100 101 ctx.tellSuccess(msg);
101 102 }
102 103 }
... ...
... ... @@ -77,7 +77,7 @@ public class TbSendEmailNode implements TbNode {
77 77 sendEmail(ctx, msg, email);
78 78 return null;
79 79 }),
80   - ok -> ctx.tellSuccess(msg),
  80 + ok -> {log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellSuccess(msg);},
81 81 fail -> ctx.tellFailure(msg, fail));
82 82 } catch (Exception ex) {
83 83 ctx.tellFailure(msg, ex);
... ...
... ... @@ -91,6 +91,7 @@ public class CalculateDeltaNode implements TbNode {
91 91 BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
92 92
93 93 if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
  94 + log.info("演员消息生产——普通消息内容:【{}】", msg);
94 95 ctx.tellNext(msg, TbRelationTypes.FAILURE);
95 96 return;
96 97 }
... ... @@ -111,13 +112,16 @@ public class CalculateDeltaNode implements TbNode {
111 112 long period = previousData != null ? currentTs - previousData.ts : 0;
112 113 result.put(config.getPeriodValueKey(), period);
113 114 }
  115 + log.info("演员消息生产——普通消息内容:【{}】", TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
114 116 ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
115 117 },
116 118 t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
117 119 } else {
  120 + log.info("演员消息生产——普通消息内容:【{}】", msg);
118 121 ctx.tellNext(msg, "Other");
119 122 }
120 123 } else {
  124 + log.info("演员消息生产——普通消息内容:【{}】", msg);
121 125 ctx.tellNext(msg, "Other");
122 126 }
123 127 }
... ...
... ... @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
24 24 import com.google.common.util.concurrent.ListenableFuture;
25 25 import com.google.common.util.concurrent.MoreExecutors;
26 26 import com.google.gson.JsonParseException;
  27 +import lombok.extern.slf4j.Slf4j;
27 28 import org.apache.commons.collections.CollectionUtils;
28 29 import org.apache.commons.lang3.BooleanUtils;
29 30 import org.thingsboard.rule.engine.api.TbContext;
... ... @@ -49,7 +50,7 @@ import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
49 50 import static org.thingsboard.server.common.data.DataConstants.LATEST_TS;
50 51 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
51 52 import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
52   -
  53 +@Slf4j
53 54 public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
54 55
55 56 private static ObjectMapper mapper = new ObjectMapper();
... ... @@ -70,6 +71,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
70 71
71 72 @Override
72 73 public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
  74 + log.info("节点【TbAbstractGetAttributesNode】处理消费: {}", msg);
73 75 try {
74 76 withCallback(
75 77 findEntityIdAsync(ctx, msg),
... ... @@ -88,6 +90,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
88 90
89 91 private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
90 92 if (entityId == null || entityId.isNullUid()) {
  93 + log.info("演员消息生产——普通消息内容:【{}】", msg);
91 94 ctx.tellNext(msg, FAILURE);
92 95 return;
93 96 }
... ... @@ -102,6 +105,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
102 105 if (!failuresMap.isEmpty()) {
103 106 throw reportFailures(failuresMap);
104 107 }
  108 + log.info("演员消息生产——普通消息内容:【{}】", msg);
105 109 ctx.tellSuccess(msg);
106 110 }, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
107 111 }
... ...
... ... @@ -61,6 +61,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
61 61
62 62 private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) {
63 63 if (entityId == null || entityId.isNullUid()) {
  64 + log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msg);
64 65 ctx.tellNext(msg, FAILURE);
65 66 return;
66 67 }
... ... @@ -88,6 +89,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
88 89 String attrName = config.getAttrMapping().get(r.getKey());
89 90 msg.getMetaData().putValue(attrName, r.getValueAsString());
90 91 });
  92 + log.info("演员消息生产——普通消息内容:putAttributesAndTell【{}】", msg);
91 93 ctx.tellSuccess(msg);
92 94 }
93 95
... ...
... ... @@ -56,7 +56,7 @@ public class TbGetOriginatorFieldsNode implements TbNode {
56 56 public void onMsg(TbContext ctx, TbMsg msg) {
57 57 try {
58 58 withCallback(putEntityFields(ctx, msg.getOriginator(), msg),
59   - i -> ctx.tellSuccess(msg), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  59 + i ->{ log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellSuccess(msg);}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
60 60 } catch (Throwable th) {
61 61 ctx.tellFailure(msg, th);
62 62 }
... ...
... ... @@ -106,6 +106,7 @@ public class TbGetTelemetryNode implements TbNode {
106 106
107 107 @Override
108 108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
  109 + log.info("节点【TbGetTelemetryNode】处理消费: {}", msg);
109 110 if (tsKeyNames.isEmpty()) {
110 111 ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!"));
111 112 } else {
... ... @@ -117,6 +118,7 @@ public class TbGetTelemetryNode implements TbNode {
117 118 ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys));
118 119 DonAsynchron.withCallback(list, data -> {
119 120 process(data, msg, keys);
  121 + log.info("演员消息生产——普通消息内容:【{}】", ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
120 122 ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
121 123 }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
122 124 } catch (Exception e) {
... ...
... ... @@ -75,10 +75,12 @@ public class TbMqttNode implements TbNode {
75 75
76 76 @Override
77 77 public void onMsg(TbContext ctx, TbMsg msg) {
  78 + log.info("节点【TbMqttNode】处理消费: {}", msg);
78 79 String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
79 80 this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
80 81 .addListener(future -> {
81 82 if (future.isSuccess()) {
  83 + log.info("演员消息生产——普通消息内容:【{}】", msg);
82 84 ctx.tellSuccess(msg);
83 85 } else {
84 86 TbMsg next = processException(ctx, msg, future.cause());
... ...
... ... @@ -67,7 +67,7 @@ public class AlarmNoticeNode implements TbNode {
67 67 posetApi(ctx, msg);
68 68 return null;
69 69 }),
70   - ok -> ctx.tellSuccess(msg),
  70 + ok -> {log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellSuccess(msg);},
71 71 fail -> ctx.tellFailure(msg, fail));
72 72 } catch (Exception ex) {
73 73 ctx.tellFailure(msg, ex);
... ...
... ... @@ -157,6 +157,7 @@ class DeviceState {
157 157 if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED) || msg.getType().equals(DataConstants.ENTITY_UNASSIGNED)) {
158 158 dynamicPredicateValueCtx.resetCustomer();
159 159 }
  160 + log.info("演员消息生产——普通消息内容:{}【{}】",msg.getType(), msg);
160 161 ctx.tellSuccess(msg);
161 162 }
162 163 if (persistState && stateChanged) {
... ... @@ -182,6 +183,7 @@ class DeviceState {
182 183 a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
183 184 stateChanged |= alarmState.processAlarmClear(ctx, alarmNf);
184 185 }
  186 + log.info("演员消息生产——普通消息内容:processAlarmClearNotification【{}】", msg);
185 187 ctx.tellSuccess(msg);
186 188 return stateChanged;
187 189 }
... ... @@ -193,12 +195,14 @@ class DeviceState {
193 195 a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
194 196 alarmState.processAckAlarm(alarmNf);
195 197 }
  198 + log.info("演员消息生产——普通消息内容:processAlarmAckNotification【{}】", msg);
196 199 ctx.tellSuccess(msg);
197 200 }
198 201
199 202 private void processAlarmDeleteNotification(TbContext ctx, TbMsg msg) {
200 203 Alarm alarm = JacksonUtil.fromString(msg.getData(), Alarm.class);
201 204 alarmStates.values().removeIf(alarmState -> alarmState.getCurrentAlarm().getId().equals(alarm.getId()));
  205 + log.info("演员消息生产——普通消息内容:processAlarmDeleteNotification【{}】", msg);
202 206 ctx.tellSuccess(msg);
203 207 }
204 208
... ... @@ -227,6 +231,7 @@ class DeviceState {
227 231 stateChanged |= alarmState.process(ctx, msg, latestValues, null);
228 232 }
229 233 }
  234 + log.info("演员消息生产——普通消息内容:processAttributesDeleteNotification【{}】", msg);
230 235 ctx.tellSuccess(msg);
231 236 return stateChanged;
232 237 }
... ... @@ -246,6 +251,7 @@ class DeviceState {
246 251 stateChanged |= alarmState.process(ctx, msg, latestValues, update);
247 252 }
248 253 }
  254 + log.info("演员消息生产——普通消息内容:processAttributes【{}】", msg);
249 255 ctx.tellSuccess(msg);
250 256 return stateChanged;
251 257 }
... ... @@ -271,6 +277,7 @@ class DeviceState {
271 277 }
272 278 }
273 279 }
  280 + log.info("演员消息生产——普通消息内容:processTelemetry【{}】", msg);
274 281 ctx.tellSuccess(msg);
275 282 return stateChanged;
276 283 }
... ...
... ... @@ -106,6 +106,7 @@ public class TbDeviceProfileNode implements TbNode {
106 106
107 107 @Override
108 108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
  109 + log.info("节点【TbDeviceProfileNode】处理消费: {}", msg);
109 110 EntityType originatorType = msg.getOriginator().getEntityType();
110 111 if (msg.getType().equals(PERIODIC_MSG_TYPE)) {
111 112 scheduleAlarmHarvesting(ctx, msg);
... ... @@ -125,9 +126,11 @@ public class TbDeviceProfileNode implements TbNode {
125 126 DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
126 127 if (msg.getType().equals(DataConstants.ENTITY_UPDATED)) {
127 128 invalidateDeviceProfileCache(deviceId, msg.getData());
  129 + log.info("演员消息生产——普通消息内容:ENTITY_UPDATED【{}】", msg);
128 130 ctx.tellSuccess(msg);
129 131 } else if (msg.getType().equals(DataConstants.ENTITY_DELETED)) {
130 132 removeDeviceState(deviceId);
  133 + log.info("演员消息生产——普通消息内容:ENTITY_DELETED【{}】", msg);
131 134 ctx.tellSuccess(msg);
132 135 } else {
133 136 DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null);
... ... @@ -139,6 +142,7 @@ public class TbDeviceProfileNode implements TbNode {
139 142 }
140 143 }
141 144 } else {
  145 + log.info("演员消息生产——普通消息内容:{}【{}】", msg.getType(),msg);
142 146 ctx.tellSuccess(msg);
143 147 }
144 148 }
... ... @@ -170,6 +174,7 @@ public class TbDeviceProfileNode implements TbNode {
170 174
171 175 protected void scheduleAlarmHarvesting(TbContext ctx, TbMsg msg) {
172 176 TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, "{}");
  177 + log.info("演员消息生产——高级消息内容:scheduleAlarmHarvesting【{}】", periodicCheck);
173 178 ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1));
174 179 }
175 180
... ... @@ -194,6 +199,7 @@ public class TbDeviceProfileNode implements TbNode {
194 199 }
195 200
196 201 protected void onProfileUpdate(DeviceProfile profile) {
  202 + log.info("演员消息生产——高级消息内容:onProfileUpdate【{}】", TbMsg.newMsg(PROFILE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, profile.getId().getId().toString()));
197 203 ctx.tellSelf(TbMsg.newMsg(PROFILE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, profile.getId().getId().toString()), 0L);
198 204 }
199 205
... ... @@ -203,6 +209,7 @@ public class TbDeviceProfileNode implements TbNode {
203 209 if (deviceProfile != null) {
204 210 msgData.put("deviceProfileId", deviceProfile.getId().getId().toString());
205 211 }
  212 + log.info("演员消息生产——高级消息内容:onDeviceUpdate【{}】", TbMsg.newMsg(DEVICE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(msgData)));
206 213 ctx.tellSelf(TbMsg.newMsg(DEVICE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(msgData)), 0L);
207 214 }
208 215
... ...
... ... @@ -202,9 +202,11 @@ public class TbHttpClient {
202 202 public void onSuccess(ResponseEntity<String> responseEntity) {
203 203 if (responseEntity.getStatusCode().is2xxSuccessful()) {
204 204 TbMsg next = processResponse(ctx, msg, responseEntity);
  205 + log.info("演员消息生产——普通消息内容:transformSuccess【{}】",next);
205 206 ctx.tellSuccess(next);
206 207 } else {
207 208 TbMsg next = processFailureResponse(ctx, msg, responseEntity);
  209 + log.info("演员消息生产——普通消息内容:transformSuccess【{}】",next);
208 210 ctx.tellNext(next, TbRelationTypes.FAILURE);
209 211 }
210 212 }
... ...
... ... @@ -66,6 +66,8 @@ public class TbSendRPCReplyNode implements TbNode {
66 66 ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
67 67 } else {
68 68 ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData());
  69 +
  70 + log.info("演员消息生产——普通消息内容:【{}】", msg);
69 71 ctx.tellSuccess(msg);
70 72 }
71 73 }
... ...
... ... @@ -58,6 +58,7 @@ public class TbMsgAttributesNode implements TbNode {
58 58
59 59 @Override
60 60 public void onMsg(TbContext ctx, TbMsg msg) {
  61 + log.info("节点【TbMsgAttributesNode】处理消费: {}", msg);
61 62 if (!msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
62 63 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
63 64 return;
... ...
... ... @@ -72,6 +72,7 @@ public class TbMsgTimeseriesNode implements TbNode {
72 72
73 73 @Override
74 74 public void onMsg(TbContext ctx, TbMsg msg) {
  75 + log.info("节点【TbMsgTimeseriesNode】处理消费: {}", msg);
75 76 if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
76 77 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
77 78 return;
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.telemetry;
17 17
18 18 import com.google.common.util.concurrent.FutureCallback;
19 19 import lombok.Data;
  20 +import lombok.extern.slf4j.Slf4j;
20 21 import org.thingsboard.rule.engine.api.TbContext;
21 22 import org.thingsboard.server.common.msg.TbMsg;
22 23
... ... @@ -26,12 +27,14 @@ import javax.annotation.Nullable;
26 27 * Created by ashvayka on 02.04.18.
27 28 */
28 29 @Data
  30 +@Slf4j
29 31 class TelemetryNodeCallback implements FutureCallback<Void> {
30 32 private final TbContext ctx;
31 33 private final TbMsg msg;
32 34
33 35 @Override
34 36 public void onSuccess(@Nullable Void result) {
  37 + log.info("演员消息生产——普通消息内容:【{}】", msg);
35 38 ctx.tellSuccess(msg);
36 39 }
37 40
... ...
... ... @@ -46,6 +46,7 @@ public class TbSynchronizationBeginNode implements TbNode {
46 46 @Override
47 47 public void onMsg(TbContext ctx, TbMsg msg) {
48 48 log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_BY_ORIGINATOR instead.");
  49 + log.info("演员消息生产——普通消息内容:【{}】", msg);
49 50 ctx.tellSuccess(msg);
50 51 }
51 52
... ...
... ... @@ -45,6 +45,8 @@ public class TbSynchronizationEndNode implements TbNode {
45 45 @Override
46 46 public void onMsg(TbContext ctx, TbMsg msg) {
47 47 log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_BY_ORIGINATOR instead.");
  48 +
  49 + log.info("演员消息生产——普通消息内容:【{}】", msg);
48 50 ctx.tellSuccess(msg);
49 51 }
50 52
... ...
... ... @@ -46,6 +46,7 @@ public abstract class TbAbstractTransformNode implements TbNode {
46 46
47 47 @Override
48 48 public void onMsg(TbContext ctx, TbMsg msg) {
  49 + log.info("节点【TbAbstractTransformNode】处理消费: {}", msg);
49 50 withCallback(transform(ctx, msg),
50 51 m -> transformSuccess(ctx, msg, m),
51 52 t -> transformFailure(ctx, msg, t),
... ... @@ -57,6 +58,7 @@ public abstract class TbAbstractTransformNode implements TbNode {
57 58 }
58 59
59 60 protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) {
  61 + log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msg);
60 62 if (m != null) {
61 63 ctx.tellSuccess(m);
62 64 } else {
... ... @@ -67,6 +69,7 @@ public abstract class TbAbstractTransformNode implements TbNode {
67 69 protected void transformSuccess(TbContext ctx, TbMsg msg, List<TbMsg> msgs) {
68 70 if (msgs != null && !msgs.isEmpty()) {
69 71 if (msgs.size() == 1) {
  72 + log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msgs.get(0));
70 73 ctx.tellSuccess(msgs.get(0));
71 74 } else {
72 75 TbMsgCallbackWrapper wrapper = new MultipleTbMsgsCallbackWrapper(msgs.size(), new TbMsgCallback() {
... ... @@ -83,6 +86,7 @@ public abstract class TbAbstractTransformNode implements TbNode {
83 86 msgs.forEach(newMsg -> ctx.enqueueForTellNext(newMsg, "Success", wrapper::onSuccess, wrapper::onFailure));
84 87 }
85 88 } else {
  89 + log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msg);
86 90 ctx.tellNext(msg, FAILURE);
87 91 }
88 92 }
... ...
... ... @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.uuid.Uuids;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20 20 import com.google.common.collect.Lists;
21 21 import com.google.common.util.concurrent.Futures;
  22 +import lombok.extern.slf4j.Slf4j;
22 23 import org.junit.Before;
23 24 import org.junit.Test;
24 25 import org.junit.runner.RunWith;
... ... @@ -65,7 +66,7 @@ import static org.mockito.Mockito.verify;
65 66 import static org.mockito.Mockito.when;
66 67 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
67 68 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
68   -
  69 +@Slf4j
69 70 @RunWith(MockitoJUnitRunner.class)
70 71 public class TbGetCustomerAttributeNodeTest {
71 72
... ... @@ -168,6 +169,7 @@ public class TbGetCustomerAttributeNodeTest {
168 169
169 170
170 171 node.onMsg(ctx, msg);
  172 + log.info("演员消息生产——普通消息内容:tellFailure【{}】",msg);
171 173 verify(ctx).tellNext(msg, FAILURE);
172 174 assertTrue(msg.getMetaData().getData().isEmpty());
173 175 }
... ... @@ -255,6 +257,7 @@ public class TbGetCustomerAttributeNodeTest {
255 257 .thenReturn(Futures.immediateFuture(timeseries));
256 258
257 259 node.onMsg(ctx, msg);
  260 + log.info("演员消息生产——普通消息内容:deviceCustomerTelemetryFetched【{}】", msg);
258 261 verify(ctx).tellSuccess(msg);
259 262 assertEquals(msg.getMetaData().getValue("tempo"), "highest");
260 263 }
... ... @@ -267,6 +270,7 @@ public class TbGetCustomerAttributeNodeTest {
267 270 .thenReturn(Futures.immediateFuture(attributes));
268 271
269 272 node.onMsg(ctx, msg);
  273 + log.info("演员消息生产——普通消息内容:entityAttributeFetched【{}】", msg);
270 274 verify(ctx).tellSuccess(msg);
271 275 assertEquals(msg.getMetaData().getValue("tempo"), "high");
272 276 }
... ...