Commit e494145d0e4c3e8cc138752987815a7ce0adc607

Authored by Andrew Shvayka
Committed by GitHub
2 parents 3b058694 ccd4f4b0

Merge pull request #23 from thingsboard/feature/attributes

Feature/attributes
Showing 18 changed files with 199 additions and 63 deletions
@@ -29,6 +29,8 @@ public class DataConstants { @@ -29,6 +29,8 @@ public class DataConstants {
29 public static final String SERVER_SCOPE = "SERVER_SCOPE"; 29 public static final String SERVER_SCOPE = "SERVER_SCOPE";
30 public static final String SHARED_SCOPE = "SHARED_SCOPE"; 30 public static final String SHARED_SCOPE = "SHARED_SCOPE";
31 31
  32 + public static final String[] ALL_SCOPES = {CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
  33 +
32 public static final String ALARM = "ALARM"; 34 public static final String ALARM = "ALARM";
33 public static final String ERROR = "ERROR"; 35 public static final String ERROR = "ERROR";
34 public static final String LC_EVENT = "LC_EVENT"; 36 public static final String LC_EVENT = "LC_EVENT";
@@ -18,6 +18,8 @@ package org.thingsboard.server.common.msg.core; @@ -18,6 +18,8 @@ package org.thingsboard.server.common.msg.core;
18 import lombok.ToString; 18 import lombok.ToString;
19 import org.thingsboard.server.common.msg.session.MsgType; 19 import org.thingsboard.server.common.msg.session.MsgType;
20 20
  21 +import java.util.Collections;
  22 +import java.util.Optional;
21 import java.util.Set; 23 import java.util.Set;
22 24
23 @ToString 25 @ToString
@@ -28,6 +30,10 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib @@ -28,6 +30,10 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
28 private final Set<String> clientKeys; 30 private final Set<String> clientKeys;
29 private final Set<String> sharedKeys; 31 private final Set<String> sharedKeys;
30 32
  33 + public BasicGetAttributesRequest(Integer requestId) {
  34 + this(requestId, Collections.emptySet(), Collections.emptySet());
  35 + }
  36 +
31 public BasicGetAttributesRequest(Integer requestId, Set<String> clientKeys, Set<String> sharedKeys) { 37 public BasicGetAttributesRequest(Integer requestId, Set<String> clientKeys, Set<String> sharedKeys) {
32 super(requestId); 38 super(requestId);
33 this.clientKeys = clientKeys; 39 this.clientKeys = clientKeys;
@@ -40,13 +46,13 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib @@ -40,13 +46,13 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
40 } 46 }
41 47
42 @Override 48 @Override
43 - public Set<String> getClientAttributeNames() {  
44 - return clientKeys; 49 + public Optional<Set<String>> getClientAttributeNames() {
  50 + return Optional.of(clientKeys);
45 } 51 }
46 52
47 @Override 53 @Override
48 - public Set<String> getSharedAttributeNames() {  
49 - return sharedKeys; 54 + public Optional<Set<String>> getSharedAttributeNames() {
  55 + return Optional.ofNullable(sharedKeys);
50 } 56 }
51 57
52 } 58 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.core; 16 package org.thingsboard.server.common.msg.core;
17 17
  18 +import java.util.Optional;
18 import java.util.Set; 19 import java.util.Set;
19 20
20 import org.thingsboard.server.common.msg.session.FromDeviceMsg; 21 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@@ -22,7 +23,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; @@ -22,7 +23,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
22 23
23 public interface GetAttributesRequest extends FromDeviceRequestMsg { 24 public interface GetAttributesRequest extends FromDeviceRequestMsg {
24 25
25 - Set<String> getClientAttributeNames();  
26 - Set<String> getSharedAttributeNames(); 26 + Optional<Set<String>> getClientAttributeNames();
  27 + Optional<Set<String>> getSharedAttributeNames();
27 28
28 } 29 }
@@ -175,6 +175,23 @@ public class SubscriptionManager { @@ -175,6 +175,23 @@ public class SubscriptionManager {
175 } 175 }
176 } 176 }
177 177
  178 + public void onAttributesUpdateFromServer(PluginContext ctx, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
  179 + Optional<ServerAddress> serverAddress = ctx.resolve(deviceId);
  180 + if (!serverAddress.isPresent()) {
  181 + onLocalSubscriptionUpdate(ctx, deviceId, SubscriptionType.ATTRIBUTES, s -> {
  182 + List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
  183 + for (AttributeKvEntry kv : attributes) {
  184 + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
  185 + subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
  186 + }
  187 + }
  188 + return subscriptionUpdate;
  189 + });
  190 + } else {
  191 + rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), deviceId, scope, attributes);
  192 + }
  193 + }
  194 +
