Commit 4a3a8a6f71eaccd6e7d3a274d78a08436f95a706

Authored by Andrew Shvayka
1 parent 280a752f

Device State Service

@@ -64,6 +64,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; @@ -64,6 +64,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
64 import org.thingsboard.server.service.mail.MailExecutorService; 64 import org.thingsboard.server.service.mail.MailExecutorService;
65 import org.thingsboard.server.service.rpc.DeviceRpcService; 65 import org.thingsboard.server.service.rpc.DeviceRpcService;
66 import org.thingsboard.server.service.script.JsExecutorService; 66 import org.thingsboard.server.service.script.JsExecutorService;
  67 +import org.thingsboard.server.service.state.DeviceStateService;
67 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; 68 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
68 69
69 import java.io.IOException; 70 import java.io.IOException;
@@ -192,6 +193,10 @@ public class ActorSystemContext { @@ -192,6 +193,10 @@ public class ActorSystemContext {
192 @Getter 193 @Getter
193 private MsgQueue msgQueue; 194 private MsgQueue msgQueue;
194 195
  196 + @Autowired
  197 + @Getter
  198 + private DeviceStateService deviceStateService;
  199 +
195 @Value("${actors.session.sync.timeout}") 200 @Value("${actors.session.sync.timeout}")
196 @Getter 201 @Getter
197 private long syncSessionTimeout; 202 private long syncSessionTimeout;
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -265,17 +265,32 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -265,17 +265,32 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
265 break; 265 break;
266 case POST_ATTRIBUTES_REQUEST: 266 case POST_ATTRIBUTES_REQUEST:
267 handlePostAttributesRequest(context, msg); 267 handlePostAttributesRequest(context, msg);
  268 + reportActivity();
268 break; 269 break;
269 case POST_TELEMETRY_REQUEST: 270 case POST_TELEMETRY_REQUEST:
270 handlePostTelemetryRequest(context, msg); 271 handlePostTelemetryRequest(context, msg);
  272 + reportActivity();
271 break; 273 break;
272 case TO_SERVER_RPC_REQUEST: 274 case TO_SERVER_RPC_REQUEST:
273 handleClientSideRPCRequest(context, msg); 275 handleClientSideRPCRequest(context, msg);
  276 + reportActivity();
274 break; 277 break;
275 } 278 }
276 } 279 }
277 } 280 }
278 281
  282 + private void reportActivity() {
  283 + systemContext.getDeviceStateService().onDeviceActivity(deviceId);
  284 + }
  285 +
  286 + private void reportSessionOpen() {
  287 + systemContext.getDeviceStateService().onDeviceConnect(deviceId);
  288 + }
  289 +
  290 + private void reportSessionClose() {
  291 + systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
  292 + }
  293 +
279 private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) { 294 private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
280 GetAttributesRequest request = (GetAttributesRequest) src.getPayload(); 295 GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
281 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames()); 296 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
@@ -488,11 +503,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -488,11 +503,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
488 if (inMsg instanceof SessionOpenMsg) { 503 if (inMsg instanceof SessionOpenMsg) {
489 logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); 504 logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
490 sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress())); 505 sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
  506 + if (sessions.size() == 1) {
  507 + reportSessionOpen();
  508 + }
491 } else if (inMsg instanceof SessionCloseMsg) { 509 } else if (inMsg instanceof SessionCloseMsg) {
492 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); 510 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
493 sessions.remove(sessionId); 511 sessions.remove(sessionId);
494 attributeSubscriptions.remove(sessionId); 512 attributeSubscriptions.remove(sessionId);
495 rpcSubscriptions.remove(sessionId); 513 rpcSubscriptions.remove(sessionId);
  514 + if (sessions.isEmpty()) {
  515 + reportSessionClose();
  516 + }
496 } 517 }
497 } 518 }
498 519
@@ -32,4 +32,5 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor @@ -32,4 +32,5 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
32 void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId); 32 void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
33 33
34 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType); 34 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
  35 +
