Commit 5e2667d3dec644d48e4fb45084b0fbf939b8fe2a

Authored by Andrew Shvayka
1 parent b16a02d9

Implementation of Ts Websocket Service

@@ -61,12 +61,17 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService; @@ -61,12 +61,17 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
61 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; 61 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
62 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; 62 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
63 import org.thingsboard.server.service.component.ComponentDiscoveryService; 63 import org.thingsboard.server.service.component.ComponentDiscoveryService;
  64 +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
64 65
  66 +import javax.annotation.PostConstruct;
  67 +import javax.annotation.PreDestroy;
65 import java.io.IOException; 68 import java.io.IOException;
66 import java.io.PrintWriter; 69 import java.io.PrintWriter;
67 import java.io.StringWriter; 70 import java.io.StringWriter;
68 import java.nio.charset.StandardCharsets; 71 import java.nio.charset.StandardCharsets;
69 import java.util.Optional; 72 import java.util.Optional;
  73 +import java.util.concurrent.ExecutorService;
  74 +import java.util.concurrent.Executors;
70 75
71 @Slf4j 76 @Slf4j
72 @Component 77 @Component
@@ -158,6 +163,10 @@ public class ActorSystemContext { @@ -158,6 +163,10 @@ public class ActorSystemContext {
158 163
159 @Autowired 164 @Autowired
160 @Getter 165 @Getter
  166 + private TelemetrySubscriptionService tsSubService;
  167 +
  168 + @Autowired
  169 + @Getter
161 @Setter 170 @Setter
162 private PluginWebSocketMsgEndpoint wsMsgEndpoint; 171 private PluginWebSocketMsgEndpoint wsMsgEndpoint;
163 172
@@ -224,6 +233,21 @@ public class ActorSystemContext { @@ -224,6 +233,21 @@ public class ActorSystemContext {
224 @Getter 233 @Getter
225 private final Config config; 234 private final Config config;
226 235
  236 + @Getter
  237 + private ExecutorService tsCallBackExecutor;
  238 +
  239 + @PostConstruct
  240 + public void initExecutor() {
  241 + tsCallBackExecutor = Executors.newSingleThreadExecutor();
  242 + }
  243 +
  244 + @PreDestroy
  245 + public void shutdownExecutor() {
  246 + if (tsCallBackExecutor != null) {
  247 + tsCallBackExecutor.shutdownNow();
  248 + }
  249 + }
  250 +
227 public ActorSystemContext() { 251 public ActorSystemContext() {
228 config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); 252 config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
229 } 253 }
@@ -345,7 +369,7 @@ public class ActorSystemContext { @@ -345,7 +369,7 @@ public class ActorSystemContext {
345 return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); 369 return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
346 } 370 }
347 371
348 - public ListeningExecutor getExecutor() { 372 + public ListeningExecutor getJsExecutor() {
349 //TODO: take thread count from yml. 373 //TODO: take thread count from yml.
350 return new JsExecutorService(1); 374 return new JsExecutorService(1);
351 } 375 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,12 +15,18 @@ @@ -15,12 +15,18 @@
15 */ 15 */
16 package org.thingsboard.server.actors.ruleChain; 16 package org.thingsboard.server.actors.ruleChain;
17 17
18 -import akka.actor.ActorContext;  
19 import akka.actor.ActorRef; 18 import akka.actor.ActorRef;
  19 +import com.google.common.base.Function;
  20 +import com.google.common.util.concurrent.FutureCallback;
  21 +import com.google.common.util.concurrent.Futures;
  22 +import com.google.common.util.concurrent.ListenableFuture;
20 import org.thingsboard.rule.engine.api.ListeningExecutor; 23 import org.thingsboard.rule.engine.api.ListeningExecutor;
21 import org.thingsboard.rule.engine.api.TbContext; 24 import org.thingsboard.rule.engine.api.TbContext;
22 import org.thingsboard.server.actors.ActorSystemContext; 25 import org.thingsboard.server.actors.ActorSystemContext;
  26 +import org.thingsboard.server.common.data.id.EntityId;
23 import org.thingsboard.server.common.data.id.RuleNodeId; 27 import org.thingsboard.server.common.data.id.RuleNodeId;
  28 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  29 +import org.thingsboard.server.common.data.kv.TsKvEntry;
24 import org.thingsboard.server.common.msg.TbMsg; 30 import org.thingsboard.server.common.msg.TbMsg;
25 import org.thingsboard.server.common.msg.cluster.ServerAddress; 31 import org.thingsboard.server.common.msg.cluster.ServerAddress;
26 import org.thingsboard.server.dao.alarm.AlarmService; 32 import org.thingsboard.server.dao.alarm.AlarmService;
@@ -35,6 +41,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -35,6 +41,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
35 import org.thingsboard.server.dao.user.UserService; 41 import org.thingsboard.server.dao.user.UserService;
36 import scala.concurrent.duration.Duration; 42 import scala.concurrent.duration.Duration;
37 43
  44 +import javax.annotation.Nullable;
  45 +import java.util.List;
38 import java.util.Set; 46 import java.util.Set;
39 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.TimeUnit;
40 48
@@ -43,6 +51,7 @@ import java.util.concurrent.TimeUnit; @@ -43,6 +51,7 @@ import java.util.concurrent.TimeUnit;
43 */ 51 */
44 class DefaultTbContext implements TbContext { 52 class DefaultTbContext implements TbContext {
45 53
  54 + private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;
46 private final ActorSystemContext mainCtx; 55 private final ActorSystemContext mainCtx;
47 private final RuleNodeCtx nodeCtx; 56 private final RuleNodeCtx nodeCtx;
48 57
@@ -113,8 +122,35 @@ class DefaultTbContext implements TbContext { @@ -113,8 +122,35 @@ class DefaultTbContext implements TbContext {
113 } 122 }
114 123
115 @Override 124 @Override
  125 + public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
  126 + saveAndNotify(entityId, ts, 0L, callback);
  127 + }
  128 +
  129 + @Override
  130 + public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
  131 + ListenableFuture<List<Void>> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl);
  132 + Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
  133 + @Override
  134 + public void onSuccess(@Nullable List<Void> result) {
  135 + mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts);
  136 + callback.onSuccess(null);
  137 + }
  138 +
  139 + @Override
  140 + public void onFailure(Throwable t) {
  141 + callback.onFailure(t);
  142 + }
  143 + }, mainCtx.getTsCallBackExecutor());
  144 + }
  145 +
  146 + @Override
  147 + public void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
  148 +
  149 + }
  150 +
  151 + @Override
