Commit 494bb544e2918a6f17c9a6a79b31310bac3ec0a6

Authored by vparomskiy
1 parent 8bc49896

implement DeviceOfflineService

Showing 29 changed files with 752 additions and 36 deletions
... ... @@ -93,3 +93,5 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
93 93 PRIMARY KEY (id)
94 94 );
95 95
  96 +ALTER TABLE thingsboard.device ADD last_connect bigint;
  97 +ALTER TABLE thingsboard.device ADD last_update bigint;
\ No newline at end of file
... ...
... ... @@ -34,4 +34,7 @@ CREATE TABLE IF NOT EXISTS rule_node (
34 34 name varchar(255),
35 35 debug_mode boolean,
36 36 search_text varchar(255)
37   -);
\ No newline at end of file
  37 +);
  38 +
  39 +ALTER TABLE device ADD COLUMN IF NOT EXISTS last_connect BIGINT;
  40 +ALTER TABLE device ADD COLUMN IF NOT EXISTS last_update BIGINT;
\ No newline at end of file
... ...
... ... @@ -45,6 +45,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
45 45 import org.thingsboard.server.dao.customer.CustomerService;
46 46 import org.thingsboard.server.dao.dashboard.DashboardService;
47 47 import org.thingsboard.server.dao.device.DeviceCredentialsService;
  48 +import org.thingsboard.server.dao.device.DeviceOfflineService;
48 49 import org.thingsboard.server.dao.device.DeviceService;
49 50 import org.thingsboard.server.dao.exception.DataValidationException;
50 51 import org.thingsboard.server.dao.exception.IncorrectParameterException;
... ... @@ -133,6 +134,9 @@ public abstract class BaseController {
133 134 @Autowired
134 135 protected AuditLogService auditLogService;
135 136
  137 + @Autowired
  138 + protected DeviceOfflineService offlineService;
  139 +
136 140 @ExceptionHandler(ThingsboardException.class)
137 141 public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
138 142 errorResponseHandler.handle(ex, response);
... ...
... ... @@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.EntitySubtype;
25 25 import org.thingsboard.server.common.data.EntityType;
26 26 import org.thingsboard.server.common.data.audit.ActionType;
27 27 import org.thingsboard.server.common.data.device.DeviceSearchQuery;
  28 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
  29 +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
  30 +import org.thingsboard.server.common.data.exception.ThingsboardException;
28 31 import org.thingsboard.server.common.data.id.CustomerId;
29 32 import org.thingsboard.server.common.data.id.DeviceId;
30 33 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -34,8 +37,6 @@ import org.thingsboard.server.common.data.security.Authority;
34 37 import org.thingsboard.server.common.data.security.DeviceCredentials;
35 38 import org.thingsboard.server.dao.exception.IncorrectParameterException;
36 39 import org.thingsboard.server.dao.model.ModelConstants;
37   -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
38   -import org.thingsboard.server.common.data.exception.ThingsboardException;
39 40 import org.thingsboard.server.service.security.model.SecurityUser;
40 41
41 42 import java.util.ArrayList;
... ... @@ -69,7 +70,7 @@ public class DeviceController extends BaseController {
69 70 device.setTenantId(getCurrentUser().getTenantId());
70 71 if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
71 72 if (device.getId() == null || device.getId().isNullUid() ||
72   - device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
  73 + device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
73 74 throw new ThingsboardException("You don't have permission to perform this operation!",
74 75 ThingsboardErrorCode.PERMISSION_DENIED);
75 76 } else {
... ... @@ -367,4 +368,32 @@ public class DeviceController extends BaseController {
367 368 throw handleException(e);
368 369 }
369 370 }
  371 +
  372 + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
  373 + @RequestMapping(value = "/device/offline", method = RequestMethod.GET)
  374 + @ResponseBody
  375 + public List<Device> getOfflineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
  376 + @RequestParam("threshold") long threshold) throws ThingsboardException {
  377 + try {
  378 + TenantId tenantId = getCurrentUser().getTenantId();
  379 + ListenableFuture<List<Device>> offlineDevices = offlineService.findOfflineDevices(tenantId.getId(), contactType, threshold);
  380 + return checkNotNull(offlineDevices.get());
  381 + } catch (Exception e) {
  382 + throw handleException(e);
  383 + }
  384 + }
  385 +
  386 + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
  387 + @RequestMapping(value = "/device/online", method = RequestMethod.GET)
  388 + @ResponseBody
  389 + public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
  390 + @RequestParam("threshold") long threshold) throws ThingsboardException {
  391 + try {
  392 + TenantId tenantId = getCurrentUser().getTenantId();
  393 + ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
  394 + return checkNotNull(offlineDevices.get());
  395 + } catch (Exception e) {
  396 + throw handleException(e);
  397 + }
  398 + }
