Commit 2accabf0b04db16734d5eeab02fd882abe3c608d

Authored by Andrew Shvayka
1 parent 9328bbc0

Tmp commit

Showing 18 changed files with 501 additions and 179 deletions
... ... @@ -195,9 +195,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
195 195 }
196 196
197 197 void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
198   - //TODO: improve this procedure to fetch only changed attributes.
  198 + //TODO: improve this procedure to fetch only changed attributes and support attributes deletion
199 199 refreshAttributes();
200   - //TODO: support attributes deletion
201 200 Set<AttributeKey> keys = msg.getKeys();
202 201 if (attributeSubscriptions.size() > 0) {
203 202 ToDeviceMsg notification = null;
... ...
1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -56,6 +56,7 @@ import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRe
56 56 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
57 57
58 58 import akka.actor.ActorRef;
  59 +import org.w3c.dom.Attr;
59 60
60 61 import javax.annotation.Nullable;
61 62
... ... @@ -91,49 +92,86 @@ public final class PluginProcessingContext implements PluginContext {
91 92 @Override
92 93 public void saveAttributes(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
93 94 validate(deviceId);
94   - Set<AttributeKey> keys = new HashSet<>();
95   - for (AttributeKvEntry attribute : attributes) {
96   - keys.add(new AttributeKey(scope, attribute.getKey()));
97   - }
98 95
99 96 ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
100 97 Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
101   - onDeviceAttributesChanged(deviceId, keys);
  98 + onDeviceAttributesChanged(deviceId, scope, attributes);
102 99 return null;
103 100 }), executor);
104 101 }
105 102
106 103 @Override
107   - public Optional<AttributeKvEntry> loadAttribute(DeviceId deviceId, String attributeType, String attributeKey) {
  104 + public void removeAttributes(DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
108 105 validate(deviceId);
109   - AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
110   - return Optional.ofNullable(attribute);
  106 + ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
  107 + Futures.addCallback(future, getCallback(callback, v -> null), executor);
  108 + onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
111 109 }
112 110
113 111 @Override
114   - public List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType, List<String> attributeKeys) {
  112 + public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
115 113 validate(deviceId);
116   - List<AttributeKvEntry> result = new ArrayList<>(attributeKeys.size());
117   - for (String attributeKey : attributeKeys) {
118   - AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
119   - if (attribute != null) {
120   - result.add(attribute);
121   - }
122   - }
123   - return result;
  114 +
  115 + ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
  116 + Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
  117 + onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
  118 + return null;
  119 + }), executor);
  120 + }
  121 +
  122 + @Override
  123 + public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
  124 + validate(deviceId);
  125 + ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
  126 + Futures.addCallback(future, getCallback(callback, v -> null), executor);
  127 + onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
  128 + }
  129 +
  130 + @Override
  131 + public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback) {
  132 + validate(deviceId);
  133 + ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
  134 + Futures.addCallback(future, getCallback(callback, v -> v), executor);
124 135 }
125 136
126 137 @Override
127   - public List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType) {
  138 + public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
128 139 validate(deviceId);
129   - return pluginCtx.attributesService.findAll(deviceId, attributeType);
  140 + ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
  141 + Futures.addCallback(future, getCallback(callback, v -> v), executor);
130 142 }
131 143
132 144 @Override
133   - public void removeAttributes(DeviceId deviceId, String scope, List<String> keys) {
  145 + public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback) {
134 146 validate(deviceId);
135   - pluginCtx.attributesService.removeAll(deviceId, scope, keys);
136   - onDeviceAttributesDeleted(deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
  147 + ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
  148 + Futures.addCallback(future, getCallback(callback, v -> v), executor);
  149 + }
  150 +
  151 + @Override
  152 + public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback) {
  153 + validate(deviceId);
  154 + List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
  155 + attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
  156 + convertFuturesAndAddCallback(callback, futures);
  157 + }
  158 +
  159 + @Override
  160 + public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
  161 + validate(deviceId);
  162 + List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
  163 + attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
  164 + convertFuturesAndAddCallback(callback, futures);
  165 + }
  166 +
  167 + private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
  168 + ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
  169 + (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
  170 + List<AttributeKvEntry> result = new ArrayList<>();
  171 + input.forEach(r -> result.addAll(r));
  172 + return result;
  173 + }, executor);
  174 + Futures.addCallback(future, getCallback(callback, v -> v), executor);
137 175 }
138 176
139 177 @Override
... ... @@ -205,18 +243,12 @@ public final class PluginProcessingContext implements PluginContext {
205 243 return securityCtx;
206 244 }
207 245
208   - private void onDeviceAttributesChanged(DeviceId deviceId, AttributeKey key) {
209   - onDeviceAttributesChanged(deviceId, Collections.singleton(key));
  246 + private void onDeviceAttributesDeleted(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
  247 + pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(tenantId, deviceId, keys));
210 248 }
211 249
212   - private void onDeviceAttributesDeleted(DeviceId deviceId, Set<AttributeKey> keys) {
213   - Device device = pluginCtx.deviceService.findDeviceById(deviceId);
214   - pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), deviceId, keys));
215   - }
216   -
217   - private void onDeviceAttributesChanged(DeviceId deviceId, Set<AttributeKey> keys) {
218   - Device device = pluginCtx.deviceService.findDeviceById(deviceId);
219   - pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(device.getTenantId(), deviceId, keys));
  250 + private void onDeviceAttributesChanged(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> values) {
  251 + pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values));
