Commit 4182a70d5fcf8f5530a3de39a303218fa2c3d855

Authored by Volodymyr Babak
1 parent a2a89033

Fixes for subscription

@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -68,6 +68,7 @@ import javax.annotation.PostConstruct; @@ -68,6 +68,7 @@ import javax.annotation.PostConstruct;
68 import javax.annotation.PreDestroy; 68 import javax.annotation.PreDestroy;
69 import java.util.ArrayList; 69 import java.util.ArrayList;
70 import java.util.Collections; 70 import java.util.Collections;
  71 +import java.util.HashMap;
71 import java.util.HashSet; 72 import java.util.HashSet;
72 import java.util.Iterator; 73 import java.util.Iterator;
73 import java.util.List; 74 import java.util.List;
@@ -140,24 +141,64 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -140,24 +141,64 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
140 141
141 @Override 142 @Override
142 public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) { 143 public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
  144 + long startTime = 0L;
  145 + long endTime = 0L;
143 if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { 146 if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
144 EntityView entityView = entityViewService.findEntityViewById(new EntityViewId(entityId.getId())); 147 EntityView entityView = entityViewService.findEntityViewById(new EntityViewId(entityId.getId()));
145 entityId = entityView.getEntityId(); 148 entityId = entityView.getEntityId();
  149 + startTime = entityView.getStartTimeMs();
  150 + endTime = entityView.getEndTimeMs();
  151 + sub = getUpdatedSubscriptionState(entityId, sub, entityView);
146 } 152 }
147 Optional<ServerAddress> server = routingService.resolveById(entityId); 153 Optional<ServerAddress> server = routingService.resolveById(entityId);
148 Subscription subscription; 154 Subscription subscription;
149 if (server.isPresent()) { 155 if (server.isPresent()) {
150 ServerAddress address = server.get(); 156 ServerAddress address = server.get();
151 log.trace("[{}] Forwarding subscription [{}] for [{}] entity [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId, address); 157 log.trace("[{}] Forwarding subscription [{}] for [{}] entity [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId, address);
152 - subscription = new Subscription(sub, true, address); 158 + subscription = new Subscription(sub, true, address, startTime, endTime);
153 tellNewSubscription(address, sessionId, subscription); 159 tellNewSubscription(address, sessionId, subscription);
154 } else { 160 } else {
155 log.trace("[{}] Registering local subscription [{}] for [{}] entity [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId); 161 log.trace("[{}] Registering local subscription [{}] for [{}] entity [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId);
156 - subscription = new Subscription(sub, true); 162 + subscription = new Subscription(sub, true, null, startTime, endTime);
157 } 163 }
158 registerSubscription(sessionId, entityId, subscription); 164 registerSubscription(sessionId, entityId, subscription);
159 } 165 }
160 166
  167 + private SubscriptionState getUpdatedSubscriptionState(EntityId entityId, SubscriptionState sub, EntityView entityView) {
  168 + boolean allKeys;
  169 + Map<String, Long> keyStates;
  170 + if (sub.getType().equals(TelemetryFeature.TIMESERIES) && !entityView.getKeys().getTimeseries().isEmpty()) {
  171 + allKeys = false;
  172 + keyStates = sub.getKeyStates().entrySet()
  173 + .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
  174 + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  175 + } else if (sub.getType().equals(TelemetryFeature.ATTRIBUTES)) {
  176 + if (sub.getScope().equals(DataConstants.CLIENT_SCOPE) && !entityView.getKeys().getAttributes().getCs().isEmpty()) {
  177 + allKeys = false;
  178 + keyStates = filterMap(sub, entityView.getKeys().getAttributes().getCs());
  179 + } else if (sub.getScope().equals(DataConstants.SERVER_SCOPE) && !entityView.getKeys().getAttributes().getSs().isEmpty()) {
  180 + allKeys = false;
  181 + keyStates = filterMap(sub, entityView.getKeys().getAttributes().getSs());
  182 + } else if (sub.getScope().equals(DataConstants.SERVER_SCOPE) && !entityView.getKeys().getAttributes().getSh().isEmpty()) {
  183 + allKeys = false;
  184 + keyStates = filterMap(sub, entityView.getKeys().getAttributes().getSh());
  185 + } else {
  186 + allKeys = sub.isAllKeys();
  187 + keyStates = sub.getKeyStates();
  188 + }
  189 + } else {
  190 + allKeys = sub.isAllKeys();
  191 + keyStates = sub.getKeyStates();
  192 + }
  193 + return new SubscriptionState(sub.getWsSessionId(), sub.getSubscriptionId(), entityId, sub.getType(), allKeys, keyStates, sub.getScope());
  194 + }
  195 +
  196 + private Map<String, Long> filterMap(SubscriptionState sub, List<String> allowedKeys) {
  197 + return sub.getKeyStates().entrySet()
  198 + .stream().filter(entry -> allowedKeys.contains(entry.getKey()))
  199 + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  200 + }
  201 +
161 @Override 202 @Override
162 public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) { 203 public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
163 cleanupLocalWsSessionSubscriptions(sessionId); 204 cleanupLocalWsSessionSubscriptions(sessionId);
@@ -426,7 +467,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -426,7 +467,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
426 onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> { 467 onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
427 List<TsKvEntry> subscriptionUpdate = null; 468 List<TsKvEntry> subscriptionUpdate = null;
428 for (AttributeKvEntry kv : attributes) { 469 for (AttributeKvEntry kv : attributes) {
429 - if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { 470 + if (isInTimeRange(s, kv.getLastUpdateTs()) && (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey()))) {
430 if (subscriptionUpdate == null) { 471 if (subscriptionUpdate == null) {
431 subscriptionUpdate = new ArrayList<>(); 472 subscriptionUpdate = new ArrayList<>();
432 } 473 }
@@ -441,7 +482,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -441,7 +482,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
441 onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> { 482 onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> {
442 List<TsKvEntry> subscriptionUpdate = null; 483 List<TsKvEntry> subscriptionUpdate = null;
443 for (TsKvEntry kv : ts) { 484 for (TsKvEntry kv : ts) {
444 - if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) { 485 + if (isInTimeRange(s, kv.getTs()) && (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey())))) {
445 if (subscriptionUpdate == null) { 486 if (subscriptionUpdate == null) {
446 subscriptionUpdate = new ArrayList<>(); 487 subscriptionUpdate = new ArrayList<>();
447 } 488 }
@@ -452,6 +493,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -452,6 +493,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
452 }); 493 });
453 } 494 }
454 495
  496 + private boolean isInTimeRange(Subscription subscription, long kvTime) {
  497 + return (subscription.getStartTime() == 0 || subscription.getStartTime() <= kvTime)
  498 + && (subscription.getEndTime() == 0 || subscription.getEndTime() >= kvTime);
  499 + }
  500 +
455 private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) { 501 private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
456 Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId); 502 Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
457 if (deviceSubscriptions != null) { 503 if (deviceSubscriptions != null) {
@@ -33,10 +33,6 @@ public class Subscription { @@ -33,10 +33,6 @@ public class Subscription {
33 private long startTime; 33 private long startTime;
34 private long endTime; 34 private long endTime;
35 35
36 - public Subscription(SubscriptionState sub, boolean local) {  
37 - this(sub, local, null);  
38 - }  
39 -  
40 public Subscription(SubscriptionState sub, boolean local, ServerAddress server) { 36 public Subscription(SubscriptionState sub, boolean local, ServerAddress server) {
41 this(sub, local, server, 0L, 0L); 37 this(sub, local, server, 0L, 0L);
42 } 38 }