178 private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) { 195 private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
179 log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update); 196 log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
180 update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue())); 197 update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
@@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin<EmptyComponentConfigu @@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin<EmptyComponentConfigu
43 43
44 public TelemetryStoragePlugin() { 44 public TelemetryStoragePlugin() {
45 this.subscriptionManager = new SubscriptionManager(); 45 this.subscriptionManager = new SubscriptionManager();
46 - this.restMsgHandler = new TelemetryRestMsgHandler(); 46 + this.restMsgHandler = new TelemetryRestMsgHandler(subscriptionManager);
47 this.ruleMsgHandler = new TelemetryRuleMsgHandler(subscriptionManager); 47 this.ruleMsgHandler = new TelemetryRuleMsgHandler(subscriptionManager);
48 this.websocketMsgHandler = new TelemetryWebsocketMsgHandler(subscriptionManager); 48 this.websocketMsgHandler = new TelemetryWebsocketMsgHandler(subscriptionManager);
49 this.rpcMsgHandler = new TelemetryRpcMsgHandler(subscriptionManager); 49 this.rpcMsgHandler = new TelemetryRpcMsgHandler(subscriptionManager);
@@ -24,10 +24,6 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT @@ -24,10 +24,6 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
24 @NoArgsConstructor 24 @NoArgsConstructor
25 public class AttributesSubscriptionCmd extends SubscriptionCmd { 25 public class AttributesSubscriptionCmd extends SubscriptionCmd {
26 26
27 - public AttributesSubscriptionCmd(int cmdId, String deviceId, String keys, boolean unsubscribe) {  
28 - super(cmdId, deviceId, keys, unsubscribe);  
29 - }  
30 -  
31 @Override 27 @Override
32 public SubscriptionType getType() { 28 public SubscriptionType getType() {
33 return SubscriptionType.ATTRIBUTES; 29 return SubscriptionType.ATTRIBUTES;
@@ -26,6 +26,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd { @@ -26,6 +26,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
26 private int cmdId; 26 private int cmdId;
27 private String deviceId; 27 private String deviceId;
28 private String keys; 28 private String keys;
  29 + private String scope;
29 private boolean unsubscribe; 30 private boolean unsubscribe;
30 31
31 public abstract SubscriptionType getType(); 32 public abstract SubscriptionType getType();
@@ -62,6 +63,14 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd { @@ -62,6 +63,14 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
62 this.unsubscribe = unsubscribe; 63 this.unsubscribe = unsubscribe;
63 } 64 }
64 65
  66 + public String getScope() {
  67 + return scope;
  68 + }
  69 +
  70 + public void setKeys(String keys) {
  71 + this.keys = keys;
  72 + }
  73 +
65 @Override 74 @Override
66 public String toString() { 75 public String toString() {
67 return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]"; 76 return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
@@ -26,11 +26,6 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd { @@ -26,11 +26,6 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
26 26
27 private long timeWindow; 27 private long timeWindow;
28 28
29 - public TimeseriesSubscriptionCmd(int cmdId, String deviceId, String keys, boolean unsubscribe, long timeWindow) {  
30 - super(cmdId, deviceId, keys, unsubscribe);  
31 - this.timeWindow = timeWindow;  
32 - }  
33 -  
34 public long getTimeWindow() { 29 public long getTimeWindow() {
35 return timeWindow; 30 return timeWindow;
36 } 31 }
@@ -29,6 +29,7 @@ import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHand @@ -29,6 +29,7 @@ import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHand
29 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; 29 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
30 import org.thingsboard.server.extensions.api.plugins.rest.RestRequest; 30 import org.thingsboard.server.extensions.api.plugins.rest.RestRequest;
31 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData; 31 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
  32 +import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
32 import org.thingsboard.server.extensions.core.plugin.telemetry.TsData; 33 import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
33 34
34 import javax.servlet.ServletException; 35 import javax.servlet.ServletException;
@@ -39,6 +40,12 @@ import java.util.stream.Collectors; @@ -39,6 +40,12 @@ import java.util.stream.Collectors;
39 @Slf4j 40 @Slf4j
40 public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { 41 public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
41 42
  43 + private final SubscriptionManager subscriptionManager;
  44 +
  45 + public TelemetryRestMsgHandler(SubscriptionManager subscriptionManager) {
  46 + this.subscriptionManager = subscriptionManager;
  47 + }
  48 +
42 @Override 49 @Override
43 public void handleHttpGetRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException { 50 public void handleHttpGetRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
44 RestRequest request = msg.getRequest(); 51 RestRequest request = msg.getRequest();
@@ -74,9 +81,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -74,9 +81,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
74 if (!StringUtils.isEmpty(scope)) { 81 if (!StringUtils.isEmpty(scope)) {
75 attributes = ctx.loadAttributes(deviceId, scope); 82 attributes = ctx.loadAttributes(deviceId, scope);
76 } else { 83 } else {
77 - attributes = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE);  
78 - attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SERVER_SCOPE));  
79 - attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SHARED_SCOPE)); 84 + attributes = new ArrayList<>();
  85 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
80 } 86 }
81 List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList()); 87 List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
82 msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK)); 88 msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
@@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
99 if (!StringUtils.isEmpty(scope)) { 105 if (!StringUtils.isEmpty(scope)) {
100 attributes = getAttributeKvEntries(ctx, scope, deviceId, keys); 106 attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
101 } else { 107 } else {
102 - attributes = getAttributeKvEntries(ctx, DataConstants.CLIENT_SCOPE, deviceId, keys);  
103 - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SHARED_SCOPE, deviceId, keys));  
104 - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SERVER_SCOPE, deviceId, keys)); 108 + attributes = new ArrayList<>();
  109 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