220 252 }
221 253
222 254 private <T> FutureCallback<List<ResultSet>> getListCallback(final PluginCallback<T> callback, Function<List<ResultSet>, T> transformer) {
... ... @@ -256,11 +288,12 @@ public final class PluginProcessingContext implements PluginContext {
256 288 }
257 289
258 290 // TODO: replace with our own exceptions
259   - private boolean validate(DeviceId deviceId) {
  291 + private boolean validate(DeviceId deviceId, PluginCallback<Device> callback) {
260 292 if (securityCtx.isPresent()) {
261   - PluginApiCallSecurityContext ctx = securityCtx.get();
  293 + final PluginApiCallSecurityContext ctx = securityCtx.get();
262 294 if (ctx.isTenantAdmin() || ctx.isCustomerUser()) {
263   - Device device = pluginCtx.deviceService.findDeviceById(deviceId);
  295 + ListenableFuture<Device> device = pluginCtx.deviceService.findDeviceById(deviceId);
  296 + Futures.addCallback(device, );
264 297 if (device == null) {
265 298 throw new IllegalStateException("Device not found!");
266 299 } else {
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao;
  17 +
  18 +import javax.annotation.PostConstruct;
  19 +import javax.annotation.PreDestroy;
  20 +import java.util.concurrent.ExecutorService;
  21 +import java.util.concurrent.Executors;
  22 +
  23 +/**
  24 + * Created by ashvayka on 21.02.17.
  25 + */
  26 +public abstract class AbstractAsyncDao extends AbstractDao {
  27 +
  28 + protected ExecutorService readResultsProcessingExecutor;
  29 +
  30 + @PostConstruct
  31 + public void startExecutor() {
  32 + readResultsProcessingExecutor = Executors.newCachedThreadPool();
  33 + }
  34 +
  35 + @PreDestroy
  36 + public void stopExecutor() {
  37 + if (readResultsProcessingExecutor != null) {
  38 + readResultsProcessingExecutor.shutdownNow();
  39 + }
  40 + }
  41 +
  42 +}
... ...
... ... @@ -15,23 +15,28 @@
15 15 */
16 16 package org.thingsboard.server.dao.attributes;
17 17
  18 +import com.datastax.driver.core.ResultSet;
18 19 import com.datastax.driver.core.ResultSetFuture;
  20 +import com.google.common.util.concurrent.ListenableFuture;
19 21 import org.thingsboard.server.common.data.id.EntityId;
20 22 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
21 23
  24 +import java.util.Collection;
22 25 import java.util.List;
23   -import java.util.UUID;
  26 +import java.util.Optional;
24 27
25 28 /**
26 29 * @author Andrew Shvayka
27 30 */
28 31 public interface AttributesDao {
29 32
30   - AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey);
  33 + ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String attributeType, String attributeKey);
31 34
32   - List<AttributeKvEntry> findAll(EntityId entityId, String attributeType);
  35 + ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String attributeType, Collection<String> attributeKey);
  36 +
  37 + ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType);
33 38
34 39 ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
35 40
36   - void removeAll(EntityId entityId, String scope, List<String> keys);
  41 + ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys);
37 42 }
... ...
... ... @@ -23,18 +23,22 @@ import org.thingsboard.server.common.data.id.EntityId;
23 23 import org.thingsboard.server.common.data.id.UUIDBased;
24 24 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
25 25
  26 +import java.util.Collection;
26 27 import java.util.List;
  28 +import java.util.Optional;
27 29
28 30 /**
29 31 * @author Andrew Shvayka
30 32 */
31 33 public interface AttributesService {
32 34
33   - AttributeKvEntry find(EntityId entityId, String scope, String attributeKey);
  35 + ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String scope, String attributeKey);
34 36
35   - List<AttributeKvEntry> findAll(EntityId entityId, String scope);
  37 + ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String scope, Collection<String> attributeKeys);
  38 +
  39 + ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope);
36 40
37 41 ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
38 42
39   - void removeAll(EntityId entityId, String scope, List<String> attributeKeys);
  43 + ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
40 44 }
... ...
... ... @@ -18,19 +18,24 @@ package org.thingsboard.server.dao.attributes;
18 18 import com.datastax.driver.core.*;
19 19 import com.datastax.driver.core.querybuilder.QueryBuilder;
20 20 import com.datastax.driver.core.querybuilder.Select;
  21 +import com.google.common.base.Function;
  22 +import com.google.common.util.concurrent.Futures;
  23 +import com.google.common.util.concurrent.ListenableFuture;