370 399 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.system;
  17 +
  18 +import com.fasterxml.jackson.core.type.TypeReference;
  19 +import com.google.common.collect.ImmutableMap;
  20 +import org.junit.Before;
  21 +import org.junit.Test;
  22 +import org.thingsboard.server.common.data.Device;
  23 +import org.thingsboard.server.common.data.security.DeviceCredentials;
  24 +import org.thingsboard.server.controller.AbstractControllerTest;
  25 +
  26 +import java.util.List;
  27 +import java.util.UUID;
  28 +import java.util.concurrent.TimeUnit;
  29 +
  30 +import static org.junit.Assert.assertEquals;
  31 +
  32 +public class BaseDeviceOfflineTest extends AbstractControllerTest {
  33 +
  34 + private Device deviceA;
  35 + private Device deviceB;
  36 + private DeviceCredentials credA;
  37 + private DeviceCredentials credB;
  38 +
  39 + @Before
  40 + public void before() throws Exception {
  41 + loginTenantAdmin();
  42 + deviceA = createDevice("DevA", "VMS");
  43 + credA = getCredentials(deviceA.getUuidId());
  44 + deviceB = createDevice("DevB", "SOLAR");
  45 + credB = getCredentials(deviceB.getUuidId());
  46 + }
  47 +
  48 + @Test
  49 + public void offlineDevicesCanBeFoundByLastConnectField() throws Exception {
  50 + makeDeviceContact(credA);
  51 + Thread.sleep(1000);
  52 + makeDeviceContact(credB);
  53 + Thread.sleep(100);
  54 + List<Device> devices = doGetTyped("/api/device/offline?contactType=CONNECT&threshold=700", new TypeReference<List<Device>>() {
  55 + });
  56 +
  57 + assertEquals(devices.toString(),1, devices.size());
  58 + assertEquals("DevA", devices.get(0).getName());
  59 + }
  60 +
  61 + @Test
  62 + public void offlineDevicesCanBeFoundByLastUpdateField() throws Exception {
  63 + makeDeviceUpdate(credA);
  64 + Thread.sleep(1000);
  65 + makeDeviceUpdate(credB);
  66 + makeDeviceContact(credA);
  67 + Thread.sleep(100);
  68 + List<Device> devices = doGetTyped("/api/device/offline?contactType=UPLOAD&threshold=700", new TypeReference<List<Device>>() {
  69 + });
  70 +
  71 + assertEquals(devices.toString(),1, devices.size());
  72 + assertEquals("DevA", devices.get(0).getName());
  73 + }
  74 +
  75 + @Test
  76 + public void onlineDevicesCanBeFoundByLastConnectField() throws Exception {
  77 + makeDeviceContact(credB);
  78 + Thread.sleep(1000);
  79 + makeDeviceContact(credA);
  80 + Thread.sleep(100);
  81 + List<Device> devices = doGetTyped("/api/device/online?contactType=CONNECT&threshold=700", new TypeReference<List<Device>>() {
  82 + });
  83 +
  84 + assertEquals(devices.toString(),1, devices.size());
  85 + assertEquals("DevA", devices.get(0).getName());
  86 + }
  87 +
  88 + @Test
  89 + public void onlineDevicesCanBeFoundByLastUpdateField() throws Exception {
  90 + makeDeviceUpdate(credB);
  91 + Thread.sleep(1000);
  92 + makeDeviceUpdate(credA);
  93 + makeDeviceContact(credB);
  94 + Thread.sleep(100);
  95 + List<Device> devices = doGetTyped("/api/device/online?contactType=UPLOAD&threshold=700", new TypeReference<List<Device>>() {
  96 + });
  97 +
  98 + assertEquals(devices.toString(),1, devices.size());
  99 + assertEquals("DevA", devices.get(0).getName());
  100 + }
  101 +
  102 + private Device createDevice(String name, String type) throws Exception {
  103 + Device device = new Device();
  104 + device.setName(name);
  105 + device.setType(type);
  106 + long currentTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10);
  107 + device.setLastConnectTs(currentTime);
  108 + device.setLastUpdateTs(currentTime);
  109 + return doPost("/api/device", device, Device.class);
  110 + }
  111 +
  112 + private DeviceCredentials getCredentials(UUID deviceId) throws Exception {
  113 + return doGet("/api/device/" + deviceId.toString() + "/credentials", DeviceCredentials.class);
  114 + }
  115 +
  116 + private void makeDeviceUpdate(DeviceCredentials credentials) throws Exception {
  117 + doPost("/api/v1/" + credentials.getCredentialsId() + "/attributes", ImmutableMap.of("keyA", "valueA"), new String[]{});
  118 + }
  119 +
  120 + private void makeDeviceContact(DeviceCredentials credentials) throws Exception {
  121 + doGet("/api/v1/" + credentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC");
  122 + }
  123 +}
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.system;
17 17
  18 +import com.google.common.collect.ImmutableMap;
18 19 import org.junit.Before;
19 20 import org.junit.Test;
20 21 import org.springframework.test.web.servlet.ResultActions;
... ... @@ -28,6 +29,9 @@ import java.util.Map;
28 29 import java.util.Random;
29 30 import java.util.concurrent.atomic.AtomicInteger;
30 31
  32 +import static org.junit.Assert.assertEquals;
  33 +import static org.junit.Assert.assertNotNull;
  34 +import static org.junit.Assert.assertTrue;
31 35 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
32 36 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
33 37 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
... ... @@ -48,6 +52,9 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
48 52 device = new Device();
49 53 device.setName("My device");
50 54 device.setType("default");
  55 + long currentTime = System.currentTimeMillis();
  56 + device.setLastConnectTs(currentTime);
  57 + device.setLastUpdateTs(currentTime);
51 58 device = doPost("/api/device", device, Device.class);
52 59
53 60 deviceCredentials =
... ... @@ -67,6 +74,34 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
67 74 doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk());
68 75 }
69 76
  77 + @Test
  78 + public void deviceLastContactAndUpdateFieldsAreUpdated() throws Exception {
  79 + Device actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
  80 + Long initConnectTs = actualDevice.getLastConnectTs();
  81 + Long initUpdateTs = actualDevice.getLastUpdateTs();
  82 + assertNotNull(initConnectTs);
  83 + assertNotNull(initUpdateTs);
  84 + Thread.sleep(50);
  85 +
  86 + doPost("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes", ImmutableMap.of("keyA", "valueA"), new String[]{});
  87 + actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
  88 + Long postConnectTs = actualDevice.getLastConnectTs();
  89 + Long postUpdateTs = actualDevice.getLastUpdateTs();
  90 + System.out.println(postConnectTs + " - " + postUpdateTs + " -> " + (postConnectTs - initConnectTs) + " : " + (postUpdateTs - initUpdateTs));
  91 + assertTrue(postConnectTs > initConnectTs);
  92 + assertEquals(postConnectTs, postUpdateTs);
  93 + Thread.sleep(50);
  94 +
  95 + doGet("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC");
  96 + Thread.sleep(50);
  97 + actualDevice = doGet("/api/device/" + this.device.getId(), Device.class);
  98 + Long getConnectTs = actualDevice.getLastConnectTs();
  99 + Long getUpdateTs = actualDevice.getLastUpdateTs();
  100 + assertTrue(getConnectTs > postConnectTs);
  101 + assertEquals(getUpdateTs, postUpdateTs);
  102 +
  103 + }
  104 +
