Commit fc79356aae182d4da62a1754d48bdc8f253ac0d0

Authored by Andrew Shvayka
2 parents f8929de5 56248a23

Merge branch 'develop/1.5' into develop/1.5-refactoring

Showing 22 changed files with 750 additions and 33 deletions
... ... @@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId;
23 23
24 24 import com.fasterxml.jackson.databind.JsonNode;
25 25
26   -public class Customer extends ContactBased<CustomerId> implements HasName {
  26 +public class Customer extends ContactBased<CustomerId> implements HasName, HasTenantId {
27 27
28 28 private static final long serialVersionUID = -1599722990298929275L;
29 29
... ...
... ... @@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId;
23 23 import com.fasterxml.jackson.databind.JsonNode;
24 24
25 25 @EqualsAndHashCode(callSuper = true)
26   -public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implements HasName {
  26 +public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implements HasName, HasTenantId, HasCustomerId {
27 27
28 28 private static final long serialVersionUID = 2807343040519543363L;
29 29
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data;
  17 +
  18 +import org.thingsboard.server.common.data.id.CustomerId;
  19 +
  20 +public interface HasCustomerId {
  21 +
  22 + CustomerId getCustomerId();
  23 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data;
  17 +
  18 +import org.thingsboard.server.common.data.id.TenantId;
  19 +
  20 +public interface HasTenantId {
  21 +
  22 + TenantId getTenantId();
  23 +}
... ...
... ... @@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.security.Authority;
25 25 import com.fasterxml.jackson.databind.JsonNode;
26 26
27 27 @EqualsAndHashCode(callSuper = true)
28   -public class User extends SearchTextBasedWithAdditionalInfo<UserId> implements HasName {
  28 +public class User extends SearchTextBasedWithAdditionalInfo<UserId> implements HasName, HasTenantId, HasCustomerId {
29 29
30 30 private static final long serialVersionUID = 8250339805336035966L;
31 31
... ...
... ... @@ -22,6 +22,7 @@ import lombok.Builder;
22 22 import lombok.Data;
23 23 import org.thingsboard.server.common.data.BaseData;
24 24 import org.thingsboard.server.common.data.HasName;
  25 +import org.thingsboard.server.common.data.HasTenantId;
25 26 import org.thingsboard.server.common.data.id.EntityId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
27 28
... ... @@ -31,7 +32,7 @@ import org.thingsboard.server.common.data.id.TenantId;
31 32 @Data
32 33 @Builder
33 34 @AllArgsConstructor
34   -public class Alarm extends BaseData<AlarmId> implements HasName {
  35 +public class Alarm extends BaseData<AlarmId> implements HasName, HasTenantId {
35 36
36 37 private TenantId tenantId;
37 38 private String type;
... ...
... ... @@ -17,16 +17,13 @@ package org.thingsboard.server.common.data.asset;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import lombok.EqualsAndHashCode;
20   -import org.thingsboard.server.common.data.HasAdditionalInfo;
21   -import org.thingsboard.server.common.data.HasName;
22   -import org.thingsboard.server.common.data.SearchTextBased;
23   -import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
  20 +import org.thingsboard.server.common.data.*;
24 21 import org.thingsboard.server.common.data.id.AssetId;
25 22 import org.thingsboard.server.common.data.id.CustomerId;
26 23 import org.thingsboard.server.common.data.id.TenantId;
27 24
28 25 @EqualsAndHashCode(callSuper = true)
29   -public class Asset extends SearchTextBasedWithAdditionalInfo<AssetId> implements HasName {
  26 +public class Asset extends SearchTextBasedWithAdditionalInfo<AssetId> implements HasName, HasTenantId, HasCustomerId {
30 27
31 28 private static final long serialVersionUID = 2807343040519543363L;
32 29
... ...
... ... @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
21 21 import lombok.EqualsAndHashCode;
22 22 import lombok.extern.slf4j.Slf4j;
23 23 import org.thingsboard.server.common.data.HasName;
  24 +import org.thingsboard.server.common.data.HasTenantId;
24 25 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
25 26 import org.thingsboard.server.common.data.id.PluginId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -32,7 +33,7 @@ import java.io.IOException;
32 33
33 34 @EqualsAndHashCode(callSuper = true)
34 35 @Slf4j
35   -public class PluginMetaData extends SearchTextBasedWithAdditionalInfo<PluginId> implements HasName {
  36 +public class PluginMetaData extends SearchTextBasedWithAdditionalInfo<PluginId> implements HasName, HasTenantId {
36 37
37 38 private static final long serialVersionUID = 1L;
38 39
... ...
... ... @@ -21,6 +21,7 @@ import lombok.Data;
21 21 import lombok.EqualsAndHashCode;
22 22 import lombok.extern.slf4j.Slf4j;
23 23 import org.thingsboard.server.common.data.HasName;
  24 +import org.thingsboard.server.common.data.HasTenantId;
24 25 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
25 26 import org.thingsboard.server.common.data.id.RuleChainId;
26 27 import org.thingsboard.server.common.data.id.RuleNodeId;
... ... @@ -29,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId;
29 30 @Data
30 31 @EqualsAndHashCode(callSuper = true)
31 32 @Slf4j
32   -public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> implements HasName {
  33 +public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> implements HasName, HasTenantId {
33 34
34 35 private static final long serialVersionUID = -5656679015121935465L;
35 36
... ...
... ... @@ -23,6 +23,7 @@ import lombok.Data;
23 23 import lombok.EqualsAndHashCode;
24 24 import lombok.extern.slf4j.Slf4j;
25 25 import org.thingsboard.server.common.data.HasName;
  26 +import org.thingsboard.server.common.data.HasTenantId;
26 27 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
27 28 import org.thingsboard.server.common.data.id.RuleId;
28 29 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -31,7 +32,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
31 32 @Data
32 33 @EqualsAndHashCode(callSuper = true)
33 34 @Slf4j
34   -public class RuleMetaData extends SearchTextBasedWithAdditionalInfo<RuleId> implements HasName {
  35 +public class RuleMetaData extends SearchTextBasedWithAdditionalInfo<RuleId> implements HasName, HasTenantId {
35 36
36 37 private static final long serialVersionUID = -5656679015122935465L;
37 38
... ...
... ... @@ -61,13 +61,14 @@ public class HostRequestIntervalRegistry {
61 61 }
62 62
63 63 public long tick(String clientHostId) {
  64 + IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
  65 + long currentCount = intervalCount.resetIfExpiredAndTick();
64 66 if (whiteList.contains(clientHostId)) {
65 67 return 0;
66 68 } else if (blackList.contains(clientHostId)) {
67 69 return Long.MAX_VALUE;
68 70 }
69   - IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
70   - return intervalCount.resetIfExpiredAndTick();
  71 + return currentCount;
71 72 }
72 73
73 74 public void clean() {
... ...
... ... @@ -17,7 +17,17 @@ package org.thingsboard.rule.engine.api;
17 17
18 18 import org.thingsboard.server.common.msg.TbMsg;
19 19 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  20 +import org.thingsboard.server.dao.alarm.AlarmService;
  21 +import org.thingsboard.server.dao.asset.AssetService;
20 22 import org.thingsboard.server.dao.attributes.AttributesService;
  23 +import org.thingsboard.server.dao.customer.CustomerService;
  24 +import org.thingsboard.server.dao.device.DeviceService;
  25 +import org.thingsboard.server.dao.plugin.PluginService;
  26 +import org.thingsboard.server.dao.relation.RelationService;
  27 +import org.thingsboard.server.dao.rule.RuleChainService;
  28 +import org.thingsboard.server.dao.rule.RuleService;
  29 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  30 +import org.thingsboard.server.dao.user.UserService;
21 31
22 32 import java.util.UUID;
23 33
... ... @@ -40,6 +50,28 @@ public interface TbContext {
40 50
41 51 void ack(TbMsg msg);
42 52
  53 + void tellError(TbMsg msg, Throwable th);
  54 +
43 55 AttributesService getAttributesService();
44 56
  57 + CustomerService getCustomerService();
  58 +
  59 + UserService getUserService();
  60 +
  61 + RuleService getRuleService();
  62 +
  63 + PluginService getPluginService();
  64 +
  65 + AssetService getAssetService();
  66 +
  67 + DeviceService getDeviceService();
  68 +
  69 + AlarmService getAlarmService();
  70 +
  71 + RuleChainService getRuleChainService();
  72 +
  73 + TimeseriesService getTimeseriesService();
  74 +
  75 + RelationService getRelationService();
  76 +
45 77 }
... ...
... ... @@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
22 22 */
23 23 public class TbNodeException extends Exception {
24 24
  25 + public TbNodeException(String message) {
  26 + super(message);
  27 + }
  28 +
25 29 public TbNodeException(Exception e) {
26 30 super(e);
27 31 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +
  22 +import javax.annotation.Nullable;
  23 +import java.util.function.Consumer;
  24 +
  25 +public class DonAsynchron {
  26 +
  27 + public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess, Consumer<Throwable> onFailure) {
  28 + Futures.addCallback(future, new FutureCallback<T>() {
  29 + @Override
  30 + public void onSuccess(@Nullable T result) {
  31 + try {
  32 + onSuccess.accept(result);
  33 + } catch (Throwable th) {
  34 + onFailure(th);
  35 + }
  36 +
  37 + }
  38 +
  39 + @Override
  40 + public void onFailure(Throwable t) {
  41 + onFailure.accept(t);
  42 + }
  43 + });
  44 + }
  45 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import com.google.common.base.Function;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import org.thingsboard.rule.engine.TbNodeUtils;
  22 +import org.thingsboard.rule.engine.api.*;
  23 +import org.thingsboard.server.common.data.id.EntityId;
  24 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  25 +import org.thingsboard.server.common.data.kv.KvEntry;
  26 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  27 +import org.thingsboard.server.common.msg.TbMsg;
  28 +
  29 +import java.util.List;
  30 +import java.util.stream.Collectors;
  31 +
  32 +import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
  33 +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
  34 +
  35 +public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode {
  36 +
  37 + private TbGetEntityAttrNodeConfiguration config;
  38 +
  39 + @Override
  40 + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
  41 + this.config = TbNodeUtils.convert(configuration, TbGetEntityAttrNodeConfiguration.class);
  42 + }
  43 +
  44 + @Override
  45 + public void onMsg(TbContext ctx, TbMsg msg) {
  46 + try {
  47 + withCallback(
  48 + findEntityAsync(ctx, msg.getOriginator()),
  49 + entityId -> withCallback(
  50 + config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
  51 + attributes -> putAttributesAndTell(ctx, msg, attributes),
  52 + t -> ctx.tellError(msg, t)
  53 + ),
  54 + t -> ctx.tellError(msg, t));
  55 + } catch (Throwable th) {
  56 + ctx.tellError(msg, th);
  57 + }
  58 + }
  59 +
  60 + private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
  61 + ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
  62 + return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
  63 + l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
  64 + }
  65 +
  66 + private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
  67 + ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet());
  68 + return Futures.transform(latest, (Function<? super List<TsKvEntry>, ? extends List<KvEntry>>) l ->
  69 + l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
  70 + }
  71 +
  72 +
  73 + private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) {
  74 + attributes.forEach(r -> {
  75 + String attrName = config.getAttrMapping().get(r.getKey());
  76 + msg.getMetaData().putValue(attrName, r.getValueAsString());
  77 + });
  78 + ctx.tellNext(msg);
  79 + }
  80 +
  81 + @Override
  82 + public void destroy() {
  83 +
  84 + }
  85 +
  86 + protected abstract ListenableFuture<T> findEntityAsync(TbContext ctx, EntityId originator);
  87 +
  88 +}
... ...
... ... @@ -15,23 +15,27 @@
15 15 */
16 16 package org.thingsboard.rule.engine.metadata;
17 17
  18 +import com.google.common.util.concurrent.AsyncFunction;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
18 21 import lombok.extern.slf4j.Slf4j;
19 22 import org.thingsboard.rule.engine.TbNodeUtils;
20 23 import org.thingsboard.rule.engine.api.*;
21   -import org.thingsboard.server.common.data.DataConstants;
22 24 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
23 25 import org.thingsboard.server.common.msg.TbMsg;
24   -import org.thingsboard.server.dao.attributes.AttributesService;
25 26
26 27 import java.util.List;
27 28
  29 +import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
  30 +import static org.thingsboard.server.common.data.DataConstants.*;
  31 +
28 32 /**
29 33 * Created by ashvayka on 19.01.18.
30 34 */
31 35 @Slf4j
32 36 public class TbGetAttributesNode implements TbNode {
33 37
34   - TbGetAttributesNodeConfiguration config;
  38 + private TbGetAttributesNodeConfiguration config;
35 39
36 40 @Override
37 41 public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
... ... @@ -40,26 +44,25 @@ public class TbGetAttributesNode implements TbNode {
40 44
41 45 @Override
42 46 public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
43   - try {
44   - //TODO: refactor this to work async and fetch attributes from cache.
45   - AttributesService service = ctx.getAttributesService();
46   - fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
47   - fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
48   - fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
49   - ctx.tellNext(msg);
50   - } catch (Exception e) {
51   - log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
52   - throw new TbNodeException(e);
53   - }
  47 + ListenableFuture<List<Void>> future = Futures.allAsList(
  48 + putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."),
  49 + putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."),
  50 + putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss."));
  51 +
  52 + withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
54 53 }
55 54
56   - private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
57   - if (attributeNames != null && attributeNames.isEmpty()) {
58   - List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
59   - attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
60   - }
  55 + private ListenableFuture<Void> putAttributesAsync(TbMsg msg, List<AttributeKvEntry> attributes, String prefix) {
  56 + attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
  57 + return Futures.immediateFuture(null);
61 58 }
62 59
  60 + private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> attributes, String prefix) {
  61 + return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes),
  62 + (AsyncFunction<List<AttributeKvEntry>, Void>) i -> putAttributesAsync(msg, i, prefix));
  63 + }
  64 +
  65 +
63 66 @Override
64 67 public void destroy() {
65 68
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import com.google.common.util.concurrent.AsyncFunction;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import org.thingsboard.rule.engine.api.TbContext;
  22 +import org.thingsboard.rule.engine.api.TbNodeException;
  23 +import org.thingsboard.server.common.data.HasCustomerId;
  24 +import org.thingsboard.server.common.data.id.*;
  25 +
  26 +public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
  27 +
  28 + @Override
  29 + protected ListenableFuture<CustomerId> findEntityAsync(TbContext ctx, EntityId originator) {
  30 +
  31 + switch (originator.getEntityType()) {
  32 + case CUSTOMER:
  33 + return Futures.immediateFuture((CustomerId) originator);
  34 + case USER:
  35 + return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
  36 + case ASSET:
  37 + return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
  38 + case DEVICE:
  39 + return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
  40 + default:
  41 + return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
  42 + }
  43 + }
  44 +
  45 + private <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
  46 + return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
  47 + return in != null ? Futures.immediateFuture(in.getCustomerId())
  48 + : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
  49 + }
  50 +
  51 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import lombok.Data;
  19 +
  20 +import java.util.Map;
  21 +import java.util.Optional;
  22 +
  23 +@Data
  24 +public class TbGetEntityAttrNodeConfiguration {
  25 +
  26 + private Map<String, String> attrMapping;
  27 + private boolean isTelemetry = false;
  28 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.relation.EntitySearchDirection;
  20 +
  21 +@Data
  22 +public class TbGetRelatedAttrNodeConfiguration {
  23 +
  24 + private String relationType;
  25 + private EntitySearchDirection direction;
  26 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import com.google.common.util.concurrent.AsyncFunction;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import org.apache.commons.collections.CollectionUtils;
  22 +import org.thingsboard.rule.engine.TbNodeUtils;
  23 +import org.thingsboard.rule.engine.api.TbContext;
  24 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  25 +import org.thingsboard.rule.engine.api.TbNodeException;
  26 +import org.thingsboard.rule.engine.api.TbNodeState;
  27 +import org.thingsboard.server.common.data.id.EntityId;
  28 +import org.thingsboard.server.common.data.relation.EntityRelation;
  29 +import org.thingsboard.server.common.data.relation.EntitySearchDirection;
  30 +import org.thingsboard.server.dao.relation.RelationService;
  31 +
  32 +import java.util.List;
  33 +
  34 +import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
  35 +
  36 +public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
  37 +
  38 + private TbGetRelatedAttrNodeConfiguration config;
  39 +
  40 + @Override
  41 + public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
  42 + this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class);
  43 + }
  44 +
  45 + @Override
  46 + protected ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator) {
  47 + RelationService relationService = ctx.getRelationService();
  48 + if (config.getDirection() == EntitySearchDirection.FROM) {
  49 + ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON);
  50 + return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
  51 + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
  52 + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
  53 + } else if (config.getDirection() == EntitySearchDirection.TO) {
  54 + ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON);
  55 + return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
  56 + r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
  57 + : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
  58 + }
  59 +
  60 + return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
  61 + }
  62 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import com.google.common.util.concurrent.AsyncFunction;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.thingsboard.rule.engine.api.TbContext;
  23 +import org.thingsboard.rule.engine.api.TbNodeException;
  24 +import org.thingsboard.server.common.data.HasTenantId;
  25 +import org.thingsboard.server.common.data.alarm.AlarmId;
  26 +import org.thingsboard.server.common.data.id.*;
  27 +
  28 +@Slf4j
  29 +public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
  30 +
  31 + @Override
  32 + protected ListenableFuture<TenantId> findEntityAsync(TbContext ctx, EntityId originator) {
  33 +
  34 + switch (originator.getEntityType()) {
  35 + case TENANT:
  36 + return Futures.immediateFuture((TenantId) originator);
  37 + case CUSTOMER:
  38 + return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator));
  39 + case USER:
  40 + return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
  41 + case RULE:
  42 + return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator));
  43 + case PLUGIN:
  44 + return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator));
  45 + case ASSET:
  46 + return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
  47 + case DEVICE:
  48 + return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
  49 + case ALARM:
  50 + return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator));
  51 + case RULE_CHAIN:
  52 + return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator));
  53 + default:
  54 + return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
  55 + }
  56 + }
  57 +
  58 + private <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
  59 + return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
  60 + return in != null ? Futures.immediateFuture(in.getTenantId())
  61 + : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
  62 + }
  63 +
  64 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.metadata;
  17 +
  18 +import com.datastax.driver.core.utils.UUIDs;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.collect.Lists;
  21 +import com.google.common.util.concurrent.Futures;
  22 +import org.junit.Before;
  23 +import org.junit.Test;
  24 +import org.junit.runner.RunWith;
  25 +import org.mockito.ArgumentCaptor;
  26 +import org.mockito.Mock;
  27 +import org.mockito.runners.MockitoJUnitRunner;
  28 +import org.thingsboard.rule.engine.api.TbContext;
  29 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  30 +import org.thingsboard.rule.engine.api.TbNodeException;
  31 +import org.thingsboard.server.common.data.Device;
  32 +import org.thingsboard.server.common.data.User;
  33 +import org.thingsboard.server.common.data.asset.Asset;
  34 +import org.thingsboard.server.common.data.id.AssetId;
  35 +import org.thingsboard.server.common.data.id.CustomerId;
  36 +import org.thingsboard.server.common.data.id.DeviceId;
  37 +import org.thingsboard.server.common.data.id.UserId;
  38 +import org.thingsboard.server.common.data.kv.*;
  39 +import org.thingsboard.server.common.msg.TbMsg;
  40 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  41 +import org.thingsboard.server.dao.asset.AssetService;
  42 +import org.thingsboard.server.dao.attributes.AttributesService;
  43 +import org.thingsboard.server.dao.device.DeviceService;
  44 +import org.thingsboard.server.dao.timeseries.TimeseriesService;
  45 +import org.thingsboard.server.dao.user.UserService;
  46 +
  47 +import java.util.Collections;
  48 +import java.util.HashMap;
  49 +import java.util.List;
  50 +import java.util.Map;
  51 +
  52 +import static org.junit.Assert.assertEquals;
  53 +import static org.junit.Assert.assertTrue;
  54 +import static org.mockito.Matchers.same;
  55 +import static org.mockito.Mockito.verify;
  56 +import static org.mockito.Mockito.when;
  57 +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
  58 +
  59 +@RunWith(MockitoJUnitRunner.class)
  60 +public class TbGetCustomerAttributeNodeTest {
  61 +
  62 + private TbGetCustomerAttributeNode node;
  63 +
  64 + @Mock
  65 + private TbContext ctx;
  66 +
  67 + @Mock
  68 + private AttributesService attributesService;
  69 + @Mock
  70 + private TimeseriesService timeseriesService;
  71 + @Mock
  72 + private UserService userService;
  73 + @Mock
  74 + private AssetService assetService;
  75 + @Mock
  76 + private DeviceService deviceService;
  77 +
  78 + private TbMsg msg;
  79 +
  80 + @Before
  81 + public void init() throws TbNodeException {
  82 + TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
  83 + Map<String, String> attrMapping = new HashMap<>();
  84 + attrMapping.putIfAbsent("temperature", "tempo");
  85 + config.setAttrMapping(attrMapping);
  86 + config.setTelemetry(false);
  87 + ObjectMapper mapper = new ObjectMapper();
  88 + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
  89 + nodeConfiguration.setData(mapper.valueToTree(config));
  90 +
  91 + node = new TbGetCustomerAttributeNode();
  92 + node.init(nodeConfiguration, null);
  93 + }
  94 +
  95 + @Test
  96 + public void errorThrownIfCannotLoadAttributes() {
  97 + UserId userId = new UserId(UUIDs.timeBased());
  98 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  99 + User user = new User();
  100 + user.setCustomerId(customerId);
  101 +
  102 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
  103 +
  104 + when(ctx.getUserService()).thenReturn(userService);
  105 + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
  106 +
  107 + when(ctx.getAttributesService()).thenReturn(attributesService);
  108 + when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature")))
  109 + .thenThrow(new IllegalStateException("something wrong"));
  110 +
  111 + node.onMsg(ctx, msg);
  112 + final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
  113 + verify(ctx).tellError(same(msg), captor.capture());
  114 +
  115 + Throwable value = captor.getValue();
  116 + assertEquals("something wrong", value.getMessage());
  117 + assertTrue(msg.getMetaData().getData().isEmpty());
  118 + }
  119 +
  120 + @Test
  121 + public void errorThrownIfCannotLoadAttributesAsync() {
  122 + UserId userId = new UserId(UUIDs.timeBased());
  123 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  124 + User user = new User();
  125 + user.setCustomerId(customerId);
  126 +
  127 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
  128 +
  129 + when(ctx.getUserService()).thenReturn(userService);
  130 + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
  131 +
  132 + when(ctx.getAttributesService()).thenReturn(attributesService);
  133 + when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature")))
  134 + .thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
  135 +
  136 + node.onMsg(ctx, msg);
  137 + final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
  138 + verify(ctx).tellError(same(msg), captor.capture());
  139 +
  140 + Throwable value = captor.getValue();
  141 + assertEquals("something wrong", value.getMessage());
  142 + assertTrue(msg.getMetaData().getData().isEmpty());
  143 + }
  144 +
  145 + @Test
  146 + public void errorThrownIfCustomerCannotBeFound() {
  147 + UserId userId = new UserId(UUIDs.timeBased());
  148 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  149 + User user = new User();
  150 + user.setCustomerId(customerId);
  151 +
  152 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
  153 +
  154 + when(ctx.getUserService()).thenReturn(userService);
  155 + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
  156 +
  157 + node.onMsg(ctx, msg);
  158 + final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
  159 + verify(ctx).tellError(same(msg), captor.capture());
  160 +
  161 + Throwable value = captor.getValue();
  162 + assertEquals(IllegalStateException.class, value.getClass());
  163 + assertEquals("Customer not found", value.getMessage());
  164 + assertTrue(msg.getMetaData().getData().isEmpty());
  165 + }
  166 +
  167 + @Test
  168 + public void customerAttributeAddedInMetadata() {
  169 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  170 + msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), new byte[4]);
  171 + entityAttributeFetched(customerId);
  172 + }
  173 +
  174 + @Test
  175 + public void usersCustomerAttributesFetched() {
  176 + UserId userId = new UserId(UUIDs.timeBased());
  177 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  178 + User user = new User();
  179 + user.setCustomerId(customerId);
  180 +
  181 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
  182 +
  183 + when(ctx.getUserService()).thenReturn(userService);
  184 + when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
  185 +
  186 + entityAttributeFetched(customerId);
  187 + }
  188 +
  189 + @Test
  190 + public void assetsCustomerAttributesFetched() {
  191 + AssetId assetId = new AssetId(UUIDs.timeBased());
  192 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  193 + Asset asset = new Asset();
  194 + asset.setCustomerId(customerId);
  195 +
  196 + msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), new byte[4]);
  197 +
  198 + when(ctx.getAssetService()).thenReturn(assetService);
  199 + when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
  200 +
  201 + entityAttributeFetched(customerId);
  202 + }
  203 +
  204 + @Test
  205 + public void deviceCustomerAttributesFetched() {
  206 + DeviceId deviceId = new DeviceId(UUIDs.timeBased());
  207 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  208 + Device device = new Device();
  209 + device.setCustomerId(customerId);
  210 +
  211 + msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
  212 +
  213 + when(ctx.getDeviceService()).thenReturn(deviceService);
  214 + when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
  215 +
  216 + entityAttributeFetched(customerId);
  217 + }
  218 +
  219 + @Test
  220 + public void deviceCustomerTelemetryFetched() throws TbNodeException {
  221 + TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
  222 + Map<String, String> attrMapping = new HashMap<>();
  223 + attrMapping.putIfAbsent("temperature", "tempo");
  224 + config.setAttrMapping(attrMapping);
  225 + config.setTelemetry(true);
  226 + ObjectMapper mapper = new ObjectMapper();
  227 + TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
  228 + nodeConfiguration.setData(mapper.valueToTree(config));
  229 +
  230 + node = new TbGetCustomerAttributeNode();
  231 + node.init(nodeConfiguration, null);
  232 +
  233 +
  234 + DeviceId deviceId = new DeviceId(UUIDs.timeBased());
  235 + CustomerId customerId = new CustomerId(UUIDs.timeBased());
  236 + Device device = new Device();
  237 + device.setCustomerId(customerId);
  238 +
  239 + msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
  240 +
  241 + when(ctx.getDeviceService()).thenReturn(deviceService);
  242 + when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
  243 +
  244 + List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
  245 +
  246 + when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
  247 + when(timeseriesService.findLatest(customerId, Collections.singleton("temperature")))
  248 + .thenReturn(Futures.immediateFuture(timeseries));
  249 +
  250 + node.onMsg(ctx, msg);
  251 + verify(ctx).tellNext(msg);
  252 + assertEquals(msg.getMetaData().getValue("tempo"), "highest");
  253 + }
  254 +
  255 + private void entityAttributeFetched(CustomerId customerId) {
  256 + List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
  257 +
  258 + when(ctx.getAttributesService()).thenReturn(attributesService);
  259 + when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature")))
  260 + .thenReturn(Futures.immediateFuture(attributes));
  261 +
  262 + node.onMsg(ctx, msg);
  263 + verify(ctx).tellNext(msg);
  264 + assertEquals(msg.getMetaData().getValue("tempo"), "high");
  265 + }
  266 +}
\ No newline at end of file
... ...