21 24 import lombok.extern.slf4j.Slf4j;
22 25 import org.springframework.stereotype.Component;
23 26 import org.thingsboard.server.common.data.id.EntityId;
24   -import org.thingsboard.server.common.data.kv.DataType;
25   -import org.thingsboard.server.dao.AbstractDao;
  27 +import org.thingsboard.server.dao.AbstractAsyncDao;
26 28 import org.thingsboard.server.dao.model.ModelConstants;
27   -import org.slf4j.Logger;
28   -import org.slf4j.LoggerFactory;
29 29 import org.thingsboard.server.common.data.kv.*;
30 30 import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao;
31 31
  32 +import javax.annotation.PostConstruct;
  33 +import javax.annotation.PreDestroy;
32 34 import java.util.ArrayList;
  35 +import java.util.Collection;
33 36 import java.util.List;
  37 +import java.util.Optional;
  38 +import java.util.stream.Collectors;
34 39
35 40 import static org.thingsboard.server.dao.model.ModelConstants.*;
36 41 import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
... ... @@ -40,29 +45,55 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
40 45 */
41 46 @Component
42 47 @Slf4j
43   -public class BaseAttributesDao extends AbstractDao implements AttributesDao {
44   -
  48 +public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao {
  49 +
45 50 private PreparedStatement saveStmt;
46 51
  52 + @PostConstruct
  53 + public void init() {
  54 + super.startExecutor();
  55 + }
  56 +
  57 + @PreDestroy
  58 + public void stop() {
  59 + super.stopExecutor();
  60 + }
  61 +
47 62 @Override
48   - public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) {
  63 + public ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String attributeType, String attributeKey) {
49 64 Select.Where select = select().from(ATTRIBUTES_KV_CF)
50 65 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
51 66 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
52 67 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
53 68 .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
54 69 log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
55   - return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one());
  70 + return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends Optional<AttributeKvEntry>>) input ->
  71 + Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one()))
  72 + , readResultsProcessingExecutor);
56 73 }
57 74
58 75 @Override
59   - public List<AttributeKvEntry> findAll(EntityId entityId, String attributeType) {
  76 + public ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String attributeType, Collection<String> attributeKeys) {
  77 + List<ListenableFuture<Optional<AttributeKvEntry>>> entries = new ArrayList<>();
  78 + attributeKeys.forEach(attributeKey -> entries.add(find(entityId, attributeType, attributeKey)));
  79 + return Futures.transform(Futures.allAsList(entries), (Function<List<Optional<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
  80 + List<AttributeKvEntry> result = new ArrayList<>();
  81 + input.stream().filter(opt -> opt.isPresent()).forEach(opt -> result.add(opt.get()));
  82 + return result;
  83 + }, readResultsProcessingExecutor);
  84 + }
  85 +
  86 +
  87 + @Override
  88 + public ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType) {
60 89 Select.Where select = select().from(ATTRIBUTES_KV_CF)
61 90 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
62 91 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
63 92 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType));
64 93 log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType);
65   - return convertResultToAttributesKvEntryList(executeRead(select));
  94 + return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends List<AttributeKvEntry>>) input ->
  95 + convertResultToAttributesKvEntryList(input)
  96 + , readResultsProcessingExecutor);
66 97 }
67 98
68 99 @Override
... ... @@ -93,20 +124,19 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
93 124 }
94 125
95 126 @Override
96   - public void removeAll(EntityId entityId, String attributeType, List<String> keys) {
97   - for (String key : keys) {
98   - delete(entityId, attributeType, key);
99   - }
  127 + public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
  128 + List<ResultSetFuture> futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList());
  129 + return Futures.allAsList(futures);
100 130 }
101 131
102   - private void delete(EntityId entityId, String attributeType, String key) {
  132 + private ResultSetFuture delete(EntityId entityId, String attributeType, String key) {
103 133 Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
104 134 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
105 135 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
106 136 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
107 137 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
108 138 log.debug("Remove request: {}", delete.toString());
109   - getSession().execute(delete);
  139 + return getSession().executeAsync(delete);
110 140 }
111 141
112 142 private PreparedStatement getSaveStmt() {
... ... @@ -150,5 +180,4 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
150 180 }
151 181 return entries;
152 182 }
153   -
154 183 }
... ...
... ... @@ -27,7 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
27 27 import org.springframework.stereotype.Service;
28 28 import org.thingsboard.server.dao.service.Validator;
29 29
  30 +import java.util.Collection;
30 31 import java.util.List;
  32 +import java.util.Optional;