105 } 110 }
106 List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), 111 List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
107 attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); 112 attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
@@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
145 @Override 150 @Override
146 public void onSuccess(PluginContext ctx, Void value) { 151 public void onSuccess(PluginContext ctx, Void value) {
147 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); 152 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
  153 + subscriptionManager.onAttributesUpdateFromServer(ctx, deviceId, scope, attributes);
148 } 154 }
149 155
150 @Override 156 @Override
@@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
172 DeviceId deviceId = DeviceId.fromString(pathParams[0]); 178 DeviceId deviceId = DeviceId.fromString(pathParams[0]);
173 String scope = pathParams[1]; 179 String scope = pathParams[1];
174 if (DataConstants.SERVER_SCOPE.equals(scope) || 180 if (DataConstants.SERVER_SCOPE.equals(scope) ||
175 - DataConstants.SHARED_SCOPE.equals(scope)) { 181 + DataConstants.SHARED_SCOPE.equals(scope) ||
  182 + DataConstants.CLIENT_SCOPE.equals(scope)) {
176 String keysParam = request.getParameter("keys"); 183 String keysParam = request.getParameter("keys");
177 if (!StringUtils.isEmpty(keysParam)) { 184 if (!StringUtils.isEmpty(keysParam)) {
178 String[] keys = keysParam.split(","); 185 String[] keys = keysParam.split(",");
@@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException; @@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
19 import lombok.RequiredArgsConstructor; 19 import lombok.RequiredArgsConstructor;
20 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
21 import org.thingsboard.server.common.data.id.DeviceId; 21 import org.thingsboard.server.common.data.id.DeviceId;
  22 +import org.thingsboard.server.common.data.kv.*;
22 import org.thingsboard.server.common.msg.cluster.ServerAddress; 23 import org.thingsboard.server.common.msg.cluster.ServerAddress;
23 import org.thingsboard.server.extensions.api.plugins.PluginContext; 24 import org.thingsboard.server.extensions.api.plugins.PluginContext;
24 import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler; 25 import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler;
@@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
42 private final SubscriptionManager subscriptionManager; 43 private final SubscriptionManager subscriptionManager;
43 44
44 private static final int SUBSCRIPTION_CLAZZ = 1; 45 private static final int SUBSCRIPTION_CLAZZ = 1;
45 - private static final int SUBSCRIPTION_UPDATE_CLAZZ = 2;  
46 - private static final int SESSION_CLOSE_CLAZZ = 3;  
47 - private static final int SUBSCRIPTION_CLOSE_CLAZZ = 4; 46 + private static final int ATTRIBUTES_UPDATE_CLAZZ = 2;
  47 + private static final int SUBSCRIPTION_UPDATE_CLAZZ = 3;
  48 + private static final int SESSION_CLOSE_CLAZZ = 4;
  49 + private static final int SUBSCRIPTION_CLOSE_CLAZZ = 5;
48 50
49 @Override 51 @Override
50 public void process(PluginContext ctx, RpcMsg msg) { 52 public void process(PluginContext ctx, RpcMsg msg) {
@@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
55 case SUBSCRIPTION_UPDATE_CLAZZ: 57 case SUBSCRIPTION_UPDATE_CLAZZ:
56 processRemoteSubscriptionUpdate(ctx, msg); 58 processRemoteSubscriptionUpdate(ctx, msg);
57 break; 59 break;
  60 + case ATTRIBUTES_UPDATE_CLAZZ:
  61 + processAttributeUpdate(ctx, msg);
  62 + break;
58 case SESSION_CLOSE_CLAZZ: 63 case SESSION_CLOSE_CLAZZ:
59 processSessionClose(ctx, msg); 64 processSessionClose(ctx, msg);
60 break; 65 break;
@@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
76 subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto)); 81 subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto));
77 } 82 }
78 83
  84 + private void processAttributeUpdate(PluginContext ctx, RpcMsg msg) {
  85 + AttributeUpdateProto proto;
  86 + try {
  87 + proto = AttributeUpdateProto.parseFrom(msg.getMsgData());
  88 + } catch (InvalidProtocolBufferException e) {
  89 + throw new RuntimeException(e);
  90 + }
  91 + subscriptionManager.onAttributesUpdateFromServer(ctx, DeviceId.fromString(proto.getDeviceId()), proto.getScope(),
  92 + proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
  93 + }
  94 +
79 private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) { 95 private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) {
80 SubscriptionProto proto; 96 SubscriptionProto proto;
81 try { 97 try {
@@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
167 } else { 183 } else {
168 Map<String, List<Object>> data = new TreeMap<>(); 184 Map<String, List<Object>> data = new TreeMap<>();
169 proto.getDataList().forEach(v -> { 185 proto.getDataList().forEach(v -> {
170 - List<Object> values = data.get(v.getKey());  
171 - if (values == null) {  
172 - values = new ArrayList<>();  
173 - data.put(v.getKey(), values);  
174 - } 186 + List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
175 for (int i = 0; i < v.getTsCount(); i++) { 187 for (int i = 0; i < v.getTsCount(); i++) {
176 Object[] value = new Object[2]; 188 Object[] value = new Object[2];
177 value[0] = v.getTs(i); 189 value[0] = v.getTs(i);
@@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
182 return new SubscriptionUpdate(proto.getSubscriptionId(), data); 194 return new SubscriptionUpdate(proto.getSubscriptionId(), data);
183 } 195 }
184 } 196 }
  197 +
  198 + public void onAttributesUpdate(PluginContext ctx, ServerAddress address, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
  199 + ctx.sendPluginRpcMsg(new RpcMsg(address, ATTRIBUTES_UPDATE_CLAZZ, getAttributesUpdateProto(deviceId, scope, attributes).toByteArray()));
  200 + }
  201 +
  202 + private AttributeUpdateProto getAttributesUpdateProto(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
  203 + AttributeUpdateProto.Builder builder = AttributeUpdateProto.newBuilder();
  204 + builder.setDeviceId(deviceId.toString());
  205 + builder.setScope(scope);
  206 + attributes.forEach(
  207 + attr -> {
  208 + AttributeUpdateValueListProto.Builder dataBuilder = AttributeUpdateValueListProto.newBuilder();
  209 + dataBuilder.setKey(attr.getKey());
  210 + dataBuilder.setTs(attr.getLastUpdateTs());
  211 + dataBuilder.setValueType(attr.getDataType().ordinal());
  212 + switch (attr.getDataType()) {
  213 + case BOOLEAN:
  214 + dataBuilder.setBoolValue(attr.getBooleanValue().get());
  215 + break;
  216 + case LONG:
  217 + dataBuilder.setLongValue(attr.getLongValue().get());
  218 + break;
  219 + case DOUBLE:
  220 + dataBuilder.setDoubleValue(attr.getDoubleValue().get());
  221 + break;
  222 + case STRING:
  223 + dataBuilder.setStrValue(attr.getStrValue().get());
  224 + break;
  225 + }
  226 + builder.addData(dataBuilder.build());
  227 + }
  228 + );
  229 + return builder.build();
  230 + }
  231 +
  232 + private AttributeKvEntry toAttribute(AttributeUpdateValueListProto proto) {
  233 + KvEntry entry = null;
  234 + DataType type = DataType.values()[proto.getValueType()];
  235 + switch (type) {
  236 + case BOOLEAN:
  237 + entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
  238 + break;
  239 + case LONG:
  240 + entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
  241 + break;
  242 + case DOUBLE:
  243 + entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
  244 + break;
  245 + case STRING:
  246 + entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
  247 + break;
  248 + }
  249 + return new BaseAttributeKvEntry(entry, proto.getTs());
  250 + }
  251 +
185 } 252 }
@@ -58,10 +58,14 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { @@ -58,10 +58,14 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
58 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response)); 58 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
59 } 59 }
60 60
61 - private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Set<String> names) { 61 + private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names) {
62 List<AttributeKvEntry> attributes; 62 List<AttributeKvEntry> attributes;
63 - if (!names.isEmpty()) {  
64 - attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names)); 63 + if (names.isPresent()) {
  64 + if (!names.get().isEmpty()) {
  65 + attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()));
  66 + } else {
  67 + attributes = ctx.loadAttributes(deviceId, scope);
  68 + }
