Commit 3ba15599c55e1a4109518f30fb1e2248e975f0a5

Authored by Andrew Shvayka
1 parent c94ef878

Logging improvements and termination behaviour

Showing 19 changed files with 232 additions and 166 deletions
... ... @@ -25,6 +25,9 @@ import akka.actor.Terminated;
25 25 import akka.event.Logging;
26 26 import akka.event.LoggingAdapter;
27 27 import akka.japi.Function;
  28 +import com.google.common.collect.BiMap;
  29 +import com.google.common.collect.HashBiMap;
  30 +import lombok.extern.slf4j.Slf4j;
28 31 import org.thingsboard.server.actors.ActorSystemContext;
29 32 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
30 33 import org.thingsboard.server.actors.service.ContextBasedCreator;
... ... @@ -48,18 +51,17 @@ import java.util.HashMap;
48 51 import java.util.Map;
49 52 import java.util.Optional;
50 53
  54 +@Slf4j
51 55 public class AppActor extends RuleChainManagerActor {
52 56
53   - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
54   -
55   - public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
  57 + private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
56 58 private final TenantService tenantService;
57   - private final Map<TenantId, ActorRef> tenantActors;
  59 + private final BiMap<TenantId, ActorRef> tenantActors;
58 60
59 61 private AppActor(ActorSystemContext systemContext) {
60 62 super(systemContext, new SystemRuleChainManager(systemContext));
61 63 this.tenantService = systemContext.getTenantService();
62   - this.tenantActors = new HashMap<>();
  64 + this.tenantActors = HashBiMap.create();
63 65 }
64 66
65 67 @Override
... ... @@ -69,22 +71,20 @@ public class AppActor extends RuleChainManagerActor {
69 71
70 72 @Override
71 73 public void preStart() {
72   - logger.info("Starting main system actor.");
  74 + log.info("Starting main system actor.");
73 75 try {
74 76 initRuleChains();
75   -
76 77 if (systemContext.isTenantComponentsInitEnabled()) {
77 78 PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
78 79 for (Tenant tenant : tenantIterator) {
79   - logger.debug("[{}] Creating tenant actor", tenant.getId());
  80 + log.debug("[{}] Creating tenant actor", tenant.getId());
80 81 getOrCreateTenantActor(tenant.getId());
81   - logger.debug("Tenant actor created.");
  82 + log.debug("Tenant actor created.");
82 83 }
83 84 }
84   -
85   - logger.info("Main system actor started.");
  85 + log.info("Main system actor started.");
86 86 } catch (Exception e) {
87   - logger.error(e, "Unknown failure");
  87 + log.warn("Unknown failure", e);
88 88 }
89 89 }
90 90
... ... @@ -130,7 +130,7 @@ public class AppActor extends RuleChainManagerActor {
130 130
131 131 private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
132 132 if (SYSTEM_TENANT.equals(msg.getTenantId())) {
133   - //TODO: ashvayka handle this.
  133 + log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet", SYSTEM_TENANT);
134 134 } else {
135 135 getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
136 136 }
... ... @@ -152,7 +152,7 @@ public class AppActor extends RuleChainManagerActor {
152 152 if (target != null) {
153 153 target.tell(msg, ActorRef.noSender());
154 154 } else {
155   - logger.debug("Invalid component lifecycle msg: {}", msg);
  155 + log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
156 156 }
157 157 }
158 158
... ... @@ -161,14 +161,26 @@ public class AppActor extends RuleChainManagerActor {
161 161 }
162 162
163 163 private ActorRef getOrCreateTenantActor(TenantId tenantId) {
164   - return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
165   - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()));
  164 + return tenantActors.computeIfAbsent(tenantId, k -> {
  165 + log.debug("[{}] Creating tenant actor.", tenantId);
  166 + ActorRef tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
  167 + .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString());
  168 + context().watch(tenantActor);
  169 + log.debug("[{}] Created tenant actor: {}.", tenantId, tenantActor);
  170 + return tenantActor;
  171 + });
166 172 }
167 173
168   - private void processTermination(Terminated message) {
  174 + @Override
  175 + protected void processTermination(Terminated message) {
169 176 ActorRef terminated = message.actor();
170 177 if (terminated instanceof LocalActorRef) {
171   - logger.debug("Removed actor: {}", terminated);
  178 + boolean removed = tenantActors.inverse().remove(terminated) != null;
  179 + if (removed) {
  180 + log.debug("[{}] Removed actor:", terminated);
  181 + } else {
  182 + log.warn("[{}] Removed actor was not found in the tenant map!");
  183 + }
172 184 } else {
173 185 throw new IllegalStateException("Remote actors are not supported!");
174 186 }
... ... @@ -182,20 +194,17 @@ public class AppActor extends RuleChainManagerActor {
182 194 }
183 195
184 196 @Override
185   - public AppActor create() throws Exception {
  197 + public AppActor create() {
186 198 return new AppActor(context);
187 199 }
188 200 }
189 201
190   - private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, Directive>() {
191   - @Override
192   - public Directive apply(Throwable t) {
193   - logger.error(t, "Unknown failure");
194   - if (t instanceof RuntimeException) {
195   - return SupervisorStrategy.restart();
196   - } else {
197   - return SupervisorStrategy.stop();
198   - }
  202 + private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
  203 + log.warn("Unknown failure", t);
  204 + if (t instanceof RuntimeException) {
  205 + return SupervisorStrategy.restart();
  206 + } else {
  207 + return SupervisorStrategy.stop();
199 208 }
200 209 });
201 210 }
... ...
... ... @@ -15,42 +15,38 @@
15 15 */
16 16 package org.thingsboard.server.actors.device;
17 17
18   -import akka.event.Logging;
19   -import akka.event.LoggingAdapter;
  18 +import lombok.extern.slf4j.Slf4j;
20 19 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
21 20 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
22 21 import org.thingsboard.server.actors.ActorSystemContext;
23 22 import org.thingsboard.server.actors.service.ContextAwareActor;
24   -import org.thingsboard.server.actors.service.ContextBasedCreator;
25 23 import org.thingsboard.server.common.data.id.DeviceId;
26 24 import org.thingsboard.server.common.data.id.TenantId;
27 25 import org.thingsboard.server.common.msg.TbActorMsg;
28   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
29 26 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
30 27 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
31 28 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
32 29 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
33 30 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
34 31
  32 +@Slf4j
35 33 public class DeviceActor extends ContextAwareActor {
36 34
37   - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
38   -
39 35 private final DeviceActorMessageProcessor processor;
40 36
41   - private DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
  37 + DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
42 38 super(systemContext);
43   - this.processor = new DeviceActorMessageProcessor(systemContext, logger, tenantId, deviceId);
  39 + this.processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
44 40 }
45 41
46 42 @Override
47 43 public void preStart() {
48   - logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
  44 + log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
49 45 try {
50 46 processor.initSessionTimeout(context());
51   - logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
  47 + log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
52 48 } catch (Exception e) {
53   - logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId);
  49 + log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
54 50 }
55 51 }
56 52
... ... @@ -90,22 +86,4 @@ public class DeviceActor extends ContextAwareActor {
90 86 return true;
91 87 }
92 88
93   - public static class ActorCreator extends ContextBasedCreator<DeviceActor> {
94   - private static final long serialVersionUID = 1L;
95   -
96   - private final TenantId tenantId;
97   - private final DeviceId deviceId;
98   -
99   - public ActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
100   - super(context);
101   - this.tenantId = tenantId;
102   - this.deviceId = deviceId;
103   - }
104   -
105   - @Override
106   - public DeviceActor create() throws Exception {
107   - return new DeviceActor(context, tenantId, deviceId);
108   - }
109   - }
110   -
111 89 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.device;
  17 +
  18 +import org.thingsboard.server.actors.ActorSystemContext;
  19 +import org.thingsboard.server.actors.service.ContextBasedCreator;
  20 +import org.thingsboard.server.common.data.id.DeviceId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +
  23 +public class DeviceActorCreator extends ContextBasedCreator<DeviceActor> {
  24 + private static final long serialVersionUID = 1L;
  25 +
  26 + private final TenantId tenantId;
  27 + private final DeviceId deviceId;
  28 +
  29 + public DeviceActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
  30 + super(context);
  31 + this.tenantId = tenantId;
  32 + this.deviceId = deviceId;
  33 + }
  34 +
  35 + @Override
  36 + public DeviceActor create() {
  37 + return new DeviceActor(context, tenantId, deviceId);
  38 + }
  39 +}