31 33
32 34 /**
33 35 * @author Andrew Shvayka
... ... @@ -39,14 +41,21 @@ public class BaseAttributesService implements AttributesService {
39 41 private AttributesDao attributesDao;
40 42
41 43 @Override
42   - public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) {
  44 + public ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String scope, String attributeKey) {
43 45 validate(entityId, scope);
44 46 Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
45 47 return attributesDao.find(entityId, scope, attributeKey);
46 48 }
47 49
48 50 @Override
49   - public List<AttributeKvEntry> findAll(EntityId entityId, String scope) {
  51 + public ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String scope, Collection<String> attributeKeys) {
  52 + validate(entityId, scope);
  53 + attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
  54 + return attributesDao.find(entityId, scope, attributeKeys);
  55 + }
  56 +
  57 + @Override
  58 + public ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope) {
50 59 validate(entityId, scope);
51 60 return attributesDao.findAll(entityId, scope);
52 61 }
... ... @@ -56,16 +65,16 @@ public class BaseAttributesService implements AttributesService {
56 65 validate(entityId, scope);
57 66 attributes.forEach(attribute -> validate(attribute));
58 67 List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
59   - for(AttributeKvEntry attribute : attributes) {
  68 + for (AttributeKvEntry attribute : attributes) {
60 69 futures.add(attributesDao.save(entityId, scope, attribute));
61 70 }
62 71 return Futures.allAsList(futures);
63 72 }
64 73
65 74 @Override
66   - public void removeAll(EntityId entityId, String scope, List<String> keys) {
  75 + public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
67 76 validate(entityId, scope);
68   - attributesDao.removeAll(entityId, scope, keys);
  77 + return attributesDao.removeAll(entityId, scope, keys);
69 78 }
70 79
71 80 private static void validate(EntityId id, String scope) {
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.dao.device;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 19 import org.thingsboard.server.common.data.Device;
19 20 import org.thingsboard.server.common.data.id.CustomerId;
20 21 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -28,6 +29,8 @@ public interface DeviceService {
28 29
29 30 Device findDeviceById(DeviceId deviceId);
30 31
  32 + ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId);
  33 +
31 34 Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name);
32 35
33 36 Device saveDevice(Device device);
... ...
... ... @@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value;
28 28 import org.springframework.stereotype.Component;
29 29 import org.thingsboard.server.common.data.kv.*;
30 30 import org.thingsboard.server.common.data.kv.DataType;
  31 +import org.thingsboard.server.dao.AbstractAsyncDao;
31 32 import org.thingsboard.server.dao.AbstractDao;
32 33 import org.thingsboard.server.dao.model.ModelConstants;
33 34
... ... @@ -50,7 +51,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
50 51 */
51 52 @Component
52 53 @Slf4j
53   -public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
  54 +public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao {
54 55
55 56 @Value("${cassandra.query.min_aggregation_step_ms}")
56 57 private int minAggregationStepMs;
... ... @@ -60,8 +61,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
60 61
61 62 private TsPartitionDate tsFormat;
62 63
63   - private ExecutorService readResultsProcessingExecutor;
64   -
65 64 private PreparedStatement partitionInsertStmt;
66 65 private PreparedStatement[] latestInsertStmts;
67 66 private PreparedStatement[] saveStmts;
... ... @@ -71,8 +70,8 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
71 70
72 71 @PostConstruct
73 72 public void init() {
  73 + super.startExecutor();
74 74 getFetchStmt(Aggregation.NONE);
75   - readResultsProcessingExecutor = Executors.newCachedThreadPool();
76 75 Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
77 76 if (partition.isPresent()) {
78 77 tsFormat = partition.get();
... ... @@ -84,9 +83,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
84 83
85 84 @PreDestroy
86 85 public void stop() {
87   - if (readResultsProcessingExecutor != null) {
88   - readResultsProcessingExecutor.shutdownNow();
89   - }
  86 + super.stopExecutor();
90 87 }
91 88
92 89 @Override
... ...
... ... @@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired;
32 32
33 33 import java.util.Collections;
34 34 import java.util.List;
  35 +import java.util.Optional;
35 36
36 37 import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
37 38 import static org.thingsboard.server.common.data.DataConstants.DEVICE;
... ... @@ -54,8 +55,9 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
54 55 KvEntry attrValue = new StringDataEntry("attribute1", "value1");
55 56 AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
56 57 attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attr)).get();
57   - AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey());
58   - Assert.assertEquals(attr, saved);
  58 + Optional<AttributeKvEntry> saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey()).get();
  59 + Assert.assertTrue(saved.isPresent());
  60 + Assert.assertEquals(attr, saved.get());
59 61 }
60 62
61 63 @Test
... ... @@ -65,15 +67,17 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
65 67 AttributeKvEntry attrOld = new BaseAttributeKvEntry(attrOldValue, 42L);
66 68
67 69 attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrOld)).get();
68   - AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey());
69   - Assert.assertEquals(attrOld, saved);
  70 + Optional<AttributeKvEntry> saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get();
  71 +
  72 + Assert.assertTrue(saved.isPresent());
  73 + Assert.assertEquals(attrOld, saved.get());
70 74
71 75 KvEntry attrNewValue = new StringDataEntry("attribute1", "value2");
72 76 AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L);
73 77 attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrNew)).get();
74 78
75   - saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey());
76   - Assert.assertEquals(attrNew, saved);
  79 + saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get();
  80 + Assert.assertEquals(attrNew, saved.get());
