Commit 7e0d4df80858be80776bcf0f87bb149e8164218d

Authored by oleg
1 parent 3e25997b

Attribute subscription

@@ -33,6 +33,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionU @@ -33,6 +33,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionU
33 33
34 import java.util.*; 34 import java.util.*;
35 import java.util.function.Function; 35 import java.util.function.Function;
  36 +import java.util.function.Predicate;
36 37
37 /** 38 /**
38 * @author Andrew Shvayka 39 * @author Andrew Shvayka
@@ -174,9 +175,13 @@ public class SubscriptionManager { @@ -174,9 +175,13 @@ public class SubscriptionManager {
174 } 175 }
175 176
176 public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) { 177 public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
  178 + onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f);
  179 + }
  180 +
  181 + public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
177 Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId); 182 Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
178 if (deviceSubscriptions != null) { 183 if (deviceSubscriptions != null) {
179 - deviceSubscriptions.stream().filter(s -> type == s.getType()).forEach(s -> { 184 + deviceSubscriptions.stream().filter(filter).forEach(s -> {
180 String sessionId = s.getWsSessionId(); 185 String sessionId = s.getWsSessionId();
181 List<TsKvEntry> subscriptionUpdate = f.apply(s); 186 List<TsKvEntry> subscriptionUpdate = f.apply(s);
182 if (!subscriptionUpdate.isEmpty()) { 187 if (!subscriptionUpdate.isEmpty()) {
@@ -206,7 +211,7 @@ public class SubscriptionManager { @@ -206,7 +211,7 @@ public class SubscriptionManager {
206 public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) { 211 public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
207 Optional<ServerAddress> serverAddress = ctx.resolve(entityId); 212 Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
208 if (!serverAddress.isPresent()) { 213 if (!serverAddress.isPresent()) {
209 - onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.ATTRIBUTES, s -> { 214 + onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && scope.equals(s.getScope()), s -> {
210 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>(); 215 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
211 for (AttributeKvEntry kv : attributes) { 216 for (AttributeKvEntry kv : attributes) {
212 if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { 217 if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
@@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
114 } 114 }
115 Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs)); 115 Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
116 Subscription subscription = new Subscription( 116 Subscription subscription = new Subscription(
117 - new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap), 117 + new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
118 false, msg.getServerAddress()); 118 false, msg.getServerAddress());
119 subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription); 119 subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
120 } 120 }
@@ -127,6 +127,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -127,6 +127,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
127 builder.setEntityId(cmd.getEntityId().getId().toString()); 127 builder.setEntityId(cmd.getEntityId().getId().toString());
128 builder.setType(cmd.getType().name()); 128 builder.setType(cmd.getType().name());
129 builder.setAllKeys(cmd.isAllKeys()); 129 builder.setAllKeys(cmd.isAllKeys());
  130 + builder.setScope(cmd.getScope());
130 cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build())); 131 cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
131 ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray())); 132 ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
132 } 133 }
@@ -131,7 +131,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -131,7 +131,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
131 keys.forEach(key -> subState.put(key, 0L)); 131 keys.forEach(key -> subState.put(key, 0L));
132 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); 132 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
133 133
134 - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState); 134 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
135 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); 135 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
136 } 136 }
137 137
@@ -168,7 +168,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -168,7 +168,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
168 Map<String, Long> subState = new HashMap<>(attributesData.size()); 168 Map<String, Long> subState = new HashMap<>(attributesData.size());
169 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); 169 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
170 170
171 - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState); 171 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
172 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); 172 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
173 } 173 }
174 174
@@ -234,7 +234,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -234,7 +234,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
234 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); 234 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
235 Map<String, Long> subState = new HashMap<>(data.size()); 235 Map<String, Long> subState = new HashMap<>(data.size());
236 data.forEach(v -> subState.put(v.getKey(), v.getTs())); 236 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
237 - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState); 237 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
238 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); 238 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
239 } 239 }
240 240
@@ -262,7 +262,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @@ -262,7 +262,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
262 Map<String, Long> subState = new HashMap<>(keys.size()); 262 Map<String, Long> subState = new HashMap<>(keys.size());
263 keys.forEach(key -> subState.put(key, startTs)); 263 keys.forEach(key -> subState.put(key, startTs));
264 data.forEach(v -> subState.put(v.getKey(), v.getTs())); 264 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
265 - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState); 265 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
266 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); 266 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
267 } 267 }
268 268
@@ -51,6 +51,10 @@ public class Subscription { @@ -51,6 +51,10 @@ public class Subscription {
51 return getSub().getType(); 51 return getSub().getType();
52 } 52 }
53 53
  54 + public String getScope() {
  55 + return getSub().getScope();
  56 + }
  57 +
54 public boolean isAllKeys() { 58 public boolean isAllKeys() {
55 return getSub().isAllKeys(); 59 return getSub().isAllKeys();
56 } 60 }
@@ -33,6 +33,7 @@ public class SubscriptionState { @@ -33,6 +33,7 @@ public class SubscriptionState {
33 @Getter private final SubscriptionType type; 33 @Getter private final SubscriptionType type;
34 @Getter private final boolean allKeys; 34 @Getter private final boolean allKeys;
35 @Getter private final Map<String, Long> keyStates; 35 @Getter private final Map<String, Long> keyStates;
  36 + @Getter private final String scope;
36 37
37 @Override 38 @Override
38 public boolean equals(Object o) { 39 public boolean equals(Object o) {
@@ -27,6 +27,7 @@ message SubscriptionProto { @@ -27,6 +27,7 @@ message SubscriptionProto {
27 string type = 5; 27 string type = 5;
28 bool allKeys = 6; 28 bool allKeys = 6;
29 repeated SubscriptionKetStateProto keyStates = 7; 29 repeated SubscriptionKetStateProto keyStates = 7;
  30 + string scope = 8;
30 } 31 }
31 32
32 message SubscriptionUpdateProto { 33 message SubscriptionUpdateProto {