70 105 protected ResultActions doGetAsync(String urlTemplate, Object... urlVariables) throws Exception {
71 106 MockHttpServletRequestBuilder getRequest;
72 107 getRequest = get(urlTemplate, urlVariables);
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.system.nosql;
  17 +
  18 +import org.thingsboard.server.dao.service.DaoNoSqlTest;
  19 +import org.thingsboard.server.system.BaseDeviceOfflineTest;
  20 +
  21 +@DaoNoSqlTest
  22 +public class DeviceOfflineNoSqlTest extends BaseDeviceOfflineTest {
  23 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.system.sql;
  17 +
  18 +import org.thingsboard.server.dao.service.DaoSqlTest;
  19 +import org.thingsboard.server.system.BaseDeviceOfflineTest;
  20 +
  21 +@DaoSqlTest
  22 +public class DeviceOfflineSqlTest extends BaseDeviceOfflineTest {
  23 +}
... ...
... ... @@ -31,6 +31,8 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
31 31 private CustomerId customerId;
32 32 private String name;
33 33 private String type;
  34 + private Long lastConnectTs;
  35 + private Long lastUpdateTs;
34 36
35 37 public Device() {
36 38 super();
... ... @@ -81,6 +83,22 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
81 83 this.type = type;
82 84 }
83 85
  86 + public Long getLastConnectTs() {
  87 + return lastConnectTs;
  88 + }
  89 +
  90 + public void setLastConnectTs(Long lastConnectTs) {
  91 + this.lastConnectTs = lastConnectTs;
  92 + }
  93 +
  94 + public Long getLastUpdateTs() {
  95 + return lastUpdateTs;
  96 + }
  97 +
  98 + public void setLastUpdateTs(Long lastUpdateTs) {
  99 + this.lastUpdateTs = lastUpdateTs;
  100 + }
  101 +
84 102 @Override
85 103 public String getSearchText() {
86 104 return getName();
... ... @@ -101,6 +119,10 @@ public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implemen
101 119 builder.append(getAdditionalInfo());
102 120 builder.append(", createdTime=");
103 121 builder.append(createdTime);
  122 + builder.append(", lastUpdateTs=");
  123 + builder.append(lastUpdateTs);
  124 + builder.append(", lastConnectTs=");
  125 + builder.append(lastConnectTs);
104 126 builder.append(", id=");
105 127 builder.append(id);
106 128 builder.append("]");
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.device;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import lombok.ToString;
  21 +
  22 +@Data
  23 +@AllArgsConstructor
  24 +@ToString
  25 +public class DeviceStatusQuery {
  26 +
  27 + private Status status;
  28 + private ContactType contactType;
  29 + private long threshold;
  30 +
  31 +
  32 + public enum Status {
  33 + ONLINE, OFFLINE
  34 + }
  35 +
  36 + public enum ContactType {
  37 + CONNECT, UPLOAD
  38 + }
  39 +
  40 +}
... ...
... ... @@ -15,9 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.dao.device;
17 17
18   -import com.datastax.driver.core.ResultSet;
19   -import com.datastax.driver.core.ResultSetFuture;
20   -import com.datastax.driver.core.Statement;
  18 +import com.datastax.driver.core.*;
  19 +import com.datastax.driver.core.querybuilder.Clause;
  20 +import com.datastax.driver.core.querybuilder.QueryBuilder;
21 21 import com.datastax.driver.core.querybuilder.Select;
22 22 import com.datastax.driver.mapping.Result;
23 23 import com.google.common.base.Function;
... ... @@ -28,9 +28,11 @@ import org.springframework.stereotype.Component;
28 28 import org.thingsboard.server.common.data.Device;
29 29 import org.thingsboard.server.common.data.EntitySubtype;
30 30 import org.thingsboard.server.common.data.EntityType;
  31 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
31 32 import org.thingsboard.server.common.data.page.TextPageLink;
32 33 import org.thingsboard.server.dao.DaoUtil;
33 34 import org.thingsboard.server.dao.model.EntitySubtypeEntity;
  35 +import org.thingsboard.server.dao.model.ModelConstants;
34 36 import org.thingsboard.server.dao.model.nosql.DeviceEntity;
35 37 import org.thingsboard.server.dao.nosql.CassandraAbstractSearchTextDao;
36 38 import org.thingsboard.server.dao.util.NoSqlDao;
... ... @@ -157,7 +159,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
157 159 if (result != null) {
158 160 List<EntitySubtype> entitySubtypes = new ArrayList<>();
159 161 result.all().forEach((entitySubtypeEntity) ->
160   - entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
  162 + entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
161 163 );
162 164 return entitySubtypes;
163 165 } else {
... ... @@ -167,4 +169,68 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
167 169 });
168 170 }
169 171
  172 + @Override
  173 + public ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery) {
  174 + log.debug("Try to find [{}] devices by tenantId [{}]", statusQuery.getStatus(), tenantId);
  175 +
  176 + Select select = select().from(DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME).allowFiltering();
  177 + Select.Where query = select.where();
  178 + query.and(eq(DEVICE_TENANT_ID_PROPERTY, tenantId));
  179 + Clause clause = statusClause(statusQuery);
  180 + query.and(clause);
  181 + return findListByStatementAsync(query);
  182 + }
  183 +
  184 + @Override
  185 + public ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery) {
  186 + log.debug("Try to find [{}] devices by tenantId [{}] and type [{}]", statusQuery.getStatus(), tenantId, type);
  187 +
  188 + Select select = select().from(DEVICE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME).allowFiltering();
  189 + Select.Where query = select.where()
  190 + .and(eq(DEVICE_TENANT_ID_PROPERTY, tenantId))
  191 + .and(eq(DEVICE_TYPE_PROPERTY, type));
  192 +
  193 + query.and(statusClause(statusQuery));
  194 + return findListByStatementAsync(query);
  195 + }
  196 +
  197 +
  198 + @Override
  199 + public void saveDeviceStatus(Device device) {
  200 + PreparedStatement statement = prepare("insert into " +
  201 + "device (id, tenant_id, customer_id, type, last_connect, last_update) values (?, ?, ?, ?, ?, ?)");
  202 + BoundStatement boundStatement = statement.bind(device.getUuidId(), device.getTenantId().getId(), device.getCustomerId().getId(),
  203 + device.getType(), device.getLastConnectTs(), device.getLastUpdateTs());
  204 + ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);
  205 + Futures.withFallback(resultSetFuture, t -> {
  206 + log.error("Can't update device status for [{}]", device, t);
  207 + throw new IllegalArgumentException("Can't update device status for {" + device + "}", t);
  208 + });
  209 + }
  210 +
  211 + private String getStatusProperty(DeviceStatusQuery statusQuery) {
  212 + switch (statusQuery.getContactType()) {
  213 + case UPLOAD:
  214 + return DEVICE_LAST_UPDATE_PROPERTY;
  215 + case CONNECT:
  216 + return DEVICE_LAST_CONNECT_PROPERTY;
  217 + }
  218 + return null;
  219 + }
  220 +
  221 + private Clause statusClause(DeviceStatusQuery statusQuery) {
  222 + long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
  223 + String statusProperty = getStatusProperty(statusQuery);
  224 + if (statusProperty != null) {
  225 + switch (statusQuery.getStatus()) {
  226 + case ONLINE:
  227 + return gt(statusProperty, minTime);
  228 + case OFFLINE:
  229 + return lt(statusProperty, minTime);
  230 + }
  231 + }
  232 + log.error("Could not build status query from [{}]", statusQuery);
  233 + throw new IllegalStateException("Could not build status query for device []");
  234 + }
  235 +
170 236 }
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device;
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.Device;
20 20 import org.thingsboard.server.common.data.EntitySubtype;
  21 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