77 81 }
78 82
79 83 @Test
... ... @@ -91,7 +95,7 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
91 95 attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrANew)).get();
92 96 attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrBNew)).get();
93 97
94   - List<AttributeKvEntry> saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE);
  98 + List<AttributeKvEntry> saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE).get();
95 99
96 100 Assert.assertNotNull(saved);
97 101 Assert.assertEquals(2, saved.size());
... ...
... ... @@ -114,8 +114,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
114 114 entries.add(save(deviceId, 45000, 500));
115 115 entries.add(save(deviceId, 55000, 600));
116 116
117   - List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
118   - 60000, 3, Aggregation.NONE)).get();
  117 + List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  118 + 60000, 3, Aggregation.NONE))).get();
119 119 assertEquals(3, list.size());
120 120 assertEquals(55000, list.get(0).getTs());
121 121 assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
... ... @@ -126,8 +126,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
126 126 assertEquals(35000, list.get(2).getTs());
127 127 assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
128 128
129   - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
130   - 60000, 3, Aggregation.AVG)).get();
  129 + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  130 + 60000, 3, Aggregation.AVG))).get();
131 131 assertEquals(3, list.size());
132 132 assertEquals(10000, list.get(0).getTs());
133 133 assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
... ... @@ -138,8 +138,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
138 138 assertEquals(50000, list.get(2).getTs());
139 139 assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
140 140
141   - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
142   - 60000, 3, Aggregation.SUM)).get();
  141 + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  142 + 60000, 3, Aggregation.SUM))).get();
143 143
144 144 assertEquals(3, list.size());
145 145 assertEquals(10000, list.get(0).getTs());
... ... @@ -151,8 +151,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
151 151 assertEquals(50000, list.get(2).getTs());
152 152 assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
153 153
154   - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
155   - 60000, 3, Aggregation.MIN)).get();
  154 + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  155 + 60000, 3, Aggregation.MIN))).get();
156 156
157 157 assertEquals(3, list.size());
158 158 assertEquals(10000, list.get(0).getTs());
... ... @@ -164,8 +164,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
164 164 assertEquals(50000, list.get(2).getTs());
165 165 assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
166 166
167   - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
168   - 60000, 3, Aggregation.MAX)).get();
  167 + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  168 + 60000, 3, Aggregation.MAX))).get();
169 169
170 170 assertEquals(3, list.size());
171 171 assertEquals(10000, list.get(0).getTs());
... ... @@ -177,8 +177,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
177 177 assertEquals(50000, list.get(2).getTs());
178 178 assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
179 179
180   - list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
181   - 60000, 3, Aggregation.COUNT)).get();
  180 + list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  181 + 60000, 3, Aggregation.COUNT))).get();
182 182
183 183 assertEquals(3, list.size());
184 184 assertEquals(10000, list.get(0).getTs());
... ...
... ... @@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
2 2
3 3 cassandra.keyspace_name=thingsboard
4 4
5   -cassandra.url=127.0.0.1:9042
  5 +cassandra.url=127.0.0.1:9142
6 6
7 7 cassandra.ssl=false
8 8
... ...
... ... @@ -94,13 +94,21 @@ public interface PluginContext {
94 94
95 95 void saveAttributes(DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
96 96
97   - Optional<AttributeKvEntry> loadAttribute(DeviceId deviceId, String attributeType, String attributeKey);
  97 + void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
98 98
99   - List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType, List<String> attributeKeys);
  99 + void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
100 100
101   - List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType);
  101 + void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
102 102
103   - void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys);
  103 + void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback);
  104 +
  105 + void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
  106 +
  107 + void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback);
  108 +
  109 + void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback);
  110 +
  111 + void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
104 112
105 113 void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
106 114 }
... ...
... ... @@ -15,12 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.extensions.core.plugin.telemetry;
17 17
  18 +import com.sun.javafx.collections.MappingChange;
18 19 import lombok.Setter;
19 20 import lombok.extern.slf4j.Slf4j;
20 21 import org.thingsboard.server.common.data.DataConstants;
21 22 import org.thingsboard.server.common.data.id.DeviceId;
22 23 import org.thingsboard.server.common.data.kv.*;
23 24 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  25 +import org.thingsboard.server.extensions.api.plugins.PluginCallback;
24 26 import org.thingsboard.server.extensions.api.plugins.PluginContext;
25 27 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler;
26 28 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler;
... ... @@ -66,28 +68,49 @@ public class SubscriptionManager {
66 68 DeviceId deviceId = subscription.getDeviceId();
67 69 log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), deviceId, address);
68 70 registerSubscription(sessionId, deviceId, subscription);
69   - List<TsKvEntry> missedUpdates = new ArrayList<>();
70 71 if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
71   - subscription.getKeyStates().entrySet().forEach(e -> {
72   - Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
73   - if (latestOpt.isPresent()) {
74   - AttributeKvEntry latestEntry = latestOpt.get();
75   - if (latestEntry.getLastUpdateTs() > e.getValue()) {
76   - missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
77   - }
  72 + final Map<String, Long> keyStates = subscription.getKeyStates();
  73 + ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback<List<AttributeKvEntry>>() {
  74 + @Override
  75 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> values) {
  76 + List<TsKvEntry> missedUpdates = new ArrayList<>();
  77 + values.forEach(latestEntry -> {
  78 + if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
  79 + missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
78 80 }
  81 + });
  82 + if (!missedUpdates.isEmpty()) {
  83 + rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
79 84 }
80   - );
  85 + }
  86 +
  87 + @Override
  88 + public void onFailure(PluginContext ctx, Exception e) {
  89 + log.error("Failed to fetch missed updates.", e);
  90 + }
  91 + });