65 } else { 69 } else {
66 attributes = Collections.emptyList(); 70 attributes = Collections.emptyList();
67 } 71 }
@@ -104,7 +104,13 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -104,7 +104,13 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
104 SubscriptionState sub; 104 SubscriptionState sub;
105 if (keysOptional.isPresent()) { 105 if (keysOptional.isPresent()) {
106 List<String> keys = new ArrayList<>(keysOptional.get()); 106 List<String> keys = new ArrayList<>(keysOptional.get());
107 - List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keys); 107 + List<AttributeKvEntry> data = new ArrayList<>();
  108 + if (StringUtils.isEmpty(cmd.getScope())) {
  109 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
  110 + } else {
  111 + data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys));
  112 + }
  113 +
108 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 114 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
109 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); 115 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
110 116
@@ -114,7 +120,12 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -114,7 +120,12 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
114 120
115 sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState); 121 sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
116 } else { 122 } else {
117 - List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE); 123 + List<AttributeKvEntry> data = new ArrayList<>();
  124 + if (StringUtils.isEmpty(cmd.getScope())) {
  125 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
  126 + } else {
  127 + data.addAll(ctx.loadAttributes(deviceId, cmd.getScope()));
  128 + }
118 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 129 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
119 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); 130 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
120 131
@@ -36,6 +36,12 @@ message SubscriptionUpdateProto { @@ -36,6 +36,12 @@ message SubscriptionUpdateProto {
36 repeated SubscriptionUpdateValueListProto data = 5; 36 repeated SubscriptionUpdateValueListProto data = 5;
37 } 37 }
38 38
  39 +message AttributeUpdateProto {
  40 + string deviceId = 1;
  41 + string scope = 2;
  42 + repeated AttributeUpdateValueListProto data = 3;
  43 +}
  44 +