21 22 import org.thingsboard.server.common.data.page.TextPageLink;
22 23 import org.thingsboard.server.dao.Dao;
23 24
... ... @@ -27,7 +28,6 @@ import java.util.UUID;
27 28
28 29 /**
29 30 * The Interface DeviceDao.
30   - *
31 31 */
32 32 public interface DeviceDao extends Dao<Device> {
33 33
... ... @@ -52,7 +52,7 @@ public interface DeviceDao extends Dao<Device> {
52 52 * Find devices by tenantId, type and page link.
53 53 *
54 54 * @param tenantId the tenantId
55   - * @param type the type
  55 + * @param type the type
56 56 * @param pageLink the page link
57 57 * @return the list of device objects
58 58 */
... ... @@ -61,7 +61,7 @@ public interface DeviceDao extends Dao<Device> {
61 61 /**
62 62 * Find devices by tenantId and devices Ids.
63 63 *
64   - * @param tenantId the tenantId
  64 + * @param tenantId the tenantId
65 65 * @param deviceIds the device Ids
66 66 * @return the list of device objects
67 67 */
... ... @@ -70,9 +70,9 @@ public interface DeviceDao extends Dao<Device> {
70 70 /**
71 71 * Find devices by tenantId, customerId and page link.
72 72 *
73   - * @param tenantId the tenantId
  73 + * @param tenantId the tenantId
74 74 * @param customerId the customerId
75   - * @param pageLink the page link
  75 + * @param pageLink the page link
76 76 * @return the list of device objects
77 77 */
78 78 List<Device> findDevicesByTenantIdAndCustomerId(UUID tenantId, UUID customerId, TextPageLink pageLink);
... ... @@ -80,10 +80,10 @@ public interface DeviceDao extends Dao<Device> {
80 80 /**
81 81 * Find devices by tenantId, customerId, type and page link.
82 82 *
83   - * @param tenantId the tenantId
  83 + * @param tenantId the tenantId
84 84 * @param customerId the customerId
85   - * @param type the type
86   - * @param pageLink the page link
  85 + * @param type the type
  86 + * @param pageLink the page link
87 87 * @return the list of device objects
88 88 */
89 89 List<Device> findDevicesByTenantIdAndCustomerIdAndType(UUID tenantId, UUID customerId, String type, TextPageLink pageLink);
... ... @@ -92,9 +92,9 @@ public interface DeviceDao extends Dao<Device> {
92 92 /**
93 93 * Find devices by tenantId, customerId and devices Ids.
94 94 *
95   - * @param tenantId the tenantId
  95 + * @param tenantId the tenantId
96 96 * @param customerId the customerId
97   - * @param deviceIds the device Ids
  97 + * @param deviceIds the device Ids
98 98 * @return the list of device objects
99 99 */
100 100 ListenableFuture<List<Device>> findDevicesByTenantIdCustomerIdAndIdsAsync(UUID tenantId, UUID customerId, List<UUID> deviceIds);
... ... @@ -103,7 +103,7 @@ public interface DeviceDao extends Dao<Device> {
103 103 * Find devices by tenantId and device name.
104 104 *
105 105 * @param tenantId the tenantId
106   - * @param name the device name
  106 + * @param name the device name
107 107 * @return the optional device object
108 108 */
109 109 Optional<Device> findDeviceByTenantIdAndName(UUID tenantId, String name);
... ... @@ -114,4 +114,31 @@ public interface DeviceDao extends Dao<Device> {
114 114 * @return the list of tenant device type objects
115 115 */
116 116 ListenableFuture<List<EntitySubtype>> findTenantDeviceTypesAsync(UUID tenantId);
  117 +
  118 + /**
  119 + * Find devices by tenantId, statusQuery and page link.
  120 + *
  121 + * @param tenantId the tenantId
  122 + * @param statusQuery the page link
  123 + * @return the list of device objects
  124 + */
  125 + ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery);
  126 +
  127 + /**
  128 + * Find devices by tenantId, type, statusQuery and page link.
  129 + *
  130 + * @param tenantId the tenantId
  131 + * @param type the type
  132 + * @param statusQuery the page link
  133 + * @return the list of device objects
  134 + */
  135 + ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery);
  136 +
  137 +
  138 + /**
  139 + * Update device last contact and update timestamp async
  140 + *
  141 + * @param device the device object
  142 + */
  143 + void saveDeviceStatus(Device device);
117 144 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.device;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.thingsboard.server.common.data.Device;
  20 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
  21 +
  22 +import java.util.List;
  23 +import java.util.UUID;
  24 +
  25 +public interface DeviceOfflineService {
  26 +
  27 + void online(Device device, boolean isUpdate);
  28 +
  29 + void offline(Device device);
  30 +
  31 + ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold);
  32 +
  33 + ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold);
  34 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.device;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.stereotype.Service;
  21 +import org.thingsboard.server.common.data.Device;
  22 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
  23 +
  24 +import java.util.List;
  25 +import java.util.UUID;
  26 +
  27 +import static org.thingsboard.server.common.data.device.DeviceStatusQuery.Status.OFFLINE;
  28 +import static org.thingsboard.server.common.data.device.DeviceStatusQuery.Status.ONLINE;
  29 +
  30 +@Service
  31 +public class DeviceOfflineServiceImpl implements DeviceOfflineService {
  32 +
  33 + @Autowired
  34 + private DeviceDao deviceDao;
  35 +
  36 + @Override
  37 + public void online(Device device, boolean isUpdate) {
  38 + long current = System.currentTimeMillis();
  39 + device.setLastConnectTs(current);
  40 + if(isUpdate) {
  41 + device.setLastUpdateTs(current);
  42 + }
  43 + deviceDao.saveDeviceStatus(device);
  44 + }
  45 +
  46 + @Override
  47 + public void offline(Device device) {
  48 + online(device, false);
  49 + }
  50 +
  51 + @Override
  52 + public ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
  53 + DeviceStatusQuery statusQuery = new DeviceStatusQuery(OFFLINE, contactType, offlineThreshold);
  54 + return deviceDao.findDevicesByTenantIdAndStatus(tenantId, statusQuery);
  55 + }
  56 +
  57 + @Override
  58 + public ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
  59 + DeviceStatusQuery statusQuery = new DeviceStatusQuery(ONLINE, contactType, offlineThreshold);
  60 + return deviceDao.findDevicesByTenantIdAndStatus(tenantId, statusQuery);
  61 + }
  62 +}
... ...
... ... @@ -133,6 +133,8 @@ public class ModelConstants {
133 133 public static final String DEVICE_NAME_PROPERTY = "name";
134 134 public static final String DEVICE_TYPE_PROPERTY = "type";
135 135 public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
  136 + public static final String DEVICE_LAST_CONNECT_PROPERTY = "last_connect";
  137 + public static final String DEVICE_LAST_UPDATE_PROPERTY = "last_update";
136 138
137 139 public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text";
138 140 public static final String DEVICE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_by_type_and_search_text";
... ...
... ... @@ -63,6 +63,12 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
63 63 @Column(name = DEVICE_ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
64 64 private JsonNode additionalInfo;
65 65
  66 + @Column(name = DEVICE_LAST_CONNECT_PROPERTY)
  67 + private Long lastConnectTs;
  68 +
  69 + @Column(name = DEVICE_LAST_UPDATE_PROPERTY)
  70 + private Long lastUpdateTs;
  71 +
66 72 public DeviceEntity() {
67 73 super();
68 74 }
... ... @@ -80,6 +86,8 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
80 86 this.name = device.getName();
81 87 this.type = device.getType();
82 88 this.additionalInfo = device.getAdditionalInfo();
  89 + this.lastConnectTs = device.getLastConnectTs();
  90 + this.lastUpdateTs = device.getLastUpdateTs();
83 91 }
84 92
85 93 public UUID getId() {
... ... @@ -129,7 +137,23 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
129 137 public void setAdditionalInfo(JsonNode additionalInfo) {
130 138 this.additionalInfo = additionalInfo;
131 139 }
132   -
  140 +
  141 + public Long getLastConnectTs() {
  142 + return lastConnectTs;
  143 + }
  144 +
  145 + public void setLastConnectTs(Long lastConnectTs) {
  146 + this.lastConnectTs = lastConnectTs;
  147 + }
  148 +
  149 + public Long getLastUpdateTs() {
  150 + return lastUpdateTs;
  151 + }
  152 +
  153 + public void setLastUpdateTs(Long lastUpdateTs) {
  154 + this.lastUpdateTs = lastUpdateTs;
  155 + }
  156 +
133 157 @Override
134 158 public String getSearchTextSource() {
135 159 return getName();
... ... @@ -157,6 +181,8 @@ public final class DeviceEntity implements SearchTextEntity<Device> {
157 181 device.setName(name);
158 182 device.setType(type);
159 183 device.setAdditionalInfo(additionalInfo);
  184 + device.setLastConnectTs(lastConnectTs);
  185 + device.setLastUpdateTs(lastUpdateTs);
160 186 return device;
161 187 }
162 188
... ...
... ... @@ -34,6 +34,9 @@ import javax.persistence.Column;
34 34 import javax.persistence.Entity;
35 35 import javax.persistence.Table;
36 36
  37 +import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_LAST_CONNECT_PROPERTY;
  38 +import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_LAST_UPDATE_PROPERTY;
  39 +
37 40 @Data
38 41 @EqualsAndHashCode(callSuper = true)
39 42 @Entity
... ... @@ -60,6 +63,12 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
60 63 @Column(name = ModelConstants.DEVICE_ADDITIONAL_INFO_PROPERTY)
61 64 private JsonNode additionalInfo;
62 65
  66 + @Column(name = DEVICE_LAST_CONNECT_PROPERTY)
  67 + private Long lastConnectTs;
  68 +
  69 + @Column(name = DEVICE_LAST_UPDATE_PROPERTY)
  70 + private Long lastUpdateTs;
  71 +
63 72 public DeviceEntity() {
64 73 super();
65 74 }
... ... @@ -77,6 +86,8 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
77 86 this.name = device.getName();
78 87 this.type = device.getType();
79 88 this.additionalInfo = device.getAdditionalInfo();
  89 + this.lastConnectTs = device.getLastConnectTs();
  90 + this.lastUpdateTs = device.getLastUpdateTs();
80 91 }
81 92
82 93 @Override
... ... @@ -102,6 +113,8 @@ public final class DeviceEntity extends BaseSqlEntity<Device> implements SearchT
102 113 device.setName(name);
103 114 device.setType(type);
104 115 device.setAdditionalInfo(additionalInfo);
  116 + device.setLastConnectTs(lastConnectTs);
  117 + device.setLastUpdateTs(lastUpdateTs);
105 118 return device;
106 119 }
107 120 }
\ No newline at end of file
... ...
... ... @@ -79,4 +79,28 @@ public interface DeviceRepository extends CrudRepository<DeviceEntity, String> {
79 79 List<DeviceEntity> findDevicesByTenantIdAndCustomerIdAndIdIn(String tenantId, String customerId, List<String> deviceIds);
80 80
81 81 List<DeviceEntity> findDevicesByTenantIdAndIdIn(String tenantId, List<String> deviceIds);
  82 +
  83 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs > :time")
  84 + List<DeviceEntity> findConnectOnlineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
  85 +
  86 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs < :time")
  87 + List<DeviceEntity> findConnectOfflineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
  88 +
  89 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs > :time")
  90 + List<DeviceEntity> findUpdateOnlineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
  91 +
  92 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs < :time")
  93 + List<DeviceEntity> findUpdateOfflineByTenantId(@Param("tenantId") String tenantId, @Param("time") long time);
  94 +
  95 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs > :time AND d.type = :type")
  96 + List<DeviceEntity> findConnectOnlineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
  97 +
  98 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastConnectTs < :time AND d.type = :type")
  99 + List<DeviceEntity> findConnectOfflineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
  100 +
  101 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs > :time AND d.type = :type")
  102 + List<DeviceEntity> findUpdateOnlineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
  103 +
  104 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId AND d.lastUpdateTs < :time AND d.type = :type")
  105 + List<DeviceEntity> findUpdateOfflineByTenantIdAndType(@Param("tenantId") String tenantId, @Param("time") long time, @Param("type") String type);
82 106 }
... ...
... ... @@ -15,7 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.dao.sql.device;
17 17
  18 +import com.google.common.util.concurrent.Futures;
18 19 import com.google.common.util.concurrent.ListenableFuture;
  20 +import lombok.extern.slf4j.Slf4j;
19 21 import org.springframework.beans.factory.annotation.Autowired;
20 22 import org.springframework.data.domain.PageRequest;
21 23 import org.springframework.data.repository.CrudRepository;
... ... @@ -24,6 +26,7 @@ import org.thingsboard.server.common.data.Device;
24 26 import org.thingsboard.server.common.data.EntitySubtype;
25 27 import org.thingsboard.server.common.data.EntityType;
26 28 import org.thingsboard.server.common.data.UUIDConverter;
  29 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
27 30 import org.thingsboard.server.common.data.id.TenantId;
28 31 import org.thingsboard.server.common.data.page.TextPageLink;
29 32 import org.thingsboard.server.dao.DaoUtil;
... ... @@ -43,6 +46,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID_STR;
43 46 */
44 47 @Component
45 48 @SqlDao
  49 +@Slf4j
46 50 public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device> implements DeviceDao {
47 51
48 52 @Autowired
... ... @@ -124,6 +128,73 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
124 128 return service.submit(() -> convertTenantDeviceTypesToDto(tenantId, deviceRepository.findTenantDeviceTypes(fromTimeUUID(tenantId))));
125 129 }
126 130
  131 + @Override
  132 + public ListenableFuture<List<Device>> findDevicesByTenantIdAndStatus(UUID tenantId, DeviceStatusQuery statusQuery) {
  133 + String strTenantId = fromTimeUUID(tenantId);
  134 + long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
  135 + switch (statusQuery.getStatus()) {
  136 + case OFFLINE: {
  137 + switch (statusQuery.getContactType()) {
  138 + case UPLOAD:
  139 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOfflineByTenantId(strTenantId, minTime)));
  140 + case CONNECT:
  141 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOfflineByTenantId(strTenantId, minTime)));
  142 + }
  143 + break;
  144 + }
  145 + case ONLINE: {
  146 + switch (statusQuery.getContactType()) {
  147 + case UPLOAD:
  148 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOnlineByTenantId(strTenantId, minTime)));
  149 + case CONNECT:
  150 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOnlineByTenantId(strTenantId, minTime)));
  151 + }
  152 + break;
  153 + }
  154 + }
  155 +
  156 + log.error("Could not build status query from [{}]", statusQuery);
  157 + throw new IllegalStateException("Could not build status query for device []");
  158 + }
  159 +
  160 + @Override
  161 + public ListenableFuture<List<Device>> findDevicesByTenantIdTypeAndStatus(UUID tenantId, String type, DeviceStatusQuery statusQuery) {
  162 + String strTenantId = fromTimeUUID(tenantId);
  163 + long minTime = System.currentTimeMillis() - statusQuery.getThreshold();
  164 + switch (statusQuery.getStatus()) {
  165 + case OFFLINE: {
  166 + switch (statusQuery.getContactType()) {
  167 + case UPLOAD:
  168 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOfflineByTenantIdAndType(strTenantId, minTime, type)));
  169 + case CONNECT:
  170 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOfflineByTenantIdAndType(strTenantId, minTime, type)));
  171 + }
  172 + break;
  173 + }
  174 + case ONLINE: {
  175 + switch (statusQuery.getContactType()) {
  176 + case UPLOAD:
  177 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findUpdateOnlineByTenantIdAndType(strTenantId, minTime, type)));
  178 + case CONNECT:
  179 + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findConnectOnlineByTenantIdAndType(strTenantId, minTime, type)));
  180 + }
  181 + break;
  182 + }
  183 + }
  184 +
  185 + log.error("Could not build status query from [{}]", statusQuery);
  186 + throw new IllegalStateException("Could not build status query for device []");
  187 + }
  188 +
  189 + @Override
  190 + public void saveDeviceStatus(Device device) {
  191 + ListenableFuture<Device> future = service.submit(() -> save(device));
  192 + Futures.withFallback(future, t -> {
  193 + log.error("Can't update device status for [{}]", device, t);
  194 + throw new IllegalArgumentException("Can't update device status for {" + device + "}", t);
  195 + });
  196 + }
  197 +
