Commit 6823b8969f478ef6628ef7841add7b57aee147a4

Authored by Andrew Shvayka
1 parent a1662d66

Refactoring of Telemetry Websockets

Showing 17 changed files with 1209 additions and 362 deletions
@@ -19,7 +19,7 @@ import java.util.Map; @@ -19,7 +19,7 @@ import java.util.Map;
19 19
20 import org.thingsboard.server.exception.ThingsboardErrorCode; 20 import org.thingsboard.server.exception.ThingsboardErrorCode;
21 import org.thingsboard.server.exception.ThingsboardException; 21 import org.thingsboard.server.exception.ThingsboardException;
22 -import org.thingsboard.server.controller.plugin.PluginWebSocketHandler; 22 +import org.thingsboard.server.controller.plugin.TbWebSocketHandler;
23 import org.thingsboard.server.service.security.model.SecurityUser; 23 import org.thingsboard.server.service.security.model.SecurityUser;
24 import org.springframework.context.annotation.Bean; 24 import org.springframework.context.annotation.Bean;
25 import org.springframework.context.annotation.Configuration; 25 import org.springframework.context.annotation.Configuration;
@@ -54,7 +54,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer { @@ -54,7 +54,7 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
54 54
55 @Override 55 @Override
56 public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { 56 public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
57 - registry.addHandler(pluginWsHandler(), WS_PLUGIN_MAPPING).setAllowedOrigins("*") 57 + registry.addHandler(wsHandler(), WS_PLUGIN_MAPPING).setAllowedOrigins("*")
58 .addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() { 58 .addInterceptors(new HttpSessionHandshakeInterceptor(), new HandshakeInterceptor() {
59 59
60 @Override 60 @Override
@@ -82,8 +82,8 @@ public class WebSocketConfiguration implements WebSocketConfigurer { @@ -82,8 +82,8 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
82 } 82 }
83 83
84 @Bean 84 @Bean
85 - public WebSocketHandler pluginWsHandler() {  
86 - return new PluginWebSocketHandler(); 85 + public WebSocketHandler wsHandler() {
  86 + return new TbWebSocketHandler();
87 } 87 }
88 88
89 protected SecurityUser getCurrentUser() throws ThingsboardException { 89 protected SecurityUser getCurrentUser() throws ThingsboardException {
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 package org.thingsboard.server.controller; 16 package org.thingsboard.server.controller;
2 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
3 import com.google.common.base.Function; 19 import com.google.common.base.Function;
4 import com.google.common.util.concurrent.FutureCallback; 20 import com.google.common.util.concurrent.FutureCallback;
5 import com.google.common.util.concurrent.Futures; 21 import com.google.common.util.concurrent.Futures;
6 import com.google.common.util.concurrent.ListenableFuture; 22 import com.google.common.util.concurrent.ListenableFuture;
  23 +import com.google.gson.JsonElement;
  24 +import com.google.gson.JsonParser;
7 import lombok.extern.slf4j.Slf4j; 25 import lombok.extern.slf4j.Slf4j;
8 import org.springframework.beans.factory.annotation.Autowired; 26 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.http.HttpStatus; 27 import org.springframework.http.HttpStatus;
@@ -11,47 +29,54 @@ import org.springframework.http.ResponseEntity; @@ -11,47 +29,54 @@ import org.springframework.http.ResponseEntity;
11 import org.springframework.security.access.prepost.PreAuthorize; 29 import org.springframework.security.access.prepost.PreAuthorize;
12 import org.springframework.util.StringUtils; 30 import org.springframework.util.StringUtils;
13 import org.springframework.web.bind.annotation.PathVariable; 31 import org.springframework.web.bind.annotation.PathVariable;
  32 +import org.springframework.web.bind.annotation.RequestBody;
14 import org.springframework.web.bind.annotation.RequestMapping; 33 import org.springframework.web.bind.annotation.RequestMapping;
15 import org.springframework.web.bind.annotation.RequestMethod; 34 import org.springframework.web.bind.annotation.RequestMethod;
16 import org.springframework.web.bind.annotation.RequestParam; 35 import org.springframework.web.bind.annotation.RequestParam;
17 -import org.springframework.web.bind.annotation.ResponseStatus; 36 +import org.springframework.web.bind.annotation.ResponseBody;
18 import org.springframework.web.bind.annotation.RestController; 37 import org.springframework.web.bind.annotation.RestController;
19 import org.springframework.web.context.request.async.DeferredResult; 38 import org.springframework.web.context.request.async.DeferredResult;
20 -import org.thingsboard.server.actors.plugin.ValidationResult;  
21 -import org.thingsboard.server.common.data.Customer;  
22 import org.thingsboard.server.common.data.DataConstants; 39 import org.thingsboard.server.common.data.DataConstants;
23 -import org.thingsboard.server.common.data.Device;  
24 -import org.thingsboard.server.common.data.Tenant;  
25 -import org.thingsboard.server.common.data.asset.Asset; 40 +import org.thingsboard.server.common.data.EntityType;
26 import org.thingsboard.server.common.data.audit.ActionType; 41 import org.thingsboard.server.common.data.audit.ActionType;
27 -import org.thingsboard.server.common.data.id.AssetId;  
28 -import org.thingsboard.server.common.data.id.CustomerId;  
29 -import org.thingsboard.server.common.data.id.DeviceId;  
30 import org.thingsboard.server.common.data.id.EntityId; 42 import org.thingsboard.server.common.data.id.EntityId;
31 import org.thingsboard.server.common.data.id.EntityIdFactory; 43 import org.thingsboard.server.common.data.id.EntityIdFactory;
32 -import org.thingsboard.server.common.data.id.RuleChainId;  
33 -import org.thingsboard.server.common.data.id.TenantId;  
34 import org.thingsboard.server.common.data.id.UUIDBased; 44 import org.thingsboard.server.common.data.id.UUIDBased;
  45 +import org.thingsboard.server.common.data.kv.Aggregation;
35 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 46 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  47 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  48 +import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  49 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  50 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  51 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
36 import org.thingsboard.server.common.data.kv.KvEntry; 52 import org.thingsboard.server.common.data.kv.KvEntry;
  53 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  54 +import org.thingsboard.server.common.data.kv.StringDataEntry;
37 import org.thingsboard.server.common.data.kv.TsKvEntry; 55 import org.thingsboard.server.common.data.kv.TsKvEntry;
38 -import org.thingsboard.server.common.data.rule.RuleChain; 56 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  57 +import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
  58 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
39 import org.thingsboard.server.dao.attributes.AttributesService; 59 import org.thingsboard.server.dao.attributes.AttributesService;
40 import org.thingsboard.server.dao.timeseries.TimeseriesService; 60 import org.thingsboard.server.dao.timeseries.TimeseriesService;
41 import org.thingsboard.server.exception.ThingsboardException; 61 import org.thingsboard.server.exception.ThingsboardException;
42 -import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity; 62 +import org.thingsboard.server.extensions.api.exception.InvalidParametersException;
  63 +import org.thingsboard.server.extensions.api.exception.UncheckedApiException;
43 import org.thingsboard.server.extensions.api.plugins.PluginConstants; 64 import org.thingsboard.server.extensions.api.plugins.PluginConstants;
44 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData; 65 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
  66 +import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
  67 +import org.thingsboard.server.service.security.AccessValidator;
45 import org.thingsboard.server.service.security.model.SecurityUser; 68 import org.thingsboard.server.service.security.model.SecurityUser;
  69 +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
46 70
47 import javax.annotation.Nullable; 71 import javax.annotation.Nullable;
48 import javax.annotation.PreDestroy; 72 import javax.annotation.PreDestroy;
49 import java.util.ArrayList; 73 import java.util.ArrayList;
50 import java.util.Arrays; 74 import java.util.Arrays;
  75 +import java.util.LinkedHashMap;
51 import java.util.List; 76 import java.util.List;
  77 +import java.util.Map;
52 import java.util.concurrent.ExecutorService; 78 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors; 79 import java.util.concurrent.Executors;
54 -import java.util.function.BiConsumer;  
55 import java.util.stream.Collectors; 80 import java.util.stream.Collectors;
56 81
57 /** 82 /**
@@ -62,9 +87,8 @@ import java.util.stream.Collectors; @@ -62,9 +87,8 @@ import java.util.stream.Collectors;
62 @Slf4j 87 @Slf4j
63 public class TelemetryController extends BaseController { 88 public class TelemetryController extends BaseController {
64 89
65 - public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";  
66 - public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";  
67 - public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!"; 90 + @Autowired
  91 + private TelemetrySubscriptionService subscriptionService;
68 92
69 @Autowired 93 @Autowired
70 private AttributesService attributesService; 94 private AttributesService attributesService;
@@ -72,6 +96,9 @@ public class TelemetryController extends BaseController { @@ -72,6 +96,9 @@ public class TelemetryController extends BaseController {
72 @Autowired 96 @Autowired
73 private TimeseriesService tsService; 97 private TimeseriesService tsService;
74 98
  99 + @Autowired
  100 + private AccessValidator accessValidator;
  101 +
75 private ExecutorService executor; 102 private ExecutorService executor;
76 103
77 public void initExecutor() { 104 public void initExecutor() {
@@ -87,117 +114,277 @@ public class TelemetryController extends BaseController { @@ -87,117 +114,277 @@ public class TelemetryController extends BaseController {
87 114
88 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 115 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
89 @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET) 116 @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET)
90 - @ResponseStatus(value = HttpStatus.OK) 117 + @ResponseBody
91 public DeferredResult<ResponseEntity> getAttributeKeys( 118 public DeferredResult<ResponseEntity> getAttributeKeys(
92 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { 119 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
93 - return validateEntityAndCallback(entityType, entityIdStr,  
94 - this::getAttributeKeysCallback,  
95 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 120 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, this::getAttributeKeysCallback);
96 } 121 }
97 122
98 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 123 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
99 @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET) 124 @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET)
100 - @ResponseStatus(value = HttpStatus.OK) 125 + @ResponseBody
101 public DeferredResult<ResponseEntity> getAttributeKeysByScope( 126 public DeferredResult<ResponseEntity> getAttributeKeysByScope(
102 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr 127 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr
103 , @PathVariable("scope") String scope) throws ThingsboardException { 128 , @PathVariable("scope") String scope) throws ThingsboardException {
104 - return validateEntityAndCallback(entityType, entityIdStr,  
105 - (result, entityId) -> getAttributeKeysCallback(result, entityId, scope),  
106 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 129 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
  130 + (result, entityId) -> getAttributeKeysCallback(result, entityId, scope));
107 } 131 }
108 132
109 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 133 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
110 @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET) 134 @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET)
111 - @ResponseStatus(value = HttpStatus.OK) 135 + @ResponseBody
112 public DeferredResult<ResponseEntity> getAttributes( 136 public DeferredResult<ResponseEntity> getAttributes(
113 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, 137 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
114 @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { 138 @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
115 SecurityUser user = getCurrentUser(); 139 SecurityUser user = getCurrentUser();
116 - return validateEntityAndCallback(entityType, entityIdStr,  
117 - (result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr),  
118 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 140 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
  141 + (result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr));
119 } 142 }
120 143
121 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 144 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
122 @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET) 145 @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET)
123 - @ResponseStatus(value = HttpStatus.OK) 146 + @ResponseBody
124 public DeferredResult<ResponseEntity> getAttributesByScope( 147 public DeferredResult<ResponseEntity> getAttributesByScope(
125 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, 148 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
126 @PathVariable("scope") String scope, 149 @PathVariable("scope") String scope,
127 @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { 150 @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
128 SecurityUser user = getCurrentUser(); 151 SecurityUser user = getCurrentUser();
129 - return validateEntityAndCallback(entityType, entityIdStr,  
130 - (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),  
131 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 152 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
  153 + (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr));
132 } 154 }
133 155
134 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 156 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
135 @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET) 157 @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET)
136 - @ResponseStatus(value = HttpStatus.OK) 158 + @ResponseBody
137 public DeferredResult<ResponseEntity> getTimeseriesKeys( 159 public DeferredResult<ResponseEntity> getTimeseriesKeys(
138 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { 160 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
139 - return validateEntityAndCallback(entityType, entityIdStr, 161 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
140 (result, entityId) -> { 162 (result, entityId) -> {
141 Futures.addCallback(tsService.findAllLatest(entityId), getTsKeysToResponseCallback(result)); 163 Futures.addCallback(tsService.findAllLatest(entityId), getTsKeysToResponseCallback(result));
142 - },  
143 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 164 + });
144 } 165 }
145 166
146 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 167 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
147 @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET) 168 @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
148 - @ResponseStatus(value = HttpStatus.OK) 169 + @ResponseBody
149 public DeferredResult<ResponseEntity> getLatestTimeseries( 170 public DeferredResult<ResponseEntity> getLatestTimeseries(
150 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, 171 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
151 - @PathVariable("scope") String scope,  
152 @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { 172 @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
153 SecurityUser user = getCurrentUser(); 173 SecurityUser user = getCurrentUser();
154 174
155 - return validateEntityAndCallback(entityType, entityIdStr,  
156 - (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),  
157 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 175 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
  176 + (result, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr));
158 } 177 }
159 178
160 179
161 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 180 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
162 @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET) 181 @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
163 - @ResponseStatus(value = HttpStatus.OK)  
164 - public DeferredResult<ResponseEntity> getLatestTimeseries( 182 + @ResponseBody
  183 + public DeferredResult<ResponseEntity> getTimeseries(
165 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, 184 @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
166 - @PathVariable("scope") String scope,  
167 - @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {  
168 - SecurityUser user = getCurrentUser(); 185 + @RequestParam(name = "keys") String keys,
  186 + @RequestParam(name = "startTs") Long startTs,
  187 + @RequestParam(name = "endTs") Long endTs,
  188 + @RequestParam(name = "interval", defaultValue = "0") Long interval,
  189 + @RequestParam(name = "limit", defaultValue = "100") Integer limit,
  190 + @RequestParam(name = "agg", defaultValue = "NONE") String aggStr
  191 + ) throws ThingsboardException {
  192 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr,
  193 + (result, entityId) -> {
  194 + // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
  195 + Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
  196 + List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
  197 + .collect(Collectors.toList());
  198 +
  199 + Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
  200 + });
  201 + }
  202 +
  203 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  204 + @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.POST)
  205 + @ResponseBody
  206 + public DeferredResult<ResponseEntity> saveDeviceAttributes(@PathVariable("deviceId") String deviceIdStr, @PathVariable("scope") String scope,
  207 + @RequestBody JsonNode request) throws ThingsboardException {
  208 + EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
  209 + return saveAttributes(entityId, scope, request);
  210 + }
  211 +
  212 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  213 + @RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.POST)
  214 + @ResponseBody
  215 + public DeferredResult<ResponseEntity> saveEntityAttributesV1(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
  216 + @PathVariable("scope") String scope,
  217 + @RequestBody JsonNode request) throws ThingsboardException {
  218 + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
  219 + return saveAttributes(entityId, scope, request);
  220 + }
169 221
170 - return validateEntityAndCallback(entityType, entityIdStr,  
171 - (result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr),  
172 - (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); 222 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  223 + @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST)
  224 + @ResponseBody
  225 + public DeferredResult<ResponseEntity> saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
  226 + @PathVariable("scope") String scope,
  227 + @RequestBody JsonNode request) throws ThingsboardException {
  228 + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
  229 + return saveAttributes(entityId, scope, request);
  230 + }
  231 +
  232 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  233 + @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST)
  234 + @ResponseBody
  235 + public DeferredResult<ResponseEntity> saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
  236 + @PathVariable("scope") String scope,
  237 + @RequestBody String requestBody) throws ThingsboardException {
  238 + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
  239 + return saveTelemetry(entityId, requestBody, 0L);
  240 + }
  241 +
  242 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  243 + @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST)
  244 + @ResponseBody
  245 + public DeferredResult<ResponseEntity> saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
  246 + @PathVariable("scope") String scope, @PathVariable("ttl") Long ttl,
  247 + @RequestBody String requestBody) throws ThingsboardException {
  248 + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
  249 + return saveTelemetry(entityId, requestBody, ttl);
173 } 250 }
174 251
175 - private DeferredResult<ResponseEntity> validateEntityAndCallback(String entityType, String entityIdStr,  
176 - BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {  
177 - final DeferredResult<ResponseEntity> response = new DeferredResult<>(); 252 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  253 + @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
  254 + @ResponseBody
  255 + public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("deviceId") String deviceIdStr,
  256 + @PathVariable("scope") String scope,
  257 + @RequestParam(name = "keys") String keysStr) throws ThingsboardException {
  258 + EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr);
  259 + return deleteAttributes(entityId, scope, keysStr);
  260 + }
  261 +
  262 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  263 + @RequestMapping(value = "/{entityType}/{entityId}/{scope}", method = RequestMethod.DELETE)
  264 + @ResponseBody
  265 + public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
  266 + @PathVariable("scope") String scope,
  267 + @RequestParam(name = "keys") String keysStr) throws ThingsboardException {
178 EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); 268 EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
  269 + return deleteAttributes(entityId, scope, keysStr);
  270 + }
  271 +
  272 + private DeferredResult<ResponseEntity> deleteAttributes(EntityId entityIdStr, String scope, String keysStr) throws ThingsboardException {
  273 + List<String> keys = toKeysList(keysStr);
  274 + if (keys.isEmpty()) {
  275 + return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
  276 + }
  277 + SecurityUser user = getCurrentUser();
  278 + if (DataConstants.SERVER_SCOPE.equals(scope) ||
  279 + DataConstants.SHARED_SCOPE.equals(scope) ||
  280 + DataConstants.CLIENT_SCOPE.equals(scope)) {
  281 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdStr, (result, entityId) -> {
  282 + ListenableFuture<List<Void>> future = attributesService.removeAll(entityId, scope, keys);
  283 + Futures.addCallback(future, new FutureCallback<List<Void>>() {
  284 + @Override
  285 + public void onSuccess(@Nullable List<Void> tmp) {
  286 + logAttributesDeleted(user, entityId, scope, keys, null);
  287 + result.setResult(new ResponseEntity<>(HttpStatus.OK));
  288 + }
179 289
180 - validate(getCurrentUser(), entityId, new ValidationCallback(response,  
181 - new FutureCallback<DeferredResult<ResponseEntity>>() {  
182 @Override 290 @Override
183 - public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {  
184 - onSuccess.accept(response, entityId); 291 + public void onFailure(Throwable t) {
  292 + logAttributesDeleted(user, entityId, scope, keys, t);
  293 + result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  294 + }
  295 + }, executor);
  296 + });
  297 + } else {
  298 + return getImmediateDeferredResult("Invalid attribute scope: " + scope, HttpStatus.BAD_REQUEST);
  299 + }
  300 + }
  301 +
  302 + private DeferredResult<ResponseEntity> saveAttributes(EntityId entityIdSrc, String scope, JsonNode json) throws ThingsboardException {
  303 + if (!DataConstants.SERVER_SCOPE.equals(scope) && !DataConstants.SHARED_SCOPE.equals(scope)) {
  304 + return getImmediateDeferredResult("Invalid scope: " + scope, HttpStatus.BAD_REQUEST);
  305 + }
  306 + if (json.isObject()) {
  307 + List<AttributeKvEntry> attributes = extractRequestAttributes(json);
  308 + if (attributes.isEmpty()) {
  309 + return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST);
  310 + }
  311 + SecurityUser user = getCurrentUser();
  312 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
  313 + ListenableFuture<List<Void>> future = attributesService.save(entityId, scope, attributes);
  314 + Futures.addCallback(future, new FutureCallback<List<Void>>() {
  315 + @Override
  316 + public void onSuccess(@Nullable List<Void> tmp) {
  317 + logAttributesUpdated(user, entityId, scope, attributes, null);
  318 + result.setResult(new ResponseEntity(HttpStatus.OK));
  319 + subscriptionService.onAttributesUpdateFromServer(entityId, scope, attributes);
185 } 320 }
186 321
187 @Override 322 @Override
188 public void onFailure(Throwable t) { 323 public void onFailure(Throwable t) {
189 - onFailure.accept(response, t); 324 + logAttributesUpdated(user, entityId, scope, attributes, t);
  325 + AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
190 } 326 }
191 - })); 327 + });
  328 + result.setResult(new ResponseEntity(HttpStatus.OK));
  329 + });
  330 + } else {
  331 + return getImmediateDeferredResult("Request is not a JSON object", HttpStatus.BAD_REQUEST);
  332 + }
  333 + }
  334 +
  335 + private DeferredResult<ResponseEntity> saveTelemetry(EntityId entityIdSrc, String requestBody, long ttl) throws ThingsboardException {
  336 + TelemetryUploadRequest telemetryRequest;
  337 + JsonElement telemetryJson;
  338 + try {
  339 + telemetryJson = new JsonParser().parse(requestBody);
  340 + } catch (Exception e) {
  341 + return getImmediateDeferredResult("Unable to parse timeseries payload: Invalid JSON body!", HttpStatus.BAD_REQUEST);
  342 + }
  343 + try {
  344 + telemetryRequest = JsonConverter.convertToTelemetry(telemetryJson);
  345 + } catch (Exception e) {
  346 + return getImmediateDeferredResult("Unable to parse timeseries payload. Invalid JSON body: " + e.getMessage(), HttpStatus.BAD_REQUEST);
  347 + }
  348 + List<TsKvEntry> entries = new ArrayList<>();
  349 + for (Map.Entry<Long, List<KvEntry>> entry : telemetryRequest.getData().entrySet()) {
  350 + for (KvEntry kv : entry.getValue()) {
  351 + entries.add(new BasicTsKvEntry(entry.getKey(), kv));
  352 + }
  353 + }
  354 + if (entries.isEmpty()) {
  355 + return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST);
  356 + }
  357 + SecurityUser user = getCurrentUser();
  358 + return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> {
  359 + ListenableFuture<List<Void>> future = tsService.save(entityId, entries, ttl);
  360 + Futures.addCallback(future, new FutureCallback<List<Void>>() {
  361 + @Override
  362 + public void onSuccess(@Nullable List<Void> tmp) {
  363 + result.setResult(new ResponseEntity(HttpStatus.OK));
  364 + subscriptionService.onTimeseriesUpdateFromServer(entityId, entries);
  365 + }
192 366
193 - return response; 367 + @Override
  368 + public void onFailure(Throwable t) {
  369 + AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
  370 + }
  371 + });
  372 + result.setResult(new ResponseEntity(HttpStatus.OK));
  373 + });
194 } 374 }
195 375
196 - private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {  
197 - List<String> keyList = null;  
198 - if (!StringUtils.isEmpty(keys)) {  
199 - keyList = Arrays.asList(keys.split(",")); 376 + private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys) {
  377 + ListenableFuture<List<TsKvEntry>> future;
  378 + if (StringUtils.isEmpty(keys)) {
  379 + future = tsService.findAllLatest(entityId);
  380 + } else {
  381 + future = tsService.findLatest(entityId, toKeysList(keys));
200 } 382 }
  383 + Futures.addCallback(future, getTsKvListCallback(result));
  384 + }
  385 +
  386 + private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {
  387 + List<String> keyList = toKeysList(keys);
201 FutureCallback<List<AttributeKvEntry>> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList); 388 FutureCallback<List<AttributeKvEntry>> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList);
202 if (!StringUtils.isEmpty(scope)) { 389 if (!StringUtils.isEmpty(scope)) {
203 if (keyList != null && !keyList.isEmpty()) { 390 if (keyList != null && !keyList.isEmpty()) {
@@ -247,7 +434,7 @@ public class TelemetryController extends BaseController { @@ -247,7 +434,7 @@ public class TelemetryController extends BaseController {
247 @Override 434 @Override
248 public void onFailure(Throwable e) { 435 public void onFailure(Throwable e) {
249 log.error("Failed to fetch attributes", e); 436 log.error("Failed to fetch attributes", e);
250 - handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR); 437 + AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
251 } 438 }
252 }; 439 };
253 } 440 }
@@ -264,12 +451,13 @@ public class TelemetryController extends BaseController { @@ -264,12 +451,13 @@ public class TelemetryController extends BaseController {
264 @Override 451 @Override
265 public void onFailure(Throwable e) { 452 public void onFailure(Throwable e) {
266 log.error("Failed to fetch attributes", e); 453 log.error("Failed to fetch attributes", e);
267 - handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR); 454 + AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
268 } 455 }
269 }; 456 };
270 } 457 }
271 458
272 - private FutureCallback<List<AttributeKvEntry>> getAttributeValuesToResponseCallback(final DeferredResult<ResponseEntity> response, final SecurityUser user, final String scope, 459 + private FutureCallback<List<AttributeKvEntry>> getAttributeValuesToResponseCallback(final DeferredResult<ResponseEntity> response,
  460 + final SecurityUser user, final String scope,
273 final EntityId entityId, final List<String> keyList) { 461 final EntityId entityId, final List<String> keyList) {
274 return new FutureCallback<List<AttributeKvEntry>>() { 462 return new FutureCallback<List<AttributeKvEntry>>() {
275 @Override 463 @Override
@@ -284,12 +472,32 @@ public class TelemetryController extends BaseController { @@ -284,12 +472,32 @@ public class TelemetryController extends BaseController {
284 public void onFailure(Throwable e) { 472 public void onFailure(Throwable e) {
285 log.error("Failed to fetch attributes", e); 473 log.error("Failed to fetch attributes", e);
286 logAttributesRead(user, entityId, scope, keyList, e); 474 logAttributesRead(user, entityId, scope, keyList, e);
287 - handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR); 475 + AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
288 } 476 }
289 }; 477 };
290 } 478 }
291 479
292 - private void logAttributesRead(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) { 480 + private FutureCallback<List<TsKvEntry>> getTsKvListCallback(final DeferredResult<ResponseEntity> response) {
  481 + return new FutureCallback<List<TsKvEntry>>() {
  482 + @Override
  483 + public void onSuccess(List<TsKvEntry> data) {
  484 + Map<String, List<TsData>> result = new LinkedHashMap<>();
  485 + for (TsKvEntry entry : data) {
  486 + result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>())
  487 + .add(new TsData(entry.getTs(), entry.getValueAsString()));
  488 + }
  489 + response.setResult(new ResponseEntity<>(result, HttpStatus.OK));
  490 + }
  491 +
  492 + @Override
  493 + public void onFailure(Throwable e) {
  494 + log.error("Failed to fetch historical data", e);
  495 + AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
  496 + }
  497 + };
  498 + }
  499 +
  500 + private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
293 auditLogService.logEntityAction( 501 auditLogService.logEntityAction(
294 user.getTenantId(), 502 user.getTenantId(),
295 user.getCustomerId(), 503 user.getCustomerId(),
@@ -297,163 +505,39 @@ public class TelemetryController extends BaseController { @@ -297,163 +505,39 @@ public class TelemetryController extends BaseController {
297 user.getName(), 505 user.getName(),
298 (UUIDBased & EntityId) entityId, 506 (UUIDBased & EntityId) entityId,
299 null, 507 null,
300 - ActionType.ATTRIBUTES_READ, 508 + ActionType.ATTRIBUTES_DELETED,
301 toException(e), 509 toException(e),
302 scope, 510 scope,
303 keys); 511 keys);
304 } 512 }
305 513
306 - private void handleError(Throwable e, final DeferredResult<ResponseEntity> response, HttpStatus defaultErrorStatus) {  
307 - ResponseEntity responseEntity;  
308 - if (e != null && e instanceof ToErrorResponseEntity) {  
309 - responseEntity = ((ToErrorResponseEntity) e).toErrorResponseEntity();  
310 - } else if (e != null && e instanceof IllegalArgumentException) {  
311 - responseEntity = new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);  
312 - } else {  
313 - responseEntity = new ResponseEntity<>(defaultErrorStatus);  
314 - }  
315 - response.setResult(responseEntity);  
316 - }  
317 -  
318 - private void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {  
319 - switch (entityId.getEntityType()) {  
320 - case DEVICE:  
321 - validateDevice(currentUser, entityId, callback);  
322 - return;  
323 - case ASSET:  
324 - validateAsset(currentUser, entityId, callback);  
325 - return;  
326 - case RULE_CHAIN:  
327 - validateRuleChain(currentUser, entityId, callback);  
328 - return;  
329 - case CUSTOMER:  
330 - validateCustomer(currentUser, entityId, callback);  
331 - return;  
332 - case TENANT:  
333 - validateTenant(currentUser, entityId, callback);  
334 - return;  
335 - default:  
336 - //TODO: add support of other entities  
337 - throw new IllegalStateException("Not Implemented!");  
338 - }  
339 - }  
340 -  
341 - private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {  
342 - if (currentUser.isSystemAdmin()) {  
343 - callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));  
344 - } else {  
345 - ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));  
346 - Futures.addCallback(deviceFuture, getCallback(callback, device -> {  
347 - if (device == null) {  
348 - return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);  
349 - } else {  
350 - if (!device.getTenantId().equals(currentUser.getTenantId())) {  
351 - return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");  
352 - } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {  
353 - return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");  
354 - } else {  
355 - return ValidationResult.ok();  
356 - }  
357 - }  
358 - }));  
359 - }  
360 - }  
361 -  
362 - private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {  
363 - if (currentUser.isSystemAdmin()) {  
364 - callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));  
365 - } else {  
366 - ListenableFuture<Asset> assetFuture = assetService.findAssetByIdAsync(new AssetId(entityId.getId()));  
367 - Futures.addCallback(assetFuture, getCallback(callback, asset -> {  
368 - if (asset == null) {  
369 - return ValidationResult.entityNotFound("Asset with requested id wasn't found!");  
370 - } else {  
371 - if (!asset.getTenantId().equals(currentUser.getTenantId())) {  
372 - return ValidationResult.accessDenied("Asset doesn't belong to the current Tenant!");  
373 - } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {  
374 - return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");  
375 - } else {  
376 - return ValidationResult.ok();  
377 - }  
378 - }  
379 - }));  
380 - }  
381 - }  
382 -  
383 -  
384 - private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {  
385 - if (currentUser.isCustomerUser()) {  
386 - callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));  
387 - } else {  
388 - ListenableFuture<RuleChain> ruleChainFuture = ruleChainService.findRuleChainByIdAsync(new RuleChainId(entityId.getId()));  
389 - Futures.addCallback(ruleChainFuture, getCallback(callback, ruleChain -> {  
390 - if (ruleChain == null) {  
391 - return ValidationResult.entityNotFound("Rule chain with requested id wasn't found!");  
392 - } else {  
393 - if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {  
394 - return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");  
395 - } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {  
396 - return ValidationResult.accessDenied("Rule chain is not in system scope!");  
397 - } else {  
398 - return ValidationResult.ok();  
399 - }  
400 - }  
401 - }));  
402 - }  
403 - }  
404 -  
405 - private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {  
406 - if (currentUser.isSystemAdmin()) {  
407 - callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));  
408 - } else {  
409 - ListenableFuture<Customer> customerFuture = customerService.findCustomerByIdAsync(new CustomerId(entityId.getId()));  
410 - Futures.addCallback(customerFuture, getCallback(callback, customer -> {  
411 - if (customer == null) {  
412 - return ValidationResult.entityNotFound("Customer with requested id wasn't found!");  
413 - } else {  
414 - if (!customer.getTenantId().equals(currentUser.getTenantId())) {  
415 - return ValidationResult.accessDenied("Customer doesn't belong to the current Tenant!");  
416 - } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {  
417 - return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");  
418 - } else {  
419 - return ValidationResult.ok();  
420 - }  
421 - }  
422 - }));  
423 - }  
424 - }  
425 -  
426 - private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {  
427 - if (currentUser.isCustomerUser()) {  
428 - callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));  
429 - } else if (currentUser.isSystemAdmin()) {  
430 - callback.onSuccess(ValidationResult.ok());  
431 - } else {  
432 - ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));  
433 - Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {  
434 - if (tenant == null) {  
435 - return ValidationResult.entityNotFound("Tenant with requested id wasn't found!");  
436 - } else if (!tenant.getId().equals(currentUser.getTenantId())) {  
437 - return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");  
438 - } else {  
439 - return ValidationResult.ok();  
440 - }  
441 - }));  
442 - } 514 + private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) {
  515 + auditLogService.logEntityAction(
  516 + user.getTenantId(),
  517 + user.getCustomerId(),
  518 + user.getId(),
  519 + user.getName(),
  520 + (UUIDBased & EntityId) entityId,
  521 + null,
  522 + ActionType.ATTRIBUTES_UPDATED,
  523 + toException(e),
  524 + scope,
  525 + attributes);
443 } 526 }
444 527
445 - private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {  
446 - return new FutureCallback<T>() {  
447 - @Override  
448 - public void onSuccess(@Nullable T result) {  
449 - callback.onSuccess(transformer.apply(result));  
450 - }  
451 528
452 - @Override  
453 - public void onFailure(Throwable t) {  
454 - callback.onFailure(t);  
455 - }  
456 - }; 529 + private void logAttributesRead(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
  530 + auditLogService.logEntityAction(
  531 + user.getTenantId(),
  532 + user.getCustomerId(),
  533 + user.getId(),
  534 + user.getName(),
  535 + (UUIDBased & EntityId) entityId,
  536 + null,
  537 + ActionType.ATTRIBUTES_READ,
  538 + toException(e),
  539 + scope,
  540 + keys);
457 } 541 }
458 542
459 private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) { 543 private ListenableFuture<List<AttributeKvEntry>> mergeAllAttributesFutures(List<ListenableFuture<List<AttributeKvEntry>>> futures) {
@@ -467,4 +551,40 @@ public class TelemetryController extends BaseController { @@ -467,4 +551,40 @@ public class TelemetryController extends BaseController {
467 }, executor); 551 }, executor);
468 } 552 }
469 553
  554 + private List<String> toKeysList(String keys) {
  555 + List<String> keyList = null;
  556 + if (!StringUtils.isEmpty(keys)) {
  557 + keyList = Arrays.asList(keys.split(","));
  558 + }
  559 + return keyList;
  560 + }
  561 +
  562 + private DeferredResult<ResponseEntity> getImmediateDeferredResult(String message, HttpStatus status) {
  563 + DeferredResult<ResponseEntity> result = new DeferredResult<>();
  564 + result.setResult(new ResponseEntity<>(message, status));
  565 + return result;
  566 + }
  567 +
  568 + private List<AttributeKvEntry> extractRequestAttributes(JsonNode jsonNode) {
  569 + long ts = System.currentTimeMillis();
  570 + List<AttributeKvEntry> attributes = new ArrayList<>();
  571 + jsonNode.fields().forEachRemaining(entry -> {
  572 + String key = entry.getKey();
  573 + JsonNode value = entry.getValue();
  574 + if (entry.getValue().isTextual()) {
  575 + attributes.add(new BaseAttributeKvEntry(new StringDataEntry(key, value.textValue()), ts));
  576 + } else if (entry.getValue().isBoolean()) {
  577 + attributes.add(new BaseAttributeKvEntry(new BooleanDataEntry(key, value.booleanValue()), ts));
  578 + } else if (entry.getValue().isDouble()) {
  579 + attributes.add(new BaseAttributeKvEntry(new DoubleDataEntry(key, value.doubleValue()), ts));
  580 + } else if (entry.getValue().isNumber()) {
  581 + if (entry.getValue().isBigInteger()) {
  582 + throw new UncheckedApiException(new InvalidParametersException("Big integer values are not supported!"));
  583 + } else {
  584 + attributes.add(new BaseAttributeKvEntry(new LongDataEntry(key, value.longValue()), ts));
  585 + }
  586 + }
  587 + });
  588 + return attributes;
  589 + }
470 } 590 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * <p> 3 + *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - * <p>  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * <p> 7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,59 +48,59 @@ import javax.servlet.http.HttpServletRequest; @@ -48,59 +48,59 @@ import javax.servlet.http.HttpServletRequest;
48 @Slf4j 48 @Slf4j
49 public class PluginApiController extends BaseController { 49 public class PluginApiController extends BaseController {
50 50
51 - @SuppressWarnings("rawtypes")  
52 - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")  
53 - @RequestMapping(value = "/{pluginToken}/**")  
54 - @ResponseStatus(value = HttpStatus.OK)  
55 - public DeferredResult<ResponseEntity> processRequest(  
56 - @PathVariable("pluginToken") String pluginToken,  
57 - RequestEntity<byte[]> requestEntity,  
58 - HttpServletRequest request)  
59 - throws ThingsboardException {  
60 - log.debug("[{}] Going to process requst uri: {}", pluginToken, requestEntity.getUrl());  
61 - DeferredResult<ResponseEntity> result = new DeferredResult<ResponseEntity>();  
62 - PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);  
63 - if (pluginMd == null) {  
64 - result.setErrorResult(new PluginNotFoundException("Plugin with token: " + pluginToken + " not found!"));  
65 - } else {  
66 - TenantId tenantId = getCurrentUser().getTenantId();  
67 - CustomerId customerId = getCurrentUser().getCustomerId();  
68 - if (validatePluginAccess(pluginMd, tenantId, customerId)) {  
69 - if(tenantId != null && ModelConstants.NULL_UUID.equals(tenantId.getId())){  
70 - tenantId = null;  
71 - }  
72 - UserId userId = getCurrentUser().getId();  
73 - String userName = getCurrentUser().getName();  
74 - PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(),  
75 - tenantId, customerId, userId, userName);  
76 - actorService.process(new BasicPluginRestMsg(securityCtx, new RestRequest(requestEntity, request), result));  
77 - } else {  
78 - result.setResult(new ResponseEntity<>(HttpStatus.FORBIDDEN));  
79 - }  
80 -  
81 - }  
82 - return result;  
83 - }  
84 -  
85 - public static boolean validatePluginAccess(PluginMetaData pluginMd, TenantId tenantId, CustomerId customerId) {  
86 - boolean systemAdministrator = tenantId == null || ModelConstants.NULL_UUID.equals(tenantId.getId());  
87 - boolean tenantAdministrator = !systemAdministrator && (customerId == null || ModelConstants.NULL_UUID.equals(customerId.getId()));  
88 - boolean systemPlugin = ModelConstants.NULL_UUID.equals(pluginMd.getTenantId().getId());  
89 -  
90 - boolean validUser = false;  
91 - if (systemPlugin) {  
92 - if (pluginMd.isPublicAccess() || systemAdministrator) {  
93 - // All users can access public system plugins. Only system  
94 - // users can access private system plugins  
95 - validUser = true;  
96 - }  
97 - } else {  
98 - if ((pluginMd.isPublicAccess() || tenantAdministrator) && tenantId != null && tenantId.equals(pluginMd.getTenantId())) {  
99 - // All tenant users can access public tenant plugins. Only tenant  
100 - // administrator can access private tenant plugins  
101 - validUser = true;  
102 - }  
103 - }  
104 - return validUser;  
105 - } 51 +// @SuppressWarnings("rawtypes")
  52 +// @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  53 +// @RequestMapping(value = "/{pluginToken}/**")
  54 +// @ResponseStatus(value = HttpStatus.OK)
  55 +// public DeferredResult<ResponseEntity> processRequest(
  56 +// @PathVariable("pluginToken") String pluginToken,
  57 +// RequestEntity<byte[]> requestEntity,
  58 +// HttpServletRequest request)
  59 +// throws ThingsboardException {
  60 +// log.debug("[{}] Going to process requst uri: {}", pluginToken, requestEntity.getUrl());
  61 +// DeferredResult<ResponseEntity> result = new DeferredResult<ResponseEntity>();
  62 +// PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);
  63 +// if (pluginMd == null) {
  64 +// result.setErrorResult(new PluginNotFoundException("Plugin with token: " + pluginToken + " not found!"));
  65 +// } else {
  66 +// TenantId tenantId = getCurrentUser().getTenantId();
  67 +// CustomerId customerId = getCurrentUser().getCustomerId();
  68 +// if (validatePluginAccess(pluginMd, tenantId, customerId)) {
  69 +// if(tenantId != null && ModelConstants.NULL_UUID.equals(tenantId.getId())){
  70 +// tenantId = null;
  71 +// }
  72 +// UserId userId = getCurrentUser().getId();
  73 +// String userName = getCurrentUser().getName();
  74 +// PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(),
  75 +// tenantId, customerId, userId, userName);
  76 +// actorService.process(new BasicPluginRestMsg(securityCtx, new RestRequest(requestEntity, request), result));
  77 +// } else {
  78 +// result.setResult(new ResponseEntity<>(HttpStatus.FORBIDDEN));
  79 +// }
  80 +//
  81 +// }
  82 +// return result;
  83 +// }
  84 +//
  85 +// public static boolean validatePluginAccess(PluginMetaData pluginMd, TenantId tenantId, CustomerId customerId) {
  86 +// boolean systemAdministrator = tenantId == null || ModelConstants.NULL_UUID.equals(tenantId.getId());
  87 +// boolean tenantAdministrator = !systemAdministrator && (customerId == null || ModelConstants.NULL_UUID.equals(customerId.getId()));
  88 +// boolean systemPlugin = ModelConstants.NULL_UUID.equals(pluginMd.getTenantId().getId());
  89 +//
  90 +// boolean validUser = false;
  91 +// if (systemPlugin) {
  92 +// if (pluginMd.isPublicAccess() || systemAdministrator) {
  93 +// // All users can access public system plugins. Only system
  94 +// // users can access private system plugins
  95 +// validUser = true;
  96 +// }
  97 +// } else {
  98 +// if ((pluginMd.isPublicAccess() || tenantAdministrator) && tenantId != null && tenantId.equals(pluginMd.getTenantId())) {
  99 +// // All tenant users can access public tenant plugins. Only tenant
  100 +// // administrator can access private tenant plugins
  101 +// validUser = true;
  102 +// }
  103 +// }
  104 +// return validUser;
  105 +// }
106 } 106 }
application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java renamed from application/src/main/java/org/thingsboard/server/controller/plugin/PluginWebSocketHandler.java
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,57 +15,42 @@ @@ -15,57 +15,42 @@
15 */ 15 */
16 package org.thingsboard.server.controller.plugin; 16 package org.thingsboard.server.controller.plugin;
17 17
18 -import java.io.IOException;  
19 -import java.net.URI;  
20 -import java.security.InvalidParameterException;  
21 -import java.util.UUID;  
22 -import java.util.concurrent.ConcurrentHashMap;  
23 -import java.util.concurrent.ConcurrentMap;  
24 -  
25 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
26 import org.springframework.beans.factory.BeanCreationNotAllowedException; 19 import org.springframework.beans.factory.BeanCreationNotAllowedException;
27 -import org.springframework.context.annotation.Lazy;  
28 -import org.springframework.web.bind.annotation.RequestMapping;  
29 -import org.springframework.web.bind.annotation.RestController;  
30 -import org.thingsboard.server.actors.service.ActorService;  
31 -import org.thingsboard.server.common.data.id.UserId;  
32 -import org.thingsboard.server.config.WebSocketConfiguration;  
33 -import org.thingsboard.server.extensions.api.plugins.PluginConstants;  
34 -import org.thingsboard.server.service.security.model.SecurityUser;  
35 -import org.thingsboard.server.common.data.id.CustomerId;  
36 -import org.thingsboard.server.common.data.id.TenantId;  
37 -import org.thingsboard.server.common.data.plugin.PluginMetaData;  
38 -import org.thingsboard.server.dao.plugin.PluginService;  
39 -import org.thingsboard.server.extensions.api.plugins.PluginApiCallSecurityContext;  
40 -import org.thingsboard.server.extensions.api.plugins.ws.BasicPluginWebsocketSessionRef;  
41 -import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;  
42 -import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;  
43 -import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;  
44 -import org.thingsboard.server.extensions.api.plugins.ws.msg.SessionEventPluginWebSocketMsg;  
45 -import org.thingsboard.server.extensions.api.plugins.ws.msg.TextPluginWebSocketMsg;  
46 -import org.slf4j.Logger;  
47 -import org.slf4j.LoggerFactory;  
48 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.beans.factory.annotation.Autowired;
  21 +import org.springframework.context.annotation.Lazy;
49 import org.springframework.stereotype.Service; 22 import org.springframework.stereotype.Service;
50 import org.springframework.web.socket.CloseStatus; 23 import org.springframework.web.socket.CloseStatus;
51 import org.springframework.web.socket.TextMessage; 24 import org.springframework.web.socket.TextMessage;
52 import org.springframework.web.socket.WebSocketSession; 25 import org.springframework.web.socket.WebSocketSession;
53 import org.springframework.web.socket.handler.TextWebSocketHandler; 26 import org.springframework.web.socket.handler.TextWebSocketHandler;
  27 +import org.thingsboard.server.config.WebSocketConfiguration;
  28 +import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
  29 +import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
  30 +import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
  31 +import org.thingsboard.server.extensions.api.plugins.ws.msg.TextPluginWebSocketMsg;
  32 +import org.thingsboard.server.service.security.model.SecurityUser;
  33 +import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
  34 +import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
  35 +import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
  36 +
  37 +import java.io.IOException;
  38 +import java.net.URI;
  39 +import java.security.InvalidParameterException;
  40 +import java.util.UUID;
  41 +import java.util.concurrent.ConcurrentHashMap;
  42 +import java.util.concurrent.ConcurrentMap;
54 43
55 @Service 44 @Service
56 @Slf4j 45 @Slf4j
57 -public class PluginWebSocketHandler extends TextWebSocketHandler implements PluginWebSocketMsgEndpoint { 46 +public class TbWebSocketHandler extends TextWebSocketHandler implements PluginWebSocketMsgEndpoint, TelemetryWebSocketMsgEndpoint {
58 47
59 private static final ConcurrentMap<String, SessionMetaData> internalSessionMap = new ConcurrentHashMap<>(); 48 private static final ConcurrentMap<String, SessionMetaData> internalSessionMap = new ConcurrentHashMap<>();
60 private static final ConcurrentMap<String, String> externalSessionMap = new ConcurrentHashMap<>(); 49 private static final ConcurrentMap<String, String> externalSessionMap = new ConcurrentHashMap<>();
61 50
62 @Autowired 51 @Autowired
63 @Lazy 52 @Lazy
64 - private ActorService actorService;  
65 -  
66 - @Autowired  
67 - @Lazy  
68 - private PluginService pluginService; 53 + private TelemetryWebSocketService webSocketService;
69 54
70 @Override 55 @Override
71 public void handleTextMessage(WebSocketSession session, TextMessage message) { 56 public void handleTextMessage(WebSocketSession session, TextMessage message) {
@@ -73,7 +58,7 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -73,7 +58,7 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
73 log.info("[{}] Processing {}", session.getId(), message); 58 log.info("[{}] Processing {}", session.getId(), message);
74 SessionMetaData sessionMd = internalSessionMap.get(session.getId()); 59 SessionMetaData sessionMd = internalSessionMap.get(session.getId());
75 if (sessionMd != null) { 60 if (sessionMd != null) {
76 - actorService.process(new TextPluginWebSocketMsg(sessionMd.sessionRef, message.getPayload())); 61 + webSocketService.handleWebSocketMsg(sessionMd.sessionRef, message.getPayload());
77 } else { 62 } else {
78 log.warn("[{}] Failed to find session", session.getId()); 63 log.warn("[{}] Failed to find session", session.getId());
79 session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!")); 64 session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!"));
@@ -88,11 +73,11 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -88,11 +73,11 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
88 super.afterConnectionEstablished(session); 73 super.afterConnectionEstablished(session);
89 try { 74 try {
90 String internalSessionId = session.getId(); 75 String internalSessionId = session.getId();
91 - PluginWebsocketSessionRef sessionRef = toRef(session); 76 + TelemetryWebSocketSessionRef sessionRef = toRef(session);
92 String externalSessionId = sessionRef.getSessionId(); 77 String externalSessionId = sessionRef.getSessionId();
93 internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); 78 internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef));
94 externalSessionMap.put(externalSessionId, internalSessionId); 79 externalSessionMap.put(externalSessionId, internalSessionId);
95 - actorService.process(new SessionEventPluginWebSocketMsg(sessionRef, SessionEvent.onEstablished())); 80 + processInWebSocketService(sessionRef, SessionEvent.onEstablished());
96 log.info("[{}][{}] Session is started", externalSessionId, session.getId()); 81 log.info("[{}][{}] Session is started", externalSessionId, session.getId());
97 } catch (InvalidParameterException e) { 82 } catch (InvalidParameterException e) {
98 log.warn("[[{}] Failed to start session", session.getId(), e); 83 log.warn("[[{}] Failed to start session", session.getId(), e);
@@ -108,7 +93,7 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -108,7 +93,7 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
108 super.handleTransportError(session, tError); 93 super.handleTransportError(session, tError);
109 SessionMetaData sessionMd = internalSessionMap.get(session.getId()); 94 SessionMetaData sessionMd = internalSessionMap.get(session.getId());
110 if (sessionMd != null) { 95 if (sessionMd != null) {
111 - processInActorService(new SessionEventPluginWebSocketMsg(sessionMd.sessionRef, SessionEvent.onError(tError))); 96 + processInWebSocketService(sessionMd.sessionRef, SessionEvent.onError(tError));
112 } else { 97 } else {
113 log.warn("[{}] Failed to find session", session.getId()); 98 log.warn("[{}] Failed to find session", session.getId());
114 } 99 }
@@ -121,20 +106,20 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -121,20 +106,20 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
121 SessionMetaData sessionMd = internalSessionMap.remove(session.getId()); 106 SessionMetaData sessionMd = internalSessionMap.remove(session.getId());
122 if (sessionMd != null) { 107 if (sessionMd != null) {
123 externalSessionMap.remove(sessionMd.sessionRef.getSessionId()); 108 externalSessionMap.remove(sessionMd.sessionRef.getSessionId());
124 - processInActorService(new SessionEventPluginWebSocketMsg(sessionMd.sessionRef, SessionEvent.onClosed())); 109 + processInWebSocketService(sessionMd.sessionRef, SessionEvent.onClosed());
125 } 110 }
126 log.info("[{}] Session is closed", session.getId()); 111 log.info("[{}] Session is closed", session.getId());
127 } 112 }
128 113
129 - private void processInActorService(SessionEventPluginWebSocketMsg msg) { 114 + private void processInWebSocketService(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
130 try { 115 try {
131 - actorService.process(msg); 116 + webSocketService.handleWebSocketSessionEvent(sessionRef, event);
132 } catch (BeanCreationNotAllowedException e) { 117 } catch (BeanCreationNotAllowedException e) {
133 - log.warn("[{}] Failed to close session due to possible shutdown state", msg.getSessionRef().getSessionId()); 118 + log.warn("[{}] Failed to close session due to possible shutdown state", sessionRef.getSessionId());
134 } 119 }
135 } 120 }
136 121
137 - private PluginWebsocketSessionRef toRef(WebSocketSession session) throws IOException { 122 + private TelemetryWebSocketSessionRef toRef(WebSocketSession session) throws IOException {
138 URI sessionUri = session.getUri(); 123 URI sessionUri = session.getUri();
139 String path = sessionUri.getPath(); 124 String path = sessionUri.getPath();
140 path = path.substring(WebSocketConfiguration.WS_PLUGIN_PREFIX.length()); 125 path = path.substring(WebSocketConfiguration.WS_PLUGIN_PREFIX.length());
@@ -142,33 +127,20 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -142,33 +127,20 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
142 throw new IllegalArgumentException("URL should contain plugin token!"); 127 throw new IllegalArgumentException("URL should contain plugin token!");
143 } 128 }
144 String[] pathElements = path.split("/"); 129 String[] pathElements = path.split("/");
145 - String pluginToken = pathElements[0];  
146 - // TODO: cache  
147 - PluginMetaData pluginMd = pluginService.findPluginByApiToken(pluginToken);  
148 - if (pluginMd == null) { 130 + String serviceToken = pathElements[0];
  131 + if (!"telemetry".equalsIgnoreCase(serviceToken)) {
149 throw new InvalidParameterException("Can't find plugin with specified token!"); 132 throw new InvalidParameterException("Can't find plugin with specified token!");
150 } else { 133 } else {
151 SecurityUser currentUser = (SecurityUser) session.getAttributes().get(WebSocketConfiguration.WS_SECURITY_USER_ATTRIBUTE); 134 SecurityUser currentUser = (SecurityUser) session.getAttributes().get(WebSocketConfiguration.WS_SECURITY_USER_ATTRIBUTE);
152 - TenantId tenantId = currentUser.getTenantId();  
153 - CustomerId customerId = currentUser.getCustomerId();  
154 - if (PluginApiController.validatePluginAccess(pluginMd, tenantId, customerId)) {  
155 - UserId userId = currentUser.getId();  
156 - String userName = currentUser.getName();  
157 - PluginApiCallSecurityContext securityCtx = new PluginApiCallSecurityContext(pluginMd.getTenantId(), pluginMd.getId(), tenantId,  
158 - currentUser.getCustomerId(), userId, userName);  
159 - return new BasicPluginWebsocketSessionRef(UUID.randomUUID().toString(), securityCtx, session.getUri(), session.getAttributes(),  
160 - session.getLocalAddress(), session.getRemoteAddress());  
161 - } else {  
162 - throw new SecurityException("Current user is not allowed to use this plugin!");  
163 - } 135 + return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
164 } 136 }
165 } 137 }
166 138
167 private static class SessionMetaData { 139 private static class SessionMetaData {
168 private final WebSocketSession session; 140 private final WebSocketSession session;
169 - private final PluginWebsocketSessionRef sessionRef; 141 + private final TelemetryWebSocketSessionRef sessionRef;
170 142
171 - public SessionMetaData(WebSocketSession session, PluginWebsocketSessionRef sessionRef) { 143 + public SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
172 super(); 144 super();
173 this.session = session; 145 this.session = session;
174 this.sessionRef = sessionRef; 146 this.sessionRef = sessionRef;
@@ -176,6 +148,41 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -176,6 +148,41 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
176 } 148 }
177 149
178 @Override 150 @Override
  151 + public void send(TelemetryWebSocketSessionRef sessionRef, String msg) throws IOException {
  152 + String externalId = sessionRef.getSessionId();
  153 + log.debug("[{}] Processing {}", externalId, msg);
  154 + String internalId = externalSessionMap.get(externalId);
  155 + if (internalId != null) {
  156 + SessionMetaData sessionMd = internalSessionMap.get(internalId);
  157 + if (sessionMd != null) {
  158 + sessionMd.session.sendMessage(new TextMessage(msg));
  159 + } else {
  160 + log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
  161 + }
  162 + } else {
  163 + log.warn("[{}] Failed to find session by external id", externalId);
  164 + }
  165 + }
  166 +
  167 + @Override
  168 + public void close(TelemetryWebSocketSessionRef sessionRef) throws IOException {
  169 + String externalId = sessionRef.getSessionId();
  170 + log.debug("[{}] Processing close request", externalId);
  171 + String internalId = externalSessionMap.get(externalId);
  172 + if (internalId != null) {
  173 + SessionMetaData sessionMd = internalSessionMap.get(internalId);
  174 + if (sessionMd != null) {
  175 + sessionMd.session.close(CloseStatus.NORMAL);
  176 + } else {
  177 + log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
  178 + }
  179 + } else {
  180 + log.warn("[{}] Failed to find session by external id", externalId);
  181 + }
  182 + }
  183 +
  184 + //TODO: remove
  185 + @Override
179 public void send(PluginWebsocketMsg<?> wsMsg) throws IOException { 186 public void send(PluginWebsocketMsg<?> wsMsg) throws IOException {
180 PluginWebsocketSessionRef sessionRef = wsMsg.getSessionRef(); 187 PluginWebsocketSessionRef sessionRef = wsMsg.getSessionRef();
181 String externalId = sessionRef.getSessionId(); 188 String externalId = sessionRef.getSessionId();
@@ -196,6 +203,7 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -196,6 +203,7 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
196 } 203 }
197 } 204 }
198 205
  206 + //TODO: remove
199 @Override 207 @Override
200 public void close(PluginWebsocketSessionRef sessionRef) throws IOException { 208 public void close(PluginWebsocketSessionRef sessionRef) throws IOException {
201 String externalId = sessionRef.getSessionId(); 209 String externalId = sessionRef.getSessionId();
@@ -212,5 +220,4 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug @@ -212,5 +220,4 @@ public class PluginWebSocketHandler extends TextWebSocketHandler implements Plug
212 log.warn("[{}] Failed to find session by external id", externalId); 220 log.warn("[{}] Failed to find session by external id", externalId);
213 } 221 }
214 } 222 }
215 -  
216 } 223 }
  1 +package org.thingsboard.server.service.security;
  2 +
  3 +import com.google.common.base.Function;
  4 +import com.google.common.util.concurrent.FutureCallback;
  5 +import com.google.common.util.concurrent.Futures;
  6 +import com.google.common.util.concurrent.ListenableFuture;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.http.HttpStatus;
  9 +import org.springframework.http.ResponseEntity;
  10 +import org.springframework.stereotype.Component;
  11 +import org.springframework.web.context.request.async.DeferredResult;
  12 +import org.thingsboard.server.actors.plugin.ValidationResult;
  13 +import org.thingsboard.server.common.data.Customer;
  14 +import org.thingsboard.server.common.data.Device;
  15 +import org.thingsboard.server.common.data.Tenant;
  16 +import org.thingsboard.server.common.data.asset.Asset;
  17 +import org.thingsboard.server.common.data.id.AssetId;
  18 +import org.thingsboard.server.common.data.id.CustomerId;
  19 +import org.thingsboard.server.common.data.id.DeviceId;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.data.id.EntityIdFactory;
  22 +import org.thingsboard.server.common.data.id.RuleChainId;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.data.rule.RuleChain;
  25 +import org.thingsboard.server.controller.ValidationCallback;
  26 +import org.thingsboard.server.dao.alarm.AlarmService;
  27 +import org.thingsboard.server.dao.asset.AssetService;
  28 +import org.thingsboard.server.dao.customer.CustomerService;
  29 +import org.thingsboard.server.dao.device.DeviceService;
  30 +import org.thingsboard.server.dao.rule.RuleChainService;
  31 +import org.thingsboard.server.dao.tenant.TenantService;
  32 +import org.thingsboard.server.dao.user.UserService;
  33 +import org.thingsboard.server.exception.ThingsboardException;
  34 +import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
  35 +import org.thingsboard.server.service.security.model.SecurityUser;
  36 +
  37 +import javax.annotation.Nullable;
  38 +import java.util.function.BiConsumer;
  39 +
  40 +/**
  41 + * Created by ashvayka on 27.03.18.
  42 + */
  43 +@Component
  44 +public class AccessValidator {
  45 +
  46 + public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
  47 + public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
  48 + public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";
  49 +
  50 + @Autowired
  51 + protected TenantService tenantService;
  52 +
  53 + @Autowired
  54 + protected CustomerService customerService;
  55 +
  56 + @Autowired
  57 + protected UserService userService;
  58 +
  59 + @Autowired
  60 + protected DeviceService deviceService;
  61 +
  62 + @Autowired
  63 + protected AssetService assetService;
  64 +
  65 + @Autowired
  66 + protected AlarmService alarmService;
  67 +
  68 + @Autowired
  69 + protected RuleChainService ruleChainService;
  70 +
  71 + public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, String entityType, String entityIdStr,
  72 + BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess) throws ThingsboardException {
  73 + return validateEntityAndCallback(currentUser, entityType, entityIdStr, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
  74 + }
  75 +
  76 + public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, String entityType, String entityIdStr,
  77 + BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess,
  78 + BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
  79 + return validateEntityAndCallback(currentUser, EntityIdFactory.getByTypeAndId(entityType, entityIdStr),
  80 + onSuccess, onFailure);
  81 + }
  82 +
  83 + public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, EntityId entityId,
  84 + BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess) throws ThingsboardException {
  85 + return validateEntityAndCallback(currentUser, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
  86 + }
  87 +
  88 + public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, EntityId entityId,
  89 + BiConsumer<DeferredResult<ResponseEntity>, EntityId> onSuccess,
  90 + BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
  91 +
  92 + final DeferredResult<ResponseEntity> response = new DeferredResult<>();
  93 +
  94 + validate(currentUser, entityId, new ValidationCallback(response,
  95 + new FutureCallback<DeferredResult<ResponseEntity>>() {
  96 + @Override
  97 + public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
  98 + onSuccess.accept(response, entityId);
  99 + }
  100 +
  101 + @Override
  102 + public void onFailure(Throwable t) {
  103 + onFailure.accept(response, t);
  104 + }
  105 + }));
  106 +
  107 + return response;
  108 + }
  109 +
  110 + public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
  111 + switch (entityId.getEntityType()) {
  112 + case DEVICE:
  113 + validateDevice(currentUser, entityId, callback);
  114 + return;
  115 + case ASSET:
  116 + validateAsset(currentUser, entityId, callback);
  117 + return;
  118 + case RULE_CHAIN:
  119 + validateRuleChain(currentUser, entityId, callback);
  120 + return;
  121 + case CUSTOMER:
  122 + validateCustomer(currentUser, entityId, callback);
  123 + return;
  124 + case TENANT:
  125 + validateTenant(currentUser, entityId, callback);
  126 + return;
  127 + default:
  128 + //TODO: add support of other entities
  129 + throw new IllegalStateException("Not Implemented!");
  130 + }
  131 + }
  132 +
  133 + private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
  134 + if (currentUser.isSystemAdmin()) {
  135 + callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
  136 + } else {
  137 + ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
  138 + Futures.addCallback(deviceFuture, getCallback(callback, device -> {
  139 + if (device == null) {
  140 + return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
  141 + } else {
  142 + if (!device.getTenantId().equals(currentUser.getTenantId())) {
  143 + return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
  144 + } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
  145 + return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
  146 + } else {
  147 + return ValidationResult.ok();
  148 + }
  149 + }
  150 + }));
  151 + }
  152 + }
  153 +
  154 + private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
  155 + if (currentUser.isSystemAdmin()) {
  156 + callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
  157 + } else {
  158 + ListenableFuture<Asset> assetFuture = assetService.findAssetByIdAsync(new AssetId(entityId.getId()));
  159 + Futures.addCallback(assetFuture, getCallback(callback, asset -> {
  160 + if (asset == null) {
  161 + return ValidationResult.entityNotFound("Asset with requested id wasn't found!");
  162 + } else {
  163 + if (!asset.getTenantId().equals(currentUser.getTenantId())) {
  164 + return ValidationResult.accessDenied("Asset doesn't belong to the current Tenant!");
  165 + } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
  166 + return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
  167 + } else {
  168 + return ValidationResult.ok();
  169 + }
  170 + }
  171 + }));
  172 + }
  173 + }
  174 +
  175 +
  176 + private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
  177 + if (currentUser.isCustomerUser()) {
  178 + callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
  179 + } else {
  180 + ListenableFuture<RuleChain> ruleChainFuture = ruleChainService.findRuleChainByIdAsync(new RuleChainId(entityId.getId()));
  181 + Futures.addCallback(ruleChainFuture, getCallback(callback, ruleChain -> {
  182 + if (ruleChain == null) {
  183 + return ValidationResult.entityNotFound("Rule chain with requested id wasn't found!");
  184 + } else {
  185 + if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
  186 + return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
  187 + } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
  188 + return ValidationResult.accessDenied("Rule chain is not in system scope!");
  189 + } else {
  190 + return ValidationResult.ok();
  191 + }
  192 + }
  193 + }));
  194 + }
  195 + }
  196 +
  197 + private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
  198 + if (currentUser.isSystemAdmin()) {
  199 + callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
  200 + } else {
  201 + ListenableFuture<Customer> customerFuture = customerService.findCustomerByIdAsync(new CustomerId(entityId.getId()));
  202 + Futures.addCallback(customerFuture, getCallback(callback, customer -> {
  203 + if (customer == null) {
  204 + return ValidationResult.entityNotFound("Customer with requested id wasn't found!");
  205 + } else {
  206 + if (!customer.getTenantId().equals(currentUser.getTenantId())) {
  207 + return ValidationResult.accessDenied("Customer doesn't belong to the current Tenant!");
  208 + } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
  209 + return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
  210 + } else {
  211 + return ValidationResult.ok();
  212 + }
  213 + }
  214 + }));
  215 + }
  216 + }
  217 +
  218 + private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) {
  219 + if (currentUser.isCustomerUser()) {
  220 + callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
  221 + } else if (currentUser.isSystemAdmin()) {
  222 + callback.onSuccess(ValidationResult.ok());
  223 + } else {
  224 + ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
  225 + Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
  226 + if (tenant == null) {
  227 + return ValidationResult.entityNotFound("Tenant with requested id wasn't found!");
  228 + } else if (!tenant.getId().equals(currentUser.getTenantId())) {
  229 + return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
  230 + } else {
  231 + return ValidationResult.ok();
  232 + }
  233 + }));
  234 + }
  235 + }
  236 +
  237 + private <T> FutureCallback<T> getCallback(ValidationCallback callback, Function<T, ValidationResult> transformer) {
  238 + return new FutureCallback<T>() {
  239 + @Override
  240 + public void onSuccess(@Nullable T result) {
  241 + callback.onSuccess(transformer.apply(result));
  242 + }
  243 +
  244 + @Override
  245 + public void onFailure(Throwable t) {
  246 + callback.onFailure(t);
  247 + }
  248 + };
  249 + }
  250 +
  251 + public static void handleError(Throwable e, final DeferredResult<ResponseEntity> response, HttpStatus defaultErrorStatus) {
  252 + ResponseEntity responseEntity;
  253 + if (e != null && e instanceof ToErrorResponseEntity) {
  254 + responseEntity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
  255 + } else if (e != null && e instanceof IllegalArgumentException) {
  256 + responseEntity = new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
  257 + } else {
  258 + responseEntity = new ResponseEntity<>(defaultErrorStatus);
  259 + }
  260 + response.setResult(responseEntity);
  261 + }
  262 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.springframework.beans.factory.annotation.Autowired;
  5 +import org.springframework.stereotype.Service;
  6 +import org.thingsboard.server.common.data.id.EntityId;
  7 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  8 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  9 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
  10 +
  11 +import java.util.HashMap;
  12 +import java.util.List;
  13 +import java.util.Map;
  14 +import java.util.Set;
  15 +
  16 +/**
  17 + * Created by ashvayka on 27.03.18.
  18 + */
  19 +@Service
  20 +@Slf4j
  21 +public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptionService {
  22 +
  23 + @Autowired
  24 + private TelemetryWebSocketService wsService;
  25 +
  26 +
  27 + private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
  28 +
  29 + private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
  30 +
  31 +
  32 +
  33 + @Override
  34 + public void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
  35 +
  36 + }
  37 +
  38 + @Override
  39 + public void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries) {
  40 +
  41 + }
  42 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import com.fasterxml.jackson.core.JsonProcessingException;
  4 +import com.fasterxml.jackson.databind.ObjectMapper;
  5 +import com.google.common.util.concurrent.FutureCallback;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.stereotype.Service;
  9 +import org.springframework.util.StringUtils;
  10 +import org.thingsboard.server.common.data.DataConstants;
  11 +import org.thingsboard.server.common.data.id.EntityId;
  12 +import org.thingsboard.server.common.data.id.EntityIdFactory;
  13 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  14 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  15 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  16 +import org.thingsboard.server.dao.attributes.AttributesService;
  17 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  18 +import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
  19 +import org.thingsboard.server.extensions.api.plugins.PluginCallback;
  20 +import org.thingsboard.server.extensions.api.plugins.PluginContext;
  21 +import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
  22 +import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
  23 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd;
  24 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd;
  25 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd;
  26 +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper;
  27 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
  28 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
  29 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
  30 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
  31 +
  32 +import java.io.IOException;
  33 +import java.util.ArrayList;
  34 +import java.util.Arrays;
  35 +import java.util.Collections;
  36 +import java.util.HashMap;
  37 +import java.util.HashSet;
  38 +import java.util.List;
  39 +import java.util.Map;
  40 +import java.util.Optional;
  41 +import java.util.Set;
  42 +import java.util.concurrent.ConcurrentHashMap;
  43 +import java.util.concurrent.ConcurrentMap;
  44 +import java.util.stream.Collectors;
  45 +
  46 +/**
  47 + * Created by ashvayka on 27.03.18.
  48 + */
  49 +@Service
  50 +@Slf4j
  51 +public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
  52 +
  53 + private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
  54 + private static final String PROCESSING_MSG = "[{}] Processing: {}";
  55 + private static final ObjectMapper jsonMapper = new ObjectMapper();
  56 + private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
  57 + private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
  58 + private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
  59 +
  60 + private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>();
  61 +
  62 + @Autowired
  63 + private TelemetrySubscriptionService subscriptionManager;
  64 +
  65 + @Autowired
  66 + private TelemetryWebSocketMsgEndpoint msgEndpoint;
  67 +
  68 + @Autowired
  69 + private AttributesService attributesService;
  70 +
  71 + @Autowired
  72 + private TimeseriesService tsService;
  73 +
  74 + @Override
  75 + public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) {
  76 + String sessionId = sessionRef.getSessionId();
  77 + log.debug(PROCESSING_MSG, sessionId, event);
  78 + switch (event.getEventType()) {
  79 + case ESTABLISHED:
  80 + wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
  81 + break;
  82 + case ERROR:
  83 + log.debug("[{}] Unknown websocket session error: {}. ", sessionId, event.getError().orElse(null));
  84 + break;
  85 + case CLOSED:
  86 + wsSessionsMap.remove(sessionId);
  87 + subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
  88 + break;
  89 + }
  90 + }
  91 +
  92 + @Override
  93 + public void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg) {
  94 + if (log.isTraceEnabled()) {
  95 + log.trace("[{}] Processing: {}", sessionRef.getSessionId(), msg);
  96 + }
  97 +
  98 + try {
  99 + TelemetryPluginCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, TelemetryPluginCmdsWrapper.class);
  100 + if (cmdsWrapper != null) {
  101 + if (cmdsWrapper.getAttrSubCmds() != null) {
  102 + cmdsWrapper.getAttrSubCmds().forEach(cmd -> handleWsAttributesSubscriptionCmd(sessionRef, cmd));
  103 + }
  104 + if (cmdsWrapper.getTsSubCmds() != null) {
  105 + cmdsWrapper.getTsSubCmds().forEach(cmd -> handleWsTimeseriesSubscriptionCmd(sessionRef, cmd));
  106 + }
  107 + if (cmdsWrapper.getHistoryCmds() != null) {
  108 + cmdsWrapper.getHistoryCmds().forEach(cmd -> handleWsHistoryCmd(sessionRef, cmd));
  109 + }
  110 + }
  111 + } catch (IOException e) {
  112 + log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
  113 + SubscriptionUpdate update = new SubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND);
  114 + sendWsMsg(sessionRef, update);
  115 + }
  116 + }
  117 +
  118 + private void handleWsAttributesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AttributesSubscriptionCmd cmd) {
  119 + String sessionId = sessionRef.getSessionId();
  120 + log.debug("[{}] Processing: {}", sessionId, cmd);
  121 +
  122 + if (validateSessionMetadata(sessionRef, cmd, sessionId)) {
  123 + if (cmd.isUnsubscribe()) {
  124 + unsubscribe(sessionRef, cmd, sessionId);
  125 + } else if (validateSubscriptionCmd(sessionRef, cmd)) {
  126 + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
  127 + log.debug("[{}] fetching latest attributes ({}) values for device: {}", sessionId, cmd.getKeys(), entityId);
  128 + Optional<Set<String>> keysOptional = getKeys(cmd);
  129 + if (keysOptional.isPresent()) {
  130 + List<String> keys = new ArrayList<>(keysOptional.get());
  131 + handleWsAttributesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId, keys);
  132 + } else {
  133 + handleWsAttributesSubscription(sessionRef, cmd, sessionId, entityId);
  134 + }
  135 + }
  136 + }
  137 + }
  138 +
  139 + private void handleWsAttributesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef,
  140 + AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId,
  141 + List<String> keys) {
  142 + FutureCallback<List<AttributeKvEntry>> callback = new FutureCallback<List<AttributeKvEntry>>() {
  143 + @Override
  144 + public void onSuccess(List<AttributeKvEntry> data) {
  145 + List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
  146 + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
  147 +
  148 + Map<String, Long> subState = new HashMap<>(keys.size());
  149 + keys.forEach(key -> subState.put(key, 0L));
  150 + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
  151 +
  152 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
  153 + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub);
  154 + }
  155 +
  156 + @Override
  157 + public void onFailure(Throwable e) {
  158 + log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
  159 + SubscriptionUpdate update;
  160 + if (UnauthorizedException.class.isInstance(e)) {
  161 + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
  162 + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
  163 + } else {
  164 + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  165 + FAILED_TO_FETCH_ATTRIBUTES);
  166 + }
  167 + sendWsMsg(sessionRef, update);
  168 + }
  169 + };
  170 +
  171 + if (StringUtils.isEmpty(cmd.getScope())) {
  172 + //ValidationCallback?
  173 + ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback);
  174 + } else {
  175 + ctx.loadAttributes(entityId, cmd.getScope(), keys, callback);
  176 + }
  177 + }
  178 +
  179 + private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef,
  180 + AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) {
  181 + PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
  182 + @Override
  183 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
  184 + List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
  185 + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
  186 +
  187 + Map<String, Long> subState = new HashMap<>(attributesData.size());
  188 + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
  189 +
  190 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
  191 + subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
  192 + }
  193 +
  194 + @Override
  195 + public void onFailure(PluginContext ctx, Exception e) {
  196 + log.error(FAILED_TO_FETCH_ATTRIBUTES, e);
  197 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  198 + FAILED_TO_FETCH_ATTRIBUTES);
  199 + sendWsMsg(ctx, sessionRef, update);
  200 + }
  201 + };
  202 +
  203 + if (StringUtils.isEmpty(cmd.getScope())) {
  204 + ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback);
  205 + } else {
  206 + ctx.loadAttributes(entityId, cmd.getScope(), callback);
  207 + }
  208 + }
  209 +
  210 + private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
  211 + if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
  212 + subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId);
  213 + } else {
  214 + subscriptionManager.removeSubscription(sessionId, cmd.getCmdId());
  215 + }
  216 + }
  217 +
  218 + private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd) {
  219 + if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
  220 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
  221 + "Device id is empty!");
  222 + sendWsMsg(sessionRef, update);
  223 + return false;
  224 + }
  225 + return true;
  226 + }
  227 +
  228 + private boolean validateSessionMetadata(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
  229 + WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
  230 + if (sessionMD == null) {
  231 + log.warn("[{}] Session meta data not found. ", sessionId);
  232 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  233 + SESSION_META_DATA_NOT_FOUND);
  234 + sendWsMsg(sessionRef, update);
  235 + return false;
  236 + } else {
  237 + return true;
  238 + }
  239 + }
  240 +
  241 + private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, SubscriptionUpdate update) {
  242 + try {
  243 + msgEndpoint.send(sessionRef, jsonMapper.writeValueAsString(update));
  244 + } catch (JsonProcessingException e) {
  245 + log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
  246 + } catch (IOException e) {
  247 + log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);
  248 + }
  249 + }
  250 +
  251 + private static Optional<Set<String>> getKeys(TelemetryPluginCmd cmd) {
  252 + if (!StringUtils.isEmpty(cmd.getKeys())) {
  253 + Set<String> keys = new HashSet<>();
  254 + Collections.addAll(keys, cmd.getKeys().split(","));
  255 + return Optional.of(keys);
  256 + } else {
  257 + return Optional.empty();
  258 + }
  259 + }
  260 +
  261 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import org.thingsboard.server.common.data.id.EntityId;
  4 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  5 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  6 +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
  7 +
  8 +import java.util.List;
  9 +
  10 +/**
  11 + * Created by ashvayka on 27.03.18.
  12 + */
  13 +public interface TelemetrySubscriptionService {
  14 +
  15 + void onAttributesUpdateFromServer(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
  16 +
  17 + void onTimeseriesUpdateFromServer(EntityId entityId, List<TsKvEntry> entries);
  18 +
  19 + void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId);
  20 +
  21 + void removeSubscription(String sessionId, int cmdId);
  22 +
  23 + void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
  24 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import java.io.IOException;
  4 +
  5 +/**
  6 + * Created by ashvayka on 27.03.18.
  7 + */
  8 +public interface TelemetryWebSocketMsgEndpoint {
  9 +
  10 + void send(TelemetryWebSocketSessionRef sessionRef, String msg) throws IOException;
  11 +
  12 + void close(TelemetryWebSocketSessionRef sessionRef) throws IOException;
  13 +
  14 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent;
  4 +
  5 +/**
  6 + * Created by ashvayka on 27.03.18.
  7 + */
  8 +public interface TelemetryWebSocketService {
  9 +
  10 + void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent sessionEvent);
  11 +
  12 + void handleWebSocketMsg(TelemetryWebSocketSessionRef sessionRef, String msg);
  13 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import lombok.Getter;
  4 +import org.thingsboard.server.service.security.model.SecurityUser;
  5 +
  6 +import java.net.InetSocketAddress;
  7 +import java.util.Objects;
  8 +
  9 +/**
  10 + * Created by ashvayka on 27.03.18.
  11 + */
  12 +public class TelemetryWebSocketSessionRef {
  13 +
  14 + private static final long serialVersionUID = 1L;
  15 +
  16 + @Getter
  17 + private final String sessionId;
  18 + @Getter
  19 + private final SecurityUser securityCtx;
  20 + @Getter
  21 + private final InetSocketAddress localAddress;
  22 + @Getter
  23 + private final InetSocketAddress remoteAddress;
  24 +
  25 + public TelemetryWebSocketSessionRef(String sessionId, SecurityUser securityCtx, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
  26 + this.sessionId = sessionId;
  27 + this.securityCtx = securityCtx;
  28 + this.localAddress = localAddress;
  29 + this.remoteAddress = remoteAddress;
  30 + }
  31 +
  32 + @Override
  33 + public boolean equals(Object o) {
  34 + if (this == o) return true;
  35 + if (o == null || getClass() != o.getClass()) return false;
  36 + TelemetryWebSocketSessionRef that = (TelemetryWebSocketSessionRef) o;
  37 + return Objects.equals(sessionId, that.sessionId);
  38 + }
  39 +
  40 + @Override
  41 + public int hashCode() {
  42 + return Objects.hash(sessionId);
  43 + }
  44 +
  45 + @Override
  46 + public String toString() {
  47 + return "TelemetryWebSocketSessionRef{" +
  48 + "sessionId='" + sessionId + '\'' +
  49 + ", localAddress=" + localAddress +
  50 + ", remoteAddress=" + remoteAddress +
  51 + '}';
  52 + }
  53 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import lombok.Data;
  4 +import lombok.Getter;
  5 +import org.thingsboard.server.service.security.model.SecurityUser;
  6 +
  7 +import java.net.InetSocketAddress;
  8 +import java.util.Objects;
  9 +
  10 +/**
  11 + * Created by ashvayka on 27.03.18.
  12 + */
  13 +@Data
  14 +public class TelemetryWebSocketTextMsg {
  15 +
  16 + private final TelemetryWebSocketSessionRef sessionRef;
  17 + private final String payload;
  18 +
  19 +}
  1 +package org.thingsboard.server.service.telemetry;
  2 +
  3 +import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
  4 +
  5 +/**
  6 + * Created by ashvayka on 27.03.18.
  7 + */
  8 +public class WsSessionMetaData {
  9 + private TelemetryWebSocketSessionRef sessionRef;
  10 + private long lastActivityTime;
  11 +
  12 + public WsSessionMetaData(TelemetryWebSocketSessionRef sessionRef) {
  13 + super();
  14 + this.sessionRef = sessionRef;
  15 + this.lastActivityTime = System.currentTimeMillis();
  16 + }
  17 +
  18 + public TelemetryWebSocketSessionRef getSessionRef() {
  19 + return sessionRef;
  20 + }
  21 +
  22 + public void setSessionRef(TelemetryWebSocketSessionRef sessionRef) {
  23 + this.sessionRef = sessionRef;
  24 + }
  25 +
  26 + public long getLastActivityTime() {
  27 + return lastActivityTime;
  28 + }
  29 +
  30 + public void setLastActivityTime(long lastActivityTime) {
  31 + this.lastActivityTime = lastActivityTime;
  32 + }
  33 +
  34 + @Override
  35 + public String toString() {
  36 + return "WsSessionMetaData [sessionRef=" + sessionRef + ", lastActivityTime=" + lastActivityTime + "]";
  37 + }
  38 +}
@@ -33,6 +33,10 @@ public class EntityIdFactory { @@ -33,6 +33,10 @@ public class EntityIdFactory {
33 return getByTypeAndUuid(EntityType.valueOf(type), uuid); 33 return getByTypeAndUuid(EntityType.valueOf(type), uuid);
34 } 34 }
35 35
  36 + public static EntityId getByTypeAndUuid(EntityType type, String uuid) {
  37 + return getByTypeAndUuid(type, UUID.fromString(uuid));
  38 + }
  39 +
36 public static EntityId getByTypeAndUuid(EntityType type, UUID uuid) { 40 public static EntityId getByTypeAndUuid(EntityType type, UUID uuid) {
37 switch (type) { 41 switch (type) {
38 case TENANT: 42 case TENANT:
@@ -344,9 +344,7 @@ public class SubscriptionManager { @@ -344,9 +344,7 @@ public class SubscriptionManager {
344 } 344 }
345 345
346 private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) { 346 private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) {
347 - Iterator<Subscription> subscriptionIterator = subscriptions.iterator();  
348 - while (subscriptionIterator.hasNext()) {  
349 - Subscription s = subscriptionIterator.next(); 347 + for (Subscription s : subscriptions) {
350 if (s.isLocal()) { 348 if (s.isLocal()) {
351 if (s.getServer() != null) { 349 if (s.getServer() != null) {
352 log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer()); 350 log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());
@@ -243,27 +243,19 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { @@ -243,27 +243,19 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
243 switch (attr.getDataType()) { 243 switch (attr.getDataType()) {
244 case BOOLEAN: 244 case BOOLEAN:
245 Optional<Boolean> booleanValue = attr.getBooleanValue(); 245 Optional<Boolean> booleanValue = attr.getBooleanValue();
246 - if (booleanValue.isPresent()) {  
247 - dataBuilder.setBoolValue(booleanValue.get());  
248 - } 246 + booleanValue.ifPresent(dataBuilder::setBoolValue);
249 break; 247 break;
250 case LONG: 248 case LONG:
251 Optional<Long> longValue = attr.getLongValue(); 249 Optional<Long> longValue = attr.getLongValue();
252 - if (longValue.isPresent()) {  
253 - dataBuilder.setLongValue(longValue.get());  
254 - } 250 + longValue.ifPresent(dataBuilder::setLongValue);
255 break; 251 break;
256 case DOUBLE: 252 case DOUBLE:
257 Optional<Double> doubleValue = attr.getDoubleValue(); 253 Optional<Double> doubleValue = attr.getDoubleValue();
258 - if (doubleValue.isPresent()) {  
259 - dataBuilder.setDoubleValue(doubleValue.get());  
260 - } 254 + doubleValue.ifPresent(dataBuilder::setDoubleValue);
261 break; 255 break;
262 case STRING: 256 case STRING:
263 Optional<String> stringValue = attr.getStrValue(); 257 Optional<String> stringValue = attr.getStrValue();
264 - if (stringValue.isPresent()) {  
265 - dataBuilder.setStrValue(stringValue.get());  
266 - } 258 + stringValue.ifPresent(dataBuilder::setStrValue);
267 break; 259 break;
268 } 260 }
269 return dataBuilder; 261 return dataBuilder;