Commit 6d8cb480e9fed0527613ae3bf41bd933ab2adbb1

Authored by Andrew Shvayka
1 parent 5e2667d3

Fixed compilation issues

Showing 26 changed files with 532 additions and 164 deletions
... ... @@ -233,21 +233,6 @@ public class ActorSystemContext {
233 233 @Getter
234 234 private final Config config;
235 235
236   - @Getter
237   - private ExecutorService tsCallBackExecutor;
238   -
239   - @PostConstruct
240   - public void initExecutor() {
241   - tsCallBackExecutor = Executors.newSingleThreadExecutor();
242   - }
243   -
244   - @PreDestroy
245   - public void shutdownExecutor() {
246   - if (tsCallBackExecutor != null) {
247   - tsCallBackExecutor.shutdownNow();
248   - }
249   - }
250   -
251 236 public ActorSystemContext() {
252 237 config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
253 238 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -17,16 +17,10 @@ package org.thingsboard.server.actors.ruleChain;
17 17
18 18 import akka.actor.ActorRef;
19 19 import com.google.common.base.Function;
20   -import com.google.common.util.concurrent.FutureCallback;
21   -import com.google.common.util.concurrent.Futures;
22   -import com.google.common.util.concurrent.ListenableFuture;
23 20 import org.thingsboard.rule.engine.api.ListeningExecutor;
24 21 import org.thingsboard.rule.engine.api.TbContext;
25 22 import org.thingsboard.server.actors.ActorSystemContext;
26   -import org.thingsboard.server.common.data.id.EntityId;
27 23 import org.thingsboard.server.common.data.id.RuleNodeId;
28   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
29   -import org.thingsboard.server.common.data.kv.TsKvEntry;
30 24 import org.thingsboard.server.common.msg.TbMsg;
31 25 import org.thingsboard.server.common.msg.cluster.ServerAddress;
32 26 import org.thingsboard.server.dao.alarm.AlarmService;
... ... @@ -41,7 +35,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
41 35 import org.thingsboard.server.dao.user.UserService;
42 36 import scala.concurrent.duration.Duration;
43 37
44   -import javax.annotation.Nullable;
45 38 import java.util.List;
46 39 import java.util.Set;
47 40 import java.util.concurrent.TimeUnit;
... ... @@ -122,33 +115,6 @@ class DefaultTbContext implements TbContext {
122 115 }
123 116
124 117 @Override
125   - public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
126   - saveAndNotify(entityId, ts, 0L, callback);
127   - }
128   -
129   - @Override
130   - public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
131   - ListenableFuture<List<Void>> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl);
132   - Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
133   - @Override
134   - public void onSuccess(@Nullable List<Void> result) {
135   - mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts);
136   - callback.onSuccess(null);
137   - }
138   -
139   - @Override
140   - public void onFailure(Throwable t) {
141   - callback.onFailure(t);
142   - }
143   - }, mainCtx.getTsCallBackExecutor());
144   - }
145   -
146   - @Override
147   - public void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
148   -
149   - }
150   -
151   - @Override
152 118 public ListeningExecutor getJsExecutor() {
153 119 return mainCtx.getJsExecutor();
154 120 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -89,15 +89,15 @@ import java.util.stream.Collectors;
89 89 public class TelemetryController extends BaseController {
90 90
91 91 @Autowired
92   - private TelemetrySubscriptionService subscriptionService;
93   -
94   - @Autowired
95 92 private AttributesService attributesService;
96 93
97 94 @Autowired
98 95 private TimeseriesService tsService;
99 96
100 97 @Autowired
  98 + private TelemetrySubscriptionService tsSubService;
  99 +
  100 + @Autowired
101 101 private AccessValidator accessValidator;
102 102
103 103 private ExecutorService executor;
... ... @@ -312,13 +312,11 @@ public class TelemetryController extends BaseController {
312 312 }
313 313 SecurityUser user = getCurrentUser();
314 314 return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
315   - ListenableFuture<List<Void>> future = attributesService.save(entityId, scope, attributes);
316   - Futures.addCallback(future, new FutureCallback<List<Void>>() {
  315 + tsSubService.saveAndNotify(entityId, scope, attributes, new FutureCallback<Void>() {
317 316 @Override
318   - public void onSuccess(@Nullable List<Void> tmp) {
  317 + public void onSuccess(@Nullable Void tmp) {
319 318 logAttributesUpdated(user, entityId, scope, attributes, null);
320 319 result.setResult(new ResponseEntity(HttpStatus.OK));
321   - subscriptionService.onAttributesUpdateFromServer(entityId, scope, attributes);
322 320 }
323 321
324 322 @Override
... ... @@ -327,7 +325,6 @@ public class TelemetryController extends BaseController {
327 325 AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
328 326 }
329 327 });
330   - result.setResult(new ResponseEntity(HttpStatus.OK));
331 328 });
332 329 } else {
333 330 return getImmediateDeferredResult("Request is not a JSON object", HttpStatus.BAD_REQUEST);
... ... @@ -358,12 +355,10 @@ public class TelemetryController extends BaseController {
358 355 }
359 356 SecurityUser user = getCurrentUser();
360 357 return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
361   - ListenableFuture<List<Void>> future = tsService.save(entityId, entries, ttl);
362   - Futures.addCallback(future, new FutureCallback<List<Void>>() {
  358 + tsSubService.saveAndNotify(entityId, entries, ttl, new FutureCallback<Void>() {
363 359 @Override
364   - public void onSuccess(@Nullable List<Void> tmp) {
  360 + public void onSuccess(@Nullable Void tmp) {
365 361 result.setResult(new ResponseEntity(HttpStatus.OK));
366   - subscriptionService.onTimeseriesUpdateFromServer(entityId, entries);
367 362 }
368 363
369 364 @Override
... ... @@ -371,7 +366,6 @@ public class TelemetryController extends BaseController {
371 366 AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
372 367 }
373 368 });
374   - result.setResult(new ResponseEntity(HttpStatus.OK));
375 369 });
376 370 }
377 371
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.security;
2 17
3 18 import com.google.common.base.Function;
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.security;
2 17
3 18 import com.google.common.util.concurrent.FutureCallback;
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
3 21 import lombok.extern.slf4j.Slf4j;
4 22 import org.springframework.beans.factory.annotation.Autowired;
5 23 import org.springframework.stereotype.Service;
  24 +import org.springframework.util.StringUtils;
6 25 import org.thingsboard.server.common.data.id.EntityId;
7 26 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
8   -import org.thingsboard.server.common.data.kv.KvEntry;
  27 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
9 28 import org.thingsboard.server.common.data.kv.TsKvEntry;
  29 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  30 +import org.thingsboard.server.dao.attributes.AttributesService;
  31 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  32 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
10 33 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
11 34 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
  35 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
  36 +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
12 37
  38 +import javax.annotation.Nullable;
  39 +import javax.annotation.PostConstruct;
  40 +import javax.annotation.PreDestroy;
  41 +import java.util.ArrayList;
13 42 import java.util.HashMap;
  43 +import java.util.HashSet;
14 44 import java.util.List;
15 45 import java.util.Map;
  46 +import java.util.Optional;
16 47 import java.util.Set;
  48 +import java.util.concurrent.ExecutorService;
  49 +import java.util.concurrent.Executors;
  50 +import java.util.function.Consumer;
  51 +import java.util.function.Function;
  52 +import java.util.function.Predicate;
17 53
18 54 /**
19 55 * Created by ashvayka on 27.03.18.
... ... @@ -25,42 +61,275 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
25 61 @Autowired
26 62 private TelemetryWebSocketService wsService;
27 63
  64 + @Autowired
  65 + private AttributesService attrService;
  66 +
  67 + @Autowired
  68 + private TimeseriesService tsService;
  69 +
  70 + @Autowired
  71 + private ClusterRoutingService routingService;
  72 +
  73 + private ExecutorService tsCallBackExecutor;
  74 + private ExecutorService wsCallBackExecutor;
  75 +
  76 + @PostConstruct
  77 + public void initExecutor() {
  78 + tsCallBackExecutor = Executors.newSingleThreadExecutor();
  79 + wsCallBackExecutor = Executors.newSingleThreadExecutor();
  80 + }
  81 +
  82 + @PreDestroy
  83 + public void shutdownExecutor() {
  84 + if (tsCallBackExecutor != null) {
  85 + tsCallBackExecutor.shutdownNow();
  86 + }
  87 + if (wsCallBackExecutor != null) {
  88 + wsCallBackExecutor.shutdownNow();
  89 + }
  90 + }
  91 +
28 92 private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
29 93
30 94 private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
31 95
32 96 @Override
33   - public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
34   -
  97 + public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
  98 + Optional<ServerAddress> server = routingService.resolveById(entityId);
  99 + Subscription subscription;
  100 + if (server.isPresent()) {
  101 + ServerAddress address = server.get();
  102 + log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
  103 + subscription = new Subscription(sub, true, address);
  104 +// rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
  105 + } else {
  106 + log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
  107 + subscription = new Subscription(sub, true);
  108 + }
  109 + registerSubscription(sessionId, entityId, subscription);
35 110 }
36 111
37 112 @Override
38   - public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
  113 + public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
  114 + cleanupLocalWsSessionSubscriptions(sessionId);
  115 + }
39 116
  117 + @Override
  118 + public void removeSubscription(String sessionId, int subscriptionId) {
  119 + log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
  120 + Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
  121 + if (sessionSubscriptions != null) {
  122 + Subscription subscription = sessionSubscriptions.remove(subscriptionId);
  123 + if (subscription != null) {
  124 + processSubscriptionRemoval(sessionId, sessionSubscriptions, subscription);
  125 + } else {
  126 + log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
  127 + }
  128 + } else {
  129 + log.debug("[{}] No session subscriptions found!", sessionId);
  130 + }
40 131 }
41 132
42 133 @Override
43   - public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
  134 + public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
  135 + saveAndNotify(entityId, ts, 0L, callback);
  136 + }
44 137
  138 + @Override
  139 + public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
  140 + ListenableFuture<List<Void>> saveFuture = tsService.save(entityId, ts, ttl);
  141 + addMainCallback(saveFuture, callback);
  142 + addWsCallback(saveFuture, success -> onTimeseriesUpdate(entityId, ts));
45 143 }
46 144
47 145 @Override
48   - public void removeSubscription(String sessionId, int cmdId) {
  146 + public void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
  147 + ListenableFuture<List<Void>> saveFuture = attrService.save(entityId, scope, attributes);
  148 + addMainCallback(saveFuture, callback);
  149 + addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
  150 + }
49 151
  152 + private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
  153 + Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
  154 + if (!serverAddress.isPresent()) {
  155 + onLocalAttributesUpdate(entityId, scope, attributes);
  156 + } else {
  157 +// rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
  158 + }
50 159 }
51 160
52   - @Override
53   - public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
  161 + private void onTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
  162 + Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
  163 + if (!serverAddress.isPresent()) {
  164 + onLocalTimeseriesUpdate(entityId, ts);
  165 + } else {
  166 +// rpcHandler.onTimeseriesUpdate(ctx, serverAddress.get(), entityId, entries);
  167 + }
  168 + }
54 169
  170 + private void onLocalAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
  171 + onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
  172 + List<TsKvEntry> subscriptionUpdate = null;
  173 + for (AttributeKvEntry kv : attributes) {
  174 + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
  175 + if (subscriptionUpdate == null) {
  176 + subscriptionUpdate = new ArrayList<>();
  177 + }
  178 + subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
  179 + }
  180 + }
  181 + return subscriptionUpdate;
  182 + });
55 183 }
56 184
57   - @Override
58   - public void onLocalTimeseriesUpdate(EntityId entityId, Map<Long, List<KvEntry>> ts) {
  185 + private void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts) {
  186 + onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> {
  187 + List<TsKvEntry> subscriptionUpdate = null;
  188 + for (TsKvEntry kv : ts) {
  189 + if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
  190 + if (subscriptionUpdate == null) {
  191 + subscriptionUpdate = new ArrayList<>();
  192 + }
  193 + subscriptionUpdate.add(kv);
  194 + }
  195 + }
  196 + return subscriptionUpdate;
  197 + });
  198 + }
59 199
  200 + private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
  201 + Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
  202 + if (deviceSubscriptions != null) {
  203 + deviceSubscriptions.stream().filter(filter).forEach(s -> {
  204 + String sessionId = s.getWsSessionId();
  205 + List<TsKvEntry> subscriptionUpdate = f.apply(s);
  206 + if (subscriptionUpdate == null || !subscriptionUpdate.isEmpty()) {
  207 + SubscriptionUpdate update = new SubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
  208 + if (s.isLocal()) {
  209 + updateSubscriptionState(sessionId, s, update);
  210 + wsService.sendWsMsg(sessionId, update);
  211 + } else {
  212 + //TODO: ashvayka
  213 +// rpcHandler.onSubscriptionUpdate(ctx, s.getServer(), sessionId, update);
  214 + }
  215 + }
  216 + });
  217 + } else {
  218 + log.debug("[{}] No device subscriptions to process!", entityId);
  219 + }
60 220 }
61 221
62   - @Override
63   - public void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes) {
  222 + private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
  223 + log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
  224 + update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
  225 + }
  226 +
  227 + private void registerSubscription(String sessionId, EntityId entityId, Subscription subscription) {
  228 + Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.computeIfAbsent(entityId, k -> new HashSet<>());
  229 + deviceSubscriptions.add(subscription);
  230 + Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.computeIfAbsent(sessionId, k -> new HashMap<>());
  231 + sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
  232 + }
  233 +
  234 + public void cleanupLocalWsSessionSubscriptions(String sessionId) {
  235 + cleanupWsSessionSubscriptions(sessionId, true);
  236 + }
  237 +
  238 + public void cleanupRemoteWsSessionSubscriptions(String sessionId) {
  239 + cleanupWsSessionSubscriptions(sessionId, false);
  240 + }
  241 +
  242 + private void cleanupWsSessionSubscriptions(String sessionId, boolean localSession) {
  243 + log.debug("[{}] Removing all subscriptions for particular session.", sessionId);
  244 + Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
  245 + if (sessionSubscriptions != null) {
  246 + int sessionSubscriptionSize = sessionSubscriptions.size();
  247 +
  248 + for (Subscription subscription : sessionSubscriptions.values()) {
  249 + EntityId entityId = subscription.getEntityId();
  250 + Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
  251 + deviceSubscriptions.remove(subscription);
  252 + if (deviceSubscriptions.isEmpty()) {
  253 + subscriptionsByEntityId.remove(entityId);
  254 + }
  255 + }
  256 + subscriptionsByWsSessionId.remove(sessionId);
  257 + log.debug("[{}] Removed {} subscriptions for particular session.", sessionId, sessionSubscriptionSize);
  258 +
  259 + if (localSession) {
  260 + notifyWsSubscriptionClosed(sessionId, sessionSubscriptions);
  261 + }
  262 + } else {
  263 + log.debug("[{}] No subscriptions found!", sessionId);
  264 + }
  265 + }
  266 +
  267 + private void notifyWsSubscriptionClosed(String sessionId, Map<Integer, Subscription> sessionSubscriptions) {
  268 + Set<ServerAddress> affectedServers = new HashSet<>();
  269 + for (Subscription subscription : sessionSubscriptions.values()) {
  270 + if (subscription.getServer() != null) {
  271 + affectedServers.add(subscription.getServer());
  272 + }
  273 + }
  274 + for (ServerAddress address : affectedServers) {
  275 + log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address);
  276 +// rpcHandler.onSessionClose(ctx, address, sessionId);
  277 + }
  278 + }
  279 +
  280 + private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) {
  281 + EntityId entityId = subscription.getEntityId();
  282 + if (subscription.isLocal() && subscription.getServer() != null) {
  283 +// rpcHandler.onSubscriptionClose(ctx, subscription.getServer(), sessionId, subscription.getSubscriptionId());
  284 + }
  285 + if (sessionSubscriptions.isEmpty()) {
  286 + log.debug("[{}] Removed last subscription for particular session.", sessionId);
  287 + subscriptionsByWsSessionId.remove(sessionId);
  288 + } else {
  289 + log.debug("[{}] Removed session subscription.", sessionId);
  290 + }
  291 + Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
  292 + if (deviceSubscriptions != null) {
  293 + boolean result = deviceSubscriptions.remove(subscription);
  294 + if (result) {
  295 + if (deviceSubscriptions.size() == 0) {
  296 + log.debug("[{}] Removed last subscription for particular device.", sessionId);
  297 + subscriptionsByEntityId.remove(entityId);
  298 + } else {
  299 + log.debug("[{}] Removed device subscription.", sessionId);
  300 + }
  301 + } else {
  302 + log.debug("[{}] Subscription not found!", sessionId);
  303 + }
  304 + } else {
  305 + log.debug("[{}] No device subscriptions found!", sessionId);
  306 + }
  307 + }
  308 +
  309 + private void addMainCallback(ListenableFuture<List<Void>> saveFuture, final FutureCallback<Void> callback) {
  310 + Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
  311 + @Override
  312 + public void onSuccess(@Nullable List<Void> result) {
  313 + callback.onSuccess(null);
  314 + }
  315 +
  316 + @Override
  317 + public void onFailure(Throwable t) {
  318 + callback.onFailure(t);
  319 + }
  320 + }, tsCallBackExecutor);
  321 + }
  322 +
  323 + private void addWsCallback(ListenableFuture<List<Void>> saveFuture, Consumer<Void> callback) {
  324 + Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
  325 + @Override
  326 + public void onSuccess(@Nullable List<Void> result) {
  327 + callback.accept(null);
  328 + }
64 329
  330 + @Override
  331 + public void onFailure(Throwable t) {
  332 + }
  333 + }, wsCallBackExecutor);
65 334 }
66 335 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
3 18 import com.fasterxml.jackson.core.JsonProcessingException;
... ... @@ -24,6 +39,7 @@ import org.thingsboard.server.common.data.kv.TsKvQuery;
24 39 import org.thingsboard.server.dao.attributes.AttributesService;
25 40 import org.thingsboard.server.dao.timeseries.TimeseriesService;
26 41 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
  42 +import org.thingsboard.server.extensions.api.plugins.PluginContext;
27 43 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
28 44 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
29 45 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
... ... @@ -31,9 +47,9 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionC
31 47 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
32 48 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
33 49 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
  50 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
34 51 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
35 52 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
36   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
37 53 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
38 54 import org.thingsboard.server.service.security.AccessValidator;
39 55
... ... @@ -146,6 +162,14 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
146 162 }
147 163 }
148 164
  165 + @Override
  166 + public void sendWsMsg(String sessionId, SubscriptionUpdate update) {
  167 + WsSessionMetaData md = wsSessionsMap.get(sessionId);
  168 + if (md != null) {
  169 + sendWsMsg(md.getSessionRef(), update);
  170 + }
  171 + }
  172 +
149 173 private void handleWsAttributesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
150 174 String sessionId = sessionRef.getSessionId();
151 175 log.debug("[{}] Processing: {}", sessionId, cmd);
... ... @@ -180,7 +204,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
180 204 keys.forEach(key -> subState.put(key, 0L));
181 205 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
182 206
183   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
  207 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
184 208 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
185 209 }
186 210
... ... @@ -267,7 +291,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
267 291 Map<String, Long> subState = new HashMap<>(attributesData.size());
268 292 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
269 293
270   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
  294 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
271 295 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
272 296 }
273 297
... ... @@ -340,7 +364,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
340 364 sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
341 365 Map<String, Long> subState = new HashMap<>(data.size());
342 366 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
343   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
  367 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
344 368 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
345 369 }
346 370
... ... @@ -370,7 +394,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
370 394 Map<String, Long> subState = new HashMap<>(keys.size());
371 395 keys.forEach(key -> subState.put(key, startTs));
372 396 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
373   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
  397 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
374 398 subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
375 399 }
376 400
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
  18 +import com.google.common.util.concurrent.FutureCallback;
3 19 import org.thingsboard.server.common.data.id.EntityId;
4 20 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
5 21 import org.thingsboard.server.common.data.kv.KvEntry;
... ... @@ -15,17 +31,15 @@ import java.util.Set;
15 31 */
16 32 public interface TelemetrySubscriptionService {
17 33
18   - void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
19   -
20   - void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries);
  34 + void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
21 35
22 36 void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
23 37
24 38 void removeSubscription(String sessionId, int cmdId);
25 39
26   - void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
  40 + void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
27 41
28   - void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts);
  42 + void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
29 43
30   - void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes);
  44 + void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
31 45 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
3 18 import java.io.IOException;
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
3 18 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
  19 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
4 20
5 21 /**
6 22 * Created by ashvayka on 27.03.18.
... ... @@ -10,4 +26,6 @@ public interface TelemetryWebSocketService {
10 26 void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent sessionEvent);
11 27
12 28 void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
  29 +
  30 + void sendWsMsg(String sessionId, SubscriptionUpdate update);
13 31 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
3 18 import lombok.Getter;
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
3 18 import lombok.Data;
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.telemetry;
2 17
3 18 import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
... ...
... ... @@ -19,20 +19,30 @@ import lombok.Setter;
19 19 import lombok.extern.slf4j.Slf4j;
20 20 import org.springframework.util.StringUtils;
21 21 import org.thingsboard.server.common.data.DataConstants;
22   -import org.thingsboard.server.common.data.id.DeviceId;
23 22 import org.thingsboard.server.common.data.id.EntityId;
24   -import org.thingsboard.server.common.data.kv.*;
  23 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  24 +import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  25 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  26 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  27 +import org.thingsboard.server.common.data.kv.TsKvQuery;
25 28 import org.thingsboard.server.common.msg.cluster.ServerAddress;
26 29 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
27 30 import org.thingsboard.server.extensions.api.plugins.PluginContext;
  31 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
28 32 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler;
29 33 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler;
30 34 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
31 35 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
32   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
33 36 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
34 37
35   -import java.util.*;
  38 +import java.util.ArrayList;
  39 +import java.util.HashMap;
  40 +import java.util.HashSet;
  41 +import java.util.Iterator;
  42 +import java.util.List;
  43 +import java.util.Map;
  44 +import java.util.Optional;
  45 +import java.util.Set;
36 46 import java.util.function.Function;
37 47 import java.util.function.Predicate;
38 48
... ... @@ -70,7 +80,7 @@ public class SubscriptionManager {
70 80 EntityId entityId = subscription.getEntityId();
71 81 log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
72 82 registerSubscription(sessionId, entityId, subscription);
73   - if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
  83 + if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
74 84 final Map<String, Long> keyStates = subscription.getKeyStates();
75 85 ctx.loadAttributes(entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback<List<AttributeKvEntry>>() {
76 86 @Override
... ... @@ -91,7 +101,7 @@ public class SubscriptionManager {
91 101 log.error("Failed to fetch missed updates.", e);
92 102 }
93 103 });
94   - } else if (subscription.getType() == SubscriptionType.TIMESERIES) {
  104 + } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
95 105 long curTs = System.currentTimeMillis();
96 106 List<TsKvQuery> queries = new ArrayList<>();
97 107 subscription.getKeyStates().entrySet().forEach(e -> {
... ... @@ -175,7 +185,7 @@ public class SubscriptionManager {
175 185 }
176 186 }
177 187
178   - public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
  188 + public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, TelemetryFeature type, Function<Subscription, List<TsKvEntry>> f) {
179 189 onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f);
180 190 }
181 191
... ... @@ -212,7 +222,7 @@ public class SubscriptionManager {
212 222 public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
213 223 Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
214 224 if (!serverAddress.isPresent()) {
215   - onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
  225 + onLocalSubscriptionUpdate(ctx, entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
216 226 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
217 227 for (AttributeKvEntry kv : attributes) {
218 228 if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
... ... @@ -229,7 +239,7 @@ public class SubscriptionManager {
229 239 public void onTimeseriesUpdateFromServer(PluginContext ctx, EntityId entityId, List<TsKvEntry> entries) {
230 240 Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
231 241 if (!serverAddress.isPresent()) {
232   - onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.TIMESERIES, s -> {
  242 + onLocalSubscriptionUpdate(ctx, entityId, TelemetryFeature.TIMESERIES, s -> {
233 243 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
234 244 for (TsKvEntry kv : entries) {
235 245 if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
... ...
... ... @@ -16,7 +16,7 @@
16 16 package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
17 17
18 18 import lombok.NoArgsConstructor;
19   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
  19 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
20 20
21 21 /**
22 22 * @author Andrew Shvayka
... ... @@ -25,8 +25,8 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
25 25 public class AttributesSubscriptionCmd extends SubscriptionCmd {
26 26
27 27 @Override
28   - public SubscriptionType getType() {
29   - return SubscriptionType.ATTRIBUTES;
  28 + public TelemetryFeature getType() {
  29 + return TelemetryFeature.ATTRIBUTES;
30 30 }
31 31
32 32 }
... ...
... ... @@ -18,7 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
18 18 import lombok.AllArgsConstructor;
19 19 import lombok.Data;
20 20 import lombok.NoArgsConstructor;
21   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
  21 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
22 22
23 23 @NoArgsConstructor
24 24 @AllArgsConstructor
... ... @@ -32,7 +32,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
32 32 private String scope;
33 33 private boolean unsubscribe;
34 34
35   - public abstract SubscriptionType getType();
  35 + public abstract TelemetryFeature getType();
36 36
37 37 @Override
38 38 public String toString() {
... ...
... ... @@ -18,7 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
18 18 import lombok.AllArgsConstructor;
19 19 import lombok.Data;
20 20 import lombok.NoArgsConstructor;
21   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
  21 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
22 22
23 23 /**
24 24 * @author Andrew Shvayka
... ... @@ -35,7 +35,7 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
35 35 private String agg;
36 36
37 37 @Override
38   - public SubscriptionType getType() {
39   - return SubscriptionType.TIMESERIES;
  38 + public TelemetryFeature getType() {
  39 + return TelemetryFeature.TIMESERIES;
40 40 }
41 41 }
... ...
... ... @@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
114 114 }
115 115 Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
116 116 Subscription subscription = new Subscription(
117   - new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
  117 + new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
118 118 false, msg.getServerAddress());
119 119 subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
120 120 }
... ...
... ... @@ -24,7 +24,11 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
24 24 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
25 25 import org.thingsboard.server.common.data.kv.KvEntry;
26 26 import org.thingsboard.server.common.data.kv.TsKvEntry;
27   -import org.thingsboard.server.common.msg.core.*;
  27 +import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
  28 +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
  29 +import org.thingsboard.server.common.msg.core.GetAttributesRequest;
  30 +import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
  31 +import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
28 32 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
29 33 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
30 34 import org.thingsboard.server.extensions.api.plugins.PluginContext;
... ... @@ -35,9 +39,13 @@ import org.thingsboard.server.extensions.api.plugins.msg.TelemetryUploadRequestR
35 39 import org.thingsboard.server.extensions.api.plugins.msg.UpdateAttributesRequestRuleToPluginMsg;
36 40 import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
37 41 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
38   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
39 42
40   -import java.util.*;
  43 +import java.util.ArrayList;
  44 +import java.util.Collections;
  45 +import java.util.List;
  46 +import java.util.Map;
  47 +import java.util.Optional;
  48 +import java.util.Set;
41 49 import java.util.stream.Collectors;
42 50
43 51 @Slf4j
... ... @@ -97,7 +105,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
97 105 @Override
98 106 public void onSuccess(PluginContext ctx, Void data) {
99 107 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
100   - subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), SubscriptionType.TIMESERIES, s ->
  108 + subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), TelemetryFeature.TIMESERIES, s ->
101 109 prepareSubscriptionUpdate(request, s)
102 110 );
103 111 }
... ... @@ -131,7 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
131 139 public void onSuccess(PluginContext ctx, Void value) {
132 140 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
133 141
134   - subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), SubscriptionType.ATTRIBUTES, s -> {
  142 + subscriptionManager.onLocalSubscriptionUpdate(ctx, msg.getDeviceId(), TelemetryFeature.ATTRIBUTES, s -> {
135 143 List<TsKvEntry> subscriptionUpdate = new ArrayList<>();
136 144 for (AttributeKvEntry kv : request.getAttributes()) {
137 145 if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
... ...
... ... @@ -21,7 +21,12 @@ import org.springframework.util.StringUtils;
21 21 import org.thingsboard.server.common.data.DataConstants;
22 22 import org.thingsboard.server.common.data.id.EntityId;
23 23 import org.thingsboard.server.common.data.id.EntityIdFactory;
24   -import org.thingsboard.server.common.data.kv.*;
  24 +import org.thingsboard.server.common.data.kv.Aggregation;
  25 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  26 +import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  27 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  28 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  29 +import org.thingsboard.server.common.data.kv.TsKvQuery;
25 30 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
26 31 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
27 32 import org.thingsboard.server.extensions.api.plugins.PluginContext;
... ... @@ -32,14 +37,26 @@ import org.thingsboard.server.extensions.api.plugins.ws.msg.BinaryPluginWebSocke
32 37 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
33 38 import org.thingsboard.server.extensions.api.plugins.ws.msg.TextPluginWebSocketMsg;
34 39 import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
35   -import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.*;
  40 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
  41 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
  42 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
  43 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
  44 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
  45 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
36 46 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
37 47 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
38   -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
39 48 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
40 49
41 50 import java.io.IOException;
42   -import java.util.*;
  51 +import java.util.ArrayList;
  52 +import java.util.Arrays;
  53 +import java.util.Collections;
  54 +import java.util.HashMap;
  55 +import java.util.HashSet;
  56 +import java.util.List;
  57 +import java.util.Map;
  58 +import java.util.Optional;
  59 +import java.util.Set;
43 60 import java.util.stream.Collectors;
44 61
45 62 /**
... ... @@ -131,7 +148,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
131 148 keys.forEach(key -> subState.put(key, 0L));
132 149 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
133 150
134   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
  151 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, false, subState, cmd.getScope());
135 152 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
136 153 }
137 154
... ... @@ -168,7 +185,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
168 185 Map<String, Long> subState = new HashMap<>(attributesData.size());
169 186 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
170 187
171   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
  188 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.ATTRIBUTES, true, subState, cmd.getScope());
172 189 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
173 190 }
174 191
... ... @@ -234,7 +251,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
234 251 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
235 252 Map<String, Long> subState = new HashMap<>(data.size());
236 253 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
237   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
  254 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, true, subState, cmd.getScope());
238 255 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
239 256 }
240 257
... ... @@ -262,7 +279,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
262 279 Map<String, Long> subState = new HashMap<>(keys.size());
263 280 keys.forEach(key -> subState.put(key, startTs));
264 281 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
265   - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
  282 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, TelemetryFeature.TIMESERIES, false, subState, cmd.getScope());
266 283 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
267 284 }
268 285
... ...
... ... @@ -20,6 +20,7 @@ import lombok.Data;
20 20 import org.thingsboard.server.common.data.id.DeviceId;
21 21 import org.thingsboard.server.common.data.id.EntityId;
22 22 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  23 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
23 24
24 25 import java.util.Map;
25 26
... ... @@ -47,7 +48,7 @@ public class Subscription {
47 48 return getSub().getEntityId();
48 49 }
49 50
50   - public SubscriptionType getType() {
  51 + public TelemetryFeature getType() {
51 52 return getSub().getType();
52 53 }
53 54
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.sub;
18 18 import lombok.AllArgsConstructor;
19 19 import lombok.Getter;
20 20 import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
21 22
22 23 import java.util.Map;
23 24
... ... @@ -30,7 +31,7 @@ public class SubscriptionState {
30 31 @Getter private final String wsSessionId;
31 32 @Getter private final int subscriptionId;
32 33 @Getter private final EntityId entityId;
33   - @Getter private final SubscriptionType type;
  34 + @Getter private final TelemetryFeature type;
34 35 @Getter private final boolean allKeys;
35 36 @Getter private final Map<String, Long> keyStates;
36 37 @Getter private final String scope;
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.extensions.core.plugin.telemetry.sub;
17   -
18   -/**
19   - * @author Andrew Shvayka
20   - */
21   -public enum SubscriptionType {
22   - ATTRIBUTES, TIMESERIES
23   -}
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -63,12 +63,6 @@ public interface TbContext {
63 63
64 64 void tellError(TbMsg msg, Throwable th);
65 65
66   - void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
67   -
68   - void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
69   -
70   - void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback);
71   -
72 66 RuleNodeId getSelfId();
73 67
74 68 AttributesService getAttributesService();
... ...
... ... @@ -89,7 +89,7 @@ public class TbTransformMsgNodeTest {
89 89
90 90 @Test
91 91 public void payloadCanBeUpdated() throws TbNodeException {
92   - initWithScript("return msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine' ");
  92 + initWithScript("msg.passed = msg.passed * metadata.temp; return msg.bigObj.newProp = 'Ukraine' ");
93 93 TbMsgMetaData metaData = new TbMsgMetaData();
94 94 metaData.putValue("temp", "7");
95 95 metaData.putValue("humidity", "99");
... ...