127 198 private List<EntitySubtype> convertTenantDeviceTypesToDto(UUID tenantId, List<String> types) {
128 199 List<EntitySubtype> list = Collections.emptyList();
129 200 if (types != null && !types.isEmpty()) {
... ...
... ... @@ -159,6 +159,8 @@ CREATE TABLE IF NOT EXISTS thingsboard.device (
159 159 type text,
160 160 search_text text,
161 161 additional_info text,
  162 + last_connect bigint,
  163 + last_update bigint,
162 164 PRIMARY KEY (id, tenant_id, customer_id, type)
163 165 );
164 166
... ...
... ... @@ -118,7 +118,9 @@ CREATE TABLE IF NOT EXISTS device (
118 118 type varchar(255),
119 119 name varchar(255),
120 120 search_text varchar(255),
121   - tenant_id varchar(31)
  121 + tenant_id varchar(31),
  122 + last_connect bigint,
  123 + last_update bigint
122 124 );
123 125
124 126 CREATE TABLE IF NOT EXISTS device_credentials (
... ...
... ... @@ -35,6 +35,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
35 35 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
36 36 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
37 37 import org.thingsboard.server.common.transport.quota.QuotaService;
  38 +import org.thingsboard.server.dao.device.DeviceOfflineService;
38 39 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
39 40 import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
40 41 import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
... ... @@ -53,15 +54,17 @@ public class CoapTransportResource extends CoapResource {
53 54 private final SessionMsgProcessor processor;
54 55 private final DeviceAuthService authService;
55 56 private final QuotaService quotaService;
  57 + private final DeviceOfflineService offlineService;
56 58 private final Field observerField;
57 59 private final long timeout;
58 60
59 61 public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name,
60   - long timeout, QuotaService quotaService) {
  62 + long timeout, QuotaService quotaService, DeviceOfflineService offlineService) {
61 63 super(name);
62 64 this.processor = processor;
63 65 this.authService = authService;
64 66 this.quotaService = quotaService;
  67 + this.offlineService = offlineService;
65 68 this.adaptor = adaptor;
66 69 this.timeout = timeout;
67 70 // This is important to turn off existing observable logic in
... ... @@ -168,6 +171,7 @@ public class CoapTransportResource extends CoapResource {
168 171 case TO_SERVER_RPC_REQUEST:
169 172 ctx.setSessionType(SessionType.SYNC);
170 173 msg = adaptor.convertToActorMsg(ctx, type, request);
  174 + offlineService.online(ctx.getDevice(), true);
171 175 break;
172 176 case SUBSCRIBE_ATTRIBUTES_REQUEST:
173 177 case SUBSCRIBE_RPC_COMMANDS_REQUEST:
... ... @@ -175,11 +179,13 @@ public class CoapTransportResource extends CoapResource {
175 179 advanced.setObserver(new CoapExchangeObserverProxy(systemObserver, ctx));
176 180 ctx.setSessionType(SessionType.ASYNC);
177 181 msg = adaptor.convertToActorMsg(ctx, type, request);
  182 + offlineService.online(ctx.getDevice(), false);
178 183 break;
179 184 case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
180 185 case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
181 186 ctx.setSessionType(SessionType.ASYNC);
182 187 msg = adaptor.convertToActorMsg(ctx, type, request);
  188 + offlineService.online(ctx.getDevice(), false);
183 189 break;
184 190 default:
185 191 log.trace("[{}] Unsupported msg type: {}", ctx.getSessionId(), type);
... ...
... ... @@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
27 27 import org.thingsboard.server.common.transport.SessionMsgProcessor;
28 28 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
29 29 import org.thingsboard.server.common.transport.quota.QuotaService;
  30 +import org.thingsboard.server.dao.device.DeviceOfflineService;
30 31 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
31 32
32 33 import javax.annotation.PostConstruct;
... ... @@ -57,6 +58,9 @@ public class CoapTransportService {
57 58 @Autowired(required = false)
58 59 private QuotaService quotaService;
59 60
  61 + @Autowired(required = false)
  62 + private DeviceOfflineService offlineService;
  63 +
60 64
61 65 @Value("${coap.bind_address}")
62 66 private String host;
... ... @@ -86,7 +90,7 @@ public class CoapTransportService {
86 90
87 91 private void createResources() {
88 92 CoapResource api = new CoapResource(API);
89   - api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService));
  93 + api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService, offlineService));
90 94 server.add(api);
91 95 }
92 96
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.transport.coap;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 19 import lombok.extern.slf4j.Slf4j;
19 20 import org.eclipse.californium.core.CoapClient;
20 21 import org.eclipse.californium.core.CoapResponse;
... ... @@ -31,6 +32,7 @@ import org.springframework.test.annotation.DirtiesContext;
31 32 import org.springframework.test.annotation.DirtiesContext.ClassMode;
32 33 import org.springframework.test.context.junit4.SpringRunner;
33 34 import org.thingsboard.server.common.data.Device;
  35 +import org.thingsboard.server.common.data.device.DeviceStatusQuery;
34 36 import org.thingsboard.server.common.data.id.CustomerId;
35 37 import org.thingsboard.server.common.data.id.DeviceId;
36 38 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -51,6 +53,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
51 53 import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
52 54 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
53 55 import org.thingsboard.server.common.transport.quota.QuotaService;
  56 +import org.thingsboard.server.dao.device.DeviceOfflineService;
54 57
55 58 import java.util.ArrayList;
56 59 import java.util.List;
... ... @@ -137,6 +140,31 @@ public class CoapServerTest {
137 140 public static QuotaService quotaService() {
138 141 return key -> false;
139 142 }
  143 +
  144 + @Bean
  145 + public static DeviceOfflineService offlineService() {
  146 + return new DeviceOfflineService() {
  147 + @Override
  148 + public void online(Device device, boolean isUpdate) {
  149 +
  150 + }
  151 +
  152 + @Override
  153 + public void offline(Device device) {
  154 +
  155 + }
  156 +
  157 + @Override
  158 + public ListenableFuture<List<Device>> findOfflineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
  159 + return null;
  160 + }
  161 +
  162 + @Override
  163 + public ListenableFuture<List<Device>> findOnlineDevices(UUID tenantId, DeviceStatusQuery.ContactType contactType, long offlineThreshold) {
  164 + return null;
  165 + }
  166 + };
  167 + }
140 168 }
141 169
142 170 @Autowired
... ...
... ... @@ -26,6 +26,7 @@ import org.springframework.http.ResponseEntity;
26 26 import org.springframework.util.StringUtils;
27 27 import org.springframework.web.bind.annotation.*;
28 28 import org.springframework.web.context.request.async.DeferredResult;
  29 +import org.thingsboard.server.common.data.Device;
29 30 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
30 31 import org.thingsboard.server.common.msg.core.*;
31 32 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
... ... @@ -36,6 +37,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
36 37 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
37 38 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
38 39 import org.thingsboard.server.common.transport.quota.QuotaService;
  40 +import org.thingsboard.server.dao.device.DeviceOfflineService;
39 41 import org.thingsboard.server.transport.http.session.HttpSessionCtx;
40 42
41 43 import javax.servlet.http.HttpServletRequest;
... ... @@ -63,6 +65,9 @@ public class DeviceApiController {
63 65 @Autowired(required = false)
64 66 private QuotaService quotaService;
65 67
  68 + @Autowired(required = false)
  69 + private DeviceOfflineService offlineService;
  70 +
66 71 @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
67 72 public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
68 73 @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
... ... @@ -82,7 +87,7 @@ public class DeviceApiController {
82 87 Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
83 88 request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
84 89 }
85   - process(ctx, request);
  90 + process(ctx, request, false);
86 91 } else {
87 92 responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
88 93 }
... ... @@ -100,7 +105,7 @@ public class DeviceApiController {
100 105 HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
101 106 if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
102 107 try {
103   - process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
  108 + process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)), true);
104 109 } catch (IllegalStateException | JsonSyntaxException ex) {
105 110 responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
106 111 }
... ... @@ -120,7 +125,7 @@ public class DeviceApiController {
120 125 HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
121 126 if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
122 127 try {
123   - process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
  128 + process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)), true);
