Commit f9810f43f7e8c5be806d447022e1b573e61dd068

Authored by xp.Huang
2 parents e76d082b f29fa66e

Merge branch 'ljl0209' into 'master'

Ljl0209

See merge request huang/thingsboard3.3.2!43
Showing 70 changed files with 140 additions and 196 deletions
... ... @@ -66,7 +66,6 @@ public class AppActor extends ContextAwareActor {
66 66
67 67 @Override
68 68 protected boolean doProcess(TbActorMsg msg) {
69   - log.info("演员【AppActor】消息消费: {}", msg);
70 69 if (!ruleChainsInitialized) {
71 70 initTenantActors();
72 71 ruleChainsInitialized = true;
... ... @@ -146,7 +145,6 @@ public class AppActor extends ContextAwareActor {
146 145 msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
147 146 } else {
148 147 if (!deletedTenants.contains(msg.getTenantId())) {
149   - log.info("演员消息生产——普通消息内容:【{}】",msg);
150 148 getOrCreateTenantActor(msg.getTenantId()).tell(msg);
151 149 } else {
152 150 msg.getMsg().getCallback().onSuccess();
... ... @@ -175,7 +173,6 @@ public class AppActor extends ContextAwareActor {
175 173 }
176 174 }
177 175 if (target != null) {
178   - log.info("演员消息生产——高级消息内容:【{}】", msg);
179 176 target.tellWithHighPriority(msg);
180 177 } else {
181 178 log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
... ... @@ -186,10 +183,8 @@ public class AppActor extends ContextAwareActor {
186 183 if (!deletedTenants.contains(msg.getTenantId())) {
187 184 TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
188 185 if (priority) {
189   - log.info("演员消息生产——高级消息内容:【{}】", msg);
190 186 tenantActor.tellWithHighPriority(msg);
191 187 } else {
192   - log.info("演员消息生产——普通消息内容:【{}】",msg);
193 188 tenantActor.tell(msg);
194 189 }
195 190 } else {
... ... @@ -213,7 +208,6 @@ public class AppActor extends ContextAwareActor {
213 208 target = getOrCreateTenantActor(msg.getTenantId());
214 209 }
215 210 if (target != null) {
216   - log.info("演员消息生产——高级消息内容:【{}】", msg);
217 211 target.tellWithHighPriority(msg);
218 212 } else {
219 213 log.debug("[{}] Invalid edge event update msg: {}", msg.getTenantId(), msg);
... ...
... ... @@ -57,7 +57,6 @@ public class DeviceActor extends ContextAwareActor {
57 57
58 58 @Override
59 59 protected boolean doProcess(TbActorMsg msg) {
60   - log.info("演员【DeviceActor】消息消费: {}", msg);
61 60 switch (msg.getMsgType()) {
62 61 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
63 62 processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);
... ...
... ... @@ -302,7 +302,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
302 302 private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
303 303 toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
304 304 DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
305   - log.info("演员消息生产——高级消息内容:【{}】", timeoutMsg);
306 305 scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
307 306 }
308 307
... ... @@ -933,7 +932,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
933 932 }
934 933
935 934 void init(TbActorCtx ctx) {
936   - log.info("演员消息生产——高级消息内容:【{}】", SessionTimeoutCheckMsg.instance());
937 935 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
938 936 PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
939 937 PageData<Rpc> pageData;
... ...
... ... @@ -254,7 +254,6 @@ class DefaultTbContext implements TbContext {
254 254
255 255 @Override
256 256 public void tellFailure(TbMsg msg, Throwable th) {
257   - log.info("演员消息生产——普通消息内容:tellFailure【{}】",msg);
258 257 if (nodeCtx.getSelf().isDebugMode()) {
259 258 mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
260 259 }
... ...
... ... @@ -48,7 +48,6 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
48 48
49 49 @Override
50 50 protected boolean doProcess(TbActorMsg msg) {
51   - log.info("演员【RuleChainActor】消息消费: {}", msg);
52 51 switch (msg.getMsgType()) {
53 52 case COMPONENT_LIFE_CYCLE_MSG:
54 53 onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
... ...
... ... @@ -133,7 +133,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
133 133 } else {
134 134 log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
135 135 existing.setSelf(ruleNode);
136   - log.info("演员消息生产——高级消息内容:RuleNodeUpdatedMsg【{}】", new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
137 136 existing.getSelfActor().tellWithHighPriority(new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
138 137 }
139 138 }
... ... @@ -143,7 +142,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
143 142 removedRules.forEach(ruleNodeId -> {
144 143 log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
145 144 RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
146   - log.info("演员消息生产——高级消息内容:ComponentLifecycleMsg【{}】", new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
147 145 removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
148 146 });
149 147
... ... @@ -163,7 +161,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
163 161 @Override
164 162 public void onPartitionChangeMsg(PartitionChangeMsg msg) {
165 163 nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> {
166   - log.info("演员消息生产——高级消息内容:【{}】", msg);
167 164 actorRef.tellWithHighPriority(msg);
168 165 });
169 166 }
... ... @@ -318,7 +315,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
318 315 pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
319 316 break;
320 317 case RULE_CHAIN:
321   - log.info("演员消息生产——普通消息内容:RuleChainToRuleChainMsg【{}】",msg);
322 318 parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
323 319 break;
324 320 }
... ... @@ -350,7 +346,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
350 346
351 347 private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
352 348 if (nodeCtx != null) {
353   - log.info("演员消息生产——普通消息内容:RuleChainToRuleNodeMsg【{}】",msg);
354 349 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
355 350 } else {
356 351 log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
... ...
... ... @@ -51,7 +51,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
51 51
52 52 @Override
53 53 protected boolean doProcess(TbActorMsg msg) {
54   - log.info("演员【RuleNodeActor】消息消费: {}", msg);
55 54 switch (msg.getMsgType()) {
56 55 case COMPONENT_LIFE_CYCLE_MSG:
57 56 case RULE_NODE_UPDATED_MSG:
... ...
... ... @@ -140,7 +140,6 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
140 140
141 141 protected void onStatsPersistTick(EntityId entityId) {
142 142 try {
143   - log.info("演员消息生产——普通消息内容:StatsPersistMsg【{}】",new StatsPersistMsg(messagesProcessed, errorsOccurred, tenantId, entityId));
144 143 systemContext.getStatsActor().tell(new StatsPersistMsg(messagesProcessed, errorsOccurred, tenantId, entityId));
145 144 resetStatsCounters();
146 145 } catch (Exception e) {
... ...
... ... @@ -118,14 +118,12 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
118 118 @Order(value = 2)
119 119 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
120 120 log.info("Received application ready event. Sending application init message to actor system");
121   - log.info("演员消息生产——高级消息内容:AppInitMsg【{}】", new AppInitMsg());
122 121 appActor.tellWithHighPriority(new AppInitMsg());
123 122 }
124 123
125 124 @Override
126 125 protected void onTbApplicationEvent(PartitionChangeEvent event) {
127 126 log.info("Received partition change event.");
128   - log.info("演员消息生产——高级消息内容:PartitionChangeMsg【{}】", new PartitionChangeMsg(event.getServiceQueueKey(), event.getPartitions()));
129 127 this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceQueueKey(), event.getPartitions()));
130 128 }
131 129
... ...
... ... @@ -80,7 +80,6 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
80 80 }
81 81
82 82 public void scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) {
83   - log.info("演员消息生产——高级消息内容:ComponentActor【StatsPersistTick】");
84 83 schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
85 84 }
86 85
... ...
... ... @@ -41,7 +41,6 @@ public class StatsActor extends ContextAwareActor {
41 41
42 42 @Override
43 43 protected boolean doProcess(TbActorMsg msg) {
44   - log.info("演员【StatsActor】消息消费: {}", msg);
45 44 if (msg.getMsgType().equals(MsgType.STATS_PERSIST_MSG)) {
46 45 onStatsPersistMsg((StatsPersistMsg) msg);
47 46 return true;
... ...
... ... @@ -194,7 +194,6 @@ public class TenantActor extends RuleChainManagerActor {
194 194 if (apiUsageState.isReExecEnabled()) {
195 195 if (tbMsg.getRuleChainId() == null) {
196 196 if (getRootChainActor() != null) {
197   - log.info("演员消息生产——普通消息内容:【{}】",msg);
198 197 getRootChainActor().tell(msg);
199 198 } else {
200 199 tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
... ... @@ -202,7 +201,6 @@ public class TenantActor extends RuleChainManagerActor {
202 201 }
203 202 } else {
204 203 try {
205   - log.info("演员消息生产——普通消息内容:TbEntityActorId【{}】",msg);
206 204 ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
207 205 } catch (TbActorNotRegisteredException ex) {
208 206 log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
... ... @@ -218,7 +216,6 @@ public class TenantActor extends RuleChainManagerActor {
218 216
219 217 private void onRuleChainMsg(RuleChainAwareMsg msg) {
220 218 if (apiUsageState.isReExecEnabled()) {
221   - log.info("演员消息生产——普通消息内容:【{}】",msg);
222 219 getOrCreateActor(msg.getRuleChainId()).tell(msg);
223 220 }
224 221 }
... ... @@ -229,10 +226,8 @@ public class TenantActor extends RuleChainManagerActor {
229 226 }
230 227 TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
231 228 if (priority) {
232   - log.info("演员消息生产——高级消息内容:【{}】", msg);
233 229 deviceActor.tellWithHighPriority(msg);
234 230 } else {
235   - log.info("演员消息生产——普通消息内容:【{}】",msg);
236 231 deviceActor.tell(msg);
237 232 }
238 233 }
... ... @@ -269,7 +264,6 @@ public class TenantActor extends RuleChainManagerActor {
269 264 visit(ruleChain, target);
270 265 }
271 266 }
272   - log.info("演员消息生产——高级消息内容:【{}】",msg);
273 267 target.tellWithHighPriority(msg);
274 268 } else {
275 269 log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
... ...
... ... @@ -246,7 +246,6 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
246 246 .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
247 247 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
248 248 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(tbMsg.getId(), msg);
249   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
250 249 ruleEngineMsgProducer.send(tpi,queueMsg , callback);
251 250 }
252 251
... ...
... ... @@ -252,7 +252,6 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
252 252
253 253 TopicPartitionInfo tpi = new TopicPartitionInfo(otaPackageStateMsgProducer.getDefaultTopic(), null, null, false);
254 254 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), msg);
255   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
256 255 otaPackageStateMsgProducer.send(tpi, queueMsg, null);
257 256
258 257 List<TsKvEntry> telemetry = new ArrayList<>();
... ...
... ... @@ -96,7 +96,6 @@ public class DefaultTbClusterService implements TbClusterService {
96 96 public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) {
97 97 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
98 98 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), msg);
99   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
100 99 producerProvider.getTbCoreMsgProducer().send(tpi, queueMsg, callback);
101 100 toCoreMsgs.incrementAndGet();
102 101 }
... ... @@ -104,7 +103,6 @@ public class DefaultTbClusterService implements TbClusterService {
104 103 @Override
105 104 public void pushMsgToCore(TopicPartitionInfo tpi, UUID msgId, ToCoreMsg msg, TbQueueCallback callback) {
106 105 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(msgId, msg);
107   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
108 106 producerProvider.getTbCoreMsgProducer().send(tpi,queueMsg , callback);
109 107 toCoreMsgs.incrementAndGet();
110 108 }
... ... @@ -115,7 +113,6 @@ public class DefaultTbClusterService implements TbClusterService {
115 113 log.trace("PUSHING msg: {} to:{}", msg, tpi);
116 114 byte[] msgBytes = encodingService.encode(msg);
117 115 ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build();
118   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg));
119 116 producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback);
120 117 toCoreMsgs.incrementAndGet();
121 118 }
... ... @@ -131,7 +128,6 @@ public class DefaultTbClusterService implements TbClusterService {
131 128 response.getResponse().ifPresent(builder::setResponse);
132 129 ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
133 130 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(response.getId(), msg);
134   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
135 131 producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, queueMsg, callback);
136 132 toCoreNfs.incrementAndGet();
137 133 }
... ... @@ -140,7 +136,6 @@ public class DefaultTbClusterService implements TbClusterService {
140 136 public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
141 137 log.trace("PUSHING msg: {} to:{}", msg, tpi);
142 138 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(msgId, msg);
143   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
144 139 producerProvider.getRuleEngineMsgProducer().send(tpi, queueMsg, callback);
145 140 toRuleEngineMsgs.incrementAndGet();
146 141 }
... ... @@ -168,7 +163,6 @@ public class DefaultTbClusterService implements TbClusterService {
168 163 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
169 164 .setTbMsg(TbMsg.toByteString(tbMsg)).build();
170 165 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(tbMsg.getId(), msg);
171   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
172 166 producerProvider.getRuleEngineMsgProducer().send(tpi, queueMsg, callback);
173 167 toRuleEngineMsgs.incrementAndGet();
174 168 }
... ... @@ -202,7 +196,6 @@ public class DefaultTbClusterService implements TbClusterService {
202 196 response.getResponse().ifPresent(builder::setResponse);
203 197 ToRuleEngineNotificationMsg msg = ToRuleEngineNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
204 198 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(response.getId(), msg);
205   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
206 199 producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, queueMsg, callback);
207 200 toRuleEngineNfs.incrementAndGet();
208 201 }
... ... @@ -219,7 +212,6 @@ public class DefaultTbClusterService implements TbClusterService {
219 212 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId);
220 213 log.trace("PUSHING msg: {} to:{}", response, tpi);
221 214 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), response);
222   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
223 215 producerProvider.getTransportNotificationsMsgProducer().send(tpi, queueMsg, callback);
224 216 toTransportNfs.incrementAndGet();
225 217 }
... ... @@ -342,7 +334,6 @@ public class DefaultTbClusterService implements TbClusterService {
342 334 for (String serviceId : tbCoreServices) {
343 335 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
344 336 ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
345   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg));
346 337 toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null);
347 338 toCoreNfs.incrementAndGet();
348 339 }
... ... @@ -364,7 +355,6 @@ public class DefaultTbClusterService implements TbClusterService {
364 355 for (String serviceId : tbCoreServices) {
365 356 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
366 357 ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
367   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg));
368 358 toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null);
369 359 toCoreNfs.incrementAndGet();
370 360 }
... ... @@ -374,7 +364,7 @@ public class DefaultTbClusterService implements TbClusterService {
374 364 for (String serviceId : tbRuleEngineServices) {
375 365 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
376 366 ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
377   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg));
  367 +
378 368 toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null);
379 369 toRuleEngineNfs.incrementAndGet();
380 370 }
... ...
... ... @@ -199,7 +199,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
199 199 if (msgs.isEmpty()) {
200 200 continue;
201 201 }
202   - log.info("队列消息消费 :launchMainConsumers【{}】 ", msgs);
203 202 List<IdMsgPair<ToCoreMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
204 203 ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = orderedMsgList.stream().collect(
205 204 Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
... ... @@ -236,7 +235,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
236 235 tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
237 236 } else {
238 237 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
239   - log.info("演员消息生产——普通消息内容:【{}】",msg);
240 238 actorContext.tell(actorMsg.get());
241 239 }
242 240 }
... ... @@ -310,7 +308,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
310 308 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray());
311 309 if (actorMsg.isPresent()) {
312 310 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
313   - log.info("演员消息生产——高级消息内容:【{}】", actorMsg.get());
314 311 actorContext.tellWithHighPriority(actorMsg.get());
315 312 }
316 313 callback.onSuccess();
... ... @@ -328,7 +325,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
328 325 if (msgs.isEmpty()) {
329 326 continue;
330 327 }
331   - log.info("队列消息消费 :launchUsageStatsConsumer【{}】 ", msgs);
332 328 ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
333 329 Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
334 330 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
... ... @@ -373,7 +369,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
373 369 if (msgs.isEmpty()) {
374 370 continue;
375 371 }
376   - log.info("队列消息消费:launchOtaPackageUpdateNotificationConsumer【{}】 ", msgs);
377 372 long timeToSleep = maxProcessingTimeoutPerRecord;
378 373 for (TbProtoQueueMsg<ToOtaPackageStateServiceMsg> msg : msgs) {
379 374 try {
... ... @@ -510,7 +505,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
510 505 if (statsEnabled) {
511 506 stats.log(toDeviceActorMsg);
512 507 }
513   - log.info("演员消息生产——普通消息内容:TransportToDeviceActorMsgWrapper【{}】",new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
514 508 actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
515 509 }
516 510
... ...
... ... @@ -253,7 +253,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
253 253 if (msgs.isEmpty()) {
254 254 continue;
255 255 }
256   - log.info("队列消息消费 {} ", msgs);
257 256 final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);
258 257 final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);
259 258 submitStrategy.init(msgs);
... ... @@ -400,7 +399,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
400 399 }
401 400 }
402 401 msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
403   - log.info("演员消息生产——普通消息内容:【{}】",msg);
404 402 actorContext.tell(msg);
405 403 }
406 404
... ...
... ... @@ -115,7 +115,6 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
115 115 if (msgs.isEmpty()) {
116 116 continue;
117 117 }
118   - log.info("队列消息消费 {} ", msgs);
119 118 ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = msgs.stream().collect(
120 119 Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
121 120 CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
... ... @@ -185,7 +184,6 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
185 184 }
186 185 }
187 186 log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
188   - log.info("演员消息生产——高级消息内容:【{}】", actorMsg);
189 187 actorContext.tellWithHighPriority(actorMsg);
190 188 }
191 189 }
... ...
... ... @@ -123,7 +123,6 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
123 123 log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
124 124 UUID requestId = request.getId();
125 125 localToDeviceRpcRequests.put(requestId, rpcMsg);
126   - log.info("演员消息生产——高级消息内容:【{}】", rpcMsg);
127 126 actorContext.tellWithHighPriority(rpcMsg);
128 127 scheduleToDeviceTimeout(request, requestId);
129 128 }
... ... @@ -143,7 +142,6 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
143 142 @Override
144 143 public void processRemoveRpc(RemoveRpcActorMsg removeRpcMsg) {
145 144 log.trace("[{}][{}] Processing remove RPC [{}]", removeRpcMsg.getTenantId(), removeRpcMsg.getRequestId(), removeRpcMsg.getDeviceId());
146   - log.info("演员消息生产——高级消息内容:【{}】", removeRpcMsg);
147 145 actorContext.tellWithHighPriority(removeRpcMsg);
148 146 }
149 147
... ...
... ... @@ -353,7 +353,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
353 353 } else {
354 354 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
355 355 TbProtoQueueMsg queueMsg =toProto(s, subscriptionUpdate, ignoreEmptyUpdates);
356   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
357 356 toCoreNotificationsProducer.send(tpi, queueMsg, null);
358 357 }
359 358 }
... ... @@ -378,7 +377,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
378 377 } else {
379 378 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
380 379 TbProtoQueueMsg queueMsg =toProto(s, alarm, deleted);
381   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
382 380 toCoreNotificationsProducer.send(tpi, queueMsg, null);
383 381 }
384 382 }
... ... @@ -421,7 +419,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
421 419 if (!missedUpdates.isEmpty()) {
422 420 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
423 421 TbProtoQueueMsg queueMsg =toProto(subscription, missedUpdates);
424   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
425 422 toCoreNotificationsProducer.send(tpi, queueMsg, null);
426 423 }
427 424 },
... ... @@ -446,7 +443,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
446 443 if (missedUpdates != null && !missedUpdates.isEmpty()) {
447 444 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
448 445 TbProtoQueueMsg queueMsg =toProto(subscription, missedUpdates);
449   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
450 446 toCoreNotificationsProducer.send(tpi, toProto(subscription, missedUpdates), null);
451 447 }
452 448 },
... ... @@ -467,7 +463,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
467 463 if (missedUpdates != null && !missedUpdates.isEmpty()) {
468 464 TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, subscription.getServiceId());
469 465 TbProtoQueueMsg queueMsg =toProto(subscription, missedUpdates);
470   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
471 466 toCoreNotificationsProducer.send(tpi, queueMsg, null);
472 467 }
473 468 },
... ...
... ... @@ -65,7 +65,6 @@ public class DefaultTbCoreToTransportService implements TbCoreToTransportService
65 65 UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
66 66 log.trace("[{}][{}] Pushing session data to topic: {}", tpi.getFullTopicName(), sessionId, msg);
67 67 TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(NULL_UUID, msg);
68   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
69 68 tbTransportProducer.send(tpi, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
70 69 }
71 70
... ...
... ... @@ -133,7 +133,6 @@ public class DefaultTbActorSystem implements TbActorSystem {
133 133
134 134 @Override
135 135 public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) {
136   - log.info("演员消息生产——高级消息内容:【{}】",actorMsg);
137 136 tell(target, actorMsg, true);
138 137 }
139 138
... ... @@ -165,7 +164,7 @@ public class DefaultTbActorSystem implements TbActorSystem {
165 164 public void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg) {
166 165 Set<TbActorId> children = parentChildMap.get(parent);
167 166 if (children != null) {
168   - children.stream().filter(childFilter).forEach(id -> {log.info("演员消息生产——普通消息内容:broadcastToChildren【{}】",msg);tell(id, msg);});
  167 + children.stream().filter(childFilter).forEach(id -> {tell(id, msg);});
169 168 }
170 169 }
171 170
... ...
... ... @@ -60,7 +60,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
60 60 return;
61 61 }
62 62 Set<TopicPartitionInfo> partitions =Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
63   - log.info("队列消息订阅 {} ", partitions);
  63 +