... ...
... ... @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
24 24 import com.google.gson.Gson;
25 25 import com.google.gson.JsonObject;
26 26 import com.google.gson.JsonParser;
  27 +import lombok.extern.slf4j.Slf4j;
27 28 import org.thingsboard.rule.engine.api.RpcError;
28 29 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
29 30 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
... ... @@ -88,6 +89,7 @@ import java.util.stream.Collectors;
88 89 /**
89 90 * @author Andrew Shvayka
90 91 */
  92 +@Slf4j
91 93 class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
92 94
93 95 final TenantId tenantId;
... ... @@ -106,8 +108,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
106 108 private String deviceType;
107 109 private TbMsgMetaData defaultMetaData;
108 110
109   - DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, DeviceId deviceId) {
110   - super(systemContext, logger);
  111 + DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
  112 + super(systemContext);
111 113 this.tenantId = tenantId;
112 114 this.deviceId = deviceId;
113 115 this.sessions = new LinkedHashMap<>();
... ... @@ -136,30 +138,30 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
136 138
137 139 long timeout = request.getExpirationTime() - System.currentTimeMillis();
138 140 if (timeout <= 0) {
139   - logger.debug("[{}][{}] Ignoring message due to exp time reached", deviceId, request.getId(), request.getExpirationTime());
  141 + log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
140 142 return;
141 143 }
142 144
143 145 boolean sent = rpcSubscriptions.size() > 0;
144 146 Set<UUID> syncSessionSet = new HashSet<>();
145   - rpcSubscriptions.entrySet().forEach(sub -> {
146   - sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId());
147   - if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) {
148   - syncSessionSet.add(sub.getKey());
  147 + rpcSubscriptions.forEach((key, value) -> {
  148 + sendToTransport(rpcRequest, key, value.getNodeId());
  149 + if (TransportProtos.SessionType.SYNC == value.getType()) {
  150 + syncSessionSet.add(key);
149 151 }
150 152 });
151 153 syncSessionSet.forEach(rpcSubscriptions::remove);
152 154
153 155 if (request.isOneway() && sent) {
154   - logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
  156 + log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
155 157 systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
156 158 } else {
157 159 registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
158 160 }
159 161 if (sent) {
160   - logger.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
  162 + log.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
161 163 } else {
162   - logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
  164 + log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
163 165 }
164 166 }
165 167
... ... @@ -172,7 +174,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
172 174 void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) {
173 175 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
174 176 if (requestMd != null) {
175   - logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
  177 + log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
176 178 systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
177 179 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
178 180 }
... ... @@ -181,13 +183,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
181 183 private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
182 184 TransportProtos.SessionType sessionType = getSessionType(sessionId);
183 185 if (!toDeviceRpcPendingMap.isEmpty()) {
184   - logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
  186 + log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
185 187 if (sessionType == TransportProtos.SessionType.SYNC) {
186   - logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
  188 + log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
187 189 rpcSubscriptions.remove(sessionId);
188 190 }
189 191 } else {
190   - logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
  192 + log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
191 193 }
192 194 Set<Integer> sentOneWayIds = new HashSet<>();
193 195 if (sessionType == TransportProtos.SessionType.ASYNC) {
... ... @@ -335,7 +337,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
335 337 void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
336 338 ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
337 339 if (data != null) {
338   - logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
  340 + log.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
339 341 sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
340 342 .setRequestId(msg.getId()).setError("timeout").build()
341 343 , data.getSessionId(), data.getNodeId());
... ... @@ -380,7 +382,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
380 382 hasNotificationData = true;
381 383 }
382 384 } else {
383   - logger.debug("[{}] No public server side attributes changed!", deviceId);
  385 + log.debug("[{}] No public server side attributes changed!", deviceId);
384 386 }
385 387 }
386 388 }
... ... @@ -391,27 +393,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
391 393 });
392 394 }
393 395 } else {
394   - logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
  396 + log.debug("[{}] No registered attributes subscriptions to process!", deviceId);
395 397 }
396 398 }
397 399
398 400 private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
399 401 UUID sessionId = getSessionId(sessionInfo);
400   - logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
  402 + log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
