Commit dfd3252313759d039c9e4b9616aa9ed7d4a48b91

Authored by Igor Kulikov
2 parents e8496ffe 44c7d936

Merge branch 'develop/1.5' of github.com:thingsboard/thingsboard into develop/1.5

... ... @@ -65,6 +65,7 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService;
65 65 import org.thingsboard.server.service.mail.MailExecutorService;
66 66 import org.thingsboard.server.service.rpc.DeviceRpcService;
67 67 import org.thingsboard.server.service.script.JsExecutorService;
  68 +import org.thingsboard.server.service.state.DeviceStateService;
68 69 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
69 70
70 71 import java.io.IOException;
... ... @@ -197,6 +198,10 @@ public class ActorSystemContext {
197 198 @Getter
198 199 private MsgQueue msgQueue;
199 200
  201 + @Autowired
  202 + @Getter
  203 + private DeviceStateService deviceStateService;
  204 +
200 205 @Value("${actors.session.sync.timeout}")
201 206 @Getter
202 207 private long syncSessionTimeout;
... ...
... ... @@ -265,17 +265,32 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
265 265 break;
266 266 case POST_ATTRIBUTES_REQUEST:
267 267 handlePostAttributesRequest(context, msg);
  268 + reportActivity();
268 269 break;
269 270 case POST_TELEMETRY_REQUEST:
270 271 handlePostTelemetryRequest(context, msg);
  272 + reportActivity();
271 273 break;
272 274 case TO_SERVER_RPC_REQUEST:
273 275 handleClientSideRPCRequest(context, msg);
  276 + reportActivity();
274 277 break;
275 278 }
276 279 }
277 280 }
278 281
  282 + private void reportActivity() {
  283 + systemContext.getDeviceStateService().onDeviceActivity(deviceId);
  284 + }
  285 +
  286 + private void reportSessionOpen() {
  287 + systemContext.getDeviceStateService().onDeviceConnect(deviceId);
  288 + }
  289 +
  290 + private void reportSessionClose() {
  291 + systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
  292 + }
  293 +
279 294 private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
280 295 GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
281 296 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
... ... @@ -488,11 +503,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
488 503 if (inMsg instanceof SessionOpenMsg) {
489 504 logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
490 505 sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
  506 + if (sessions.size() == 1) {
  507 + reportSessionOpen();
  508 + }
491 509 } else if (inMsg instanceof SessionCloseMsg) {
492 510 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
493 511 sessions.remove(sessionId);
494 512 attributeSubscriptions.remove(sessionId);
495 513 rpcSubscriptions.remove(sessionId);
  514 + if (sessions.isEmpty()) {
  515 + reportSessionClose();
  516 + }
496 517 }
497 518 }
498 519
... ...
... ... @@ -32,4 +32,5 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
32 32 void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
33 33
34 34 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
  35 +
35 36 }
... ...
... ... @@ -63,6 +63,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
63 63 import org.thingsboard.server.common.data.exception.ThingsboardException;
64 64 import org.thingsboard.server.service.component.ComponentDiscoveryService;
65 65 import org.thingsboard.server.service.security.model.SecurityUser;
  66 +import org.thingsboard.server.service.state.DeviceStateService;