124 129 } catch (IllegalStateException | JsonSyntaxException ex) {
125 130 responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
126 131 }
... ... @@ -150,7 +155,7 @@ public class DeviceApiController {
150 155 if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
151 156 try {
152 157 JsonObject response = new JsonParser().parse(json).getAsJsonObject();
153   - process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
  158 + process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()), true);
154 159 } catch (IllegalStateException | JsonSyntaxException ex) {
155 160 responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
156 161 }
... ... @@ -173,7 +178,7 @@ public class DeviceApiController {
173 178 JsonObject request = new JsonParser().parse(json).getAsJsonObject();
174 179 process(ctx, new ToServerRpcRequestMsg(0,
175 180 request.get("method").getAsString(),
176   - request.get("params").toString()));
  181 + request.get("params").toString()), true);
177 182 } catch (IllegalStateException | JsonSyntaxException ex) {
178 183 responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
179 184 }
... ... @@ -199,7 +204,7 @@ public class DeviceApiController {
199 204 HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
200 205 if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
201 206 try {
202   - process(ctx, msg);
  207 + process(ctx, msg, false);
203 208 } catch (IllegalStateException | JsonSyntaxException ex) {
204 209 responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
205 210 }
... ... @@ -217,9 +222,10 @@ public class DeviceApiController {
217 222 return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
218 223 }
219 224
220   - private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
  225 + private void process(HttpSessionCtx ctx, FromDeviceMsg request, boolean isUpdate) {
221 226 AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
222 227 processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
  228 + offlineService.online(ctx.getDevice(), isUpdate);
223 229 }
224 230
225 231 private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
37 37 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
38 38 import org.thingsboard.server.common.transport.quota.QuotaService;
39 39 import org.thingsboard.server.dao.EncryptionUtil;
  40 +import org.thingsboard.server.dao.device.DeviceOfflineService;
40 41 import org.thingsboard.server.dao.device.DeviceService;
41 42 import org.thingsboard.server.dao.relation.RelationService;
42 43 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
... ... @@ -72,13 +73,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
72 73 private final DeviceAuthService authService;
73 74 private final RelationService relationService;
74 75 private final QuotaService quotaService;
  76 + private final DeviceOfflineService offlineService;
75 77 private final SslHandler sslHandler;
76 78 private volatile boolean connected;
77 79 private volatile InetSocketAddress address;
78 80 private volatile GatewaySessionCtx gatewaySessionCtx;
79 81
80 82 public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
81   - MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
  83 + MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService, DeviceOfflineService offlineService) {
82 84 this.processor = processor;
83 85 this.deviceService = deviceService;
84 86 this.relationService = relationService;
... ... @@ -88,6 +90,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
88 90 this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
89 91 this.sslHandler = sslHandler;
90 92 this.quotaService = quotaService;
  93 + this.offlineService = offlineService;
91 94 }
92 95
93 96 @Override
... ... @@ -129,11 +132,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
129 132 case PINGREQ:
130 133 if (checkConnected(ctx)) {
131 134 ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
  135 + offlineService.online(deviceSessionCtx.getDevice(), false);
132 136 }
133 137 break;
134 138 case DISCONNECT:
135 139 if (checkConnected(ctx)) {
136 140 processDisconnect(ctx);
  141 + offlineService.offline(deviceSessionCtx.getDevice());
137 142 }
138 143 break;
139 144 default:
... ... @@ -185,23 +190,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
185 190 try {
186 191 if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
187 192 msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
  193 + offlineService.online(deviceSessionCtx.getDevice(), true);
188 194 } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
189 195 msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
  196 + offlineService.online(deviceSessionCtx.getDevice(), true);