35 } 36 }
@@ -63,6 +63,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; @@ -63,6 +63,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
63 import org.thingsboard.server.common.data.exception.ThingsboardException; 63 import org.thingsboard.server.common.data.exception.ThingsboardException;
64 import org.thingsboard.server.service.component.ComponentDiscoveryService; 64 import org.thingsboard.server.service.component.ComponentDiscoveryService;
65 import org.thingsboard.server.service.security.model.SecurityUser; 65 import org.thingsboard.server.service.security.model.SecurityUser;
  66 +import org.thingsboard.server.service.state.DeviceStateService;
66 67
67 import javax.mail.MessagingException; 68 import javax.mail.MessagingException;
68 import javax.servlet.http.HttpServletRequest; 69 import javax.servlet.http.HttpServletRequest;
@@ -137,6 +138,9 @@ public abstract class BaseController { @@ -137,6 +138,9 @@ public abstract class BaseController {
137 @Autowired 138 @Autowired
138 protected DeviceOfflineService offlineService; 139 protected DeviceOfflineService offlineService;
139 140
  141 + @Autowired
  142 + protected DeviceStateService deviceStateService;
  143 +
140 @ExceptionHandler(ThingsboardException.class) 144 @ExceptionHandler(ThingsboardException.class)
141 public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) { 145 public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
142 errorResponseHandler.handle(ex, response); 146 errorResponseHandler.handle(ex, response);
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -90,6 +90,11 @@ public class DeviceController extends BaseController { @@ -90,6 +90,11 @@ public class DeviceController extends BaseController {
90 savedDevice.getCustomerId(), 90 savedDevice.getCustomerId(),
91 device.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null); 91 device.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
92 92
  93 + if (device.getId() == null) {
  94 + deviceStateService.onDeviceAdded(savedDevice);
  95 + } else {
  96 + deviceStateService.onDeviceUpdated(savedDevice);
  97 + }
93 return savedDevice; 98 return savedDevice;
94 } catch (Exception e) { 99 } catch (Exception e) {
95 logEntityAction(emptyId(EntityType.DEVICE), device, 100 logEntityAction(emptyId(EntityType.DEVICE), device,
@@ -112,6 +117,7 @@ public class DeviceController extends BaseController { @@ -112,6 +117,7 @@ public class DeviceController extends BaseController {
112 device.getCustomerId(), 117 device.getCustomerId(),
113 ActionType.DELETED, null, strDeviceId); 118 ActionType.DELETED, null, strDeviceId);
114 119
  120 + deviceStateService.onDeviceDeleted(device);
115 } catch (Exception e) { 121 } catch (Exception e) {
116 logEntityAction(emptyId(EntityType.DEVICE), 122 logEntityAction(emptyId(EntityType.DEVICE),
117 null, 123 null,
@@ -387,7 +393,7 @@ public class DeviceController extends BaseController { @@ -387,7 +393,7 @@ public class DeviceController extends BaseController {
387 @RequestMapping(value = "/device/online", method = RequestMethod.GET) 393 @RequestMapping(value = "/device/online", method = RequestMethod.GET)
388 @ResponseBody 394 @ResponseBody
389 public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType, 395 public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
390 - @RequestParam("threshold") long threshold) throws ThingsboardException { 396 + @RequestParam("threshold") long threshold) throws ThingsboardException {
391 try { 397 try {
392 TenantId tenantId = getCurrentUser().getTenantId(); 398 TenantId tenantId = getCurrentUser().getTenantId();
393 ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold); 399 ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
  1 +package org.thingsboard.server.service.state;
  2 +
  3 +import com.datastax.driver.core.utils.UUIDs;
  4 +import com.fasterxml.jackson.databind.ObjectMapper;
  5 +import com.google.common.base.Function;
  6 +import com.google.common.util.concurrent.FutureCallback;
  7 +import com.google.common.util.concurrent.Futures;
  8 +import com.google.common.util.concurrent.ListenableFuture;
  9 +import com.google.common.util.concurrent.ListeningScheduledExecutorService;
  10 +import com.google.common.util.concurrent.MoreExecutors;
  11 +import lombok.Getter;
  12 +import lombok.extern.slf4j.Slf4j;
  13 +import org.springframework.beans.factory.annotation.Autowired;
  14 +import org.springframework.beans.factory.annotation.Value;
  15 +import org.springframework.stereotype.Service;
  16 +import org.thingsboard.server.actors.service.ActorService;
  17 +import org.thingsboard.server.common.data.DataConstants;
  18 +import org.thingsboard.server.common.data.Device;
  19 +import org.thingsboard.server.common.data.Tenant;
  20 +import org.thingsboard.server.common.data.id.DeviceId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  23 +import org.thingsboard.server.common.data.page.TextPageLink;
  24 +import org.thingsboard.server.common.msg.TbMsg;
  25 +import org.thingsboard.server.common.msg.TbMsgDataType;
  26 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  27 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
  28 +import org.thingsboard.server.dao.attributes.AttributesService;
  29 +import org.thingsboard.server.dao.device.DeviceService;
  30 +import org.thingsboard.server.dao.tenant.TenantService;
  31 +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
  32 +
  33 +import javax.annotation.Nullable;
  34 +import javax.annotation.PostConstruct;
  35 +import javax.annotation.PreDestroy;
  36 +import java.util.ArrayList;
  37 +import java.util.Arrays;
  38 +import java.util.HashSet;
  39 +import java.util.List;
  40 +import java.util.Optional;
  41 +import java.util.Set;
  42 +import java.util.concurrent.ConcurrentHashMap;
  43 +import java.util.concurrent.ConcurrentMap;
  44 +import java.util.concurrent.ExecutionException;
  45 +import java.util.concurrent.Executors;
  46 +import java.util.concurrent.TimeUnit;
  47 +
  48 +import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
  49 +import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
  50 +import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
  51 +import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;
  52 +
  53 +/**
  54 + * Created by ashvayka on 01.05.18.
  55 + */
  56 +@Service
  57 +@Slf4j
  58 +//TODO: refactor to use page links as cursor and not fetch all
  59 +public class DefaultDeviceStateService implements DeviceStateService {
  60 +
  61 + private static final ObjectMapper json = new ObjectMapper();
  62 + public static final String ACTIVITY_STATE = "active";
  63 + public static final String LAST_CONNECT_TIME = "lastConnectTime";
  64 + public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime";
  65 + public static final String LAST_ACTIVITY_TIME = "lastActivityTime";
  66 + public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
  67 + public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";
  68 +
  69 + public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
  70 +
  71 + @Autowired
  72 + private TenantService tenantService;
  73 +
  74 + @Autowired
  75 + private DeviceService deviceService;
  76 +
  77 + @Autowired
  78 + private AttributesService attributesService;
  79 +
  80 + @Autowired
  81 + private ActorService actorService;
  82 +
  83 + @Autowired
  84 + private TelemetrySubscriptionService tsSubService;
  85 +
  86 + @Value("${state.defaultInactivityTimeoutInSec}")
  87 + @Getter
  88 + private long defaultInactivityTimeoutInSec;
  89 +
  90 + @Value("${state.defaultStateCheckIntervalInSec}")
  91 + @Getter
  92 + private long defaultStateCheckIntervalInSec;
  93 +
  94 +// TODO in v2.1
  95 +// @Value("${state.defaultStatePersistenceIntervalInSec}")
  96 +// @Getter
  97 +// private long defaultStatePersistenceIntervalInSec;
  98 +//
  99 +// @Value("${state.defaultStatePersistencePack}")
  100 +// @Getter
  101 +// private long defaultStatePersistencePack;
  102 +
  103 + private ListeningScheduledExecutorService queueExecutor;
  104 +
  105 + private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
  106 + private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
  107 +
  108 + @PostConstruct
  109 + public void init() {
  110 + // Should be always single threaded due to absence of locks.
  111 + queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
  112 + queueExecutor.submit(this::initStateFromDB);
  113 + queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
  114 + //TODO: schedule persistence in v2.1;
  115 + }
  116 +
  117 + @PreDestroy
  118 + public void stop() {
  119 + if (queueExecutor != null) {
  120 + queueExecutor.shutdownNow();
  121 + }
  122 + }
  123 +
  124 + @Override
  125 + public void onDeviceAdded(Device device) {
  126 + queueExecutor.submit(() -> onDeviceAddedSync(device));
  127 + }
  128 +
  129 + @Override
  130 + public void onDeviceUpdated(Device device) {
  131 + queueExecutor.submit(() -> onDeviceUpdatedSync(device));
  132 + }
  133 +
  134 + @Override
  135 + public void onDeviceConnect(DeviceId deviceId) {
  136 + queueExecutor.submit(() -> onDeviceConnectSync(deviceId));
  137 + }
  138 +
  139 + @Override
  140 + public void onDeviceActivity(DeviceId deviceId) {
  141 + queueExecutor.submit(() -> onDeviceActivitySync(deviceId));
  142 + }
  143 +
  144 + @Override
  145 + public void onDeviceDisconnect(DeviceId deviceId) {
  146 + queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId));
  147 + }
  148 +
  149 + @Override
  150 + public void onDeviceDeleted(Device device) {
  151 + queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId()));
  152 + }
  153 +
  154 + @Override
  155 + public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
  156 + queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout));
  157 + }
  158 +
  159 + @Override
  160 + public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
  161 + DeviceStateData state = deviceStates.get(deviceId);
  162 + if (state != null) {
  163 + return Optional.of(state.getState());
  164 + } else {
  165 + return Optional.empty();
  166 + }
  167 + }
  168 +
  169 + private void initStateFromDB() {
  170 + List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
  171 + for (Tenant tenant : tenants) {
  172 + List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
  173 + List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
  174 + for (Device device : devices) {
  175 + fetchFutures.add(fetchDeviceState(device));
  176 + }
  177 + try {
  178 + Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
  179 + } catch (InterruptedException | ExecutionException e) {
  180 + log.warn("Failed to init device state service from DB", e);
  181 + }
  182 + }
  183 + }
  184 +
  185 + private void addDeviceUsingState(DeviceStateData state) {
  186 + tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
  187 + deviceStates.put(state.getDeviceId(), state);
  188 + }
  189 +
  190 + private void updateState() {
  191 + long ts = System.currentTimeMillis();
  192 + Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
  193 + for (DeviceId deviceId : deviceIds) {
  194 + DeviceStateData stateData = deviceStates.get(deviceId);
  195 + DeviceState state = stateData.getState();
  196 + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
  197 + if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
  198 + state.setLastInactivityAlarmTime(ts);
  199 + pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
  200 + saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
  201 + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
  202 + }
  203 + }
  204 + }
  205 +
  206 + private void onDeviceConnectSync(DeviceId deviceId) {
  207 + DeviceStateData stateData = deviceStates.get(deviceId);
  208 + if (stateData != null) {
  209 + long ts = System.currentTimeMillis();
  210 + stateData.getState().setLastConnectTime(ts);
  211 + pushRuleEngineMessage(stateData, CONNECT_EVENT);
  212 + saveAttribute(deviceId, LAST_CONNECT_TIME, ts);
  213 + }
  214 + }
  215 +
  216 + private void onDeviceDisconnectSync(DeviceId deviceId) {
  217 + DeviceStateData stateData = deviceStates.get(deviceId);
  218 + if (stateData != null) {
  219 + long ts = System.currentTimeMillis();
  220 + stateData.getState().setLastDisconnectTime(ts);
  221 + pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
  222 + saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts);
  223 + }
  224 + }
  225 +
  226 + private void onDeviceActivitySync(DeviceId deviceId) {
  227 + DeviceStateData stateData = deviceStates.get(deviceId);
  228 + if (stateData != null) {
  229 + DeviceState state = stateData.getState();
  230 + long ts = System.currentTimeMillis();
  231 + state.setActive(true);
  232 + stateData.getState().setLastActivityTime(ts);
  233 + pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
  234 + saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts);
  235 + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
  236 + }
  237 + }
  238 +
  239 + private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
  240 + if (inactivityTimeout == 0L) {
  241 + return;
  242 + }
  243 + DeviceStateData stateData = deviceStates.get(deviceId);
  244 + if (stateData != null) {
  245 + long ts = System.currentTimeMillis();
  246 + DeviceState state = stateData.getState();
  247 + state.setInactivityTimeout(inactivityTimeout);
  248 + boolean oldActive = state.isActive();
  249 + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
  250 + if (!oldActive && state.isActive()) {
  251 + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
  252 + }
  253 + }
  254 + }
  255 +
  256 + private void onDeviceAddedSync(Device device) {
  257 + Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
  258 + @Override
  259 + public void onSuccess(@Nullable DeviceStateData state) {
  260 + addDeviceUsingState(state);
  261 + }
  262 +
  263 + @Override
  264 + public void onFailure(Throwable t) {
  265 + log.warn("Failed to register device to the state service", t);
  266 + }
  267 + });
  268 + }
  269 +
  270 + private void onDeviceUpdatedSync(Device device) {
  271 + DeviceStateData stateData = deviceStates.get(device.getId());
  272 + if (stateData != null) {
  273 + TbMsgMetaData md = new TbMsgMetaData();
  274 + md.putValue("deviceName", device.getName());
  275 + md.putValue("deviceType", device.getType());
  276 + stateData.setMetaData(md);
  277 + }
  278 + }
  279 +
  280 + private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
  281 + deviceStates.remove(deviceId);
  282 + Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
  283 + if (deviceIds != null) {
  284 + deviceIds.remove(deviceId);
  285 + if (deviceIds.isEmpty()) {
  286 + tenantDevices.remove(tenantId);
  287 + }
  288 + }
  289 + }
  290 +
  291 + private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
  292 + ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
  293 + return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() {
  294 + @Nullable
  295 + @Override
  296 + public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) {
  297 + long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L);
  298 + long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L);
  299 + long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
  300 + boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
  301 + DeviceState deviceState = DeviceState.builder()
  302 + .active(active)
  303 + .lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L))
  304 + .lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L))
  305 + .lastActivityTime(lastActivityTime)
  306 + .lastInactivityAlarmTime(inactivityAlarmTime)
  307 + .inactivityTimeout(inactivityTimeout)
  308 + .build();
  309 + TbMsgMetaData md = new TbMsgMetaData();
  310 + md.putValue("deviceName", device.getName());
  311 + md.putValue("deviceType", device.getType());
  312 + return DeviceStateData.builder()
  313 + .tenantId(device.getTenantId())
  314 + .deviceId(device.getId())
  315 + .metaData(md)
  316 + .state(deviceState).build();
  317 + }
  318 + });
  319 + }
  320 +
  321 + private long getLastPersistTime(List<AttributeKvEntry> attributes) {
  322 + return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L);
  323 + }
  324 +
  325 + private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
  326 + for (AttributeKvEntry attribute : attributes) {
  327 + if (attribute.getKey().equals(attributeName)) {
  328 + return attribute.getLongValue().orElse(defaultValue);
  329 + }
  330 + }
  331 + return defaultValue;
  332 + }
  333 +
  334 + private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
  335 + DeviceState state = stateData.getState();
  336 + try {
  337 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON
  338 + , json.writeValueAsString(state)
  339 + , null, null, 0L);
  340 + actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
  341 + } catch (Exception e) {
  342 + log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
  343 + }
  344 + }
  345 +
  346 + private void saveAttribute(DeviceId deviceId, String key, long value) {
  347 + tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
  348 + }
  349 +
  350 + private void saveAttribute(DeviceId deviceId, String key, boolean value) {
  351 + tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
  352 + }
  353 +
  354 + private class AttributeSaveCallback implements FutureCallback<Void> {
  355 + private final DeviceId deviceId;
  356 + private final String key;
  357 + private final Object value;
  358 +
  359 + AttributeSaveCallback(DeviceId deviceId, String key, Object value) {
  360 + this.deviceId = deviceId;
  361 + this.key = key;
  362 + this.value = value;
  363 + }
  364 +
  365 + @Override
  366 + public void onSuccess(@Nullable Void result) {
  367 + log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
  368 + }
  369 +
  370 + @Override
  371 + public void onFailure(Throwable t) {
  372 + log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t);
  373 + }
  374 + }
  375 +}
  1 +package org.thingsboard.server.service.state;
  2 +
  3 +import lombok.Builder;
  4 +import lombok.Data;
  5 +
  6 +/**
  7 + * Created by ashvayka on 01.05.18.
  8 + */
  9 +@Data
  10 +@Builder
  11 +public class DeviceState {
  12 +
  13 + private boolean active;
  14 + private long lastConnectTime;
  15 + private long lastActivityTime;
  16 + private long lastDisconnectTime;
  17 + private long lastInactivityAlarmTime;
  18 + private long inactivityTimeout;
  19 +
  20 +}
  1 +package org.thingsboard.server.service.state;
  2 +
  3 +import lombok.Builder;
  4 +import lombok.Data;
  5 +import org.thingsboard.server.common.data.id.DeviceId;
  6 +import org.thingsboard.server.common.data.id.TenantId;
  7 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  8 +
  9 +/**
  10 + * Created by ashvayka on 01.05.18.
  11 + */
  12 +@Data
  13 +@Builder
  14 +class DeviceStateData {
  15 +
  16 + private final TenantId tenantId;
  17 + private final DeviceId deviceId;
  18 +
  19 + private TbMsgMetaData metaData;
  20 + private final DeviceState state;
  21 +
  22 +}
  1 +package org.thingsboard.server.service.state;
  2 +
  3 +import org.thingsboard.server.common.data.Device;
  4 +import org.thingsboard.server.common.data.id.DeviceId;
  5 +
  6 +import java.util.Optional;
  7 +
  8 +/**
  9 + * Created by ashvayka on 01.05.18.
  10 + */
  11 +public interface DeviceStateService {
  12 +
  13 + void onDeviceAdded(Device device);
  14 +
  15 + void onDeviceUpdated(Device device);
  16 +
  17 + void onDeviceDeleted(Device device);
  18 +
  19 + void onDeviceConnect(DeviceId deviceId);
  20 +
  21 + void onDeviceActivity(DeviceId deviceId);
  22 +
  23 + void onDeviceDisconnect(DeviceId deviceId);
  24 +
  25 + void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
  26 +
  27 + Optional<DeviceState> getDeviceState(DeviceId deviceId);
  28 +
  29 +}
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +20,20 @@ import com.google.common.util.concurrent.Futures; @@ -20,11 +20,20 @@ import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
21 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
22 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.context.annotation.Lazy;
23 import org.springframework.stereotype.Service; 24 import org.springframework.stereotype.Service;
24 import org.springframework.util.StringUtils; 25 import org.springframework.util.StringUtils;
  26 +import org.thingsboard.server.common.data.DataConstants;
  27 +import org.thingsboard.server.common.data.EntityType;
  28 +import org.thingsboard.server.common.data.id.DeviceId;