401 403 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
402 404 boolean success = requestMd != null;
403 405 if (success) {
404 406 systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
405 407 responseMsg.getPayload(), null));
406 408 } else {
407   - logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
  409 + log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
408 410 }
409 411 }
410 412
411 413 private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
412 414 UUID sessionId = getSessionId(sessionInfo);
413 415 if (subscribeCmd.getUnsubscribe()) {
414   - logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
  416 + log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
415 417 attributeSubscriptions.remove(sessionId);
416 418 } else {
417 419 SessionInfoMetaData sessionMD = sessions.get(sessionId);
... ... @@ -419,7 +421,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
419 421 sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
420 422 }
421 423 sessionMD.setSubscribedToAttributes(true);
422   - logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
  424 + log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
423 425 attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
424 426 dumpSessions();
425 427 }
... ... @@ -432,7 +434,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
432 434 private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
433 435 UUID sessionId = getSessionId(sessionInfo);
434 436 if (subscribeCmd.getUnsubscribe()) {
435   - logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
  437 + log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
436 438 rpcSubscriptions.remove(sessionId);
437 439 } else {
438 440 SessionInfoMetaData sessionMD = sessions.get(sessionId);
... ... @@ -440,7 +442,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
440 442 sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
441 443 }
442 444 sessionMD.setSubscribedToRPC(true);
443   - logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
  445 + log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
444 446 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
445 447 sendPendingRequests(context, sessionId, sessionInfo);
446 448 dumpSessions();
... ... @@ -451,10 +453,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
451 453 UUID sessionId = getSessionId(sessionInfo);
452 454 if (msg.getEvent() == SessionEvent.OPEN) {
453 455 if (sessions.containsKey(sessionId)) {
454   - logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
  456 + log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
455 457 return;
456 458 }
457   - logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
  459 + log.debug("[{}] Processing new session [{}]", deviceId, sessionId);
458 460 if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
459 461 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
460 462 if (sessionIdToRemove != null) {
... ... @@ -467,7 +469,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
467 469 }
468 470 dumpSessions();
469 471 } else if (msg.getEvent() == SessionEvent.CLOSED) {
470   - logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
  472 + log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
471 473 sessions.remove(sessionId);
472 474 attributeSubscriptions.remove(sessionId);
473 475 rpcSubscriptions.remove(sessionId);
... ... @@ -623,10 +625,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
623 625 }
624 626
625 627 private void restoreSessions() {
626   - logger.debug("[{}] Restoring sessions from cache", deviceId);
  628 + log.debug("[{}] Restoring sessions from cache", deviceId);
627 629 TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
628 630 if (sessionsDump.getSerializedSize() == 0) {
629   - logger.debug("[{}] No session information found", deviceId);
  631 + log.debug("[{}] No session information found", deviceId);
630 632 return;
631 633 }
632 634 for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
... ... @@ -644,13 +646,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
644 646 rpcSubscriptions.put(sessionId, sessionInfo);
645 647 sessionMD.setSubscribedToRPC(true);
646 648 }
647   - logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
  649 + log.debug("[{}] Restored session: {}", deviceId, sessionMD);
