Commit 0f0a25ce38db6a50eb4e064841fe1dad885a093a

Authored by Igor Kulikov
2 parents 96776635 8ebaa22a

Merge branch 'develop/2.5.4'

... ... @@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
28 28 import org.thingsboard.server.actors.ActorSystemContext;
29 29 import org.thingsboard.server.actors.TbActorCtx;
30 30 import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
  31 +import org.thingsboard.server.common.data.DataConstants;
31 32 import org.thingsboard.server.common.data.Device;
32 33 import org.thingsboard.server.common.data.id.DeviceId;
33 34 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -79,8 +80,6 @@ import java.util.UUID;
79 80 import java.util.function.Consumer;
80 81 import java.util.stream.Collectors;
81 82
82   -import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
83   -import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
84 83
85 84 /**
86 85 * @author Andrew Shvayka
... ... @@ -279,17 +278,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
279 278 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture;
280 279 ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture;
281 280 if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
282   - clientAttributesFuture = findAllAttributesByScope(CLIENT_SCOPE);
283   - sharedAttributesFuture = findAllAttributesByScope(SHARED_SCOPE);
  281 + clientAttributesFuture = findAllAttributesByScope(DataConstants.CLIENT_SCOPE);
  282 + sharedAttributesFuture = findAllAttributesByScope(DataConstants.SHARED_SCOPE);
284 283 } else if (!CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
285   - clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), CLIENT_SCOPE);
286   - sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), SHARED_SCOPE);
  284 + clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
  285 + sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
287 286 } else if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
288 287 clientAttributesFuture = Futures.immediateFuture(Collections.emptyList());
289   - sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), SHARED_SCOPE);
  288 + sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
290 289 } else {
291 290 sharedAttributesFuture = Futures.immediateFuture(Collections.emptyList());
292   - clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), CLIENT_SCOPE);
  291 + clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
293 292 }
294 293 return Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture));
295 294 }
... ... @@ -316,7 +315,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
316 315 AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
317 316 if (msg.isDeleted()) {
318 317 List<String> sharedKeys = msg.getDeletedKeys().stream()
319   - .filter(key -> SHARED_SCOPE.equals(key.getScope()))
  318 + .filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
320 319 .map(AttributeKey::getAttributeKey)
321 320 .collect(Collectors.toList());
322 321 if (!sharedKeys.isEmpty()) {
... ... @@ -324,7 +323,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
324 323 hasNotificationData = true;
325 324 }
326 325 } else {
327   - if (SHARED_SCOPE.equals(msg.getScope())) {
  326 + if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
328 327 List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
329 328 if (attributes.size() > 0) {
330 329 List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto)
... ... @@ -334,7 +333,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
334 333 hasNotificationData = true;
335 334 }
336 335 } else {
337   - log.debug("[{}] No public server side attributes changed!", deviceId);
  336 + log.debug("[{}] No public shared side attributes changed!", deviceId);
338 337 }
339 338 }
340 339 }
... ...
... ... @@ -158,7 +158,7 @@ public class DefaultTransportApiService implements TransportApiService {
158 158 return TransportApiResponseMsg.newBuilder()
159 159 .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
160 160 } catch (JsonProcessingException e) {
161   - log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
  161 + log.warn("[{}][{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
162 162 throw new RuntimeException(e);
163 163 } finally {
164 164 deviceCreationLock.unlock();
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.common.msg.kv;
17   -
18   -import java.io.Serializable;
19   -import java.util.List;
20   -
21   -import org.thingsboard.server.common.data.kv.AttributeKey;
22   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
23   -
24   -public interface AttributesKVMsg extends Serializable {
25   -
26   - List<AttributeKvEntry> getClientAttributes();
27   - List<AttributeKvEntry> getSharedAttributes();
28   - List<AttributeKey> getDeletedAttributes();
29   -}
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.common.msg.kv;
17   -
18   -import lombok.AccessLevel;
19   -import lombok.Data;
20   -import lombok.RequiredArgsConstructor;
21   -import org.thingsboard.server.common.data.kv.AttributeKey;
22   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
23   -
24   -import java.util.Collections;
25   -import java.util.List;
26   -
27   -@Data
28   -@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
29   -public class BasicAttributeKVMsg implements AttributesKVMsg {
30   -
31   - private static final long serialVersionUID = 1L;
32   -
33   - private final List<AttributeKvEntry> clientAttributes;
34   - private final List<AttributeKvEntry> sharedAttributes;
35   - private final List<AttributeKey> deletedAttributes;
36   -
37   - public static BasicAttributeKVMsg fromClient(List<AttributeKvEntry> attributes) {
38   - return new BasicAttributeKVMsg(attributes, Collections.emptyList(), Collections.emptyList());
39   - }
40   -
41   - public static BasicAttributeKVMsg fromShared(List<AttributeKvEntry> attributes) {
42   - return new BasicAttributeKVMsg(Collections.emptyList(), attributes, Collections.emptyList());
43   - }
44   -
45   - public static BasicAttributeKVMsg from(List<AttributeKvEntry> client, List<AttributeKvEntry> shared) {
46   - return new BasicAttributeKVMsg(client, shared, Collections.emptyList());
47   - }
48   -
49   - public static AttributesKVMsg fromDeleted(List<AttributeKey> shared) {
50   - return new BasicAttributeKVMsg(Collections.emptyList(), Collections.emptyList(), shared);
51   - }
52   -}
... ... @@ -23,16 +23,29 @@ import java.util.Collections;
23 23 import java.util.List;
24 24 import java.util.concurrent.BlockingQueue;
25 25 import java.util.concurrent.ConcurrentHashMap;
  26 +import java.util.concurrent.Executors;
26 27 import java.util.concurrent.LinkedBlockingQueue;
  28 +import java.util.concurrent.ScheduledExecutorService;
27 29 import java.util.concurrent.TimeUnit;
28 30
29 31 @Slf4j
30 32 public final class InMemoryStorage {
31 33 private static InMemoryStorage instance;
32 34 private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
  35 + private static ScheduledExecutorService statExecutor;
33 36
34 37 private InMemoryStorage() {
35 38 storage = new ConcurrentHashMap<>();
  39 + statExecutor = Executors.newSingleThreadScheduledExecutor();
  40 + statExecutor.scheduleAtFixedRate(this::printStats, 60, 60, TimeUnit.SECONDS);
  41 + }
  42 +
  43 + private void printStats() {
  44 + storage.forEach((topic, queue) -> {
  45 + if (queue.size() > 0) {
  46 + log.debug("Topic: [{}], Queue size: [{}]", topic, queue.size());
  47 + }
  48 + });
36 49 }
37 50
38 51 public static InMemoryStorage getInstance() {
... ... @@ -77,4 +90,9 @@ public final class InMemoryStorage {
77 90 storage.clear();
78 91 }
79 92
  93 + public void destroy() {
  94 + if (statExecutor != null) {
  95 + statExecutor.shutdownNow();
  96 + }
  97 + }
80 98 }
... ...
... ... @@ -53,6 +53,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
53 53
54 54 @Override
55 55 public void stop() {
56   -
  56 + storage.destroy();
57 57 }
58 58 }
... ...
... ... @@ -127,7 +127,6 @@ message GetAttributeResponseMsg {
127 127 int32 requestId = 1;
128 128 repeated TsKvProto clientAttributeList = 2;
129 129 repeated TsKvProto sharedAttributeList = 3;
130   - repeated string deletedAttributeKeys = 4;
131 130 string error = 5;
132 131 }
133 132
... ...
... ... @@ -125,7 +125,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
125 125
126 126 @Override
127 127 public Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException {
128   - if (msg.getClientAttributeListCount() == 0 && msg.getSharedAttributeListCount() == 0 && msg.getDeletedAttributeKeysCount() == 0) {
  128 + if (msg.getClientAttributeListCount() == 0 && msg.getSharedAttributeListCount() == 0) {
129 129 return new Response(CoAP.ResponseCode.NOT_FOUND);
130 130 } else {
131 131 Response response = new Response(CoAP.ResponseCode.CONTENT);
... ...
... ... @@ -35,7 +35,6 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
35 35 import org.thingsboard.server.common.data.kv.KvEntry;
36 36 import org.thingsboard.server.common.data.kv.LongDataEntry;
37 37 import org.thingsboard.server.common.data.kv.StringDataEntry;
38   -import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
39 38 import org.thingsboard.server.gen.transport.TransportProtos;
40 39 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
41 40 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
... ... @@ -269,11 +268,6 @@ public class JsonConverter {
269 268 payload.getSharedAttributeListList().forEach(addToObjectFromProto(attrObject));
270 269 result.add("shared", attrObject);
271 270 }
272   - if (payload.getDeletedAttributeKeysCount() > 0) {
273   - JsonArray attrObject = new JsonArray();
274   - payload.getDeletedAttributeKeysList().forEach(attrObject::add);
275   - result.add("deleted", attrObject);
276   - }
277 271 return result;
278 272 }
279 273
... ... @@ -290,31 +284,6 @@ public class JsonConverter {
290 284 return result;
291 285 }
292 286
293   - public static JsonObject toJson(AttributesKVMsg payload, boolean asMap) {
294   - JsonObject result = new JsonObject();
295   - if (asMap) {
296   - if (!payload.getClientAttributes().isEmpty()) {
297   - JsonObject attrObject = new JsonObject();
298   - payload.getClientAttributes().forEach(addToObject(attrObject));
299   - result.add("client", attrObject);
300   - }
301   - if (!payload.getSharedAttributes().isEmpty()) {
302   - JsonObject attrObject = new JsonObject();
303   - payload.getSharedAttributes().forEach(addToObject(attrObject));
304   - result.add("shared", attrObject);
305   - }
306   - } else {
307   - payload.getClientAttributes().forEach(addToObject(result));
308   - payload.getSharedAttributes().forEach(addToObject(result));
309   - }
310   - if (!payload.getDeletedAttributes().isEmpty()) {
311   - JsonArray attrObject = new JsonArray();
312   - payload.getDeletedAttributes().forEach(addToObject(attrObject));
313   - result.add("deleted", attrObject);
314   - }
315   - return result;
316   - }
317   -
318 287 public static JsonObject getJsonObjectForGateway(String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) {
319 288 JsonObject result = new JsonObject();
320 289 result.addProperty("id", responseMsg.getRequestId());
... ... @@ -370,10 +339,6 @@ public class JsonConverter {
370 339 }
371 340 }
372 341
373   - private static Consumer<AttributeKey> addToObject(JsonArray result) {
374   - return key -> result.add(key.getAttributeKey());
375   - }
376   -
377 342 private static Consumer<TsKvProto> addToObjectFromProto(JsonObject result) {
378 343 return de -> {
379 344 switch (de.getKv().getType()) {
... ...
... ... @@ -46,6 +46,7 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
46 46 entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
47 47 entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
48 48 entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
  49 + entity.setJsonValue(tsKvEntry.getJsonValue().orElse(null));
49 50 log.trace("Saving entity: {}", entity);
50 51 return tsQueue.add(entity);
51 52 }
... ...
... ... @@ -41,8 +41,8 @@ public class HsqlLatestInsertTsRepository extends AbstractInsertRepository imple
41 41 "ON (ts_kv_latest.entity_id=T.entity_id " +
42 42 "AND ts_kv_latest.key=T.key) " +
43 43 "WHEN MATCHED THEN UPDATE SET ts_kv_latest.ts = T.ts, ts_kv_latest.bool_v = T.bool_v, ts_kv_latest.str_v = T.str_v, ts_kv_latest.long_v = T.long_v, ts_kv_latest.dbl_v = T.dbl_v, ts_kv_latest.json_v = T.json_v " +
44   - "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
45   - "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v);";
  44 + "WHEN NOT MATCHED THEN INSERT (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
  45 + "VALUES (T.entity_id, T.key, T.ts, T.bool_v, T.str_v, T.long_v, T.dbl_v, T.json_v);";
46 46
47 47 @Override
48 48 public void saveOrUpdate(List<TsKvLatestEntity> entities) {
... ...
... ... @@ -13,6 +13,7 @@ DROP TABLE IF EXISTS relation;
13 13 DROP TABLE IF EXISTS tb_user;
14 14 DROP TABLE IF EXISTS tenant;
15 15 DROP TABLE IF EXISTS ts_kv;
  16 +DROP TABLE IF EXISTS ts_kv_dictionary;
16 17 DROP TABLE IF EXISTS ts_kv_latest;
17 18 DROP TABLE IF EXISTS user_credentials;
18 19 DROP TABLE IF EXISTS widget_type;
... ...