39 message SessionCloseProto { 45 message SessionCloseProto {
40 string sessionId = 1; 46 string sessionId = 1;
41 } 47 }
@@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto { @@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto {
54 string key = 1; 60 string key = 1;
55 repeated int64 ts = 2; 61 repeated int64 ts = 2;
56 repeated string value = 3; 62 repeated string value = 3;
  63 +}
  64 +
  65 +message AttributeUpdateValueListProto {
  66 + string key = 1;
  67 + int64 ts = 2;
  68 + int32 valueType = 3;
  69 + string strValue = 4;
  70 + int64 longValue = 5;
  71 + double doubleValue = 6;
  72 + bool boolValue = 7;
57 } 73 }
@@ -167,17 +167,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { @@ -167,17 +167,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
167 167
168 private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException { 168 private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
169 List<String> queryElements = inbound.getOptions().getUriQuery(); 169 List<String> queryElements = inbound.getOptions().getUriQuery();
170 - if (queryElements == null || queryElements.size() == 0) {  
171 - log.warn("[{}] Query is empty!", ctx.getSessionId());  
172 - throw new AdaptorException(new IllegalArgumentException("Query is empty!"));  
173 - }  
174 -  
175 - Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");  
176 - Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");  
177 - if (clientKeys.isEmpty() && sharedKeys.isEmpty()) {  
178 - throw new AdaptorException("No clientKeys and serverKeys parameters!"); 170 + if (queryElements != null || queryElements.size() > 0) {
  171 + Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");
  172 + Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");
  173 + return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);
  174 + } else {
  175 + return new BasicGetAttributesRequest(0);
179 } 176 }
180 - return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);  
181 } 177 }
182 178
183 private Set<String> toKeys(SessionContext ctx, List<String> queryElements, String attributeName) throws AdaptorException { 179 private Set<String> toKeys(SessionContext ctx, List<String> queryElements, String attributeName) throws AdaptorException {
@@ -191,7 +187,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { @@ -191,7 +187,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
191 if (!StringUtils.isEmpty(keys)) { 187 if (!StringUtils.isEmpty(keys)) {
192 return new HashSet<>(Arrays.asList(keys.split(","))); 188 return new HashSet<>(Arrays.asList(keys.split(",")));
193 } else { 189 } else {
194 - return Collections.emptySet(); 190 + return null;
195 } 191 }
196 } 192 }
197 193
@@ -182,7 +182,7 @@ public class CoapServerTest { @@ -182,7 +182,7 @@ public class CoapServerTest {
182 public void testNoKeysAttributesGetRequest() { 182 public void testNoKeysAttributesGetRequest() {
183 CoapClient client = new CoapClient(getBaseTestUrl() + DEVICE1_TOKEN + "/" + FeatureType.ATTRIBUTES.name().toLowerCase() + "?data=key1,key2"); 183 CoapClient client = new CoapClient(getBaseTestUrl() + DEVICE1_TOKEN + "/" + FeatureType.ATTRIBUTES.name().toLowerCase() + "?data=key1,key2");
184 CoapResponse response = client.setTimeout(6000).get(); 184 CoapResponse response = client.setTimeout(6000).get();
185 - Assert.assertEquals(ResponseCode.BAD_REQUEST, response.getCode()); 185 + Assert.assertEquals(ResponseCode.CONTENT, response.getCode());
186 } 186 }
187 187
188 @Test 188 @Test
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService; @@ -38,6 +38,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
38 import org.thingsboard.server.transport.http.session.HttpSessionCtx; 38 import org.thingsboard.server.transport.http.session.HttpSessionCtx;
39 39
40 import java.util.Arrays; 40 import java.util.Arrays;
  41 +import java.util.Collections;
41 import java.util.HashSet; 42 import java.util.HashSet;
42 import java.util.Set; 43 import java.util.Set;
43 44
@@ -60,20 +61,22 @@ public class DeviceApiController { @@ -60,20 +61,22 @@ public class DeviceApiController {
60 61
61 @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") 62 @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
62 public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, 63 public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
63 - @RequestParam(value = "clientKeys", required = false) String clientKeys,  
64 - @RequestParam(value = "sharedKeys", required = false) String sharedKeys) { 64 + @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
  65 + @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) {
65 DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); 66 DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
66 - if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {  
67 - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));  
68 - } else {  
69 - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);  
70 - if (ctx.login(new DeviceTokenCredentials(deviceToken))) {  
71 - Set<String> clientKeySet = new HashSet<>(Arrays.asList(clientKeys.split(",")));  
72 - Set<String> sharedKeySet = new HashSet<>(Arrays.asList(clientKeys.split(",")));  
73 - process(ctx, new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet)); 67 + HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
  68 + if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
  69 + GetAttributesRequest request;
  70 + if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
  71 + request = new BasicGetAttributesRequest(0);
74 } else { 72 } else {
75 - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); 73 + Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
  74 + Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
  75 + request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
76 } 76 }
  77 + process(ctx, request);
  78 + } else {
  79 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
77 } 80 }
78 81
79 return responseWriter; 82 return responseWriter;
@@ -162,8 +162,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -162,8 +162,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
162 Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())); 162 Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
163 String payload = inbound.payload().toString(UTF8); 163 String payload = inbound.payload().toString(UTF8);
164 JsonElement requestBody = new JsonParser().parse(payload); 164 JsonElement requestBody = new JsonParser().parse(payload);
165 - return new BasicGetAttributesRequest(requestId,  
166 - toStringSet(requestBody, "clientKeys"), toStringSet(requestBody, "sharedKeys")); 165 + Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
  166 + Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
  167 + if (clientKeys == null && sharedKeys == null) {
  168 + return new BasicGetAttributesRequest(requestId);
  169 + } else {
  170 + return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);
  171 + }