648 650 }
649   - logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
  651 + log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
650 652 }
651 653
652 654 private void dumpSessions() {
653   - logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
  655 + log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
654 656 List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
655 657 sessions.forEach((uuid, sessionMD) -> {
656 658 if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
... ... @@ -668,7 +670,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
668 670 sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
669 671 .setSessionInfo(sessionInfoProto)
670 672 .setSubscriptionInfo(subscriptionInfoProto).build());
671   - logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
  673 + log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
672 674 });
673 675 systemContext.getDeviceSessionCacheService()
674 676 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
... ...
... ... @@ -19,6 +19,7 @@ import akka.actor.ActorRef;
19 19 import akka.actor.Props;
20 20 import akka.event.Logging;
21 21 import akka.event.LoggingAdapter;
  22 +import lombok.extern.slf4j.Slf4j;
22 23 import org.thingsboard.server.actors.ActorSystemContext;
23 24 import org.thingsboard.server.actors.service.ContextAwareActor;
24 25 import org.thingsboard.server.actors.service.ContextBasedCreator;
... ... @@ -35,17 +36,16 @@ import java.util.*;
35 36 /**
36 37 * @author Andrew Shvayka
37 38 */
  39 +@Slf4j
38 40 public class RpcManagerActor extends ContextAwareActor {
39 41
40   - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
41   -
42 42 private final Map<ServerAddress, SessionActorInfo> sessionActors;
43 43
44 44 private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
45 45
46 46 private final ServerAddress instance;
47 47
48   - RpcManagerActor(ActorSystemContext systemContext) {
  48 + private RpcManagerActor(ActorSystemContext systemContext) {
49 49 super(systemContext);
50 50 this.sessionActors = new HashMap<>();
51 51 this.pendingMsgs = new HashMap<>();
... ... @@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
116 116 queue.add(msg);
117 117 }
118 118 } else {
119   - logger.warning("Cluster msg doesn't have server address [{}]", msg);
  119 + log.warn("Cluster msg doesn't have server address [{}]", msg);
120 120 }
121 121 }
122 122
... ... @@ -207,7 +207,7 @@ public class RpcManagerActor extends ContextAwareActor {
207 207 }
208 208
209 209 @Override
210   - public RpcManagerActor create() throws Exception {
  210 + public RpcManagerActor create() {
211 211 return new RpcManagerActor(context);
212 212 }
213 213 }
... ...
... ... @@ -33,7 +33,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
33 33 private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId) {
34 34 super(systemContext, tenantId, ruleChainId);
35 35 setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChainId, systemContext,
36   - logger, context().parent(), context().self()));
  36 + context().parent(), context().self()));
37 37 }
38 38
39 39 @Override
... ... @@ -79,7 +79,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
79 79 }
80 80
81 81 @Override
82   - public RuleChainActor create() throws Exception {
  82 + public RuleChainActor create() {
83 83 return new RuleChainActor(context, tenantId, ruleChainId);
84 84 }
85 85 }
... ...
... ... @@ -23,6 +23,7 @@ import com.datastax.driver.core.utils.UUIDs;
23 23
24 24 import java.util.Optional;
25 25
  26 +import lombok.extern.slf4j.Slf4j;
26 27 import org.thingsboard.server.actors.ActorSystemContext;
27 28 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
28 29 import org.thingsboard.server.actors.service.DefaultActorService;
... ... @@ -55,6 +56,7 @@ import java.util.stream.Collectors;
55 56 /**
56 57 * @author Andrew Shvayka
57 58 */
  59 +@Slf4j
58 60 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
59 61
60 62 private static final long DEFAULT_CLUSTER_PARTITION = 0L;
... ... @@ -69,8 +71,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
69 71 private boolean started;
70 72
71 73 RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
72   - , LoggingAdapter logger, ActorRef parent, ActorRef self) {
73   - super(systemContext, logger, tenantId, ruleChainId);
  74 + , ActorRef parent, ActorRef self) {
  75 + super(systemContext, tenantId, ruleChainId);
74 76 this.parent = parent;
75 77 this.self = self;
76 78 this.nodeActors = new HashMap<>();
... ... @@ -216,7 +218,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
216 218
217 219 private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
218 220 TbMsg msg = envelope.getMsg();
219   - logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
  221 + log.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
220 222 envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
221 223 systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
222 224 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors.ruleChain;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 19 import org.thingsboard.server.actors.ActorSystemContext;
19 20 import org.thingsboard.server.actors.service.ComponentActor;
20 21 import org.thingsboard.server.actors.service.ContextBasedCreator;
... ... @@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
24 25 import org.thingsboard.server.common.msg.TbActorMsg;
25 26 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
26 27
  28 +@Slf4j
27 29 public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
28 30
29 31 private final RuleChainId ruleChainId;
... ... @@ -32,7 +34,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
32 34 super(systemContext, tenantId, ruleNodeId);
33 35 this.ruleChainId = ruleChainId;
34 36 setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
35   - logger, context().parent(), context().self()));
  37 + context().parent(), context().self()));