190 197 } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
191 198 msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
192 199 if (msgId >= 0) {
193 200 ctx.writeAndFlush(createMqttPubAckMsg(msgId));
194 201 }
  202 + offlineService.online(deviceSessionCtx.getDevice(), false);
195 203 } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
196 204 msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
197 205 if (msgId >= 0) {
198 206 ctx.writeAndFlush(createMqttPubAckMsg(msgId));
199 207 }
  208 + offlineService.online(deviceSessionCtx.getDevice(), true);
200 209 } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
201 210 msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
202 211 if (msgId >= 0) {
203 212 ctx.writeAndFlush(createMqttPubAckMsg(msgId));
204 213 }
  214 + offlineService.online(deviceSessionCtx.getDevice(), true);
205 215 }
206 216 } catch (AdaptorException e) {
207 217 log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
... ... @@ -250,6 +260,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
250 260 }
251 261 }
252 262 ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
  263 + offlineService.online(deviceSessionCtx.getDevice(), false);
253 264 }
254 265
255 266 private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
... ... @@ -273,6 +284,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
273 284 }
274 285 }
275 286 ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
  287 + offlineService.online(deviceSessionCtx.getDevice(), false);
276 288 }
277 289
278 290 private MqttMessage createUnSubAckMessage(int msgId) {
... ... @@ -304,6 +316,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
304 316 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
305 317 connected = true;
306 318 checkGatewaySession();
  319 + offlineService.online(deviceSessionCtx.getDevice(), false);
307 320 }
308 321 }
309 322
... ... @@ -315,6 +328,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
315 328 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
316 329 connected = true;
317 330 checkGatewaySession();
  331 + offlineService.online(deviceSessionCtx.getDevice(), false);