25 import org.thingsboard.server.common.data.id.EntityId; 29 import org.thingsboard.server.common.data.id.EntityId;
26 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 30 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  31 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
27 import org.thingsboard.server.common.data.kv.BasicTsKvEntry; 32 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  33 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  34 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  35 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  36 +import org.thingsboard.server.common.data.kv.StringDataEntry;
28 import org.thingsboard.server.common.data.kv.TsKvEntry; 37 import org.thingsboard.server.common.data.kv.TsKvEntry;
29 import org.thingsboard.server.common.msg.cluster.ServerAddress; 38 import org.thingsboard.server.common.msg.cluster.ServerAddress;
30 import org.thingsboard.server.dao.attributes.AttributesService; 39 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -34,11 +43,14 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; @@ -34,11 +43,14 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
34 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; 43 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
35 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate; 44 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
36 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; 45 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  46 +import org.thingsboard.server.service.state.DefaultDeviceStateService;
  47 +import org.thingsboard.server.service.state.DeviceStateService;
37 48
38 import javax.annotation.Nullable; 49 import javax.annotation.Nullable;
39 import javax.annotation.PostConstruct; 50 import javax.annotation.PostConstruct;
40 import javax.annotation.PreDestroy; 51 import javax.annotation.PreDestroy;
41 import java.util.ArrayList; 52 import java.util.ArrayList;
  53 +import java.util.Collections;