66 67
67 68 import javax.mail.MessagingException;
68 69 import javax.servlet.http.HttpServletRequest;
... ... @@ -137,6 +138,9 @@ public abstract class BaseController {
137 138 @Autowired
138 139 protected DeviceOfflineService offlineService;
139 140
  141 + @Autowired
  142 + protected DeviceStateService deviceStateService;
  143 +
140 144 @ExceptionHandler(ThingsboardException.class)
141 145 public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
142 146 errorResponseHandler.handle(ex, response);
... ...
... ... @@ -90,6 +90,11 @@ public class DeviceController extends BaseController {
90 90 savedDevice.getCustomerId(),
91 91 device.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
92 92
  93 + if (device.getId() == null) {
  94 + deviceStateService.onDeviceAdded(savedDevice);
  95 + } else {
  96 + deviceStateService.onDeviceUpdated(savedDevice);
  97 + }
93 98 return savedDevice;
94 99 } catch (Exception e) {
95 100 logEntityAction(emptyId(EntityType.DEVICE), device,
... ... @@ -112,6 +117,7 @@ public class DeviceController extends BaseController {
112 117 device.getCustomerId(),
113 118 ActionType.DELETED, null, strDeviceId);
114 119
  120 + deviceStateService.onDeviceDeleted(device);
115 121 } catch (Exception e) {
116 122 logEntityAction(emptyId(EntityType.DEVICE),
117 123 null,
... ... @@ -387,7 +393,7 @@ public class DeviceController extends BaseController {
387 393 @RequestMapping(value = "/device/online", method = RequestMethod.GET)
388 394 @ResponseBody
389 395 public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
390   - @RequestParam("threshold") long threshold) throws ThingsboardException {
  396 + @RequestParam("threshold") long threshold) throws ThingsboardException {
391 397 try {
392 398 TenantId tenantId = getCurrentUser().getTenantId();
393 399 ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.state;
  17 +
  18 +import com.datastax.driver.core.utils.UUIDs;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.base.Function;
  21 +import com.google.common.util.concurrent.FutureCallback;
  22 +import com.google.common.util.concurrent.Futures;
  23 +import com.google.common.util.concurrent.ListenableFuture;
  24 +import com.google.common.util.concurrent.ListeningScheduledExecutorService;
  25 +import com.google.common.util.concurrent.MoreExecutors;
  26 +import lombok.Getter;
  27 +import lombok.extern.slf4j.Slf4j;
  28 +import org.springframework.beans.factory.annotation.Autowired;
  29 +import org.springframework.beans.factory.annotation.Value;
  30 +import org.springframework.stereotype.Service;
  31 +import org.thingsboard.server.actors.service.ActorService;
  32 +import org.thingsboard.server.common.data.DataConstants;
  33 +import org.thingsboard.server.common.data.Device;
  34 +import org.thingsboard.server.common.data.Tenant;
  35 +import org.thingsboard.server.common.data.id.DeviceId;
  36 +import org.thingsboard.server.common.data.id.TenantId;
  37 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  38 +import org.thingsboard.server.common.data.page.TextPageLink;
  39 +import org.thingsboard.server.common.msg.TbMsg;
  40 +import org.thingsboard.server.common.msg.TbMsgDataType;
  41 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  42 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
  43 +import org.thingsboard.server.dao.attributes.AttributesService;
  44 +import org.thingsboard.server.dao.device.DeviceService;
  45 +import org.thingsboard.server.dao.tenant.TenantService;
  46 +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
  47 +
  48 +import javax.annotation.Nullable;
  49 +import javax.annotation.PostConstruct;
  50 +import javax.annotation.PreDestroy;
  51 +import java.util.ArrayList;
  52 +import java.util.Arrays;
  53 +import java.util.HashSet;
  54 +import java.util.List;
  55 +import java.util.Optional;
  56 +import java.util.Set;
  57 +import java.util.concurrent.ConcurrentHashMap;
  58 +import java.util.concurrent.ConcurrentMap;
  59 +import java.util.concurrent.ExecutionException;
  60 +import java.util.concurrent.Executors;
  61 +import java.util.concurrent.TimeUnit;
  62 +
  63 +import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
  64 +import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
  65 +import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
  66 +import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;
  67 +
  68 +/**
  69 + * Created by ashvayka on 01.05.18.
  70 + */
  71 +@Service
  72 +@Slf4j
  73 +//TODO: refactor to use page links as cursor and not fetch all
  74 +public class DefaultDeviceStateService implements DeviceStateService {
  75 +
  76 + private static final ObjectMapper json = new ObjectMapper();
  77 + public static final String ACTIVITY_STATE = "active";
  78 + public static final String LAST_CONNECT_TIME = "lastConnectTime";
  79 + public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime";
  80 + public static final String LAST_ACTIVITY_TIME = "lastActivityTime";
  81 + public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
  82 + public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";
  83 +
  84 + public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
  85 +
  86 + @Autowired
  87 + private TenantService tenantService;
  88 +
  89 + @Autowired
  90 + private DeviceService deviceService;
  91 +
  92 + @Autowired
  93 + private AttributesService attributesService;
  94 +
  95 + @Autowired
  96 + private ActorService actorService;
  97 +
  98 + @Autowired
  99 + private TelemetrySubscriptionService tsSubService;
  100 +
  101 + @Value("${state.defaultInactivityTimeoutInSec}")
  102 + @Getter
  103 + private long defaultInactivityTimeoutInSec;
  104 +
  105 + @Value("${state.defaultStateCheckIntervalInSec}")
  106 + @Getter
  107 + private long defaultStateCheckIntervalInSec;
  108 +
  109 +// TODO in v2.1
  110 +// @Value("${state.defaultStatePersistenceIntervalInSec}")
  111 +// @Getter
  112 +// private long defaultStatePersistenceIntervalInSec;
  113 +//
  114 +// @Value("${state.defaultStatePersistencePack}")
  115 +// @Getter
  116 +// private long defaultStatePersistencePack;
  117 +
  118 + private ListeningScheduledExecutorService queueExecutor;
  119 +
  120 + private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
  121 + private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
  122 +
  123 + @PostConstruct
  124 + public void init() {
  125 + // Should be always single threaded due to absence of locks.
  126 + queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
  127 + queueExecutor.submit(this::initStateFromDB);
  128 + queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
  129 + //TODO: schedule persistence in v2.1;
  130 + }
  131 +
  132 + @PreDestroy
  133 + public void stop() {
  134 + if (queueExecutor != null) {
  135 + queueExecutor.shutdownNow();
  136 + }
  137 + }
  138 +
  139 + @Override
  140 + public void onDeviceAdded(Device device) {
  141 + queueExecutor.submit(() -> onDeviceAddedSync(device));
  142 + }
  143 +
  144 + @Override
  145 + public void onDeviceUpdated(Device device) {
  146 + queueExecutor.submit(() -> onDeviceUpdatedSync(device));
  147 + }
  148 +
  149 + @Override
  150 + public void onDeviceConnect(DeviceId deviceId) {
  151 + queueExecutor.submit(() -> onDeviceConnectSync(deviceId));
  152 + }
  153 +
  154 + @Override
  155 + public void onDeviceActivity(DeviceId deviceId) {
  156 + queueExecutor.submit(() -> onDeviceActivitySync(deviceId));
  157 + }
  158 +
  159 + @Override
  160 + public void onDeviceDisconnect(DeviceId deviceId) {
  161 + queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId));
  162 + }
  163 +
  164 + @Override
  165 + public void onDeviceDeleted(Device device) {
  166 + queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId()));
  167 + }
  168 +
  169 + @Override
  170 + public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
  171 + queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout));
  172 + }
  173 +
  174 + @Override
  175 + public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
  176 + DeviceStateData state = deviceStates.get(deviceId);
  177 + if (state != null) {
  178 + return Optional.of(state.getState());
  179 + } else {
  180 + return Optional.empty();
  181 + }
  182 + }
  183 +
  184 + private void initStateFromDB() {
  185 + List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
  186 + for (Tenant tenant : tenants) {
  187 + List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
  188 + List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
  189 + for (Device device : devices) {
  190 + fetchFutures.add(fetchDeviceState(device));
  191 + }
  192 + try {
  193 + Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
  194 + } catch (InterruptedException | ExecutionException e) {
  195 + log.warn("Failed to init device state service from DB", e);
  196 + }
  197 + }
  198 + }
  199 +
  200 + private void addDeviceUsingState(DeviceStateData state) {
  201 + tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
  202 + deviceStates.put(state.getDeviceId(), state);
  203 + }
  204 +
  205 + private void updateState() {
  206 + long ts = System.currentTimeMillis();
  207 + Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
  208 + for (DeviceId deviceId : deviceIds) {
  209 + DeviceStateData stateData = deviceStates.get(deviceId);
  210 + DeviceState state = stateData.getState();
  211 + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
  212 + if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
  213 + state.setLastInactivityAlarmTime(ts);
  214 + pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
  215 + saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
  216 + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
  217 + }
  218 + }
  219 + }
  220 +
  221 + private void onDeviceConnectSync(DeviceId deviceId) {
  222 + DeviceStateData stateData = deviceStates.get(deviceId);
  223 + if (stateData != null) {
  224 + long ts = System.currentTimeMillis();
  225 + stateData.getState().setLastConnectTime(ts);
  226 + pushRuleEngineMessage(stateData, CONNECT_EVENT);
  227 + saveAttribute(deviceId, LAST_CONNECT_TIME, ts);
  228 + }
  229 + }
  230 +
  231 + private void onDeviceDisconnectSync(DeviceId deviceId) {
  232 + DeviceStateData stateData = deviceStates.get(deviceId);
  233 + if (stateData != null) {
  234 + long ts = System.currentTimeMillis();
  235 + stateData.getState().setLastDisconnectTime(ts);
  236 + pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
  237 + saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts);
  238 + }
  239 + }
  240 +
  241 + private void onDeviceActivitySync(DeviceId deviceId) {
  242 + DeviceStateData stateData = deviceStates.get(deviceId);
  243 + if (stateData != null) {
  244 + DeviceState state = stateData.getState();
  245 + long ts = System.currentTimeMillis();
  246 + state.setActive(true);
  247 + stateData.getState().setLastActivityTime(ts);
  248 + pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
  249 + saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts);
  250 + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
  251 + }
  252 + }
  253 +
  254 + private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
  255 + if (inactivityTimeout == 0L) {
  256 + return;
  257 + }
  258 + DeviceStateData stateData = deviceStates.get(deviceId);
  259 + if (stateData != null) {
  260 + long ts = System.currentTimeMillis();
  261 + DeviceState state = stateData.getState();
  262 + state.setInactivityTimeout(inactivityTimeout);
  263 + boolean oldActive = state.isActive();
  264 + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
  265 + if (!oldActive && state.isActive()) {
  266 + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
  267 + }
  268 + }
  269 + }
  270 +
  271 + private void onDeviceAddedSync(Device device) {
  272 + Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
  273 + @Override
  274 + public void onSuccess(@Nullable DeviceStateData state) {
  275 + addDeviceUsingState(state);
  276 + }
  277 +
  278 + @Override
  279 + public void onFailure(Throwable t) {
  280 + log.warn("Failed to register device to the state service", t);
  281 + }
  282 + });
  283 + }
  284 +
  285 + private void onDeviceUpdatedSync(Device device) {
  286 + DeviceStateData stateData = deviceStates.get(device.getId());
  287 + if (stateData != null) {
  288 + TbMsgMetaData md = new TbMsgMetaData();
  289 + md.putValue("deviceName", device.getName());
  290 + md.putValue("deviceType", device.getType());
  291 + stateData.setMetaData(md);
  292 + }
  293 + }
  294 +
  295 + private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
  296 + deviceStates.remove(deviceId);
  297 + Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
  298 + if (deviceIds != null) {
  299 + deviceIds.remove(deviceId);
  300 + if (deviceIds.isEmpty()) {
  301 + tenantDevices.remove(tenantId);
  302 + }
  303 + }
  304 + }
  305 +
  306 + private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
  307 + ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
  308 + return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() {
  309 + @Nullable
  310 + @Override
  311 + public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) {
  312 + long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L);
  313 + long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L);
  314 + long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
  315 + boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
  316 + DeviceState deviceState = DeviceState.builder()
  317 + .active(active)
  318 + .lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L))
  319 + .lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L))
  320 + .lastActivityTime(lastActivityTime)
  321 + .lastInactivityAlarmTime(inactivityAlarmTime)
  322 + .inactivityTimeout(inactivityTimeout)
  323 + .build();
  324 + TbMsgMetaData md = new TbMsgMetaData();
  325 + md.putValue("deviceName", device.getName());
  326 + md.putValue("deviceType", device.getType());
  327 + return DeviceStateData.builder()
  328 + .tenantId(device.getTenantId())
  329 + .deviceId(device.getId())
  330 + .metaData(md)
  331 + .state(deviceState).build();
  332 + }
  333 + });
  334 + }
  335 +
  336 + private long getLastPersistTime(List<AttributeKvEntry> attributes) {
  337 + return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L);
  338 + }
  339 +
  340 + private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
  341 + for (AttributeKvEntry attribute : attributes) {
  342 + if (attribute.getKey().equals(attributeName)) {
  343 + return attribute.getLongValue().orElse(defaultValue);
  344 + }
  345 + }
  346 + return defaultValue;
  347 + }
  348 +
  349 + private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
  350 + DeviceState state = stateData.getState();
  351 + try {
  352 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON
  353 + , json.writeValueAsString(state)
  354 + , null, null, 0L);
  355 + actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
  356 + } catch (Exception e) {
  357 + log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
  358 + }
  359 + }
  360 +
  361 + private void saveAttribute(DeviceId deviceId, String key, long value) {
  362 + tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
  363 + }
  364 +
  365 + private void saveAttribute(DeviceId deviceId, String key, boolean value) {
  366 + tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
  367 + }
  368 +
  369 + private class AttributeSaveCallback implements FutureCallback<Void> {
  370 + private final DeviceId deviceId;
  371 + private final String key;
  372 + private final Object value;
  373 +
  374 + AttributeSaveCallback(DeviceId deviceId, String key, Object value) {
  375 + this.deviceId = deviceId;
  376 + this.key = key;
  377 + this.value = value;
  378 + }
  379 +
  380 + @Override
  381 + public void onSuccess(@Nullable Void result) {
  382 + log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
  383 + }
  384 +
  385 + @Override
  386 + public void onFailure(Throwable t) {
  387 + log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t);
  388 + }
  389 + }
  390 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.state;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Data;
  20 +
  21 +/**
  22 + * Created by ashvayka on 01.05.18.
  23 + */
  24 +@Data
  25 +@Builder
  26 +public class DeviceState {
  27 +
  28 + private boolean active;
  29 + private long lastConnectTime;
  30 + private long lastActivityTime;
  31 + private long lastDisconnectTime;
  32 + private long lastInactivityAlarmTime;
  33 + private long inactivityTimeout;
  34 +
  35 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.state;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.common.data.id.DeviceId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  23 +
  24 +/**
  25 + * Created by ashvayka on 01.05.18.
  26 + */
  27 +@Data
  28 +@Builder
  29 +class DeviceStateData {
  30 +
  31 + private final TenantId tenantId;
  32 + private final DeviceId deviceId;
  33 +
  34 + private TbMsgMetaData metaData;
  35 + private final DeviceState state;
  36 +
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.state;
  17 +
  18 +import org.thingsboard.server.common.data.Device;
  19 +import org.thingsboard.server.common.data.id.DeviceId;
  20 +
  21 +import java.util.Optional;
  22 +
  23 +/**
  24 + * Created by ashvayka on 01.05.18.
  25 + */
  26 +public interface DeviceStateService {
  27 +
  28 + void onDeviceAdded(Device device);
  29 +
  30 + void onDeviceUpdated(Device device);
  31 +
  32 + void onDeviceDeleted(Device device);
  33 +
  34 + void onDeviceConnect(DeviceId deviceId);
  35 +
  36 + void onDeviceActivity(DeviceId deviceId);
  37 +
  38 + void onDeviceDisconnect(DeviceId deviceId);
  39 +
  40 + void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
  41 +
  42 + Optional<DeviceState> getDeviceState(DeviceId deviceId);
  43 +
  44 +}
... ...
... ... @@ -20,11 +20,20 @@ import com.google.common.util.concurrent.Futures;
20 20 import com.google.common.util.concurrent.ListenableFuture;
21 21 import lombok.extern.slf4j.Slf4j;
22 22 import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.context.annotation.Lazy;
23 24 import org.springframework.stereotype.Service;
24 25 import org.springframework.util.StringUtils;
  26 +import org.thingsboard.server.common.data.DataConstants;
  27 +import org.thingsboard.server.common.data.EntityType;
  28 +import org.thingsboard.server.common.data.id.DeviceId;
25 29 import org.thingsboard.server.common.data.id.EntityId;
26 30 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  31 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
27 32 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  33 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  34 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  35 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  36 +import org.thingsboard.server.common.data.kv.StringDataEntry;
28 37 import org.thingsboard.server.common.data.kv.TsKvEntry;
29 38 import org.thingsboard.server.common.msg.cluster.ServerAddress;
30 39 import org.thingsboard.server.dao.attributes.AttributesService;
... ... @@ -34,11 +43,14 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
34 43 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
35 44 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
36 45 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  46 +import org.thingsboard.server.service.state.DefaultDeviceStateService;
  47 +import org.thingsboard.server.service.state.DeviceStateService;
37 48
38 49 import javax.annotation.Nullable;
39 50 import javax.annotation.PostConstruct;
40 51 import javax.annotation.PreDestroy;
41 52 import java.util.ArrayList;
  53 +import java.util.Collections;
42 54 import java.util.HashMap;
43 55 import java.util.HashSet;
44 56 import java.util.List;
... ... @@ -70,6 +82,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
70 82 @Autowired
71 83 private ClusterRoutingService routingService;
72 84
  85 + @Autowired
  86 + @Lazy
  87 + private DeviceStateService stateService;
  88 +
73 89 private ExecutorService tsCallBackExecutor;
74 90 private ExecutorService wsCallBackExecutor;
75 91
... ... @@ -149,10 +165,41 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
149 165 addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
150 166 }
151 167
  168 + @Override
  169 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
  170 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
  171 + , System.currentTimeMillis())), callback);
  172 + }
  173 +
  174 + @Override
  175 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback) {
  176 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(key, value)
  177 + , System.currentTimeMillis())), callback);
  178 + }
  179 +
  180 + @Override
  181 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback) {
  182 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new DoubleDataEntry(key, value)
  183 + , System.currentTimeMillis())), callback);
  184 + }
  185 +
  186 + @Override
  187 + public void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback) {
  188 + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
  189 + , System.currentTimeMillis())), callback);
  190 + }
  191 +