81 92 } else if (subscription.getType() == SubscriptionType.TIMESERIES) {
82 93 long curTs = System.currentTimeMillis();
  94 + List<TsKvQuery> queries = new ArrayList<>();
83 95 subscription.getKeyStates().entrySet().forEach(e -> {
84   - TsKvQuery query = new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs);
85   - missedUpdates.addAll(ctx.loadTimeseries(deviceId, query));
  96 + queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
  97 + });
  98 +
  99 + ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
  100 + @Override
  101 + public void onSuccess(PluginContext ctx, List<TsKvEntry> missedUpdates) {
  102 + if (!missedUpdates.isEmpty()) {
  103 + rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
  104 + }
  105 + }
  106 +
  107 + @Override
  108 + public void onFailure(PluginContext ctx, Exception e) {
  109 + log.error("Failed to fetch missed updates.", e);
  110 + }
86 111 });
87 112 }
88   - if (!missedUpdates.isEmpty()) {
89   - rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
90   - }
  113 +
91 114 }
92 115
93 116 private void registerSubscription(String sessionId, DeviceId deviceId, Subscription subscription) {
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.server.extensions.api.plugins.PluginCallback;
  20 +import org.thingsboard.server.extensions.api.plugins.PluginContext;
  21 +
  22 +/**
  23 + * Created by ashvayka on 21.02.17.
  24 + */
  25 +@Slf4j
  26 +public abstract class BiPluginCallBack<V1, V2> {
  27 +
  28 + private V1 v1;
  29 + private V2 v2;
  30 +
  31 + public PluginCallback<V1> getV1Callback() {
  32 + return new PluginCallback<V1>() {
  33 + @Override
  34 + public void onSuccess(PluginContext ctx, V1 value) {
  35 + synchronized (BiPluginCallBack.this) {
  36 + BiPluginCallBack.this.v1 = value;
  37 + if (v2 != null) {
  38 + BiPluginCallBack.this.onSuccess(ctx, v1, v2);
  39 + }
  40 + }
  41 + }
  42 +
  43 + @Override
  44 + public void onFailure(PluginContext ctx, Exception e) {
  45 + BiPluginCallBack.this.onFailure(ctx, e);
  46 + }
  47 + };
  48 + }
  49 +
  50 + public PluginCallback<V2> getV2Callback() {
  51 + return new PluginCallback<V2>() {
  52 + @Override
  53 + public void onSuccess(PluginContext ctx, V2 value) {
  54 + synchronized (BiPluginCallBack.this) {
  55 + BiPluginCallBack.this.v2 = value;
  56 + if (v1 != null) {
  57 + BiPluginCallBack.this.onSuccess(ctx, v1, v2);
  58 + }
  59 + }
  60 +
  61 + }
  62 +
  63 + @Override
  64 + public void onFailure(PluginContext ctx, Exception e) {
  65 + BiPluginCallBack.this.onFailure(ctx, e);
  66 + }
  67 + };
  68 + }
  69 +
  70 + abstract public void onSuccess(PluginContext ctx, V1 v1, V2 v2);
  71 +
  72 + abstract public void onFailure(PluginContext ctx, Exception e);
  73 +
  74 +}
... ...
... ... @@ -77,41 +77,58 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
77 77 }
78 78 });
79 79 } else if (entity.equals("attributes")) {
80   - List<AttributeKvEntry> attributes;
  80 + PluginCallback<List<AttributeKvEntry>> callback = getAttributeKeysPluginCallback(msg);
81 81 if (!StringUtils.isEmpty(scope)) {
82   - attributes = ctx.loadAttributes(deviceId, scope);
  82 + ctx.loadAttributes(deviceId, scope, callback);
83 83 } else {
84   - attributes = new ArrayList<>();
85   - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
  84 + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
86 85 }
87   - List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
88   - msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
89 86 }
90 87 } else if (method.equals("values")) {
91 88 if ("timeseries".equals(entity)) {
92   - String keys = request.getParameter("keys");
  89 + String keysStr = request.getParameter("keys");
93 90 Optional<Long> startTs = request.getLongParamValue("startTs");
94 91 Optional<Long> endTs = request.getLongParamValue("endTs");
95 92 Optional<Integer> limit = request.getIntParamValue("limit");
96   - Map<String, List<TsData>> data = new LinkedHashMap<>();
97   - for (String key : keys.split(",")) {
98   - //TODO: refactoring
99   -// List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
100   -// data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
101   - }
102   - msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
  93 + Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
  94 +
  95 + List<String> keys = Arrays.asList(keysStr.split(","));
  96 + List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList());
  97 + ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
  98 + @Override
  99 + public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
  100 + Map<String, List<TsData>> result = new LinkedHashMap<>();
  101 + for (TsKvEntry entry : data) {
  102 + result.put(entry.getKey(), data.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
  103 + }
  104 + msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
  105 + }
  106 +
  107 + @Override
  108 + public void onFailure(PluginContext ctx, Exception e) {
  109 + log.error("Failed to fetch historical data", e);
  110 + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  111 + }
  112 + });