42 import java.util.HashMap; 54 import java.util.HashMap;
43 import java.util.HashSet; 55 import java.util.HashSet;
44 import java.util.List; 56 import java.util.List;
@@ -70,6 +82,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -70,6 +82,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
70 @Autowired 82 @Autowired
71 private ClusterRoutingService routingService; 83 private ClusterRoutingService routingService;
72 84
  85 + @Autowired
  86 + @Lazy
  87 + private DeviceStateService stateService;
  88 +
73 private ExecutorService tsCallBackExecutor; 89 private ExecutorService tsCallBackExecutor;
74 private ExecutorService wsCallBackExecutor; 90 private ExecutorService wsCallBackExecutor;
75 91
@@ -149,10 +165,41 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -149,10 +165,41 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
149 addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes)); 165 addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
150 } 166 }
151 167
  168 + @Override
  169 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
  170 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
  171 + , System.currentTimeMillis())), callback);
  172 + }
  173 +
  174 + @Override
  175 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback) {
  176 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(key, value)
  177 + , System.currentTimeMillis())), callback);
  178 + }
  179 +
  180 + @Override
  181 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback) {
  182 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new DoubleDataEntry(key, value)
  183 + , System.currentTimeMillis())), callback);
  184 + }
  185 +
  186 + @Override
  187 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback) {
  188 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
  189 + , System.currentTimeMillis())), callback);
  190 + }
  191 +