152 192 private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
153 193 Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
154 194 if (!serverAddress.isPresent()) {
155 195 onLocalAttributesUpdate(entityId, scope, attributes);
  196 + if (entityId.getEntityType() == EntityType.DEVICE && DataConstants.SERVER_SCOPE.equalsIgnoreCase(scope)) {
  197 + for (AttributeKvEntry attribute : attributes) {
  198 + if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
  199 + stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
  200 + }
  201 + }
  202 + }
156 203 } else {
157 204 // rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
158 205 }
... ...
... ... @@ -359,4 +359,11 @@ audit_log:
359 359 host: "${AUDIT_LOG_SINK_HOST:localhost}"
360 360 port: "${AUDIT_LOG_SINK_POST:9200}"
361 361 user_name: "${AUDIT_LOG_SINK_USER_NAME:}"
362   - password: "${AUDIT_LOG_SINK_PASSWORD:}"
\ No newline at end of file
  362 + password: "${AUDIT_LOG_SINK_PASSWORD:}"
  363 +
  364 +state:
  365 + defaultInactivityTimeoutInSec: 10
  366 + defaultStateCheckIntervalInSec: 10
  367 +# TODO in v2.1
  368 +# defaultStatePersistenceIntervalInSec: 60
  369 +# defaultStatePersistencePack: 100
