Commit cdb2903f95e542821a52b6318cca377f5eefe5c9

Authored by Andrew Shvayka
1 parent f0186c90

Fixed initialization order

... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.Tenant;
39 39 import org.thingsboard.server.common.data.id.TenantId;
40 40 import org.thingsboard.server.common.data.page.PageDataIterable;
41 41 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  42 +import org.thingsboard.server.common.msg.MsgType;
42 43 import org.thingsboard.server.common.msg.TbActorMsg;
43 44 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
44 45 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
... ... @@ -58,6 +59,7 @@ public class AppActor extends RuleChainManagerActor {
58 59 private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
59 60 private final TenantService tenantService;
60 61 private final BiMap<TenantId, ActorRef> tenantActors;
  62 + private boolean ruleChainsInitialized;
61 63
62 64 private AppActor(ActorSystemContext systemContext) {
63 65 super(systemContext, new SystemRuleChainManager(systemContext));
... ... @@ -72,26 +74,20 @@ public class AppActor extends RuleChainManagerActor {
72 74
73 75 @Override
74 76 public void preStart() {
75   - log.info("Starting main system actor.");
76   - try {
77   - initRuleChains();
78   - if (systemContext.isTenantComponentsInitEnabled()) {
79   - PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
80   - for (Tenant tenant : tenantIterator) {
81   - log.debug("[{}] Creating tenant actor", tenant.getId());
82   - getOrCreateTenantActor(tenant.getId());
83   - log.debug("Tenant actor created.");
84   - }
85   - }
86   - log.info("Main system actor started.");
87   - } catch (Exception e) {
88   - log.warn("Unknown failure", e);
89   - }
90 77 }
91 78
92 79 @Override
93 80 protected boolean process(TbActorMsg msg) {
  81 + if (!ruleChainsInitialized) {
  82 + initRuleChainsAndTenantActors();
  83 + ruleChainsInitialized = true;
  84 + if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
  85 + log.warn("Rule Chains initialized by unexpected message: {}", msg);
  86 + }
  87 + }
94 88 switch (msg.getMsgType()) {
  89 + case APP_INIT_MSG:
  90 + break;
95 91 case SEND_TO_CLUSTER_MSG:
96 92 onPossibleClusterMsg((SendToClusterMsg) msg);
97 93 break;
... ... @@ -119,6 +115,24 @@ public class AppActor extends RuleChainManagerActor {
119 115 return true;
120 116 }
121 117
  118 + private void initRuleChainsAndTenantActors() {
  119 + log.info("Starting main system actor.");
  120 + try {
  121 + initRuleChains();
  122 + if (systemContext.isTenantComponentsInitEnabled()) {
  123 + PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
  124 + for (Tenant tenant : tenantIterator) {
  125 + log.debug("[{}] Creating tenant actor", tenant.getId());
  126 + getOrCreateTenantActor(tenant.getId());
  127 + log.debug("Tenant actor created.");
  128 + }
  129 + }
  130 + log.info("Main system actor started.");
  131 + } catch (Exception e) {
  132 + log.warn("Unknown failure", e);
  133 + }
  134 + }
  135 +
122 136 private void onPossibleClusterMsg(SendToClusterMsg msg) {
123 137 Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
124 138 if (address.isPresent()) {
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.app;
  17 +
  18 +import org.thingsboard.server.common.msg.MsgType;
  19 +import org.thingsboard.server.common.msg.TbActorMsg;
  20 +
  21 +public class AppInitMsg implements TbActorMsg {
  22 +
  23 + @Override
  24 + public MsgType getMsgType() {
  25 + return MsgType.APP_INIT_MSG;
  26 + }
  27 +}
... ...
... ... @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
24 24 import com.google.gson.Gson;
25 25 import com.google.gson.JsonObject;
26 26 import com.google.gson.JsonParser;
  27 +import com.google.protobuf.InvalidProtocolBufferException;
27 28 import lombok.extern.slf4j.Slf4j;
28 29 import org.thingsboard.rule.engine.api.RpcError;
29 30 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
... ... @@ -483,19 +484,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
483 484 }
484 485 }
485 486
486   - private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
487   - UUID sessionId = getSessionId(sessionInfo);
488   - SessionInfoMetaData sessionMD = sessions.get(sessionId);
489   - if (sessionMD != null) {
490   - sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
491   - sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
492   - sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
493   - if (subscriptionInfo.getAttributeSubscription()) {
494   - attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
495   - }
496   - if (subscriptionInfo.getRpcSubscription()) {
497   - rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
498   - }
  487 + private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
  488 + UUID sessionId = getSessionId(sessionInfoProto);
  489 + SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
  490 + id -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
  491 +
  492 + sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
  493 + sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
  494 + sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
  495 + if (subscriptionInfo.getAttributeSubscription()) {
  496 + attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  497 + }
  498 + if (subscriptionInfo.getRpcSubscription()) {
  499 + rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
499 500 }
500 501 dumpSessions();
501 502 }
... ... @@ -629,8 +630,14 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
629 630
630 631 private void restoreSessions() {
631 632 log.debug("[{}] Restoring sessions from cache", deviceId);
632   - TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
633   - if (sessionsDump.getSerializedSize() == 0) {
  633 + TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
  634 + try {
  635 + sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
  636 + } catch (InvalidProtocolBufferException e) {
  637 + log.warn("[{}] Failed to decode device sessions from cache", deviceId);
  638 + return;
  639 + }
  640 + if (sessionsDump.getSessionsCount() == 0) {
634 641 log.debug("[{}] No session information found", deviceId);
635 642 return;
636 643 }
... ... @@ -677,7 +684,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
677 684 });
678 685 systemContext.getDeviceSessionCacheService()
679 686 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
680   - .addAllSessions(sessionsList).build());
  687 + .addAllSessions(sessionsList).build().toByteArray());
681 688 }
682 689
683 690 void initSessionTimeout(ActorContext context) {
... ...
... ... @@ -22,11 +22,14 @@ import akka.actor.Terminated;
22 22 import com.google.protobuf.ByteString;
23 23 import lombok.extern.slf4j.Slf4j;
24 24 import org.springframework.beans.factory.annotation.Autowired;
  25 +import org.springframework.boot.context.event.ApplicationReadyEvent;
  26 +import org.springframework.context.event.EventListener;
25 27 import org.springframework.stereotype.Service;
26 28 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
27 29 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
28 30 import org.thingsboard.server.actors.ActorSystemContext;
29 31 import org.thingsboard.server.actors.app.AppActor;
  32 +import org.thingsboard.server.actors.app.AppInitMsg;
30 33 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
31 34 import org.thingsboard.server.actors.rpc.RpcManagerActor;
32 35 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
... ... @@ -54,6 +57,12 @@ import scala.concurrent.duration.Duration;
54 57 import javax.annotation.PostConstruct;
55 58 import javax.annotation.PreDestroy;
56 59
  60 +import java.util.Arrays;
  61 +import java.util.UUID;
  62 +import java.util.concurrent.Executors;
  63 +import java.util.concurrent.ScheduledExecutorService;
  64 +import java.util.concurrent.TimeUnit;
  65 +
57 66 import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
58 67
59 68 @Service
... ... @@ -86,6 +95,8 @@ public class DefaultActorService implements ActorService {
86 95
87 96 private ActorRef rpcManagerActor;
88 97
  98 + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  99 +
89 100 @PostConstruct
90 101 public void initActorSystem() {
91 102 log.info("Initializing Actor system. {}", actorContext.getRuleChainService());
... ... @@ -106,6 +117,12 @@ public class DefaultActorService implements ActorService {
106 117 log.info("Actor system initialized.");
107 118 }
108 119
  120 + @EventListener(ApplicationReadyEvent.class)
  121 + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
  122 + log.info("Received application ready event. Sending application init message to actor system");
  123 + appActor.tell(new AppInitMsg(), ActorRef.noSender());
  124 + }
  125 +
109 126 @PreDestroy
110 127 public void stopActorSystem() {
111 128 Future<Terminated> status = system.terminate();
... ...
... ... @@ -38,6 +38,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
38 38 import org.springframework.boot.context.event.ApplicationReadyEvent;
39 39 import org.springframework.context.ApplicationListener;
40 40 import org.springframework.context.annotation.Lazy;
  41 +import org.springframework.context.event.EventListener;
41 42 import org.springframework.stereotype.Service;
42 43 import org.springframework.util.Assert;
43 44 import org.thingsboard.server.actors.service.ActorService;
... ... @@ -61,7 +62,7 @@ import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.
61 62 @Service
62 63 @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
63 64 @Slf4j
64   -public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener, ApplicationListener<ApplicationReadyEvent> {
  65 +public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
65 66
66 67 @Value("${zk.url}")
67 68 private String zkUrl;
... ... @@ -232,8 +233,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
232 233 .collect(Collectors.toList());
233 234 }
234 235
235   - @Override
  236 + @EventListener(ApplicationReadyEvent.class)
236 237 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
  238 + log.info("Received application ready event. Starting current ZK node.");
237 239 if (stopped) {
238 240 log.debug("Ignoring application ready event. Service is stopped.");
239 241 return;
... ...
... ... @@ -22,7 +22,6 @@ import org.springframework.stereotype.Service;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
23 23 import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
24 24
25   -import java.util.ArrayList;
26 25 import java.util.Collections;
27 26
28 27 import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
... ... @@ -35,16 +34,17 @@ import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
35 34 public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
36 35
37 36 @Override
38   - @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
39   - public DeviceSessionsCacheEntry get(DeviceId deviceId) {
  37 + @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
  38 + public byte[] get(DeviceId deviceId) {
40 39 log.debug("[{}] Fetching session data from cache", deviceId);
41   - return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
  40 + return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build().toByteArray();
42 41 }
43 42
44 43 @Override
45   - @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
46   - public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
47   - log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
  44 + @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()")
  45 + public byte[] put(DeviceId deviceId, byte[] sessions) {
  46 + log.debug("[{}] Pushing session data to cache: {}", deviceId, sessions);
48 47 return sessions;
49 48 }
  49 +
50 50 }
... ...
... ... @@ -23,8 +23,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheE
23 23 */
24 24 public interface DeviceSessionCacheService {
25 25
26   - DeviceSessionsCacheEntry get(DeviceId deviceId);
  26 + byte[] get(DeviceId deviceId);
27 27
28   - DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
  28 + byte[] put(DeviceId deviceId, byte[] sessions);
29 29
30 30 }
... ...
... ... @@ -29,6 +29,9 @@ import org.apache.kafka.clients.producer.RecordMetadata;
29 29 import org.springframework.beans.factory.annotation.Autowired;
30 30 import org.springframework.beans.factory.annotation.Value;
31 31 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  32 +import org.springframework.boot.context.event.ApplicationReadyEvent;
  33 +import org.springframework.context.event.ContextRefreshedEvent;
  34 +import org.springframework.context.event.EventListener;
32 35 import org.springframework.stereotype.Service;
33 36 import org.thingsboard.server.actors.ActorSystemContext;
34 37 import org.thingsboard.server.actors.service.ActorService;
... ... @@ -127,7 +130,11 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
127 130
128 131 ruleEngineConsumer = ruleEngineConsumerBuilder.build();
129 132 ruleEngineConsumer.subscribe();
  133 + }
130 134
  135 + @EventListener(ApplicationReadyEvent.class)
  136 + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
  137 + log.info("Received application ready event. Starting polling for events.");
131 138 LocalBucketBuilder builder = Bucket4j.builder();
132 139 builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
133 140 builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
... ...
... ... @@ -19,6 +19,8 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Autowired;
20 20 import org.springframework.beans.factory.annotation.Value;
21 21 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  22 +import org.springframework.boot.context.event.ApplicationReadyEvent;
  23 +import org.springframework.context.event.EventListener;
22 24 import org.springframework.stereotype.Component;
23 25 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
24 26 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
... ... @@ -93,6 +95,11 @@ public class RemoteTransportApiService {
93 95 builder.executor(transportCallbackExecutor);
94 96 builder.handler(transportApiService);
95 97 transportApiTemplate = builder.build();
  98 + }
  99 +
  100 + @EventListener(ApplicationReadyEvent.class)
  101 + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
  102 + log.info("Received application ready event. Starting polling for events.");
96 103 transportApiTemplate.init();
97 104 }
98 105
... ...
... ... @@ -28,6 +28,8 @@ public enum MsgType {
28 28 */
29 29 CLUSTER_EVENT_MSG,
30 30
  31 + APP_INIT_MSG,
  32 +
31 33 /**
32 34 * All messages, could be send to cluster
33 35 */
... ...
... ... @@ -141,7 +141,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
141 141 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
142 142 break;
143 143 case PINGREQ:
144   - if (checkConnected(ctx)) {
  144 + if (checkConnected(ctx, msg)) {
145 145 ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
146 146 transportService.reportActivity(sessionInfo);
147 147 if (gatewaySessionHandler != null) {
... ... @@ -150,7 +150,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
150 150 }
151 151 break;
152 152 case DISCONNECT:
153   - if (checkConnected(ctx)) {
  153 + if (checkConnected(ctx, msg)) {
154 154 processDisconnect(ctx);
155 155 }
156 156 break;
... ... @@ -161,7 +161,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
161 161 }
162 162
163 163 private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
164   - if (!checkConnected(ctx)) {
  164 + if (!checkConnected(ctx, mqttMsg)) {
165 165 return;
166 166 }
167 167 String topicName = mqttMsg.variableHeader().topicName();
... ... @@ -248,7 +248,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
248 248 }
249 249
250 250 private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
251   - if (!checkConnected(ctx)) {
  251 + if (!checkConnected(ctx, mqttMsg)) {
252 252 return;
253 253 }
254 254 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
... ... @@ -293,7 +293,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
293 293 }
294 294
295 295 private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
296   - if (!checkConnected(ctx)) {
  296 + if (!checkConnected(ctx, mqttMsg)) {
297 297 return;
298 298 }
299 299 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
... ... @@ -444,11 +444,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
444 444 return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
445 445 }
446 446
447   - private boolean checkConnected(ChannelHandlerContext ctx) {
  447 + private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {
448 448 if (deviceSessionCtx.isConnected()) {
449 449 return true;
450 450 } else {
451   - log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
  451 + log.info("[{}] Closing current session due to invalid msg order: {}", sessionId, msg);
452 452 ctx.close();
453 453 return false;
454 454 }
... ... @@ -496,6 +496,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
496 496 transportService.registerAsyncSession(sessionInfo, this);
497 497 checkGatewaySession();
498 498 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
  499 + log.info("[{}] Client connected!", sessionId);
499 500 }
500 501 }
501 502
... ...