Commit 8ad7558b3e6a15642bb4639dcd3e0043ec4bd4d9

Authored by Andrew Shvayka
1 parent 62230018

Cluster mode for DeviceStateService and multiple fixes

... ... @@ -241,6 +241,9 @@ public class DefaultActorService implements ActorService {
241 241 case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
242 242 actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
243 243 break;
  244 + case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
  245 + actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
  246 + break;
244 247 }
245 248 }
246 249
... ...
... ... @@ -36,6 +36,7 @@ import org.springframework.context.annotation.Lazy;
36 36 import org.springframework.stereotype.Service;
37 37 import org.springframework.util.Assert;
38 38 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  39 +import org.thingsboard.server.service.state.DeviceStateService;
39 40 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
40 41 import org.thingsboard.server.utils.MiscUtils;
41 42
... ... @@ -74,6 +75,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
74 75 @Lazy
75 76 private TelemetrySubscriptionService tsSubService;
76 77
  78 + @Autowired
  79 + @Lazy
  80 + private DeviceStateService deviceStateService;
  81 +
77 82 private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
78 83
79 84 private CuratorFramework client;
... ... @@ -203,6 +208,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
203 208 switch (pathChildrenCacheEvent.getType()) {
204 209 case CHILD_ADDED:
205 210 tsSubService.onClusterUpdate();
  211 + deviceStateService.onClusterUpdate();
206 212 listeners.forEach(listener -> listener.onServerAdded(instance));
207 213 break;
208 214 case CHILD_UPDATED:
... ... @@ -210,6 +216,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
210 216 break;
211 217 case CHILD_REMOVED:
212 218 tsSubService.onClusterUpdate();
  219 + deviceStateService.onClusterUpdate();
213 220 listeners.forEach(listener -> listener.onServerRemoved(instance));
214 221 break;
215 222 default:
... ...
... ... @@ -23,11 +23,13 @@ import com.google.common.util.concurrent.Futures;
23 23 import com.google.common.util.concurrent.ListenableFuture;
24 24 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
25 25 import com.google.common.util.concurrent.MoreExecutors;
  26 +import com.google.protobuf.InvalidProtocolBufferException;
26 27 import lombok.Getter;
27 28 import lombok.extern.slf4j.Slf4j;
28 29 import org.springframework.beans.factory.annotation.Autowired;
29 30 import org.springframework.beans.factory.annotation.Value;
30 31 import org.springframework.stereotype.Service;
  32 +import org.thingsboard.rule.engine.api.RpcError;
31 33 import org.thingsboard.server.actors.service.ActorService;
32 34 import org.thingsboard.server.common.data.DataConstants;
33 35 import org.thingsboard.server.common.data.Device;
... ... @@ -36,13 +38,19 @@ import org.thingsboard.server.common.data.id.DeviceId;
36 38 import org.thingsboard.server.common.data.id.TenantId;
37 39 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
38 40 import org.thingsboard.server.common.data.page.TextPageLink;
  41 +import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
39 42 import org.thingsboard.server.common.msg.TbMsg;
40 43 import org.thingsboard.server.common.msg.TbMsgDataType;
41 44 import org.thingsboard.server.common.msg.TbMsgMetaData;
  45 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
42 46 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
43 47 import org.thingsboard.server.dao.attributes.AttributesService;
44 48 import org.thingsboard.server.dao.device.DeviceService;
45 49 import org.thingsboard.server.dao.tenant.TenantService;
  50 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  51 +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  52 +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
  53 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
46 54 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
47 55
48 56 import javax.annotation.Nullable;
... ... @@ -54,6 +62,7 @@ import java.util.HashSet;
54 62 import java.util.List;
55 63 import java.util.Optional;
56 64 import java.util.Set;
  65 +import java.util.UUID;
57 66 import java.util.concurrent.ConcurrentHashMap;
58 67 import java.util.concurrent.ConcurrentMap;
59 68 import java.util.concurrent.ExecutionException;
... ... @@ -98,6 +107,12 @@ public class DefaultDeviceStateService implements DeviceStateService {
98 107 @Autowired
99 108 private TelemetrySubscriptionService tsSubService;
100 109
  110 + @Autowired
  111 + private ClusterRoutingService routingService;
  112 +
  113 + @Autowired
  114 + private ClusterRpcService clusterRpcService;
  115 +
101 116 @Value("${state.defaultInactivityTimeoutInSec}")
102 117 @Getter
103 118 private long defaultInactivityTimeoutInSec;
... ... @@ -172,12 +187,57 @@ public class DefaultDeviceStateService implements DeviceStateService {
172 187 }
173 188
174 189 @Override
175   - public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
176   - DeviceStateData state = deviceStates.get(deviceId);
177   - if (state != null) {
178   - return Optional.of(state.getState());
  190 + public void onClusterUpdate() {
  191 + queueExecutor.submit(this::onClusterUpdateSync);
  192 + }
  193 +
  194 + @Override
  195 + public void onRemoteMsg(ServerAddress serverAddress, byte[] data) {
  196 + ClusterAPIProtos.DeviceStateServiceMsgProto proto;
  197 + try {
  198 + proto = ClusterAPIProtos.DeviceStateServiceMsgProto.parseFrom(data);
  199 + } catch (InvalidProtocolBufferException e) {
  200 + throw new RuntimeException(e);
  201 + }
  202 + TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
  203 + DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB()));
  204 + if (proto.getDeleted()) {
  205 + queueExecutor.submit(() -> onDeviceDeleted(tenantId, deviceId));
179 206 } else {
180   - return Optional.empty();
  207 + Device device = deviceService.findDeviceById(deviceId);
  208 + if (device != null) {
  209 + if (proto.getAdded()) {
  210 + onDeviceAdded(device);
  211 + } else if (proto.getUpdated()) {
  212 + onDeviceUpdated(device);
  213 + }
  214 + }
  215 + }
  216 + }
  217 +
  218 + private void onClusterUpdateSync() {
  219 + List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
  220 + for (Tenant tenant : tenants) {
  221 + List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
  222 + List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
  223 + for (Device device : devices) {
  224 + if (!routingService.resolveById(device.getId()).isPresent()) {
  225 + if (!deviceStates.containsKey(device.getId())) {
  226 + fetchFutures.add(fetchDeviceState(device));
  227 + }
  228 + } else {
  229 + Set<DeviceId> tenantDeviceSet = tenantDevices.get(tenant.getId());
  230 + if (tenantDeviceSet != null) {
  231 + tenantDeviceSet.remove(device.getId());
  232 + }
  233 + deviceStates.remove(device.getId());
  234 + }
  235 + }
  236 + try {
  237 + Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
  238 + } catch (InterruptedException | ExecutionException e) {
  239 + log.warn("Failed to init device state service from DB", e);
  240 + }
181 241 }
182 242 }
183 243
... ... @@ -187,7 +247,9 @@ public class DefaultDeviceStateService implements DeviceStateService {
187 247 List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
188 248 List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
189 249 for (Device device : devices) {
190   - fetchFutures.add(fetchDeviceState(device));
  250 + if (!routingService.resolveById(device.getId()).isPresent()) {
  251 + fetchFutures.add(fetchDeviceState(device));
  252 + }
191 253 }
192 254 try {
193 255 Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
... ... @@ -209,7 +271,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
209 271 DeviceStateData stateData = deviceStates.get(deviceId);
210 272 DeviceState state = stateData.getState();
211 273 state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
212   - if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
  274 + if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
213 275 state.setLastInactivityAlarmTime(ts);
214 276 pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
215 277 saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
... ... @@ -219,7 +281,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
219 281 }
220 282
221 283 private void onDeviceConnectSync(DeviceId deviceId) {
222   - DeviceStateData stateData = deviceStates.get(deviceId);
  284 + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
223 285 if (stateData != null) {
224 286 long ts = System.currentTimeMillis();
225 287 stateData.getState().setLastConnectTime(ts);
... ... @@ -229,7 +291,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
229 291 }
230 292
231 293 private void onDeviceDisconnectSync(DeviceId deviceId) {
232   - DeviceStateData stateData = deviceStates.get(deviceId);
  294 + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
233 295 if (stateData != null) {
234 296 long ts = System.currentTimeMillis();
235 297 stateData.getState().setLastDisconnectTime(ts);
... ... @@ -239,7 +301,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
239 301 }
240 302
241 303 private void onDeviceActivitySync(DeviceId deviceId) {
242   - DeviceStateData stateData = deviceStates.get(deviceId);
  304 + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
243 305 if (stateData != null) {
244 306 DeviceState state = stateData.getState();
245 307 long ts = System.currentTimeMillis();
... ... @@ -251,6 +313,23 @@ public class DefaultDeviceStateService implements DeviceStateService {
251 313 }
252 314 }
253 315
  316 + private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
  317 + DeviceStateData deviceStateData = deviceStates.get(deviceId);
  318 + if (deviceStateData == null) {
  319 + if (!routingService.resolveById(deviceId).isPresent()) {
  320 + Device device = deviceService.findDeviceById(deviceId);
  321 + if (device != null) {
  322 + try {
  323 + deviceStateData = fetchDeviceState(device).get();
  324 + } catch (InterruptedException | ExecutionException e) {
  325 + log.debug("[{}] Failed to fetch device state!", deviceId, e);
  326 + }
  327 + }
  328 + }
  329 + }
  330 + return deviceStateData;
  331 + }
  332 +
254 333 private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
255 334 if (inactivityTimeout == 0L) {
256 335 return;
... ... @@ -269,37 +348,65 @@ public class DefaultDeviceStateService implements DeviceStateService {
269 348 }
270 349
271 350 private void onDeviceAddedSync(Device device) {
272   - Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
273   - @Override
274   - public void onSuccess(@Nullable DeviceStateData state) {
275   - addDeviceUsingState(state);
276   - }
  351 + Optional<ServerAddress> address = routingService.resolveById(device.getId());
  352 + if (!address.isPresent()) {
  353 + Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
  354 + @Override
  355 + public void onSuccess(@Nullable DeviceStateData state) {
  356 + addDeviceUsingState(state);
  357 + }
  358 +
  359 + @Override
  360 + public void onFailure(Throwable t) {
  361 + log.warn("Failed to register device to the state service", t);
  362 + }
  363 + });
  364 + } else {
  365 + sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false);
  366 + }
  367 + }
277 368
278   - @Override
279   - public void onFailure(Throwable t) {
280   - log.warn("Failed to register device to the state service", t);
281   - }
282   - });
  369 + private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, ServerAddress address, boolean added, boolean updated, boolean deleted) {
  370 + log.trace("[{}][{}] Device is monitored on other server: {}", tenantId, deviceId, address);
  371 + ClusterAPIProtos.DeviceStateServiceMsgProto.Builder builder = ClusterAPIProtos.DeviceStateServiceMsgProto.newBuilder();
  372 + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
  373 + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
  374 + builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits());
  375 + builder.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits());
  376 + builder.setAdded(added);
  377 + builder.setUpdated(updated);
  378 + builder.setDeleted(deleted);
  379 + clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_DEVICE_STATE_SERVICE_MESSAGE, builder.build().toByteArray());