152 private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) { 192 private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
153 Optional<ServerAddress> serverAddress = routingService.resolveById(entityId); 193 Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
154 if (!serverAddress.isPresent()) { 194 if (!serverAddress.isPresent()) {
155 onLocalAttributesUpdate(entityId, scope, attributes); 195 onLocalAttributesUpdate(entityId, scope, attributes);
  196 + if (entityId.getEntityType() == EntityType.DEVICE && DataConstants.SERVER_SCOPE.equalsIgnoreCase(scope)) {
  197 + for (AttributeKvEntry attribute : attributes) {
  198 + if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
  199 + stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
  200 + }
  201 + }
  202 + }
156 } else { 203 } else {
157 // rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries); 204 // rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
158 } 205 }
@@ -357,4 +357,11 @@ audit_log: @@ -357,4 +357,11 @@ audit_log:
357 host: "${AUDIT_LOG_SINK_HOST:localhost}" 357 host: "${AUDIT_LOG_SINK_HOST:localhost}"
358 port: "${AUDIT_LOG_SINK_POST:9200}" 358 port: "${AUDIT_LOG_SINK_POST:9200}"
359 user_name: "${AUDIT_LOG_SINK_USER_NAME:}" 359 user_name: "${AUDIT_LOG_SINK_USER_NAME:}"
360 - password: "${AUDIT_LOG_SINK_PASSWORD:}"  
  360 + password: "${AUDIT_LOG_SINK_PASSWORD:}"
  361 +
  362 +state:
  363 + defaultInactivityTimeoutInSec: 10
  364 + defaultStateCheckIntervalInSec: 10
  365 +# TODO in v2.1
  366 +# defaultStatePersistenceIntervalInSec: 60
  367 +# defaultStatePersistencePack: 100
@@ -45,4 +45,9 @@ public class DataConstants { @@ -45,4 +45,9 @@ public class DataConstants {
45 public static final String IN = "IN"; 45 public static final String IN = "IN";
46 public static final String OUT = "OUT"; 46 public static final String OUT = "OUT";
47 47
  48 + public static final String INACTIVITY_EVENT = "INACTIVITY_EVENT";
  49 + public static final String CONNECT_EVENT = "CONNECT_EVENT";
  50 + public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT";
  51 + public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT";
  52 +
48 } 53 }
@@ -33,4 +33,12 @@ public interface RuleEngineTelemetryService { @@ -33,4 +33,12 @@ public interface RuleEngineTelemetryService {
33 33
34 void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback); 34 void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
35 35
  36 + void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
  37 +
  38 + void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);
  39 +
  40 + void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback);
  41 +
  42 + void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
  43 +
36 } 44 }