Commit 15f442fb18af9c65492f5bd36744e35556e0186c

Authored by Andrew Shvayka
1 parent 3b058694

Ability to display and subscribe to server side attributes

@@ -29,6 +29,8 @@ public class DataConstants { @@ -29,6 +29,8 @@ public class DataConstants {
29 public static final String SERVER_SCOPE = "SERVER_SCOPE"; 29 public static final String SERVER_SCOPE = "SERVER_SCOPE";
30 public static final String SHARED_SCOPE = "SHARED_SCOPE"; 30 public static final String SHARED_SCOPE = "SHARED_SCOPE";
31 31
  32 + public static final String[] ALL_SCOPES = {CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
  33 +
32 public static final String ALARM = "ALARM"; 34 public static final String ALARM = "ALARM";
33 public static final String ERROR = "ERROR"; 35 public static final String ERROR = "ERROR";
34 public static final String LC_EVENT = "LC_EVENT"; 36 public static final String LC_EVENT = "LC_EVENT";
@@ -175,6 +175,23 @@ public class SubscriptionManager { @@ -175,6 +175,23 @@ public class SubscriptionManager {
175 } 175 }
176 } 176 }
177 177
  178 + public void onAttributesUpdateFromServer(PluginContext ctx, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
  179 + Optional<ServerAddress> serverAddress = ctx.resolve(deviceId);
  180 + if (!serverAddress.isPresent()) {
  181 + onLocalSubscriptionUpdate(ctx, deviceId, SubscriptionType.ATTRIBUTES, s -> {
  182 + List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
  183 + for (AttributeKvEntry kv : attributes) {
  184 + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
  185 + subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
  186 + }
  187 + }
  188 + return subscriptionUpdate;
  189 + });
  190 + } else {
  191 + rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), deviceId, scope, attributes);
  192 + }
  193 + }
  194 +
178 private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) { 195 private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
179 log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update); 196 log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
180 update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue())); 197 update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
@@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin<EmptyComponentConfigu @@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin<EmptyComponentConfigu
43 43
44 public TelemetryStoragePlugin() { 44 public TelemetryStoragePlugin() {
45 this.subscriptionManager = new SubscriptionManager(); 45 this.subscriptionManager = new SubscriptionManager();
46 - this.restMsgHandler = new TelemetryRestMsgHandler(); 46 + this.restMsgHandler = new TelemetryRestMsgHandler(subscriptionManager);
47 this.ruleMsgHandler = new TelemetryRuleMsgHandler(subscriptionManager); 47 this.ruleMsgHandler = new TelemetryRuleMsgHandler(subscriptionManager);
48 this.websocketMsgHandler = new TelemetryWebsocketMsgHandler(subscriptionManager); 48 this.websocketMsgHandler = new TelemetryWebsocketMsgHandler(subscriptionManager);
49 this.rpcMsgHandler = new TelemetryRpcMsgHandler(subscriptionManager); 49 this.rpcMsgHandler = new TelemetryRpcMsgHandler(subscriptionManager);
@@ -29,6 +29,7 @@ import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHand @@ -29,6 +29,7 @@ import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHand
29 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; 29 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
30 import org.thingsboard.server.extensions.api.plugins.rest.RestRequest; 30 import org.thingsboard.server.extensions.api.plugins.rest.RestRequest;
31 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData; 31 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
  32 +import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
32 import org.thingsboard.server.extensions.core.plugin.telemetry.TsData; 33 import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
33 34
34 import javax.servlet.ServletException; 35 import javax.servlet.ServletException;
@@ -39,6 +40,12 @@ import java.util.stream.Collectors; @@ -39,6 +40,12 @@ import java.util.stream.Collectors;
39 @Slf4j 40 @Slf4j
40 public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { 41 public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
41 42
  43 + private final SubscriptionManager subscriptionManager;
  44 +
  45 + public TelemetryRestMsgHandler(SubscriptionManager subscriptionManager) {
  46 + this.subscriptionManager = subscriptionManager;
  47 + }
  48 +
42 @Override 49 @Override
43 public void handleHttpGetRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException { 50 public void handleHttpGetRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
44 RestRequest request = msg.getRequest(); 51 RestRequest request = msg.getRequest();
@@ -74,9 +81,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -74,9 +81,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
74 if (!StringUtils.isEmpty(scope)) { 81 if (!StringUtils.isEmpty(scope)) {
75 attributes = ctx.loadAttributes(deviceId, scope); 82 attributes = ctx.loadAttributes(deviceId, scope);
76 } else { 83 } else {
77 - attributes = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE);  
78 - attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SERVER_SCOPE));  
79 - attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SHARED_SCOPE)); 84 + attributes = new ArrayList<>();
  85 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
80 } 86 }
81 List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList()); 87 List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
82 msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK)); 88 msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
@@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
99 if (!StringUtils.isEmpty(scope)) { 105 if (!StringUtils.isEmpty(scope)) {
100 attributes = getAttributeKvEntries(ctx, scope, deviceId, keys); 106 attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
101 } else { 107 } else {
102 - attributes = getAttributeKvEntries(ctx, DataConstants.CLIENT_SCOPE, deviceId, keys);  
103 - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SHARED_SCOPE, deviceId, keys));  
104 - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SERVER_SCOPE, deviceId, keys)); 108 + attributes = new ArrayList<>();
  109 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