283 380 }
284 381
285 382 private void onDeviceUpdatedSync(Device device) {
286   - DeviceStateData stateData = deviceStates.get(device.getId());
287   - if (stateData != null) {
288   - TbMsgMetaData md = new TbMsgMetaData();
289   - md.putValue("deviceName", device.getName());
290   - md.putValue("deviceType", device.getType());
291   - stateData.setMetaData(md);
  383 + Optional<ServerAddress> address = routingService.resolveById(device.getId());
  384 + if (!address.isPresent()) {
  385 + DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
  386 + if (stateData != null) {
  387 + TbMsgMetaData md = new TbMsgMetaData();
  388 + md.putValue("deviceName", device.getName());
  389 + md.putValue("deviceType", device.getType());
  390 + stateData.setMetaData(md);
  391 + }
  392 + } else {
  393 + sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), false, true, false);
292 394 }
293 395 }
294 396
295 397 private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
296   - deviceStates.remove(deviceId);
297   - Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
298   - if (deviceIds != null) {
299   - deviceIds.remove(deviceId);
300   - if (deviceIds.isEmpty()) {
301   - tenantDevices.remove(tenantId);
  398 + Optional<ServerAddress> address = routingService.resolveById(deviceId);
  399 + if (!address.isPresent()) {
  400 + deviceStates.remove(deviceId);
  401 + Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
  402 + if (deviceIds != null) {
  403 + deviceIds.remove(deviceId);
  404 + if (deviceIds.isEmpty()) {
  405 + tenantDevices.remove(tenantId);
  406 + }
302 407 }
  408 + } else {
  409 + sendDeviceEvent(tenantId, deviceId, address.get(), false, false, true);
303 410 }
304 411 }
305 412
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.service.state;
17 17
18 18 import org.thingsboard.server.common.data.Device;
19 19 import org.thingsboard.server.common.data.id.DeviceId;
  20 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
20 21
21 22 import java.util.Optional;
22 23
... ... @@ -39,6 +40,7 @@ public interface DeviceStateService {
39 40
40 41 void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
41 42
42   - Optional<DeviceState> getDeviceState(DeviceId deviceId);
  43 + void onClusterUpdate();
43 44
  45 + void onRemoteMsg(ServerAddress serverAddress, byte[] bytes);
44 46 }
... ...
... ... @@ -362,7 +362,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
362 362
363 363 DonAsynchron.withCallback(tsService.findAll(entityId, queries),
364 364 missedUpdates -> {
365   - if (!missedUpdates.isEmpty()) {
  365 + if (missedUpdates != null && !missedUpdates.isEmpty()) {
366 366 tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
367 367 }
368 368 },
... ...
... ... @@ -57,6 +57,8 @@ enum MessageType {
57 57 CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
58 58 CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
59 59 CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
  60 +
  61 + CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
60 62 }
61 63
62 64 // Messages related to CLUSTER_TELEMETRY_MESSAGE
... ... @@ -128,3 +130,13 @@ message FromDeviceRPCResponseProto {
128 130 string response = 3;
129 131 int32 error = 4;
130 132 }
  133 +
  134 +message DeviceStateServiceMsgProto {
  135 + int64 tenantIdMSB = 1;
  136 + int64 tenantIdLSB = 2;
  137 + int64 deviceIdMSB = 3;
  138 + int64 deviceIdLSB = 4;
  139 + bool added = 5;
  140 + bool updated = 6;
  141 + bool deleted = 7;
  142 +}
\ No newline at end of file
... ...