\ No newline at end of file
... ...
... ... @@ -45,4 +45,9 @@ public class DataConstants {
45 45 public static final String IN = "IN";
46 46 public static final String OUT = "OUT";
47 47
  48 + public static final String INACTIVITY_EVENT = "INACTIVITY_EVENT";
  49 + public static final String CONNECT_EVENT = "CONNECT_EVENT";
  50 + public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT";
  51 + public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT";
  52 +
48 53 }
... ...
... ... @@ -33,4 +33,12 @@ public interface RuleEngineTelemetryService {
33 33
34 34 void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
35 35
  36 + void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
  37 +
  38 + void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);
  39 +
  40 + void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback);
  41 +
  42 + void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
  43 +
36 44 }
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.filter;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.rule.engine.TbNodeUtils;
20 20 import org.thingsboard.rule.engine.api.*;
  21 +import org.thingsboard.server.common.data.DataConstants;
21 22 import org.thingsboard.server.common.data.plugin.ComponentType;
22 23 import org.thingsboard.server.common.msg.TbMsg;
23 24 import org.thingsboard.server.common.msg.session.SessionMsgType;
... ... @@ -27,7 +28,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
27 28 type = ComponentType.FILTER,
28 29 name = "message type switch",
29 30 configClazz = EmptyNodeConfiguration.class,
30   - relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Other"},
  31 + relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Other"},
31 32 nodeDescription = "Route incoming messages by Message Type",
32 33 nodeDetails = "Sends messages with message types <b>\"Post attributes\", \"Post telemetry\", \"RPC Request\"</b> via corresponding chain, otherwise <b>Other</b> chain is used.",
33 34 uiResources = {"static/rulenode/rulenode-core-config.js"},
... ... @@ -50,7 +51,15 @@ public class TbMsgTypeSwitchNode implements TbNode {
50 51 relationType = "Post telemetry";
51 52 } else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) {
52 53 relationType = "RPC Request";
53   - } else {
  54 + } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) {
  55 + relationType = "Activity Event";
  56 + } else if (msg.getType().equals(DataConstants.INACTIVITY_EVENT)) {
  57 + relationType = "Inactivity Event";
  58 + } else if (msg.getType().equals(DataConstants.CONNECT_EVENT)) {
  59 + relationType = "Connect Event";
  60 + } else if (msg.getType().equals(DataConstants.DISCONNECT_EVENT)) {
  61 + relationType = "Disconnect Event";
  62 + } else {
54 63 relationType = "Other";
55 64 }
56 65 ctx.tellNext(msg, relationType);
... ...