36 38 }
37 39
38 40 @Override
... ... @@ -60,7 +62,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
60 62 }
61 63
62 64 private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
63   - logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
  65 + log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
64 66 try {
65 67 processor.onRuleToSelfMsg(msg);
66 68 increaseMessagesProcessedCount();
... ... @@ -70,7 +72,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
70 72 }
71 73
72 74 private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
73   - logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
  75 + log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
74 76 try {
75 77 processor.onRuleChainToRuleNodeMsg(msg);
76 78 increaseMessagesProcessedCount();
... ...
... ... @@ -44,8 +44,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
44 44 private TbContext defaultCtx;
45 45
46 46 RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
47   - , LoggingAdapter logger, ActorRef parent, ActorRef self) {
48   - super(systemContext, logger, tenantId, ruleNodeId);
  47 + , ActorRef parent, ActorRef self) {
  48 + super(systemContext, tenantId, ruleNodeId);
49 49 this.parent = parent;
50 50 this.self = self;
51 51 this.service = systemContext.getRuleChainService();
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
18 18 import akka.actor.ActorRef;
19 19 import akka.event.Logging;
20 20 import akka.event.LoggingAdapter;
  21 +import lombok.extern.slf4j.Slf4j;
21 22 import org.thingsboard.server.actors.ActorSystemContext;
22 23 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
23 24 import org.thingsboard.server.actors.stats.StatsPersistMsg;
... ... @@ -30,10 +31,9 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
30 31 /**
31 32 * @author Andrew Shvayka
32 33 */
  34 +@Slf4j
33 35 public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
34 36
35   - protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
36   -
37 37 private long lastPersistedErrorTs = 0L;
38 38 protected final TenantId tenantId;
39 39 protected final T id;
... ... @@ -60,7 +60,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
60 60 scheduleStatsPersistTick();
61 61 }
62 62 } catch (Exception e) {
63   - logger.warning("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
  63 + log.warn("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
64 64 logAndPersist("OnStart", e, true);
65 65 logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
66 66 }
... ... @@ -70,7 +70,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
70 70 try {
71 71 processor.scheduleStatsPersistTick(context(), systemContext.getStatisticsPersistFrequency());
72 72 } catch (Exception e) {
73   - logger.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
  73 + log.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
74 74 logAndPersist("onScheduleStatsPersistMsg", e);
75 75 }
76 76 }
... ... @@ -81,7 +81,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
81 81 processor.stop(context());
82 82 logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
83 83 } catch (Exception e) {
84   - logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
  84 + log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
85 85 logAndPersist("OnStop", e, true);
86 86 logLifecycleEvent(ComponentLifecycleEvent.STOPPED, e);
87 87 }
... ... @@ -148,9 +148,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
148 148 private void logAndPersist(String method, Exception e, boolean critical) {
149 149 errorsOccurred++;
150 150 if (critical) {
151   - logger.warning("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
  151 + log.warn("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
152 152 } else {
153   - logger.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
  153 + log.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
154 154 }
155 155 long ts = System.currentTimeMillis();
156 156 if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
... ...
... ... @@ -15,14 +15,16 @@
15 15 */
16 16 package org.thingsboard.server.actors.service;
17 17
  18 +import akka.actor.Terminated;
18 19 import akka.actor.UntypedActor;
19 20 import akka.event.Logging;
20 21 import akka.event.LoggingAdapter;
  22 +import lombok.extern.slf4j.Slf4j;
21 23 import org.thingsboard.server.actors.ActorSystemContext;
22 24 import org.thingsboard.server.common.msg.TbActorMsg;
23 25
  26 +@Slf4j
24 27 public abstract class ContextAwareActor extends UntypedActor {
25   - protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
26 28
27 29 public static final int ENTITY_PACK_LIMIT = 1024;
28 30
... ... @@ -35,21 +37,26 @@ public abstract class ContextAwareActor extends UntypedActor {
35 37
36 38 @Override
37 39 public void onReceive(Object msg) throws Exception {
38   - if (logger.isDebugEnabled()) {
39   - logger.debug("Processing msg: {}", msg);
  40 + if (log.isDebugEnabled()) {
  41 + log.debug("Processing msg: {}", msg);
40 42 }
41 43 if (msg instanceof TbActorMsg) {
42 44 try {
43 45 if (!process((TbActorMsg) msg)) {
44   - logger.warning("Unknown message: {}!", msg);
  46 + log.warn("Unknown message: {}!", msg);
45 47 }
46 48 } catch (Exception e) {
47 49 throw e;
48 50 }
  51 + } else if (msg instanceof Terminated) {
  52 + processTermination((Terminated) msg);
49 53 } else {
50   - logger.warning("Unknown message: {}!", msg);
  54 + log.warn("Unknown message: {}!", msg);
51 55 }
52 56 }
53 57
  58 + protected void processTermination(Terminated msg) {
  59 + }
  60 +
54 61 protected abstract boolean process(TbActorMsg msg);
55 62 }
... ...
... ... @@ -22,22 +22,22 @@ import akka.event.LoggingAdapter;
22 22 import com.fasterxml.jackson.databind.ObjectMapper;
23 23 import lombok.AllArgsConstructor;
24 24 import lombok.Data;
  25 +import lombok.extern.slf4j.Slf4j;
25 26 import org.thingsboard.server.actors.ActorSystemContext;
26 27 import scala.concurrent.ExecutionContextExecutor;
27 28 import scala.concurrent.duration.Duration;
28 29
29 30 import java.util.concurrent.TimeUnit;
30 31
  32 +@Slf4j
31 33 public abstract class AbstractContextAwareMsgProcessor {
32 34
33 35 protected final ActorSystemContext systemContext;
34   - protected final LoggingAdapter logger;
35 36 protected final ObjectMapper mapper = new ObjectMapper();
36 37
37   - protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger) {
  38 + protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext) {
38 39 super();
39 40 this.systemContext = systemContext;
40   - this.logger = logger;
41 41 }
42 42
43 43 private Scheduler getScheduler() {
... ... @@ -53,7 +53,7 @@ public abstract class AbstractContextAwareMsgProcessor {
53 53 }
54 54
55 55 private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) {
56   - logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
  56 + log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
57 57 getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
58 58 }
59 59
... ... @@ -62,7 +62,7 @@ public abstract class AbstractContextAwareMsgProcessor {
62 62 }
63 63
64 64 private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
65   - logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
  65 + log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
66 66 getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
67 67 }
68 68
... ...
... ... @@ -19,6 +19,7 @@ import akka.actor.ActorContext;
19 19 import akka.event.LoggingAdapter;
20 20 import com.google.common.util.concurrent.FutureCallback;
21 21 import com.google.common.util.concurrent.Futures;
  22 +import lombok.extern.slf4j.Slf4j;
22 23 import org.thingsboard.server.actors.ActorSystemContext;
23 24 import org.thingsboard.server.actors.stats.StatsPersistTick;
24 25 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -30,14 +31,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
30 31 import javax.annotation.Nullable;
31 32 import java.util.function.Consumer;
32 33
  34 +@Slf4j
33 35 public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
34 36
35 37 protected final TenantId tenantId;
36 38 protected final T entityId;
37 39 protected ComponentLifecycleState state;
38 40
39   - protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
40   - super(systemContext, logger);
  41 + protected ComponentMsgProcessor(ActorSystemContext systemContext, TenantId tenantId, T id) {
  42 + super(systemContext);
41 43 this.tenantId = tenantId;
42 44 this.entityId = id;
43 45 }
... ... @@ -79,7 +81,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
79 81
80 82 protected void checkActive() {
81 83 if (state != ComponentLifecycleState.ACTIVE) {
82   - logger.warning("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
  84 + log.warn("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
83 85 throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId);
84 86 }
85 87 }
... ...
... ... @@ -20,6 +20,8 @@ import akka.actor.ActorRef;
20 20 import akka.actor.Props;
21 21 import akka.actor.UntypedActor;
22 22 import akka.japi.Creator;
  23 +import com.google.common.collect.BiMap;
  24 +import com.google.common.collect.HashBiMap;
23 25 import lombok.extern.slf4j.Slf4j;
24 26 import org.thingsboard.server.actors.ActorSystemContext;
25 27 import org.thingsboard.server.actors.service.ContextAwareActor;
... ... @@ -39,11 +41,11 @@ import java.util.Map;
39 41 public abstract class EntityActorsManager<T extends EntityId, A extends UntypedActor, M extends SearchTextBased<? extends UUIDBased>> {
40 42
41 43 protected final ActorSystemContext systemContext;
42   - protected final Map<T, ActorRef> actors;
  44 + protected final BiMap<T, ActorRef> actors;
43 45
44 46 public EntityActorsManager(ActorSystemContext systemContext) {
45 47 this.systemContext = systemContext;
46   - this.actors = new HashMap<>();
  48 + this.actors = HashBiMap.create();
47 49 }
48 50
49 51 protected abstract TenantId getTenantId();
... ... @@ -65,7 +67,8 @@ public abstract class EntityActorsManager<T extends EntityId, A extends UntypedA
65 67 }
66 68 }
67 69
68   - public void visit(M entity, ActorRef actorRef) {}
  70 + public void visit(M entity, ActorRef actorRef) {
  71 + }
69 72
70 73 public ActorRef getOrCreateActor(ActorContext context, T entityId) {
71 74 return actors.computeIfAbsent(entityId, eId ->
... ...
... ... @@ -15,10 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.actors.stats;
17 17
18   -import akka.event.Logging;
19   -import akka.event.LoggingAdapter;
20 18 import com.fasterxml.jackson.databind.JsonNode;
21 19 import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import lombok.extern.slf4j.Slf4j;
22 21 import org.thingsboard.server.actors.ActorSystemContext;
23 22 import org.thingsboard.server.actors.service.ContextAwareActor;
24 23 import org.thingsboard.server.actors.service.ContextBasedCreator;
... ... @@ -27,9 +26,9 @@ import org.thingsboard.server.common.data.Event;
27 26 import org.thingsboard.server.common.msg.TbActorMsg;
28 27 import org.thingsboard.server.common.msg.cluster.ServerAddress;
29 28
  29 +@Slf4j
30 30 public class StatsActor extends ContextAwareActor {
31 31
32   - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
33 32 private final ObjectMapper mapper = new ObjectMapper();
34 33
35 34 public StatsActor(ActorSystemContext context) {
... ... @@ -43,13 +42,13 @@ public class StatsActor extends ContextAwareActor {
43 42 }
44 43
45 44 @Override
46   - public void onReceive(Object msg) throws Exception {
47   - logger.debug("Received message: {}", msg);
  45 + public void onReceive(Object msg) {
  46 + log.debug("Received message: {}", msg);
48 47 if (msg instanceof StatsPersistMsg) {
49 48 try {
50 49 onStatsPersistMsg((StatsPersistMsg) msg);
51 50 } catch (Exception e) {
52   - logger.warning("Failed to persist statistics: {}", msg, e);
  51 + log.warn("Failed to persist statistics: {}", msg, e);
53 52 }
54 53 }
55 54 }
... ... @@ -75,7 +74,7 @@ public class StatsActor extends ContextAwareActor {
75 74 }
76 75
77 76 @Override
78   - public StatsActor create() throws Exception {
  77 + public StatsActor create() {
79 78 return new StatsActor(context);
80 79 }
81 80 }
... ...
... ... @@ -17,15 +17,19 @@ package org.thingsboard.server.actors.tenant;
17 17
18 18 import akka.actor.ActorInitializationException;
19 19 import akka.actor.ActorRef;
  20 +import akka.actor.LocalActorRef;
20 21 import akka.actor.OneForOneStrategy;
21 22 import akka.actor.Props;
22 23 import akka.actor.SupervisorStrategy;
  24 +import akka.actor.Terminated;
23 25 import akka.japi.Function;
  26 +import com.google.common.collect.BiMap;
  27 +import com.google.common.collect.HashBiMap;
  28 +import lombok.extern.slf4j.Slf4j;
24 29 import org.thingsboard.server.actors.ActorSystemContext;
25   -import org.thingsboard.server.actors.device.DeviceActor;
  30 +import org.thingsboard.server.actors.device.DeviceActorCreator;
26 31 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
27 32 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
28   -import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
29 33 import org.thingsboard.server.actors.service.ContextBasedCreator;
30 34 import org.thingsboard.server.actors.service.DefaultActorService;
31 35 import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
... ... @@ -44,18 +48,18 @@ import scala.concurrent.duration.Duration;
44 48 import java.util.HashMap;
45 49 import java.util.Map;
46 50
  51 +@Slf4j
47 52 public class TenantActor extends RuleChainManagerActor {
48 53
49 54 private final TenantId tenantId;
50   - private final Map<DeviceId, ActorRef> deviceActors;
  55 + private final BiMap<DeviceId, ActorRef> deviceActors;
51 56
52 57 private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
53 58 super(systemContext, new TenantRuleChainManager(systemContext, tenantId));
54 59 this.tenantId = tenantId;
55   - this.deviceActors = new HashMap<>();
  60 + this.deviceActors = HashBiMap.create();
56 61 }
57 62
58   -
59 63 @Override
60 64 public SupervisorStrategy supervisorStrategy() {
61 65 return strategy;
... ... @@ -63,12 +67,12 @@ public class TenantActor extends RuleChainManagerActor {
63 67
64 68 @Override
65 69 public void preStart() {
66   - logger.info("[{}] Starting tenant actor.", tenantId);
  70 + log.info("[{}] Starting tenant actor.", tenantId);
67 71 try {
68 72 initRuleChains();
69   - logger.info("[{}] Tenant actor started.", tenantId);
  73 + log.info("[{}] Tenant actor started.", tenantId);
70 74 } catch (Exception e) {
71   - logger.error(e, "[{}] Unknown failure", tenantId);
  75 + log.warn("[{}] Unknown failure", tenantId, e);
72 76 }
73 77 }
74 78
... ... @@ -105,22 +109,20 @@ public class TenantActor extends RuleChainManagerActor {
105 109 return true;
106 110 }
107 111
108   - @Override
109   - protected void broadcast(Object msg) {
110   - super.broadcast(msg);
111   -// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
112   - }
113   -
114 112 private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
115   - if (ruleChainManager.getRootChainActor()!=null)
116   - ruleChainManager.getRootChainActor().tell(msg, self());
117   - else logger.info("[{}] No Root Chain", msg);
  113 + if (ruleChainManager.getRootChainActor() != null) {
  114 + ruleChainManager.getRootChainActor().tell(msg, self());
  115 + } else {
  116 + log.info("[{}] No Root Chain: {}", tenantId, msg);
  117 + }
118 118 }
119 119
120 120 private void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg msg) {
121   - if (ruleChainManager.getRootChainActor()!=null)
122   - ruleChainManager.getRootChainActor().tell(msg, self());
123   - else logger.info("[{}] No Root Chain", msg);
  121 + if (ruleChainManager.getRootChainActor() != null) {
  122 + ruleChainManager.getRootChainActor().tell(msg, self());
  123 + } else {
  124 + log.info("[{}] No Root Chain: {}", tenantId, msg);
  125 + }
124 126 }
125 127
126 128 private void onRuleChainMsg(RuleChainAwareMsg msg) {
... ... @@ -141,13 +143,35 @@ public class TenantActor extends RuleChainManagerActor {
141 143 }
142 144 target.tell(msg, ActorRef.noSender());
143 145 } else {
144   - logger.debug("Invalid component lifecycle msg: {}", msg);
  146 + log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
145 147 }
146 148 }
147 149
148 150 private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
149   - return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
150   - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()));
  151 + return deviceActors.computeIfAbsent(deviceId, k -> {
  152 + log.debug("[{}][{}] Creating device actor.", tenantId, deviceId);
  153 + ActorRef deviceActor = context().actorOf(Props.create(new DeviceActorCreator(systemContext, tenantId, deviceId))
  154 + .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME)
  155 + , deviceId.toString());
  156 + context().watch(deviceActor);
  157 + log.debug("[{}][{}] Created device actor: {}.", tenantId, deviceId, deviceActor);
  158 + return deviceActor;
  159 + });
  160 + }
  161 +
  162 + @Override
  163 + protected void processTermination(Terminated message) {
  164 + ActorRef terminated = message.actor();
  165 + if (terminated instanceof LocalActorRef) {
  166 + boolean removed = deviceActors.inverse().remove(terminated) != null;
  167 + if (removed) {
  168 + log.debug("[{}] Removed actor:", terminated);
  169 + } else {
  170 + log.warn("[{}] Removed actor was not found in the device map!");
  171 + }
  172 + } else {
  173 + throw new IllegalStateException("Remote actors are not supported!");
  174 + }
151 175 }
152 176
153 177 public static class ActorCreator extends ContextBasedCreator<TenantActor> {
... ... @@ -161,7 +185,7 @@ public class TenantActor extends RuleChainManagerActor {
161 185 }
162 186
163 187 @Override
164   - public TenantActor create() throws Exception {
  188 + public TenantActor create() {
165 189 return new TenantActor(context, tenantId);
166 190 }
167 191 }
... ... @@ -169,8 +193,8 @@ public class TenantActor extends RuleChainManagerActor {
169 193 private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() {
170 194 @Override
171 195 public SupervisorStrategy.Directive apply(Throwable t) {
172   - logger.error(t, "Unknown failure");
173   - if(t instanceof ActorInitializationException){
  196 + log.warn("[{}] Unknown failure", tenantId, t);
  197 + if (t instanceof ActorInitializationException) {
174 198 return SupervisorStrategy.stop();
175 199 } else {
176 200 return SupervisorStrategy.resume();
... ...
... ... @@ -52,7 +52,6 @@ public class DashboardController extends BaseController {
52 52 public static final String DASHBOARD_ID = "dashboardId";
53 53
54 54 @Value("${dashboard.max_datapoints_limit}")
55   - @Getter
56 55 private long maxDatapointsLimit;
57 56
58 57
... ...
... ... @@ -22,6 +22,7 @@ import io.jsonwebtoken.Jwts;
22 22 import io.jsonwebtoken.MalformedJwtException;
23 23 import io.jsonwebtoken.SignatureException;
24 24 import io.jsonwebtoken.UnsupportedJwtException;
  25 +import lombok.extern.slf4j.Slf4j;
25 26 import org.slf4j.Logger;
26 27 import org.slf4j.LoggerFactory;
27 28 import org.springframework.security.authentication.BadCredentialsException;
... ... @@ -29,12 +30,11 @@ import org.thingsboard.server.service.security.exception.JwtExpiredTokenExceptio
29 30
30 31 import java.io.Serializable;
31 32
  33 +@Slf4j
32 34 public class RawAccessJwtToken implements JwtToken, Serializable {
33 35
34 36 private static final long serialVersionUID = -797397445703066079L;
35 37
36   - private static Logger logger = LoggerFactory.getLogger(RawAccessJwtToken.class);
37   -
38 38 private String token;
39 39
40 40 public RawAccessJwtToken(String token) {
... ... @@ -52,10 +52,10 @@ public class RawAccessJwtToken implements JwtToken, Serializable {
52 52 try {
53 53 return Jwts.parser().setSigningKey(signingKey).parseClaimsJws(this.token);
54 54 } catch (UnsupportedJwtException | MalformedJwtException | IllegalArgumentException | SignatureException ex) {
55   - logger.error("Invalid JWT Token", ex);
  55 + log.error("Invalid JWT Token", ex);
56 56 throw new BadCredentialsException("Invalid JWT token: ", ex);
57 57 } catch (ExpiredJwtException expiredEx) {
58   - logger.info("JWT Token is expired", expiredEx);
  58 + log.info("JWT Token is expired", expiredEx);
59 59 throw new JwtExpiredTokenException(this, "JWT Token expired", expiredEx);
60 60 }
61 61 }
... ...
... ... @@ -19,7 +19,7 @@ akka {
19 19 # JVM shutdown, System.exit(-1), in case of a fatal error,
20 20 # such as OutOfMemoryError
21 21 jvm-exit-on-fatal-error = off
22   - loglevel = "DEBUG"
  22 + loglevel = "INFO"
23 23 loggers = ["akka.event.slf4j.Slf4jLogger"]
24 24 }
25 25
... ...