103 113 } else if ("attributes".equals(entity)) {
104 114 String keys = request.getParameter("keys", "");
105   - List<AttributeKvEntry> attributes;
  115 +
  116 + PluginCallback<List<AttributeKvEntry>> callback = getAttributeValuesPluginCallback(msg);
106 117 if (!StringUtils.isEmpty(scope)) {
107   - attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
  118 + if (!StringUtils.isEmpty(keys)) {
  119 + List<String> keyList = Arrays.asList(keys.split(","));
  120 + ctx.loadAttributes(deviceId, scope, keyList, callback);
  121 + } else {
  122 + ctx.loadAttributes(deviceId, scope, callback);
  123 + }
108 124 } else {
109   - attributes = new ArrayList<>();
110   - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
  125 + if (!StringUtils.isEmpty(keys)) {
  126 + List<String> keyList = Arrays.asList(keys.split(","));
  127 + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keyList, callback);
  128 + } else {
  129 + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
  130 + }
111 131 }
112   - List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
113   - attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
114   - msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK));
115 132 }
116 133 }
117 134 } else {
... ... @@ -156,6 +173,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
156 173
157 174 @Override
158 175 public void onFailure(PluginContext ctx, Exception e) {
  176 + log.error("Failed to save attributes", e);
159 177 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
160 178 }
161 179 });
... ... @@ -184,8 +202,18 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
184 202 String keysParam = request.getParameter("keys");
185 203 if (!StringUtils.isEmpty(keysParam)) {
186 204 String[] keys = keysParam.split(",");
187   - ctx.removeAttributes(deviceId, scope, Arrays.asList(keys));
188   - msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
  205 + ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
  206 + @Override
  207 + public void onSuccess(PluginContext ctx, Void value) {
  208 + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
  209 + }
  210 +
  211 + @Override
  212 + public void onFailure(PluginContext ctx, Exception e) {
  213 + log.error("Failed to remove attributes", e);
  214 + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  215 + }
  216 + });
189 217 return;
190 218 }
191 219 }
... ... @@ -196,14 +224,37 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
196 224 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
197 225 }
198 226
199   - private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, String scope, DeviceId deviceId, String keysParam) {
200   - List<AttributeKvEntry> attributes;
201   - if (!StringUtils.isEmpty(keysParam)) {
202   - String[] keys = keysParam.split(",");
203   - attributes = ctx.loadAttributes(deviceId, scope, Arrays.asList(keys));
204   - } else {
205   - attributes = ctx.loadAttributes(deviceId, scope);
206   - }
207   - return attributes;
  227 +
  228 + private PluginCallback<List<AttributeKvEntry>> getAttributeKeysPluginCallback(final PluginRestMsg msg) {
  229 + return new PluginCallback<List<AttributeKvEntry>>() {
  230 + @Override
  231 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
  232 + List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
  233 + msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
  234 + }
  235 +
  236 + @Override
  237 + public void onFailure(PluginContext ctx, Exception e) {
  238 + log.error("Failed to fetch attributes", e);
  239 + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  240 + }
  241 + };
  242 + }
  243 +
  244 + private PluginCallback<List<AttributeKvEntry>> getAttributeValuesPluginCallback(final PluginRestMsg msg) {
  245 + return new PluginCallback<List<AttributeKvEntry>>() {
  246 + @Override
  247 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
  248 + List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
  249 + attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
  250 + msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK));
  251 + }
  252 +
  253 + @Override
  254 + public void onFailure(PluginContext ctx, Exception e) {
  255 + log.error("Failed to fetch attributes", e);
  256 + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  257 + }
  258 + };
208 259 }
209 260 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 19 import org.thingsboard.server.common.data.DataConstants;
19 20 import org.thingsboard.server.common.data.id.DeviceId;
20 21 import org.thingsboard.server.common.data.id.RuleId;
... ... @@ -38,6 +39,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
38 39 import java.util.*;
39 40 import java.util.stream.Collectors;
40 41
  42 +@Slf4j