167 } catch (RuntimeException e) { 172 } catch (RuntimeException e) {
168 log.warn("Failed to decode get attributes request", e); 173 log.warn("Failed to decode get attributes request", e);
169 throw new AdaptorException(e); 174 throw new AdaptorException(e);
@@ -189,7 +194,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -189,7 +194,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
189 if (element != null) { 194 if (element != null) {
190 return new HashSet<>(Arrays.asList(element.getAsString().split(","))); 195 return new HashSet<>(Arrays.asList(element.getAsString().split(",")));
191 } else { 196 } else {
192 - return Collections.emptySet(); 197 + return null;
193 } 198 }
194 } 199 }
195 200
@@ -293,7 +293,8 @@ function DeviceService($http, $q, $filter, telemetryWebsocketService, types) { @@ -293,7 +293,8 @@ function DeviceService($http, $q, $filter, telemetryWebsocketService, types) {
293 var deviceAttributesSubscription = deviceAttributesSubscriptionMap[subscriptionId]; 293 var deviceAttributesSubscription = deviceAttributesSubscriptionMap[subscriptionId];
294 if (!deviceAttributesSubscription) { 294 if (!deviceAttributesSubscription) {
295 var subscriptionCommand = { 295 var subscriptionCommand = {
296 - deviceId: deviceId 296 + deviceId: deviceId,
  297 + scope: attributeScope
297 }; 298 };
298 299
299 var type = attributeScope === types.latestTelemetry.value ? 300 var type = attributeScope === types.latestTelemetry.value ?