318 332 } else {
319 333 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
320 334 ctx.close();
... ... @@ -365,6 +379,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
365 379 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
366 380 log.error("[{}] Unexpected Exception", sessionId, cause);
367 381 ctx.close();
  382 + if(deviceSessionCtx.getDevice() != null) {
  383 + offlineService.offline(deviceSessionCtx.getDevice());
  384 + }
368 385 }
369 386
370 387 private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
... ... @@ -403,7 +420,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
403 420 if (infoNode != null) {
404 421 JsonNode gatewayNode = infoNode.get("gateway");
405 422 if (gatewayNode != null && gatewayNode.asBoolean()) {
406   - gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
  423 + gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService,
  424 + relationService, deviceSessionCtx, offlineService);
407 425 }
408 426 }
409 427 }
... ... @@ -411,5 +429,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
411 429 @Override
412 430 public void operationComplete(Future<? super Void> future) throws Exception {
413 431 processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
  432 + if(deviceSessionCtx.getDevice() != null) {
  433 + offlineService.offline(deviceSessionCtx.getDevice());
  434 + }
414 435 }
415 436 }
... ...
... ... @@ -24,6 +24,7 @@ import io.netty.handler.ssl.SslHandler;
24 24 import org.thingsboard.server.common.transport.SessionMsgProcessor;
25 25 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
26 26 import org.thingsboard.server.common.transport.quota.QuotaService;
  27 +import org.thingsboard.server.dao.device.DeviceOfflineService;
27 28 import org.thingsboard.server.dao.device.DeviceService;
28 29 import org.thingsboard.server.dao.relation.RelationService;
29 30 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
... ... @@ -42,10 +43,11 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
42 43 private final MqttTransportAdaptor adaptor;
43 44 private final MqttSslHandlerProvider sslHandlerProvider;
44 45 private final QuotaService quotaService;
  46 + private final DeviceOfflineService offlineService;
45 47
46 48 public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
47 49 MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider,
48   - QuotaService quotaService) {
  50 + QuotaService quotaService, DeviceOfflineService offlineService) {
49 51 this.processor = processor;
50 52 this.deviceService = deviceService;
51 53 this.authService = authService;
... ... @@ -53,6 +55,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
53 55 this.adaptor = adaptor;
54 56 this.sslHandlerProvider = sslHandlerProvider;
55 57 this.quotaService = quotaService;
  58 + this.offlineService = offlineService;
56 59 }
57 60
58 61 @Override
... ... @@ -67,7 +70,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
67 70 pipeline.addLast("encoder", MqttEncoder.INSTANCE);
68 71
69 72 MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
70   - adaptor, sslHandler, quotaService);
  73 + adaptor, sslHandler, quotaService, offlineService);
71 74
72 75 pipeline.addLast(handler);
73 76 ch.closeFuture().addListener(handler);
... ...
... ... @@ -30,6 +30,7 @@ import org.springframework.stereotype.Service;
30 30 import org.thingsboard.server.common.transport.SessionMsgProcessor;
31 31 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
32 32 import org.thingsboard.server.common.transport.quota.QuotaService;
  33 +import org.thingsboard.server.dao.device.DeviceOfflineService;
33 34 import org.thingsboard.server.dao.device.DeviceService;
34 35 import org.thingsboard.server.dao.relation.RelationService;
35 36 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
... ... @@ -69,6 +70,9 @@ public class MqttTransportService {
69 70 @Autowired(required = false)
70 71 private QuotaService quotaService;
71 72
  73 + @Autowired(required = false)
  74 + private DeviceOfflineService offlineService;
  75 +
72 76 @Value("${mqtt.bind_address}")
73 77 private String host;
74 78 @Value("${mqtt.bind_port}")
... ... @@ -106,7 +110,7 @@ public class MqttTransportService {
106 110 b.group(bossGroup, workerGroup)
107 111 .channel(NioServerSocketChannel.class)
108 112 .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService,
109   - adaptor, sslHandlerProvider, quotaService));
  113 + adaptor, sslHandlerProvider, quotaService, offlineService));
110 114
111 115 serverChannel = b.bind(host, port).sync().channel();
112 116 log.info("Mqtt transport started!");
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
36 36 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
37 37 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
38 38 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
  39 +import org.thingsboard.server.dao.device.DeviceOfflineService;
39 40 import org.thingsboard.server.dao.device.DeviceService;
40 41 import org.thingsboard.server.dao.relation.RelationService;
41 42 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
... ... @@ -61,14 +62,17 @@ public class GatewaySessionCtx {
61 62 private final DeviceService deviceService;
62 63 private final DeviceAuthService authService;
63 64 private final RelationService relationService;
  65 + private final DeviceOfflineService offlineService;
64 66 private final Map<String, GatewayDeviceSessionCtx> devices;
65 67 private ChannelHandlerContext channel;
66 68
67   - public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
  69 + public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
  70 + RelationService relationService, DeviceSessionCtx gatewaySessionCtx, DeviceOfflineService offlineService) {
68 71 this.processor = processor;
69 72 this.deviceService = deviceService;
70 73 this.authService = authService;
71 74 this.relationService = relationService;
  75 + this.offlineService = offlineService;
72 76 this.gateway = gatewaySessionCtx.getDevice();
73 77 this.gatewaySessionId = gatewaySessionCtx.getSessionId();
74 78 this.devices = new HashMap<>();
... ... @@ -98,6 +102,7 @@ public class GatewaySessionCtx {
98 102 log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
99 103 processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
100 104 processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
  105 + offlineService.online(device, false);
101 106 }
102 107 }
103 108
... ... @@ -107,6 +112,7 @@ public class GatewaySessionCtx {
107 112 if (deviceSessionCtx != null) {
108 113 processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
109 114 deviceSessionCtx.setClosed(true);
  115 + offlineService.offline(deviceSessionCtx.getDevice());
110 116 log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
111 117 } else {
112 118 log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
... ... @@ -117,6 +123,7 @@ public class GatewaySessionCtx {
117 123 public void onGatewayDisconnect() {
118 124 devices.forEach((k, v) -> {
119 125 processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
  126 + offlineService.offline(v.getDevice());
120 127 });
121 128 }
122 129
... ... @@ -138,6 +145,7 @@ public class GatewaySessionCtx {
138 145 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
139 146 processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
140 147 new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
  148 + offlineService.online(deviceSessionCtx.getDevice(), true);
141 149 }
142 150 } else {
143 151 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
... ... @@ -154,6 +162,7 @@ public class GatewaySessionCtx {
154 162 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
155 163 processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
156 164 new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
  165 + offlineService.online(deviceSessionCtx.getDevice(), true);
157 166 } else {
158 167 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
159 168 }
... ... @@ -176,6 +185,7 @@ public class GatewaySessionCtx {
176 185 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
177 186 processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
178 187 new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
  188 + offlineService.online(deviceSessionCtx.getDevice(), true);
179 189 }
180 190 } else {
181 191 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
... ... @@ -210,6 +220,7 @@ public class GatewaySessionCtx {
210 220 processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
211 221 new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
212 222 ack(msg);
  223 + offlineService.online(deviceSessionCtx.getDevice(), false);
213 224 } else {
214 225 throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
215 226 }
... ...