116 public ListeningExecutor getJsExecutor() { 152 public ListeningExecutor getJsExecutor() {
117 - return mainCtx.getExecutor(); 153 + return mainCtx.getJsExecutor();
118 } 154 }
119 155
120 @Override 156 @Override
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.controller;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import org.springframework.http.ResponseEntity;
  20 +import org.springframework.web.context.request.async.DeferredResult;
  21 +import org.thingsboard.server.service.security.ValidationCallback;
  22 +
  23 +/**
  24 + * Created by ashvayka on 21.02.17.
  25 + */
  26 +public class HttpValidationCallback extends ValidationCallback<DeferredResult<ResponseEntity>> {
  27 +
  28 + public HttpValidationCallback(DeferredResult<ResponseEntity> response, FutureCallback<DeferredResult<ResponseEntity>> action) {
  29 + super(response, action);
  30 + }
  31 +
  32 +}
@@ -69,6 +69,7 @@ import org.thingsboard.server.service.security.model.SecurityUser; @@ -69,6 +69,7 @@ import org.thingsboard.server.service.security.model.SecurityUser;
69 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; 69 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
70 70
71 import javax.annotation.Nullable; 71 import javax.annotation.Nullable;
  72 +import javax.annotation.PostConstruct;
72 import javax.annotation.PreDestroy; 73 import javax.annotation.PreDestroy;
73 import java.util.ArrayList; 74 import java.util.ArrayList;
74 import java.util.Arrays; 75 import java.util.Arrays;
@@ -101,6 +102,7 @@ public class TelemetryController extends BaseController { @@ -101,6 +102,7 @@ public class TelemetryController extends BaseController {
101 102
102 private ExecutorService executor; 103 private ExecutorService executor;
103 104
  105 + @PostConstruct
104 public void initExecutor() { 106 public void initExecutor() {
105 executor = Executors.newSingleThreadExecutor(); 107 executor = Executors.newSingleThreadExecutor();
106 } 108 }
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
22 import org.thingsboard.server.common.data.id.RuleChainId; 22 import org.thingsboard.server.common.data.id.RuleChainId;
23 import org.thingsboard.server.common.data.id.TenantId; 23 import org.thingsboard.server.common.data.id.TenantId;
24 import org.thingsboard.server.common.data.rule.RuleChain; 24 import org.thingsboard.server.common.data.rule.RuleChain;
25 -import org.thingsboard.server.controller.ValidationCallback; 25 +import org.thingsboard.server.controller.HttpValidationCallback;
26 import org.thingsboard.server.dao.alarm.AlarmService; 26 import org.thingsboard.server.dao.alarm.AlarmService;
27 import org.thingsboard.server.dao.asset.AssetService; 27 import org.thingsboard.server.dao.asset.AssetService;
28 import org.thingsboard.server.dao.customer.CustomerService; 28 import org.thingsboard.server.dao.customer.CustomerService;
@@ -91,7 +91,7 @@ public class AccessValidator { @@ -91,7 +91,7 @@ public class AccessValidator {
91 91
92 final DeferredResult<ResponseEntity> response = new DeferredResult<>(); 92 final DeferredResult<ResponseEntity> response = new DeferredResult<>();
93 93
94 - validate(currentUser, entityId, new ValidationCallback(response, 94 + validate(currentUser, entityId, new HttpValidationCallback(response,
95 new FutureCallback<DeferredResult<ResponseEntity>>() { 95 new FutureCallback<DeferredResult<ResponseEntity>>() {
96 @Override 96 @Override
97 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { 97 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@@ -107,7 +107,7 @@ public class AccessValidator { @@ -107,7 +107,7 @@ public class AccessValidator {
107 return response; 107 return response;
108 } 108 }
109 109
110 - public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { 110 + public <T> void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
111 switch (entityId.getEntityType()) { 111 switch (entityId.getEntityType()) {
112 case DEVICE: 112 case DEVICE:
113 validateDevice(currentUser, entityId, callback); 113 validateDevice(currentUser, entityId, callback);
@@ -130,7 +130,7 @@ public class AccessValidator { @@ -130,7 +130,7 @@ public class AccessValidator {
130 } 130 }
131 } 131 }
132 132
133 - private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { 133 + private void validateDevice(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
134 if (currentUser.isSystemAdmin()) { 134 if (currentUser.isSystemAdmin()) {
135 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); 135 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
136 } else { 136 } else {
@@ -151,7 +151,7 @@ public class AccessValidator { @@ -151,7 +151,7 @@ public class AccessValidator {
151 } 151 }
152 } 152 }
153 153
154 - private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { 154 + private <T> void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
155 if (currentUser.isSystemAdmin()) { 155 if (currentUser.isSystemAdmin()) {
156 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); 156 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
157 } else { 157 } else {
@@ -173,7 +173,7 @@ public class AccessValidator { @@ -173,7 +173,7 @@ public class AccessValidator {
173 } 173 }
174 174
175 175
176 - private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { 176 + private <T> void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
177 if (currentUser.isCustomerUser()) { 177 if (currentUser.isCustomerUser()) {
178 callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); 178 callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
179 } else { 179 } else {
@@ -194,7 +194,7 @@ public class AccessValidator { @@ -194,7 +194,7 @@ public class AccessValidator {
194 } 194 }
195 } 195 }
196 196
197 - private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { 197 + private <T> void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
198 if (currentUser.isSystemAdmin()) { 198 if (currentUser.isSystemAdmin()) {
199 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); 199 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
200 } else { 200 } else {
@@ -215,7 +215,7 @@ public class AccessValidator { @@ -215,7 +215,7 @@ public class AccessValidator {
215 } 215 }
216 } 216 }
217 217
218 - private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { 218 + private <T> void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
219 if (currentUser.isCustomerUser()) { 219 if (currentUser.isCustomerUser()) {
220 callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); 220 callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
221 } else if (currentUser.isSystemAdmin()) { 221 } else if (currentUser.isSystemAdmin()) {
@@ -234,7 +234,7 @@ public class AccessValidator { @@ -234,7 +234,7 @@ public class AccessValidator {
234 } 234 }
235 } 235 }
236 236
237 - private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) { 237 + private <T> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult> transformer) {
238 return new FutureCallback<T>() { 238 return new FutureCallback<T>() {
239 @Override 239 @Override
240 public void onSuccess(@Nullable T result) { 240 public void onSuccess(@Nullable T result) {
application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java renamed from application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.controller; 1 +package org.thingsboard.server.service.security;
17 2
18 import com.google.common.util.concurrent.FutureCallback; 3 import com.google.common.util.concurrent.FutureCallback;
19 -import org.springframework.http.ResponseEntity;  
20 -import org.springframework.web.context.request.async.DeferredResult;  
21 import org.thingsboard.server.actors.plugin.ValidationResult; 4 import org.thingsboard.server.actors.plugin.ValidationResult;
22 import org.thingsboard.server.actors.plugin.ValidationResultCode; 5 import org.thingsboard.server.actors.plugin.ValidationResultCode;
23 import org.thingsboard.server.extensions.api.exception.AccessDeniedException; 6 import org.thingsboard.server.extensions.api.exception.AccessDeniedException;
@@ -26,14 +9,14 @@ import org.thingsboard.server.extensions.api.exception.InternalErrorException; @@ -26,14 +9,14 @@ import org.thingsboard.server.extensions.api.exception.InternalErrorException;
26 import org.thingsboard.server.extensions.api.exception.UnauthorizedException; 9 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
27 10
28 /** 11 /**
29 - * Created by ashvayka on 21.02.17. 12 + * Created by ashvayka on 31.03.18.
30 */ 13 */
31 -public class ValidationCallback implements FutureCallback<ValidationResult> { 14 +public class ValidationCallback<T> implements FutureCallback<ValidationResult> {
32 15
33 - private final DeferredResult<ResponseEntity> response;  
34 - private final FutureCallback<DeferredResult<ResponseEntity>> action; 16 + private final T response;
  17 + private final FutureCallback<T> action;
35 18
36 - public ValidationCallback(DeferredResult<ResponseEntity> response, FutureCallback<DeferredResult<ResponseEntity>> action) { 19 + public ValidationCallback(T response, FutureCallback<T> action) {
37 this.response = response; 20 this.response = response;
38 this.action = action; 21 this.action = action;
39 } 22 }
@@ -5,8 +5,10 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -5,8 +5,10 @@ import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.stereotype.Service; 5 import org.springframework.stereotype.Service;
6 import org.thingsboard.server.common.data.id.EntityId; 6 import org.thingsboard.server.common.data.id.EntityId;
7 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 7 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  8 +import org.thingsboard.server.common.data.kv.KvEntry;
8 import org.thingsboard.server.common.data.kv.TsKvEntry; 9 import org.thingsboard.server.common.data.kv.TsKvEntry;
9 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; 10 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
  11 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
10 12
11 import java.util.HashMap; 13 import java.util.HashMap;
12 import java.util.List; 14 import java.util.List;
@@ -23,13 +25,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -23,13 +25,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
23 @Autowired 25 @Autowired
24 private TelemetryWebSocketService wsService; 26 private TelemetryWebSocketService wsService;
25 27
26 -  
27 private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>(); 28 private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
28 29
29 private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>(); 30 private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
30 31
31 -  
32 -  
33 @Override 32 @Override
34 public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) { 33 public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
35 34
@@ -39,4 +38,29 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @@ -39,4 +38,29 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
39 public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) { 38 public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
40 39
41 } 40 }
  41 +
  42 + @Override
  43 + public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
  44 +
  45 + }
  46 +
  47 + @Override
  48 + public void removeSubscription(String sessionId, int cmdId) {
  49 +
  50 + }
  51 +
  52 + @Override
  53 + public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
  54 +
  55 + }
  56 +
  57 + @Override
  58 + public void onLocalTimeseriesUpdate(EntityId entityId, Map<Long, List<KvEntry>> ts) {
  59 +
  60 + }
  61 +
  62 + @Override
  63 + public void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes) {
  64 +
  65 + }
42 } 66 }
@@ -2,36 +2,46 @@ package org.thingsboard.server.service.telemetry; @@ -2,36 +2,46 @@ package org.thingsboard.server.service.telemetry;
2 2
3 import com.fasterxml.jackson.core.JsonProcessingException; 3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper; 4 import com.fasterxml.jackson.databind.ObjectMapper;
  5 +import com.google.common.base.Function;
5 import com.google.common.util.concurrent.FutureCallback; 6 import com.google.common.util.concurrent.FutureCallback;
  7 +import com.google.common.util.concurrent.Futures;
  8 +import com.google.common.util.concurrent.ListenableFuture;
  9 +import com.hazelcast.util.function.Consumer;
6 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
7 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.stereotype.Service; 12 import org.springframework.stereotype.Service;
9 import org.springframework.util.StringUtils; 13 import org.springframework.util.StringUtils;
  14 +import org.thingsboard.server.actors.plugin.ValidationResult;
10 import org.thingsboard.server.common.data.DataConstants; 15 import org.thingsboard.server.common.data.DataConstants;
11 import org.thingsboard.server.common.data.id.EntityId; 16 import org.thingsboard.server.common.data.id.EntityId;
12 import org.thingsboard.server.common.data.id.EntityIdFactory; 17 import org.thingsboard.server.common.data.id.EntityIdFactory;
  18 +import org.thingsboard.server.common.data.kv.Aggregation;
13 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 19 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  20 +import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
14 import org.thingsboard.server.common.data.kv.BasicTsKvEntry; 21 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
15 import org.thingsboard.server.common.data.kv.TsKvEntry; 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
  23 +import org.thingsboard.server.common.data.kv.TsKvQuery;
16 import org.thingsboard.server.dao.attributes.AttributesService; 24 import org.thingsboard.server.dao.attributes.AttributesService;
17 import org.thingsboard.server.dao.timeseries.TimeseriesService; 25 import org.thingsboard.server.dao.timeseries.TimeseriesService;
18 import org.thingsboard.server.extensions.api.exception.UnauthorizedException; 26 import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
19 -import org.thingsboard.server.extensions.api.plugins.PluginCallback;  
20 -import org.thingsboard.server.extensions.api.plugins.PluginContext;  
21 -import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;  
22 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent; 27 import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
23 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd; 28 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
  29 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd;
24 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd; 30 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
25 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd; 31 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
26 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper; 32 import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
  33 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd;
27 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode; 34 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
28 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; 35 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
29 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType; 36 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
30 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate; 37 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
  38 +import org.thingsboard.server.service.security.AccessValidator;
31 39
  40 +import javax.annotation.Nullable;
  41 +import javax.annotation.PostConstruct;
  42 +import javax.annotation.PreDestroy;
32 import java.io.IOException; 43 import java.io.IOException;
33 import java.util.ArrayList; 44 import java.util.ArrayList;
34 -import java.util.Arrays;  
35 import java.util.Collections; 45 import java.util.Collections;
36 import java.util.HashMap; 46 import java.util.HashMap;
37 import java.util.HashSet; 47 import java.util.HashSet;
@@ -41,6 +51,8 @@ import java.util.Optional; @@ -41,6 +51,8 @@ import java.util.Optional;
41 import java.util.Set; 51 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap; 52 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap; 53 import java.util.concurrent.ConcurrentMap;
  54 +import java.util.concurrent.ExecutorService;
  55 +import java.util.concurrent.Executors;
44 import java.util.stream.Collectors; 56 import java.util.stream.Collectors;
45 57
46 /** 58 /**
@@ -50,6 +62,8 @@ import java.util.stream.Collectors; @@ -50,6 +62,8 @@ import java.util.stream.Collectors;
50 @Slf4j 62 @Slf4j
51 public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService { 63 public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
52 64
  65 + public static final int DEFAULT_LIMIT = 100;
  66 + public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
53 private static final int UNKNOWN_SUBSCRIPTION_ID = 0; 67 private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
54 private static final String PROCESSING_MSG = "[{}] Processing: {}"; 68 private static final String PROCESSING_MSG = "[{}] Processing: {}";
55 private static final ObjectMapper jsonMapper = new ObjectMapper(); 69 private static final ObjectMapper jsonMapper = new ObjectMapper();
@@ -66,11 +80,28 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -66,11 +80,28 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
66 private TelemetryWebSocketMsgEndpoint msgEndpoint; 80 private TelemetryWebSocketMsgEndpoint msgEndpoint;
67 81
68 @Autowired 82 @Autowired
  83 + private AccessValidator accessValidator;
  84 +
  85 + @Autowired
69 private AttributesService attributesService; 86 private AttributesService attributesService;
70 87
71 @Autowired 88 @Autowired
72 private TimeseriesService tsService; 89 private TimeseriesService tsService;
73 90
  91 + private ExecutorService executor;
  92 +
  93 + @PostConstruct
  94 + public void initExecutor() {
  95 + executor = Executors.newSingleThreadExecutor();
  96 + }
  97 +
  98 + @PreDestroy
  99 + public void shutdownExecutor() {
  100 + if (executor != null) {
  101 + executor.shutdownNow();
  102 + }
  103 + }
  104 +
74 @Override 105 @Override
75 public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) { 106 public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
76 String sessionId = sessionRef.getSessionId(); 107 String sessionId = sessionRef.getSessionId();
@@ -169,44 +200,190 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -169,44 +200,190 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
169 }; 200 };
170 201
171 if (StringUtils.isEmpty(cmd.getScope())) { 202 if (StringUtils.isEmpty(cmd.getScope())) {
172 - //ValidationCallback?  
173 - ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback); 203 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, keys, callback));
174 } else { 204 } else {
175 - ctx.loadAttributes(entityId, cmd.getScope(), keys, callback); 205 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), keys, callback));
  206 + }
  207 + }
  208 +
  209 + private void handleWsHistoryCmd(TelemetryWebSocketSessionRef sessionRef, GetHistoryCmd cmd) {
  210 + String sessionId = sessionRef.getSessionId();
  211 + WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
  212 + if (sessionMD == null) {
  213 + log.warn("[{}] Session meta data not found. ", sessionId);
  214 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  215 + SESSION_META_DATA_NOT_FOUND);
  216 + sendWsMsg(sessionRef, update);
  217 + return;
  218 + }
  219 + if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
  220 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
  221 + "Device id is empty!");
  222 + sendWsMsg(sessionRef, update);
  223 + return;
  224 + }
  225 + if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
  226 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
  227 + "Keys are empty!");
  228 + sendWsMsg(sessionRef, update);
  229 + return;
176 } 230 }
  231 + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
  232 + List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
  233 + List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
  234 + .collect(Collectors.toList());
  235 +
  236 + FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
  237 + @Override
  238 + public void onSuccess(List<TsKvEntry> data) {
  239 + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
  240 + }
  241 +
  242 + @Override
  243 + public void onFailure(Throwable e) {
  244 + SubscriptionUpdate update;
  245 + if (UnauthorizedException.class.isInstance(e)) {
  246 + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
  247 + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
  248 + } else {
  249 + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  250 + FAILED_TO_FETCH_DATA);
  251 + }
  252 + sendWsMsg(sessionRef, update);
  253 + }
  254 + };
  255 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
  256 + on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure));
177 } 257 }
178 258
179 - private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef, 259 + private void handleWsAttributesSubscription(TelemetryWebSocketSessionRef sessionRef,
180 AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) { 260 AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
181 - PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() { 261 + FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
182 @Override 262 @Override
183 - public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) { 263 + public void onSuccess(List<AttributeKvEntry> data) {
184 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); 264 List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
185 - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); 265 + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
186 266
187 Map<String, Long> subState = new HashMap<>(attributesData.size()); 267 Map<String, Long> subState = new HashMap<>(attributesData.size());
188 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); 268 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
189 269
190 SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope()); 270 SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
191 - subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); 271 + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
192 } 272 }
193 273
194 @Override 274 @Override
195 - public void onFailure(PluginContext ctx, Exception e) { 275 + public void onFailure(Throwable e) {
196 log.error(FAILED_TO_FETCH_ATTRIBUTES, e); 276 log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
197 SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, 277 SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
198 FAILED_TO_FETCH_ATTRIBUTES); 278 FAILED_TO_FETCH_ATTRIBUTES);
199 - sendWsMsg(ctx, sessionRef, update); 279 + sendWsMsg(sessionRef, update);
200 } 280 }
201 }; 281 };
202 282
  283 +
203 if (StringUtils.isEmpty(cmd.getScope())) { 284 if (StringUtils.isEmpty(cmd.getScope())) {
204 - ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback); 285 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, callback));
  286 + } else {
  287 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), callback));
  288 + }
  289 + }
  290 +
  291 + private void handleWsTimeseriesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd) {
  292 + String sessionId = sessionRef.getSessionId();
  293 + log.debug("[{}] Processing: {}", sessionId, cmd);
  294 +
  295 + if (validateSessionMetadata(sessionRef, cmd, sessionId)) {
  296 + if (cmd.isUnsubscribe()) {
  297 + unsubscribe(sessionRef, cmd, sessionId);
  298 + } else if (validateSubscriptionCmd(sessionRef, cmd)) {
  299 + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
  300 + Optional<Set<String>> keysOptional = getKeys(cmd);
  301 +
  302 + if (keysOptional.isPresent()) {
  303 + handleWsTimeseriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId);
  304 + } else {
  305 + handleWsTimeseriesSubscription(sessionRef, cmd, sessionId, entityId);
  306 + }
  307 + }
  308 + }
  309 + }
  310 +
  311 + private void handleWsTimeseriesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
  312 + TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
  313 + long startTs;
  314 + if (cmd.getTimeWindow() > 0) {
  315 + List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
  316 + log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
  317 + startTs = cmd.getStartTs();
  318 + long endTs = cmd.getStartTs() + cmd.getTimeWindow();
  319 + List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
  320 + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
  321 +
  322 + final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
  323 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
  324 + on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure));
205 } else { 325 } else {
206 - ctx.loadAttributes(entityId, cmd.getScope(), callback); 326 + List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
  327 + startTs = System.currentTimeMillis();
  328 + log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), entityId);
  329 + final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
  330 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
  331 + on(r -> Futures.addCallback(tsService.findLatest(entityId, keys), callback, executor), callback::onFailure));
207 } 332 }
208 } 333 }
209 334
  335 + private void handleWsTimeseriesSubscription(TelemetryWebSocketSessionRef sessionRef,
  336 + TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
  337 + FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
  338 + @Override
  339 + public void onSuccess(List<TsKvEntry> data) {
  340 + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
  341 + Map<String, Long> subState = new HashMap<>(data.size());
  342 + data.forEach(v -> subState.put(v.getKey(), v.getTs()));
  343 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
  344 + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  345 + }
  346 +
  347 + @Override
  348 + public void onFailure(Throwable e) {
  349 + SubscriptionUpdate update;
  350 + if (UnauthorizedException.class.isInstance(e)) {
  351 + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
  352 + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
  353 + } else {
  354 + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  355 + FAILED_TO_FETCH_DATA);
  356 + }
  357 + sendWsMsg(sessionRef, update);
  358 + }
  359 + };
  360 + accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
  361 + on(r -> Futures.addCallback(tsService.findAllLatest(entityId), callback, executor), callback::onFailure));
  362 + }
  363 +
  364 + private FutureCallback<List<TsKvEntry>> getSubscriptionCallback(final TelemetryWebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List<String> keys) {
  365 + return new FutureCallback<List<TsKvEntry>>() {
  366 + @Override
  367 + public void onSuccess(List<TsKvEntry> data) {
  368 + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
  369 +
  370 + Map<String, Long> subState = new HashMap<>(keys.size());
  371 + keys.forEach(key -> subState.put(key, startTs));
  372 + data.forEach(v -> subState.put(v.getKey(), v.getTs()));
  373 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
  374 + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  375 + }
  376 +
  377 + @Override
  378 + public void onFailure(Throwable e) {
  379 + log.error(FAILED_TO_FETCH_DATA, e);
  380 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  381 + FAILED_TO_FETCH_DATA);
  382 + sendWsMsg(sessionRef, update);
  383 + }
  384 + };
  385 + }
  386 +
210 private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { 387 private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
211 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { 388 if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
212 subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId); 389 subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
@@ -258,4 +435,105 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @@ -258,4 +435,105 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
258 } 435 }
259 } 436 }
260 437
  438 + private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) {
  439 + return Futures.transform(Futures.successfulAsList(futures),
  440 + (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
  441 + List<AttributeKvEntry> tmp = new ArrayList<>();
  442 + if (input != null) {
  443 + input.forEach(tmp::addAll);
  444 + }
  445 + return tmp;
  446 + }, executor);
  447 + }
  448 +
  449 + private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final List<String> keys, final FutureCallback<List<AttributeKvEntry>> callback) {
  450 + return new FutureCallback<ValidationResult>() {
  451 + @Override
  452 + public void onSuccess(@Nullable ValidationResult result) {
  453 + List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
  454 + for (String scope : DataConstants.allScopes()) {
  455 + futures.add(attributesService.find(entityId, scope, keys));
  456 + }
  457 +
  458 + ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
  459 + Futures.addCallback(future, callback);
  460 + }
  461 +
  462 + @Override
  463 + public void onFailure(Throwable t) {
  464 + callback.onFailure(t);
  465 + }
  466 + };
  467 + }
  468 +
  469 + private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final String scope, final List<String> keys, final FutureCallback<List<AttributeKvEntry>> callback) {
  470 + return new FutureCallback<ValidationResult>() {
  471 + @Override
  472 + public void onSuccess(@Nullable ValidationResult result) {
  473 + Futures.addCallback(attributesService.find(entityId, scope, keys), callback);
  474 + }
  475 +
  476 + @Override
  477 + public void onFailure(Throwable t) {
  478 + callback.onFailure(t);
  479 + }
  480 + };
  481 + }
  482 +
  483 + private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final FutureCallback<List<AttributeKvEntry>> callback) {
  484 + return new FutureCallback<ValidationResult>() {
  485 + @Override
  486 + public void onSuccess(@Nullable ValidationResult result) {
  487 + List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
  488 + for (String scope : DataConstants.allScopes()) {
  489 + futures.add(attributesService.findAll(entityId, scope));
  490 + }
  491 +
  492 + ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
  493 + Futures.addCallback(future, callback);
  494 + }
  495 +
  496 + @Override
  497 + public void onFailure(Throwable t) {
  498 + callback.onFailure(t);
  499 + }
  500 + };
  501 + }
  502 +
  503 + private <T> FutureCallback<ValidationResult> getAttributesFetchCallback(final EntityId entityId, final String scope, final FutureCallback<List<AttributeKvEntry>> callback) {
  504 + return new FutureCallback<ValidationResult>() {
  505 + @Override
  506 + public void onSuccess(@Nullable ValidationResult result) {
  507 + Futures.addCallback(attributesService.findAll(entityId, scope), callback);
  508 + }
  509 +
  510 + @Override
  511 + public void onFailure(Throwable t) {
  512 + callback.onFailure(t);
  513 + }
  514 + };
  515 + }
  516 +
  517 + private FutureCallback<ValidationResult> on(Consumer<ValidationResult> success, Consumer<Throwable> failure) {
  518 + return new FutureCallback<ValidationResult>() {
  519 + @Override
  520 + public void onSuccess(@Nullable ValidationResult result) {
  521 + success.accept(result);
  522 + }
  523 +
  524 + @Override
  525 + public void onFailure(Throwable t) {
  526 + failure.accept(t);
  527 + }
  528 + };
  529 + }
  530 +
  531 +
  532 + private static Aggregation getAggregation(String agg) {
  533 + return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
  534 + }
  535 +
  536 + private int getLimit(int limit) {
  537 + return limit == 0 ? DEFAULT_LIMIT : limit;
  538 + }
261 } 539 }
@@ -2,10 +2,13 @@ package org.thingsboard.server.service.telemetry; @@ -2,10 +2,13 @@ package org.thingsboard.server.service.telemetry;
2 2
3 import org.thingsboard.server.common.data.id.EntityId; 3 import org.thingsboard.server.common.data.id.EntityId;
4 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 4 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  5 +import org.thingsboard.server.common.data.kv.KvEntry;
5 import org.thingsboard.server.common.data.kv.TsKvEntry; 6 import org.thingsboard.server.common.data.kv.TsKvEntry;
6 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; 7 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
7 8
8 import java.util.List; 9 import java.util.List;
  10 +import java.util.Map;
  11 +import java.util.Set;
9 12
10 /** 13 /**
11 * Created by ashvayka on 27.03.18. 14 * Created by ashvayka on 27.03.18.
@@ -21,4 +24,8 @@ public interface TelemetrySubscriptionService { @@ -21,4 +24,8 @@ public interface TelemetrySubscriptionService {
21 void removeSubscription(String sessionId, int cmdId); 24 void removeSubscription(String sessionId, int cmdId);
22 25
23 void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub); 26 void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
  27 +
  28 + void onLocalTimeseriesUpdate(EntityId entityId, List<TsKvEntry> ts);
  29 +
  30 + void onLocalAttributesUpdate(EntityId entityId, String scope, Set<AttributeKvEntry> attributes);
24 } 31 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +15,12 @@ @@ -15,7 +15,12 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.api; 16 package org.thingsboard.rule.engine.api;
17 17
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import org.thingsboard.server.common.data.id.EntityId;
18 import org.thingsboard.server.common.data.id.RuleNodeId; 20 import org.thingsboard.server.common.data.id.RuleNodeId;
  21 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  22 +import org.thingsboard.server.common.data.kv.KvEntry;
  23 +import org.thingsboard.server.common.data.kv.TsKvEntry;
19 import org.thingsboard.server.common.msg.TbMsg; 24 import org.thingsboard.server.common.msg.TbMsg;
20 import org.thingsboard.server.common.msg.cluster.ServerAddress; 25 import org.thingsboard.server.common.msg.cluster.ServerAddress;
21 import org.thingsboard.server.dao.alarm.AlarmService; 26 import org.thingsboard.server.dao.alarm.AlarmService;
@@ -30,6 +35,8 @@ import org.thingsboard.server.dao.rule.RuleService; @@ -30,6 +35,8 @@ import org.thingsboard.server.dao.rule.RuleService;
30 import org.thingsboard.server.dao.timeseries.TimeseriesService; 35 import org.thingsboard.server.dao.timeseries.TimeseriesService;
31 import org.thingsboard.server.dao.user.UserService; 36 import org.thingsboard.server.dao.user.UserService;
32 37
  38 +import java.util.List;
  39 +import java.util.Map;
33 import java.util.Set; 40 import java.util.Set;
34 import java.util.UUID; 41 import java.util.UUID;
35 42
@@ -56,6 +63,12 @@ public interface TbContext { @@ -56,6 +63,12 @@ public interface TbContext {
56 63
57 void tellError(TbMsg msg, Throwable th); 64 void tellError(TbMsg msg, Throwable th);
58 65
  66 + void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
  67 +
  68 + void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
  69 +
  70 + void saveAndNotify(EntityId entityId, String scope, Set<AttributeKvEntry> attributes, FutureCallback<Void> callback);
  71 +
59 RuleNodeId getSelfId(); 72 RuleNodeId getSelfId();
60 73
61 AttributesService getAttributesService(); 74 AttributesService getAttributesService();