64 64 subscribeQueue.add(partitions);
65 65 }
66 66
... ... @@ -71,7 +71,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
71 71 log.error("trying subscribe, but consumer stopped for topic {}", topic);
72 72 return;
73 73 }
74   - log.info("队列消息订阅 {} ", partitions);
75 74 subscribeQueue.add(partitions);
76 75 }
77 76
... ... @@ -97,7 +96,6 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
97 96 partitions = subscribeQueue.poll();
98 97 }
99 98 if (!subscribed) {
100   - log.info("队列消息订阅 {} ", partitions);
101 99 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
102 100 doSubscribe(topicNames);
103 101 subscribed = true;
... ...
... ... @@ -113,7 +113,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
113 113 List<Response> responses = doPoll(); //poll js responses
114 114 log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}] responses", responseTemplate.getTopic(), pendingRequestsCount, responses.size());
115 115 if(!responses.isEmpty()){
116   - log.info("队列消息消费 fetchAndProcessResponses【{}】 ", responses);
117 116 }
118 117 responses.forEach(this::processResponse); //this can take a long time
119 118 responseTemplate.commit();
... ... @@ -254,7 +253,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
254 253 }
255 254
256 255 TopicPartitionInfo topicInfo = TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build();
257   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",topicInfo.getFullTopicName(),request);
258 256 requestTemplate.send(topicInfo, request, new TbQueueCallback() {
259 257 @Override
260 258 public void onSuccess(TbQueueMsgMetadata metadata) {
... ...
... ... @@ -94,7 +94,6 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
94 94 if (requests.isEmpty()) {
95 95 continue;
96 96 }
97   - log.info("队列消息消费 {} ", requests);
98 97
99 98 requests.forEach(request -> {
100 99 long currentTime = System.currentTimeMillis();
... ... @@ -120,7 +119,6 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
120 119 pendingRequestCount.decrementAndGet();
121 120 response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
122 121 TopicPartitionInfo topicInfo = TopicPartitionInfo.builder().topic(responseTopic).build();
123   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",topicInfo.getFullTopicName(),response);
124 122 responseTemplate.send(topicInfo, response, null);
125 123 stats.incrementSuccessful();
126 124 },
... ...
... ... @@ -81,7 +81,6 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
81 81 ProducerRecord<String, byte[]> record;
82 82 Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
83 83 record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
84   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),record);
85 84 producer.send(record, (metadata, exception) -> {
86 85 if (exception == null) {
87 86 if (callback != null) {
... ...
... ... @@ -124,7 +124,6 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient {
124 124
125 125
126 126 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build());
127   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
128 127 msgProducer.send(tpi, queueMsg, null);
129 128 }));
130 129
... ...
... ... @@ -225,7 +225,6 @@ public class DefaultTransportService implements TransportService {
225 225 if (records.size() == 0) {
226 226 continue;
227 227 }
228   - log.info("队列消息消费 {} ", records);
229 228 records.forEach(record -> {
230 229 try {
231 230 processToTransportMsg(record.getValue());
... ... @@ -1034,7 +1033,6 @@ public class DefaultTransportService implements TransportService {
1034 1033 new TransportTbQueueCallback(callback) : null;
1035 1034 tbCoreProducerStats.incrementTotal();
1036 1035 StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
1037   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()));
1038 1036 tbCoreMsgProducer.send(tpi
1039 1037 ,new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build())
1040 1038 ,wrappedCallback);
... ... @@ -1052,7 +1050,6 @@ public class DefaultTransportService implements TransportService {
1052 1050 StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
1053 1051
1054 1052 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(tbMsg.getId(), msg);
1055   - log.info("队列消息生产 ——主题【{}】,消息【{}】 ",tpi.getFullTopicName(),queueMsg);
1056 1053 ruleEngineMsgProducer.send(tpi, queueMsg, wrappedCallback);
1057 1054 }
1058 1055
... ...
... ... @@ -94,6 +94,8 @@ public class YtNoticeServiceImpl implements YtNoticeService {
94 94 */
95 95 private void noticeAll(String messageCode, List<AlarmContact> alarmContactList, AlarmInfoDTO alarmInfo, Organization organization,String tenantId) {
96 96 Optional.ofNullable(alarmContactList).ifPresent(contacts -> {
  97 +
  98 + /**可用的告警通知模板*/
97 99 QueryWrapper<MessageTemplate> messageTemplateQueryWrapper = new QueryWrapper<MessageTemplate>();
98 100 messageTemplateQueryWrapper.lambda()
99 101 .eq(MessageTemplate::getTenantId, tenantId)
... ... @@ -106,62 +108,143 @@ public class YtNoticeServiceImpl implements YtNoticeService {
106 108 templatesMap.put(item.getMessageType(), item.getId());
107 109 }
108 110 });
109   - List<String> emailReceivers = new ArrayList<>();
110   - contacts.stream().parallel().forEach(item -> {
111   - if (messageCode.contains(MessageTypeEnum.PHONE_MESSAGE.name())
112   - && templatesMap.containsKey(MessageTypeEnum.PHONE_MESSAGE.name())
113   - && !item.getPhone().isEmpty()) {
114   -
115   -
116   - SmsReqDTO info = new SmsReqDTO();
117   - info.setId(templatesMap.get(MessageTypeEnum.PHONE_MESSAGE.name()));
118   - info.setPhoneNumbers(item.getPhone());
119   - LinkedHashMap<String, String> params = new LinkedHashMap<>();
120   - //name-其他;device_name-其他;level-其他;location-其他;alarm_value-其他;
121   - params.put("type", alarmInfo.getType());
122   - params.put("device_name", alarmInfo.getDeviceName());
123   - params.put("severity", alarmInfo.getSeverity());
124   - params.put("organization", organization != null ? organization.getName() : "");
125   - params.put("createtime", YtDateTimeUtils.formate(alarmInfo.getCreateTs()));
126   - info.setParams(params);
127   - smsService.sendSms(info);
128   - }
129 111
130   - if (messageCode.contains(MessageTypeEnum.EMAIL_MESSAGE.name())
131   - && templatesMap.containsKey(MessageTypeEnum.EMAIL_MESSAGE.name())
132   - && !item.getEmail().isEmpty()) {
133   - emailReceivers.add(item.getEmail());
134   - }
135 112
  113 + if (messageCode.contains(MessageTypeEnum.PHONE_MESSAGE.name())
  114 + && templatesMap.containsKey(MessageTypeEnum.PHONE_MESSAGE.name())) {
  115 + sms4Alarm(alarmInfo,templatesMap.get(MessageTypeEnum.PHONE_MESSAGE.name()),organization,contacts);
  116 + }
136 117
137   - if (messageCode.contains(MessageTypeEnum.DING_TALK_MESSAGE.name())
138   - && templatesMap.containsKey(MessageTypeEnum.DING_TALK_MESSAGE.name())
139   - && !item.getDingtalk().isEmpty()) {
  118 + if (messageCode.contains(MessageTypeEnum.EMAIL_MESSAGE.name())
  119 + && templatesMap.containsKey(MessageTypeEnum.EMAIL_MESSAGE.name())) {
  120 + email4Alarm(alarmInfo,templatesMap.get(MessageTypeEnum.EMAIL_MESSAGE.name()),organization,contacts);
  121 + }
140 122
141   - }
142 123
143   - if (messageCode.contains(MessageTypeEnum.WECHAT_MESSAGE.name())
144   - && templatesMap.containsKey(MessageTypeEnum.WECHAT_MESSAGE.name())
145   - && !item.getWechat().isEmpty()) {
  124 + if (messageCode.contains(MessageTypeEnum.DING_TALK_MESSAGE.name())
  125 + && templatesMap.containsKey(MessageTypeEnum.DING_TALK_MESSAGE.name())) {
  126 + dingTalk4Alarm(alarmInfo,templatesMap.get(MessageTypeEnum.DING_TALK_MESSAGE.name()),organization,contacts);
  127 + }
146 128
147   - }
148   - });
149   - if (!emailReceivers.isEmpty()) {
150   - EmailReqDTO info = new EmailReqDTO();
151   - info.setTo(emailReceivers.toArray(new String[emailReceivers.size()]));
152   - info.setSubject(String.format("【%s】告警通知",alarmInfo.getDeviceName()));
153   - String body =String.format("%s位于【%s】的设备【%s】触发【%s】级的【%s】,请尽快处理!"
154   - ,YtDateTimeUtils.formate(alarmInfo.getCreateTs())
155   - , organization != null ? organization.getName() : ""
156   - ,alarmInfo.getDeviceName()
157   - ,alarmInfo.getSeverity()
158   - , alarmInfo.getType());
159   - info.setBody(body);
160   - info.setEmailFormatEnum(EmailFormatEnum.TEXT.name());
161   - info.setId(templatesMap.get(MessageTypeEnum.EMAIL_MESSAGE.name()));
162   -
163   - mailService.sendEmail(info);
  129 + if (messageCode.contains(MessageTypeEnum.WECHAT_MESSAGE.name())
  130 + && templatesMap.containsKey(MessageTypeEnum.WECHAT_MESSAGE.name())) {
  131 + weChat4Alarm(alarmInfo,templatesMap.get(MessageTypeEnum.WECHAT_MESSAGE.name()),organization,contacts);
164 132 }
165 133 });
166 134 }
  135 +
  136 + /**
  137 + * 短信通知设备告警信息
  138 + * @param alarmInfo 告警信息
  139 + * @param templateId 告警模板主键
  140 + * @param organization 设备所属组织
  141 + * @param contacts 设备告警联系人
  142 + */
  143 + private void sms4Alarm(AlarmInfoDTO alarmInfo,String templateId, Organization organization,List<AlarmContact> contacts){
  144 + SmsReqDTO info = new SmsReqDTO();
  145 + info.setId(templateId);
  146 + LinkedHashMap<String, String> params = new LinkedHashMap<>();
  147 + //name-其他;device_name-其他;level-其他;location-其他;alarm_value-其他;
  148 + params.put("type", alarmInfo.getType());
  149 + params.put("device_name", alarmInfo.getDeviceName());
  150 + params.put("severity", alarmInfo.getSeverity());
  151 + params.put("organization", organization != null ? organization.getName() : "");
  152 + params.put("createtime", YtDateTimeUtils.formate(alarmInfo.getCreateTs()));
  153 + info.setParams(params);
  154 + contacts.stream().parallel().forEach(item -> {
  155 + if (!item.getPhone().isEmpty()) {
  156 + info.setPhoneNumbers(item.getPhone());
  157 + smsService.sendSms(info);
  158 + }
  159 + });
  160 + }
  161 +
  162 + /**
  163 + * 邮件通知设备告警信息
  164 + * @param alarmInfo 告警信息
  165 + * @param templateId 告警模板主键
  166 + * @param organization 设备所属组织
  167 + * @param contacts 设备告警联系人
  168 + */
  169 + private void email4Alarm(AlarmInfoDTO alarmInfo,String templateId, Organization organization,List<AlarmContact> contacts){
  170 + List<String> emailReceivers = new ArrayList<>();
  171 + contacts.stream().parallel().forEach(item -> {
  172 + if (!item.getEmail().isEmpty()) {
  173 + emailReceivers.add(item.getEmail());
  174 + }
  175 + });
  176 + if (!emailReceivers.isEmpty()) {
  177 + EmailReqDTO info = new EmailReqDTO();
  178 + info.setTo(emailReceivers.toArray(new String[emailReceivers.size()]));
  179 + info.setSubject(String.format("【%s】告警通知",alarmInfo.getDeviceName()));
  180 + String body =String.format("%s位于【%s】的设备【%s】触发【%s】级的【%s】,请尽快处理!"
  181 + ,YtDateTimeUtils.formate(alarmInfo.getCreateTs())
  182 + , organization != null ? organization.getName() : ""
  183 + ,alarmInfo.getDeviceName()
  184 + ,alarmInfo.getSeverity()
  185 + , alarmInfo.getType());
  186 + info.setBody(body);
  187 + info.setEmailFormatEnum(EmailFormatEnum.TEXT.name());
  188 + info.setId(templateId);
  189 +
  190 + mailService.sendEmail(info);
  191 + }
  192 + }
  193 +
  194 + /**
  195 + * 钉钉通知设备告警信息
  196 + * @param alarmInfo 告警信息
  197 + * @param templateId 告警模板主键
  198 + * @param organization 设备所属组织
  199 + * @param contacts 设备告警联系人
  200 + */
  201 + private void dingTalk4Alarm(AlarmInfoDTO alarmInfo,String templateId, Organization organization,List<AlarmContact> contacts){
  202 + // TODO 推送钉钉消息
  203 + SmsReqDTO info = new SmsReqDTO();
  204 + info.setId(templateId);
  205 + LinkedHashMap<String, String> params = new LinkedHashMap<>();
  206 + //name-其他;device_name-其他;level-其他;location-其他;alarm_value-其他;
  207 + params.put("type", alarmInfo.getType());
  208 + params.put("device_name", alarmInfo.getDeviceName());
  209 + params.put("severity", alarmInfo.getSeverity());
  210 + params.put("organization", organization != null ? organization.getName() : "");
  211 + params.put("createtime", YtDateTimeUtils.formate(alarmInfo.getCreateTs()));
  212 + info.setParams(params);
  213 + contacts.stream().parallel().forEach(item -> {
  214 + if (!item.getDingtalk().isEmpty()) {
  215 + info.setPhoneNumbers(item.getDingtalk());
  216 + smsService.sendSms(info);
  217 + }
  218 + });
  219 +
  220 + }
  221 +
  222 + /**
  223 + * 微信通知设备告警信息
  224 + * @param alarmInfo 告警信息
  225 + * @param templateId 告警模板主键
  226 + * @param organization 设备所属组织
  227 + * @param contacts 设备告警联系人
  228 + */
  229 + private void weChat4Alarm(AlarmInfoDTO alarmInfo,String templateId, Organization organization,List<AlarmContact> contacts){
  230 + // TODO 推送微信通知
  231 + SmsReqDTO info = new SmsReqDTO();
  232 + info.setId(templateId);
  233 + LinkedHashMap<String, String> params = new LinkedHashMap<>();
  234 + //name-其他;device_name-其他;level-其他;location-其他;alarm_value-其他;
  235 + params.put("type", alarmInfo.getType());
  236 + params.put("device_name", alarmInfo.getDeviceName());
  237 + params.put("severity", alarmInfo.getSeverity());
  238 + params.put("organization", organization != null ? organization.getName() : "");
  239 + params.put("createtime", YtDateTimeUtils.formate(alarmInfo.getCreateTs()));
  240 + info.setParams(params);
  241 + contacts.stream().parallel().forEach(item -> {
  242 + if (!item.getWechat().isEmpty()) {
  243 + info.setPhoneNumbers(item.getWechat());
  244 + smsService.sendSms(info);
  245 + }
  246 + });
  247 + }
  248 +
  249 +
167 250 }
... ...
... ... @@ -53,10 +53,8 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
53 53
54 54 @Override
55 55 public void onMsg(TbContext ctx, TbMsg msg) {
56   - log.info("节点【TbAbstractAlarmNode】处理消费: {}", msg);
57 56 withCallback(processAlarm(ctx, msg),
58 57 alarmResult -> {
59   - log.info("演员消息生产——普通消息内容:【{}】",msg);
60 58 if (alarmResult.alarm == null) {
61 59 ctx.tellNext(msg, "False");
62 60 } else if (alarmResult.isCreated) {
... ... @@ -111,7 +109,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
111 109
112 110 private void tellNext(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult, String entityAction, String alarmAction) {
113 111 ctx.enqueue(ctx.alarmActionMsg(alarmResult.alarm, ctx.getSelfId(), entityAction),
114   - () -> {log.info("演员消息生产——普通消息内容:transformSuccess【{}】",toAlarmMsg(ctx, alarmResult, msg));ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmAction);},
  112 + () -> {ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmAction);},
115 113 throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable));
116 114 }
117 115 }
... ...
... ... @@ -64,7 +64,6 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
64 64 public void onMsg(TbContext ctx, TbMsg msg) {
65 65 withCallback(processCustomerAction(ctx, msg),
66 66 m -> {
67   - log.info("演员消息生产——普通消息内容:tellFailure【{}】",msg);
68 67 ctx.tellSuccess(msg);}
69 68 ,t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
70 69 }
... ...
... ... @@ -84,7 +84,6 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
84 84 String relationType = processPattern(msg, config.getRelationType());
85 85 withCallback(processEntityRelationAction(ctx, msg, relationType),
86 86 filterResult -> {
87   - log.info("演员消息生产——普通消息内容:tellFailure【{}】",filterResult.getMsg());
88 87 ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE);}
89 88 , t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
90 89 }
... ...
... ... @@ -56,7 +56,6 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
56 56
57 57 @Override
58 58 protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
59   - log.info("节点【TbClearAlarmNode】处理消费: {}", msg);
60 59 String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
61 60 ListenableFuture<Alarm> alarmFuture;
62 61 if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
... ...
... ... @@ -81,7 +81,6 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
81 81
82 82 @Override
83 83 protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
84   - log.info("节点【TbCreateAlarmNode】处理消费: {}", msg);
85 84 String alarmType;
86 85 final Alarm msgAlarm;
87 86
... ...
... ... @@ -64,7 +64,6 @@ public class TbLogNode implements TbNode {
64 64 public void onSuccess(@Nullable String result) {
65 65 ctx.logJsEvalResponse();
66 66 log.info(result);
67   - log.info("演员消息生产——普通消息内容:【{}】", msg);
68 67 ctx.tellSuccess(msg);
69 68 }
70 69
... ...
... ... @@ -96,7 +96,6 @@ public class TbMsgCountNode implements TbNode {
96 96 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
97 97 TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), msg != null ? msg.getCustomerId() : null, new TbMsgMetaData(), "");
98 98 nextTickId = tickMsg.getId();
99   - log.info("演员消息生产——高级消息内容:【{}】", tickMsg);
100 99 ctx.tellSelf(tickMsg, curDelay);
101 100 }
102 101
... ...
... ... @@ -105,7 +105,6 @@ public class TbMsgGeneratorNode implements TbNode {
105 105 @Override
106 106 public void onMsg(TbContext ctx, TbMsg msg) {
107 107 log.trace("onMsg, config {}, msg {}", config, msg);
108   - log.info("节点【TbMsgGeneratorNode】处理消费: {}", msg);
109 108 if (initialized.get() && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
110 109 TbStopWatch sw = TbStopWatch.startNew();
111 110 withCallback(generate(ctx, msg),
... ... @@ -138,7 +137,6 @@ public class TbMsgGeneratorNode implements TbNode {
138 137 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
139 138 TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
140 139 nextTickId = tickMsg.getId();
141   - log.info("演员消息生产——高级消息内容:【{}】", tickMsg);
142 140 ctx.tellSelf(tickMsg, curDelay);
143 141 }
144 142
... ...
... ... @@ -74,7 +74,6 @@ public class TbMsgDelayNode implements TbNode {
74 74 if (pendingMsgs.size() < config.getMaxPendingMsgs()) {
75 75 pendingMsgs.put(msg.getId(), msg);
76 76 TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), msg.getCustomerId(), new TbMsgMetaData(), msg.getId().toString());
77   - log.info("演员消息生产——高级消息内容:【{}】", msg);
78 77 ctx.tellSelf(tickMsg, getDelay(msg));
79 78 ctx.ack(msg);
80 79 } else {
... ...
... ... @@ -150,7 +150,6 @@ public class TbMsgPushToEdgeNode implements TbNode {
150 150 private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
151 151 edgeEvent.setEdgeId(edgeId);
152 152 ctx.getEdgeEventService().save(edgeEvent);
153   - log.info("演员消息生产——普通消息内容:【{}】",msg);
154 153 ctx.tellNext(msg, SUCCESS);
155 154 ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
156 155 }
... ...
... ... @@ -56,7 +56,6 @@ public class TbCheckAlarmStatusNode implements TbNode {
56 56
57 57 @Override
58 58 public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
59   - log.info("节点【TbCheckAlarmStatusNode】处理消费: {}", msg);
60 59 try {
61 60 Alarm alarm = mapper.readValue(msg.getData(), Alarm.class);
62 61
... ... @@ -73,7 +72,6 @@ public class TbCheckAlarmStatusNode implements TbNode {
73 72 break;
74 73 }
75 74 }
76   - log.info("演员消息生产——普通消息内容:【{}】", msg);
77 75 if (isPresent) {
78 76 ctx.tellNext(msg, "True");
79 77 } else {
... ...
... ... @@ -58,7 +58,6 @@ public class TbCheckMessageNode implements TbNode {
58 58 @Override
59 59 public void onMsg(TbContext ctx, TbMsg msg) {
60 60 try {
61   - log.info("演员消息生产——普通消息内容:【{}】", msg);
62 61 if (config.isCheckAllKeys()) {
63 62 ctx.tellNext(msg, allKeysData(msg) && allKeysMetadata(msg) ? "True" : "False");
64 63 } else {
... ...
... ... @@ -69,7 +69,7 @@ public class TbCheckRelationNode implements TbNode {
69 69 } else {
70 70 checkRelationFuture = processList(ctx, msg);
71 71 }
72   - withCallback(checkRelationFuture, filterResult -> {log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellNext(msg, filterResult ? "True" : "False");}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  72 + withCallback(checkRelationFuture, filterResult -> {ctx.tellNext(msg, filterResult ? "True" : "False");}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
73 73 }
74 74
75 75 private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg) {
... ...
... ... @@ -59,7 +59,6 @@ public class TbJsFilterNode implements TbNode {
59 59 withCallback(jsEngine.executeFilterAsync(msg),
60 60 filterResult -> {
61 61 ctx.logJsEvalResponse();
62   - log.info("演员消息生产——普通消息内容:【{}】", msg);
63 62 ctx.tellNext(msg, filterResult ? "True" : "False");
64 63 },
65 64 t -> {
... ...
... ... @@ -77,7 +77,6 @@ public class TbJsSwitchNode implements TbNode {
77 77 }
78 78
79 79 private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) {
80   - log.info("演员消息生产——普通消息内容:processSwitch【{}】",msg);
81 80 ctx.tellNext(msg, nextRelations);
82 81 }
83 82
... ...
... ... @@ -49,7 +49,6 @@ public class TbMsgTypeFilterNode implements TbNode {
49 49
50 50 @Override
51 51 public void onMsg(TbContext ctx, TbMsg msg) {
52   - log.info("演员消息生产——普通消息内容:【{}】", msg);
53 52 ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
54 53 }
55 54
... ...
... ... @@ -114,7 +114,6 @@ public class TbMsgTypeSwitchNode implements TbNode {
114 114 } else {
115 115 relationType = "Other";
116 116 }
117   - log.info("演员消息生产——普通消息内容:类型{}【{}】",msg.getType(), msg);
118 117 ctx.tellNext(msg, relationType);
119 118 }
120 119
... ...
... ... @@ -48,7 +48,6 @@ public class TbOriginatorTypeFilterNode implements TbNode {
48 48 @Override
49 49 public void onMsg(TbContext ctx, TbMsg msg) {
50 50 EntityType originatorType = msg.getOriginator().getEntityType();
51   - log.info("演员消息生产——普通消息内容:【{}】", msg);
52 51 ctx.tellNext(msg, config.getOriginatorTypes().contains(originatorType) ? "True" : "False");
53 52 }
54 53
... ...
... ... @@ -87,7 +87,6 @@ public class TbOriginatorTypeSwitchNode implements TbNode {
87 87 default:
88 88 throw new TbNodeException("Unsupported originator type: " + originatorType);
89 89 }
90   - log.info("演员消息生产——普通消息内容:【{}】", msg);
91 90 ctx.tellNext(msg, relationType);
92 91 }
93 92
... ...
... ... @@ -48,7 +48,6 @@ public class TbAckNode implements TbNode {
48 48 @Override
49 49 public void onMsg(TbContext ctx, TbMsg msg) {
50 50 ctx.ack(msg);
51   - log.info("演员消息生产——普通消息内容:【{}】", msg);
52 51 ctx.tellSuccess(msg);
53 52 }
54 53
... ...
... ... @@ -101,13 +101,11 @@ public class TbPubSubNode implements TbNode {
101 101 ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
102 102 public void onSuccess(String messageId) {
103 103 TbMsg next = processPublishResult(ctx, msg, messageId);
104   - log.info("演员消息生产——普通消息内容:【{}】", msg);
105 104 ctx.tellSuccess(next);
106 105 }
107 106
108 107 public void onFailure(Throwable t) {
109 108 TbMsg next = processException(ctx, msg, t);
110   - log.info("演员消息生产——普通消息内容:【{}】", next);
111 109 ctx.tellFailure(next, t);
112 110 }
113 111 },
... ...
... ... @@ -97,7 +97,6 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofe
97 97 }
98 98 }
99 99 if (!told) {
100   - log.info("演员消息生产——普通消息内容:【{}】", msg);
101 100 ctx.tellSuccess(msg);
102 101 }
103 102 }
... ...
... ... @@ -77,7 +77,7 @@ public class TbSendEmailNode implements TbNode {
77 77 sendEmail(ctx, msg, email);
78 78 return null;
79 79 }),
80   - ok -> {log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellSuccess(msg);},
  80 + ok -> {ctx.tellSuccess(msg);},
81 81 fail -> ctx.tellFailure(msg, fail));
82 82 } catch (Exception ex) {
83 83 ctx.tellFailure(msg, ex);
... ...
... ... @@ -91,7 +91,6 @@ public class CalculateDeltaNode implements TbNode {
91 91 BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
92 92
93 93 if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
94   - log.info("演员消息生产——普通消息内容:【{}】", msg);
95 94 ctx.tellNext(msg, TbRelationTypes.FAILURE);
96 95 return;
97 96 }
... ... @@ -112,16 +111,13 @@ public class CalculateDeltaNode implements TbNode {
112 111 long period = previousData != null ? currentTs - previousData.ts : 0;
113 112 result.put(config.getPeriodValueKey(), period);
114 113 }
115   - log.info("演员消息生产——普通消息内容:【{}】", TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
116 114 ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
117 115 },
118 116 t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
119 117 } else {
120   - log.info("演员消息生产——普通消息内容:【{}】", msg);
121 118 ctx.tellNext(msg, "Other");
122 119 }
123 120 } else {
124   - log.info("演员消息生产——普通消息内容:【{}】", msg);
125 121 ctx.tellNext(msg, "Other");
126 122 }
127 123 }
... ...
... ... @@ -71,7 +71,6 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
71 71
72 72 @Override
73 73 public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
74   - log.info("节点【TbAbstractGetAttributesNode】处理消费: {}", msg);
75 74 try {
76 75 withCallback(
77 76 findEntityIdAsync(ctx, msg),
... ... @@ -90,7 +89,6 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
90 89
91 90 private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
92 91 if (entityId == null || entityId.isNullUid()) {
93   - log.info("演员消息生产——普通消息内容:【{}】", msg);
94 92 ctx.tellNext(msg, FAILURE);
95 93 return;
96 94 }
... ... @@ -105,7 +103,6 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
105 103 if (!failuresMap.isEmpty()) {
106 104 throw reportFailures(failuresMap);
107 105 }
108   - log.info("演员消息生产——普通消息内容:【{}】", msg);
109 106 ctx.tellSuccess(msg);
110 107 }, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
111 108 }
... ...
... ... @@ -61,7 +61,6 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
61 61
62 62 private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) {
63 63 if (entityId == null || entityId.isNullUid()) {
64   - log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msg);
65 64 ctx.tellNext(msg, FAILURE);
66 65 return;
67 66 }
... ... @@ -89,7 +88,6 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
89 88 String attrName = config.getAttrMapping().get(r.getKey());
90 89 msg.getMetaData().putValue(attrName, r.getValueAsString());
91 90 });
92   - log.info("演员消息生产——普通消息内容:putAttributesAndTell【{}】", msg);
93 91 ctx.tellSuccess(msg);
94 92 }
95 93
... ...
... ... @@ -56,7 +56,7 @@ public class TbGetOriginatorFieldsNode implements TbNode {
56 56 public void onMsg(TbContext ctx, TbMsg msg) {
57 57 try {
58 58 withCallback(putEntityFields(ctx, msg.getOriginator(), msg),
59   - i ->{ log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellSuccess(msg);}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  59 + i ->{ctx.tellSuccess(msg);}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
60 60 } catch (Throwable th) {
61 61 ctx.tellFailure(msg, th);
62 62 }
... ...
... ... @@ -106,7 +106,6 @@ public class TbGetTelemetryNode implements TbNode {
106 106
107 107 @Override
108 108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
109   - log.info("节点【TbGetTelemetryNode】处理消费: {}", msg);
110 109 if (tsKeyNames.isEmpty()) {
111 110 ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!"));
112 111 } else {
... ... @@ -118,7 +117,6 @@ public class TbGetTelemetryNode implements TbNode {
118 117 ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys));
119 118 DonAsynchron.withCallback(list, data -> {
120 119 process(data, msg, keys);
121   - log.info("演员消息生产——普通消息内容:【{}】", ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
122 120 ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
123 121 }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
124 122 } catch (Exception e) {
... ...
... ... @@ -75,12 +75,10 @@ public class TbMqttNode implements TbNode {
75 75
76 76 @Override
77 77 public void onMsg(TbContext ctx, TbMsg msg) {
78   - log.info("节点【TbMqttNode】处理消费: {}", msg);
79 78 String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
80 79 this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
81 80 .addListener(future -> {
82 81 if (future.isSuccess()) {
83   - log.info("演员消息生产——普通消息内容:【{}】", msg);
84 82 ctx.tellSuccess(msg);
85 83 } else {
86 84 TbMsg next = processException(ctx, msg, future.cause());
... ...
... ... @@ -67,7 +67,7 @@ public class AlarmNoticeNode implements TbNode {
67 67 posetApi(ctx, msg);
68 68 return null;
69 69 }),
70   - ok -> {log.info("演员消息生产——普通消息内容:【{}】", msg);ctx.tellSuccess(msg);},
  70 + ok -> {ctx.tellSuccess(msg);},
71 71 fail -> ctx.tellFailure(msg, fail));
72 72 } catch (Exception ex) {
73 73 ctx.tellFailure(msg, ex);
... ...
... ... @@ -157,7 +157,6 @@ class DeviceState {
157 157 if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED) || msg.getType().equals(DataConstants.ENTITY_UNASSIGNED)) {
158 158 dynamicPredicateValueCtx.resetCustomer();
159 159 }
160   - log.info("演员消息生产——普通消息内容:{}【{}】",msg.getType(), msg);
161 160 ctx.tellSuccess(msg);
162 161 }
163 162 if (persistState && stateChanged) {
... ... @@ -183,7 +182,6 @@ class DeviceState {
183 182 a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
184 183 stateChanged |= alarmState.processAlarmClear(ctx, alarmNf);
185 184 }
186   - log.info("演员消息生产——普通消息内容:processAlarmClearNotification【{}】", msg);
187 185 ctx.tellSuccess(msg);
188 186 return stateChanged;
189 187 }
... ... @@ -195,14 +193,12 @@ class DeviceState {
195 193 a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
196 194 alarmState.processAckAlarm(alarmNf);
197 195 }
198   - log.info("演员消息生产——普通消息内容:processAlarmAckNotification【{}】", msg);
199 196 ctx.tellSuccess(msg);
200 197 }
201 198
202 199 private void processAlarmDeleteNotification(TbContext ctx, TbMsg msg) {
203 200 Alarm alarm = JacksonUtil.fromString(msg.getData(), Alarm.class);
204 201 alarmStates.values().removeIf(alarmState -> alarmState.getCurrentAlarm().getId().equals(alarm.getId()));
205   - log.info("演员消息生产——普通消息内容:processAlarmDeleteNotification【{}】", msg);
206 202 ctx.tellSuccess(msg);
207 203 }
208 204
... ... @@ -231,7 +227,6 @@ class DeviceState {
231 227 stateChanged |= alarmState.process(ctx, msg, latestValues, null);
232 228 }
233 229 }
234   - log.info("演员消息生产——普通消息内容:processAttributesDeleteNotification【{}】", msg);
235 230 ctx.tellSuccess(msg);
236 231 return stateChanged;
237 232 }
... ... @@ -251,7 +246,6 @@ class DeviceState {
251 246 stateChanged |= alarmState.process(ctx, msg, latestValues, update);
252 247 }
253 248 }
254   - log.info("演员消息生产——普通消息内容:processAttributes【{}】", msg);
255 249 ctx.tellSuccess(msg);
256 250 return stateChanged;
257 251 }
... ... @@ -277,7 +271,6 @@ class DeviceState {
277 271 }
278 272 }
279 273 }
280   - log.info("演员消息生产——普通消息内容:processTelemetry【{}】", msg);
281 274 ctx.tellSuccess(msg);
282 275 return stateChanged;
283 276 }
... ...
... ... @@ -106,7 +106,6 @@ public class TbDeviceProfileNode implements TbNode {
106 106
107 107 @Override
108 108 public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
109   - log.info("节点【TbDeviceProfileNode】处理消费: {}", msg);
110 109 EntityType originatorType = msg.getOriginator().getEntityType();
111 110 if (msg.getType().equals(PERIODIC_MSG_TYPE)) {
112 111 scheduleAlarmHarvesting(ctx, msg);
... ... @@ -126,11 +125,9 @@ public class TbDeviceProfileNode implements TbNode {
126 125 DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
127 126 if (msg.getType().equals(DataConstants.ENTITY_UPDATED)) {
128 127 invalidateDeviceProfileCache(deviceId, msg.getData());
129   - log.info("演员消息生产——普通消息内容:ENTITY_UPDATED【{}】", msg);
130 128 ctx.tellSuccess(msg);
131 129 } else if (msg.getType().equals(DataConstants.ENTITY_DELETED)) {
132 130 removeDeviceState(deviceId);
133   - log.info("演员消息生产——普通消息内容:ENTITY_DELETED【{}】", msg);
134 131 ctx.tellSuccess(msg);
135 132 } else {
136 133 DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null);
... ... @@ -142,7 +139,6 @@ public class TbDeviceProfileNode implements TbNode {
142 139 }
143 140 }
144 141 } else {
145   - log.info("演员消息生产——普通消息内容:{}【{}】", msg.getType(),msg);
146 142 ctx.tellSuccess(msg);
147 143 }
148 144 }
... ... @@ -174,7 +170,6 @@ public class TbDeviceProfileNode implements TbNode {
174 170
175 171 protected void scheduleAlarmHarvesting(TbContext ctx, TbMsg msg) {
176 172 TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, "{}");
177   - log.info("演员消息生产——高级消息内容:scheduleAlarmHarvesting【{}】", periodicCheck);
178 173 ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1));
179 174 }
180 175
... ... @@ -199,7 +194,6 @@ public class TbDeviceProfileNode implements TbNode {
199 194 }
200 195
201 196 protected void onProfileUpdate(DeviceProfile profile) {
202   - log.info("演员消息生产——高级消息内容:onProfileUpdate【{}】", TbMsg.newMsg(PROFILE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, profile.getId().getId().toString()));
203 197 ctx.tellSelf(TbMsg.newMsg(PROFILE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, profile.getId().getId().toString()), 0L);
204 198 }
205 199
... ... @@ -209,7 +203,6 @@ public class TbDeviceProfileNode implements TbNode {
209 203 if (deviceProfile != null) {
210 204 msgData.put("deviceProfileId", deviceProfile.getId().getId().toString());
211 205 }
212   - log.info("演员消息生产——高级消息内容:onDeviceUpdate【{}】", TbMsg.newMsg(DEVICE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(msgData)));
213 206 ctx.tellSelf(TbMsg.newMsg(DEVICE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(msgData)), 0L);
214 207 }
215 208
... ...
... ... @@ -202,11 +202,9 @@ public class TbHttpClient {
202 202 public void onSuccess(ResponseEntity<String> responseEntity) {
203 203 if (responseEntity.getStatusCode().is2xxSuccessful()) {
204 204 TbMsg next = processResponse(ctx, msg, responseEntity);
205   - log.info("演员消息生产——普通消息内容:transformSuccess【{}】",next);
206 205 ctx.tellSuccess(next);
207 206 } else {
208 207 TbMsg next = processFailureResponse(ctx, msg, responseEntity);
209   - log.info("演员消息生产——普通消息内容:transformSuccess【{}】",next);
210 208 ctx.tellNext(next, TbRelationTypes.FAILURE);
211 209 }
212 210 }
... ...
... ... @@ -66,8 +66,6 @@ public class TbSendRPCReplyNode implements TbNode {
66 66 ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
67 67 } else {
68 68 ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData());
69   -
70   - log.info("演员消息生产——普通消息内容:【{}】", msg);
71 69 ctx.tellSuccess(msg);
72 70 }
73 71 }
... ...
... ... @@ -58,7 +58,6 @@ public class TbMsgAttributesNode implements TbNode {
58 58
59 59 @Override
60 60 public void onMsg(TbContext ctx, TbMsg msg) {
61   - log.info("节点【TbMsgAttributesNode】处理消费: {}", msg);
62 61 if (!msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
63 62 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
64 63 return;
... ...
... ... @@ -72,7 +72,6 @@ public class TbMsgTimeseriesNode implements TbNode {
72 72
73 73 @Override
74 74 public void onMsg(TbContext ctx, TbMsg msg) {
75   - log.info("节点【TbMsgTimeseriesNode】处理消费: {}", msg);
76 75 if (!msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
77 76 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
78 77 return;
... ...
... ... @@ -34,7 +34,6 @@ class TelemetryNodeCallback implements FutureCallback<Void> {
34 34
35 35 @Override
36 36 public void onSuccess(@Nullable Void result) {
37   - log.info("演员消息生产——普通消息内容:【{}】", msg);
38 37 ctx.tellSuccess(msg);
39 38 }
40 39
... ...
... ... @@ -46,7 +46,6 @@ public class TbSynchronizationBeginNode implements TbNode {
46 46 @Override
47 47 public void onMsg(TbContext ctx, TbMsg msg) {
48 48 log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_BY_ORIGINATOR instead.");
49   - log.info("演员消息生产——普通消息内容:【{}】", msg);
50 49 ctx.tellSuccess(msg);
51 50 }
52 51
... ...
... ... @@ -45,8 +45,6 @@ public class TbSynchronizationEndNode implements TbNode {
45 45 @Override
46 46 public void onMsg(TbContext ctx, TbMsg msg) {
47 47 log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_BY_ORIGINATOR instead.");
48   -
49   - log.info("演员消息生产——普通消息内容:【{}】", msg);
50 48 ctx.tellSuccess(msg);
51 49 }
52 50
... ...
... ... @@ -46,7 +46,6 @@ public abstract class TbAbstractTransformNode implements TbNode {
46 46
47 47 @Override
48 48 public void onMsg(TbContext ctx, TbMsg msg) {
49   - log.info("节点【TbAbstractTransformNode】处理消费: {}", msg);
50 49 withCallback(transform(ctx, msg),
51 50 m -> transformSuccess(ctx, msg, m),
52 51 t -> transformFailure(ctx, msg, t),
... ... @@ -58,7 +57,6 @@ public abstract class TbAbstractTransformNode implements TbNode {
58 57 }
59 58
60 59 protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) {
61   - log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msg);
62 60 if (m != null) {
63 61 ctx.tellSuccess(m);
64 62 } else {
... ... @@ -69,7 +67,6 @@ public abstract class TbAbstractTransformNode implements TbNode {
69 67 protected void transformSuccess(TbContext ctx, TbMsg msg, List<TbMsg> msgs) {
70 68 if (msgs != null && !msgs.isEmpty()) {
71 69 if (msgs.size() == 1) {
72   - log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msgs.get(0));
73 70 ctx.tellSuccess(msgs.get(0));
74 71 } else {
75 72 TbMsgCallbackWrapper wrapper = new MultipleTbMsgsCallbackWrapper(msgs.size(), new TbMsgCallback() {
... ... @@ -86,7 +83,6 @@ public abstract class TbAbstractTransformNode implements TbNode {
86 83 msgs.forEach(newMsg -> ctx.enqueueForTellNext(newMsg, "Success", wrapper::onSuccess, wrapper::onFailure));
87 84 }
88 85 } else {
89   - log.info("演员消息生产——普通消息内容:transformSuccess【{}】",msg);
90 86 ctx.tellNext(msg, FAILURE);
91 87 }
92 88 }
... ...
... ... @@ -169,7 +169,6 @@ public class TbGetCustomerAttributeNodeTest {
169 169
170 170
171 171 node.onMsg(ctx, msg);
172   - log.info("演员消息生产——普通消息内容:tellFailure【{}】",msg);
173 172 verify(ctx).tellNext(msg, FAILURE);
174 173 assertTrue(msg.getMetaData().getData().isEmpty());
175 174 }
... ... @@ -257,7 +256,6 @@ public class TbGetCustomerAttributeNodeTest {
257 256 .thenReturn(Futures.immediateFuture(timeseries));
258 257
259 258 node.onMsg(ctx, msg);
260   - log.info("演员消息生产——普通消息内容:deviceCustomerTelemetryFetched【{}】", msg);
261 259 verify(ctx).tellSuccess(msg);
262 260 assertEquals(msg.getMetaData().getValue("tempo"), "highest");
263 261 }
... ... @@ -270,7 +268,6 @@ public class TbGetCustomerAttributeNodeTest {
270 268 .thenReturn(Futures.immediateFuture(attributes));
271 269
272 270 node.onMsg(ctx, msg);
273   - log.info("演员消息生产——普通消息内容:entityAttributeFetched【{}】", msg);
274 271 verify(ctx).tellSuccess(msg);
275 272 assertEquals(msg.getMetaData().getValue("tempo"), "high");
276 273 }
... ...