105 } 110 }
106 List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), 111 List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
107 attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); 112 attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
@@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
145 @Override 150 @Override
146 public void onSuccess(PluginContext ctx, Void value) { 151 public void onSuccess(PluginContext ctx, Void value) {
147 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); 152 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
  153 + subscriptionManager.onAttributesUpdateFromServer(ctx, deviceId, scope, attributes);
148 } 154 }
149 155
150 @Override 156 @Override
@@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
172 DeviceId deviceId = DeviceId.fromString(pathParams[0]); 178 DeviceId deviceId = DeviceId.fromString(pathParams[0]);
173 String scope = pathParams[1]; 179 String scope = pathParams[1];
174 if (DataConstants.SERVER_SCOPE.equals(scope) || 180 if (DataConstants.SERVER_SCOPE.equals(scope) ||
175 - DataConstants.SHARED_SCOPE.equals(scope)) { 181 + DataConstants.SHARED_SCOPE.equals(scope) ||
  182 + DataConstants.CLIENT_SCOPE.equals(scope)) {
176 String keysParam = request.getParameter("keys"); 183 String keysParam = request.getParameter("keys");
177 if (!StringUtils.isEmpty(keysParam)) { 184 if (!StringUtils.isEmpty(keysParam)) {
178 String[] keys = keysParam.split(","); 185 String[] keys = keysParam.split(",");
@@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException; @@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
19 import lombok.RequiredArgsConstructor; 19 import lombok.RequiredArgsConstructor;
20 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
21 import org.thingsboard.server.common.data.id.DeviceId; 21 import org.thingsboard.server.common.data.id.DeviceId;
  22 +import org.thingsboard.server.common.data.kv.*;
22 import org.thingsboard.server.common.msg.cluster.ServerAddress; 23 import org.thingsboard.server.common.msg.cluster.ServerAddress;
23 import org.thingsboard.server.extensions.api.plugins.PluginContext; 24 import org.thingsboard.server.extensions.api.plugins.PluginContext;
24 import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler; 25 import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler;
@@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
42 private final SubscriptionManager subscriptionManager; 43 private final SubscriptionManager subscriptionManager;
43 44
44 private static final int SUBSCRIPTION_CLAZZ = 1; 45 private static final int SUBSCRIPTION_CLAZZ = 1;
45 - private static final int SUBSCRIPTION_UPDATE_CLAZZ = 2;  
46 - private static final int SESSION_CLOSE_CLAZZ = 3;  
47 - private static final int SUBSCRIPTION_CLOSE_CLAZZ = 4; 46 + private static final int ATTRIBUTES_UPDATE_CLAZZ = 2;
  47 + private static final int SUBSCRIPTION_UPDATE_CLAZZ = 3;
  48 + private static final int SESSION_CLOSE_CLAZZ = 4;
  49 + private static final int SUBSCRIPTION_CLOSE_CLAZZ = 5;
48 50
49 @Override 51 @Override
50 public void process(PluginContext ctx, RpcMsg msg) { 52 public void process(PluginContext ctx, RpcMsg msg) {
@@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
55 case SUBSCRIPTION_UPDATE_CLAZZ: 57 case SUBSCRIPTION_UPDATE_CLAZZ:
56 processRemoteSubscriptionUpdate(ctx, msg); 58 processRemoteSubscriptionUpdate(ctx, msg);
57 break; 59 break;
  60 + case ATTRIBUTES_UPDATE_CLAZZ:
  61 + processAttributeUpdate(ctx, msg);
  62 + break;
58 case SESSION_CLOSE_CLAZZ: 63 case SESSION_CLOSE_CLAZZ:
59 processSessionClose(ctx, msg); 64 processSessionClose(ctx, msg);
60 break; 65 break;
@@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
76 subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto)); 81 subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto));
77 } 82 }
78 83
  84 + private void processAttributeUpdate(PluginContext ctx, RpcMsg msg) {
  85 + AttributeUpdateProto proto;
  86 + try {
  87 + proto = AttributeUpdateProto.parseFrom(msg.getMsgData());
  88 + } catch (InvalidProtocolBufferException e) {
  89 + throw new RuntimeException(e);
  90 + }
  91 + subscriptionManager.onAttributesUpdateFromServer(ctx, DeviceId.fromString(proto.getDeviceId()), proto.getScope(),
  92 + proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
  93 + }
  94 +
79 private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) { 95 private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) {
80 SubscriptionProto proto; 96 SubscriptionProto proto;
81 try { 97 try {
@@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
167 } else { 183 } else {
168 Map<String, List<Object>> data = new TreeMap<>(); 184 Map<String, List<Object>> data = new TreeMap<>();
169 proto.getDataList().forEach(v -> { 185 proto.getDataList().forEach(v -> {
170 - List<Object> values = data.get(v.getKey());  
171 - if (values == null) {  
172 - values = new ArrayList<>();  
173 - data.put(v.getKey(), values);  
174 - } 186 + List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
175 for (int i = 0; i < v.getTsCount(); i++) { 187 for (int i = 0; i < v.getTsCount(); i++) {
176 Object[] value = new Object[2]; 188 Object[] value = new Object[2];
177 value[0] = v.getTs(i); 189 value[0] = v.getTs(i);
@@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
182 return new SubscriptionUpdate(proto.getSubscriptionId(), data); 194 return new SubscriptionUpdate(proto.getSubscriptionId(), data);
183 } 195 }
184 } 196 }
  197 +
  198 + public void onAttributesUpdate(PluginContext ctx, ServerAddress address, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
  199 + ctx.sendPluginRpcMsg(new RpcMsg(address, ATTRIBUTES_UPDATE_CLAZZ, getAttributesUpdateProto(deviceId, scope, attributes).toByteArray()));
  200 + }
  201 +
  202 + private AttributeUpdateProto getAttributesUpdateProto(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
  203 + AttributeUpdateProto.Builder builder = AttributeUpdateProto.newBuilder();
  204 + builder.setDeviceId(deviceId.toString());
  205 + builder.setScope(scope);
  206 + attributes.forEach(
  207 + attr -> {
  208 + AttributeUpdateValueListProto.Builder dataBuilder = AttributeUpdateValueListProto.newBuilder();
  209 + dataBuilder.setKey(attr.getKey());
  210 + dataBuilder.setTs(attr.getLastUpdateTs());
  211 + dataBuilder.setValueType(attr.getDataType().ordinal());
  212 + switch (attr.getDataType()) {
  213 + case BOOLEAN:
  214 + dataBuilder.setBoolValue(attr.getBooleanValue().get());
  215 + break;
  216 + case LONG:
  217 + dataBuilder.setLongValue(attr.getLongValue().get());
  218 + break;
  219 + case DOUBLE:
  220 + dataBuilder.setDoubleValue(attr.getDoubleValue().get());
  221 + break;
  222 + case STRING:
  223 + dataBuilder.setStrValue(attr.getStrValue().get());
  224 + break;
  225 + }
  226 + builder.addData(dataBuilder.build());
  227 + }
  228 + );
  229 + return builder.build();
  230 + }
  231 +
  232 + private AttributeKvEntry toAttribute(AttributeUpdateValueListProto proto) {
  233 + KvEntry entry = null;
  234 + DataType type = DataType.values()[proto.getValueType()];
  235 + switch (type) {
  236 + case BOOLEAN:
  237 + entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
  238 + break;
  239 + case LONG:
  240 + entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
  241 + break;
  242 + case DOUBLE:
  243 + entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
  244 + break;
  245 + case STRING:
  246 + entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
  247 + break;
  248 + }
  249 + return new BaseAttributeKvEntry(entry, proto.getTs());
  250 + }
  251 +
185 } 252 }
@@ -104,7 +104,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -104,7 +104,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
104 SubscriptionState sub; 104 SubscriptionState sub;
105 if (keysOptional.isPresent()) { 105 if (keysOptional.isPresent()) {
106 List<String> keys = new ArrayList<>(keysOptional.get()); 106 List<String> keys = new ArrayList<>(keysOptional.get());
107 - List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keys); 107 + List<AttributeKvEntry> data = new ArrayList<>();
  108 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
108 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 109 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
109 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); 110 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
110 111
@@ -114,7 +115,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -114,7 +115,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
114 115
115 sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState); 116 sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
116 } else { 117 } else {
117 - List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE); 118 + List<AttributeKvEntry> data = new ArrayList<>();
  119 + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
118 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 120 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
119 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); 121 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
120 122
@@ -36,6 +36,12 @@ message SubscriptionUpdateProto { @@ -36,6 +36,12 @@ message SubscriptionUpdateProto {
36 repeated SubscriptionUpdateValueListProto data = 5; 36 repeated SubscriptionUpdateValueListProto data = 5;
37 } 37 }
38 38
  39 +message AttributeUpdateProto {
  40 + string deviceId = 1;
  41 + string scope = 2;
  42 + repeated AttributeUpdateValueListProto data = 3;
  43 +}
  44 +
39 message SessionCloseProto { 45 message SessionCloseProto {
40 string sessionId = 1; 46 string sessionId = 1;
41 } 47 }
@@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto { @@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto {
54 string key = 1; 60 string key = 1;
55 repeated int64 ts = 2; 61 repeated int64 ts = 2;
56 repeated string value = 3; 62 repeated string value = 3;
  63 +}
  64 +
  65 +message AttributeUpdateValueListProto {
  66 + string key = 1;
  67 + int64 ts = 2;
  68 + int32 valueType = 3;
  69 + string strValue = 4;
  70 + int64 longValue = 5;
  71 + double doubleValue = 6;
  72 + bool boolValue = 7;
57 } 73 }