41 43 public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
42 44 private final SubscriptionManager subscriptionManager;
43 45
... ... @@ -49,27 +51,36 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
49 51 public void handleGetAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, GetAttributesRequestRuleToPluginMsg msg) {
50 52 GetAttributesRequest request = msg.getPayload();
51 53
52   - List<AttributeKvEntry> clientAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
53   - List<AttributeKvEntry> sharedAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames());
  54 + BiPluginCallBack<List<AttributeKvEntry>, List<AttributeKvEntry>> callback = new BiPluginCallBack<List<AttributeKvEntry>, List<AttributeKvEntry>>() {
54 55
55   - BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
56   - request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes));
  56 + @Override
  57 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> clientAttributes, List<AttributeKvEntry> sharedAttributes) {
  58 + BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
  59 + request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes));
  60 + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
  61 + }
  62 +
  63 + @Override
  64 + public void onFailure(PluginContext ctx, Exception e) {
  65 + log.error("Failed to process get attributes request", e);
  66 + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
  67 + }
  68 + };
57 69
58   - ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
  70 + getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames(), callback.getV1Callback());
  71 + getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames(), callback.getV2Callback());
59 72 }
60 73
61   - private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names) {
62   - List<AttributeKvEntry> attributes;
  74 + private void getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names, PluginCallback<List<AttributeKvEntry>> callback) {
63 75 if (names.isPresent()) {
64 76 if (!names.get().isEmpty()) {
65   - attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()));
  77 + ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()), callback);
66 78 } else {
67   - attributes = ctx.loadAttributes(deviceId, scope);
  79 + ctx.loadAttributes(deviceId, scope, callback);
68 80 }
69 81 } else {
70   - attributes = Collections.emptyList();
  82 + callback.onSuccess(ctx, Collections.emptyList());
71 83 }
72   - return attributes;
73 84 }
74 85
75 86 @Override
... ... @@ -100,6 +111,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
100 111
101 112 @Override
102 113 public void onFailure(PluginContext ctx, Exception e) {
  114 + log.error("Failed to process telemetry upload request", e);
103 115 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
104 116 }
105 117 });
... ... @@ -127,6 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
127 139
128 140 @Override
129 141 public void onFailure(PluginContext ctx, Exception e) {
  142 + log.error("Failed to process attributes update request", e);
130 143 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
131 144 }
132 145 });
... ...
... ... @@ -104,37 +104,64 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
104 104 SubscriptionState sub;
105 105 if (keysOptional.isPresent()) {
106 106 List<String> keys = new ArrayList<>(keysOptional.get());
107   - List<AttributeKvEntry> data = new ArrayList<>();
  107 +
  108 + PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
  109 + @Override
  110 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
  111 + List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
  112 + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
  113 +
  114 + Map<String, Long> subState = new HashMap<>(keys.size());
  115 + keys.forEach(key -> subState.put(key, 0L));
  116 + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
  117 +
  118 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
  119 + subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
  120 + }
  121 +
  122 + @Override
  123 + public void onFailure(PluginContext ctx, Exception e) {
  124 + log.error("Failed to fetch attributes!", e);
  125 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  126 + "Failed to fetch attributes!");
  127 + sendWsMsg(ctx, sessionRef, update);
  128 + }
  129 + };
  130 +
108 131 if (StringUtils.isEmpty(cmd.getScope())) {
109   - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
  132 + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keys, callback);
110 133 } else {
111   - data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys));
  134 + ctx.loadAttributes(deviceId, cmd.getScope(), keys, callback);
112 135 }
  136 + } else {
  137 + PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
  138 + @Override
  139 + public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
  140 + List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
  141 + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
113 142
114   - List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
115   - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
  143 + Map<String, Long> subState = new HashMap<>(attributesData.size());
  144 + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
116 145
117   - Map<String, Long> subState = new HashMap<>(keys.size());
118   - keys.forEach(key -> subState.put(key, 0L));
119   - attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
  146 + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
  147 + subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
  148 + }
  149 +
  150 + @Override
  151 + public void onFailure(PluginContext ctx, Exception e) {
  152 + log.error("Failed to fetch attributes!", e);
  153 + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
  154 + "Failed to fetch attributes!");
  155 + sendWsMsg(ctx, sessionRef, update);
  156 + }
  157 + };
120 158
121   - sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
122   - } else {
123   - List<AttributeKvEntry> data = new ArrayList<>();
124 159 if (StringUtils.isEmpty(cmd.getScope())) {
125   - Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
  160 + ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
126 161 } else {
127   - data.addAll(ctx.loadAttributes(deviceId, cmd.getScope()));
  162 + ctx.loadAttributes(deviceId, cmd.getScope(), callback);
128 163 }
129   - List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
130   - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
131   -
132   - Map<String, Long> subState = new HashMap<>(attributesData.size());
133   - attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
134   -
135   - sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
136 164 }
137   - subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
138 165 }
139 166 }
140 167 }
... ... @@ -205,6 +232,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
205 232
206 233 @Override
207 234 public void onFailure(PluginContext ctx, Exception e) {
  235 + log.error("Failed to fetch data!", e);
208 236 SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
209 237 "Failed to fetch data!");
210 238 sendWsMsg(ctx, sessionRef, update);
... ...