Commit 4b43cd75f74aa32185871b6d45392cceec94849b

Authored by Andrii Shvaika
2 parents c3efbd40 ef33c687

Merge branch 'master' into develop/3.3

Showing 59 changed files with 973 additions and 159 deletions
... ... @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
35 35 import org.thingsboard.server.common.data.rule.RuleNode;
36 36 import org.thingsboard.server.common.msg.TbMsg;
37 37 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
  38 +import org.thingsboard.server.common.msg.plugin.RuleNodeUpdatedMsg;
38 39 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
39 40 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
40 41 import org.thingsboard.server.common.msg.queue.RuleEngineException;
... ... @@ -131,7 +132,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
131 132 } else {
132 133 log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
133 134 existing.setSelf(ruleNode);
134   - existing.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
  135 + existing.getSelfActor().tellWithHighPriority(new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
135 136 }
136 137 }
137 138
... ...
... ... @@ -26,7 +26,6 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
26 26 import org.thingsboard.server.common.data.id.RuleChainId;
27 27 import org.thingsboard.server.common.data.id.RuleNodeId;
28 28 import org.thingsboard.server.common.data.id.TenantId;
29   -import org.thingsboard.server.common.data.rule.RuleChain;
30 29 import org.thingsboard.server.common.msg.TbActorMsg;
31 30 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
32 31 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
... ... @@ -54,6 +53,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
54 53 protected boolean doProcess(TbActorMsg msg) {
55 54 switch (msg.getMsgType()) {
56 55 case COMPONENT_LIFE_CYCLE_MSG:
  56 + case RULE_NODE_UPDATED_MSG:
57 57 onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
58 58 break;
59 59 case RULE_CHAIN_TO_RULE_MSG:
... ...
... ... @@ -20,14 +20,13 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
20 20 import org.thingsboard.server.actors.ActorSystemContext;
21 21 import org.thingsboard.server.actors.TbActorCtx;
22 22 import org.thingsboard.server.actors.TbActorRef;
  23 +import org.thingsboard.server.actors.TbRuleNodeUpdateException;
23 24 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
24 25 import org.thingsboard.server.common.data.ApiUsageRecordKey;
25   -import org.thingsboard.server.common.data.TenantProfile;
26 26 import org.thingsboard.server.common.data.id.RuleNodeId;
27 27 import org.thingsboard.server.common.data.id.TenantId;
28 28 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
29 29 import org.thingsboard.server.common.data.rule.RuleNode;
30   -import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
31 30 import org.thingsboard.server.common.msg.TbMsg;
32 31 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
33 32 import org.thingsboard.server.common.msg.queue.RuleNodeException;
... ... @@ -78,7 +77,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
78 77 if (tbNode != null) {
79 78 tbNode.destroy();
80 79 }
81   - start(context);
  80 + try {
  81 + start(context);
  82 + } catch (Exception e) {
  83 + throw new TbRuleNodeUpdateException("Failed to update rule node", e);
  84 + }
82 85 }
83 86 }
84 87
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
20 20 import org.thingsboard.server.actors.TbActor;
21 21 import org.thingsboard.server.actors.TbActorCtx;
22 22 import org.thingsboard.server.actors.TbActorException;
  23 +import org.thingsboard.server.actors.TbRuleNodeUpdateException;
23 24 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
24 25 import org.thingsboard.server.actors.stats.StatsPersistMsg;
25 26 import org.thingsboard.server.common.data.id.EntityId;
... ... @@ -123,6 +124,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
123 124 } catch (Exception e) {
124 125 logAndPersist("onLifecycleMsg", e, true);
125 126 logLifecycleEvent(msg.getEvent(), e);
  127 + if (e instanceof TbRuleNodeUpdateException) {
  128 + throw (TbRuleNodeUpdateException) e;
  129 + }
126 130 }
127 131 }
128 132
... ...
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.actors.app.AppInitMsg;
34 34 import org.thingsboard.server.actors.stats.StatsActor;
35 35 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
36 36 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  37 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
37 38
38 39 import javax.annotation.PostConstruct;
39 40 import javax.annotation.PreDestroy;
... ... @@ -43,7 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
43 44
44 45 @Service
45 46 @Slf4j
46   -public class DefaultActorService implements ActorService {
  47 +public class DefaultActorService extends TbApplicationEventListener<PartitionChangeEvent> implements ActorService {
47 48
48 49 public static final String APP_DISPATCHER_NAME = "app-dispatcher";
49 50 public static final String TENANT_DISPATCHER_NAME = "tenant-dispatcher";
... ... @@ -120,10 +121,10 @@ public class DefaultActorService implements ActorService {
120 121 appActor.tellWithHighPriority(new AppInitMsg());
121 122 }
122 123
123   - @EventListener(PartitionChangeEvent.class)
124   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  124 + @Override
  125 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
125 126 log.info("Received partition change event.");
126   - this.appActor.tellWithHighPriority(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
  127 + this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceQueueKey(), event.getPartitions()));
127 128 }
128 129
129 130 @PreDestroy
... ...
... ... @@ -27,7 +27,22 @@ import org.springframework.beans.factory.annotation.Value;
27 27 import org.springframework.security.core.Authentication;
28 28 import org.springframework.security.core.context.SecurityContextHolder;
29 29 import org.springframework.web.bind.annotation.ExceptionHandler;
30   -import org.thingsboard.server.common.data.*;
  30 +import org.thingsboard.server.common.data.Customer;
  31 +import org.thingsboard.server.common.data.Dashboard;
  32 +import org.thingsboard.server.common.data.DashboardInfo;
  33 +import org.thingsboard.server.common.data.DataConstants;
  34 +import org.thingsboard.server.common.data.Device;
  35 +import org.thingsboard.server.common.data.DeviceInfo;
  36 +import org.thingsboard.server.common.data.DeviceProfile;
  37 +import org.thingsboard.server.common.data.EntityType;
  38 +import org.thingsboard.server.common.data.EntityView;
  39 +import org.thingsboard.server.common.data.EntityViewInfo;
  40 +import org.thingsboard.server.common.data.HasName;
  41 +import org.thingsboard.server.common.data.HasTenantId;
  42 +import org.thingsboard.server.common.data.Tenant;
  43 +import org.thingsboard.server.common.data.TenantInfo;
  44 +import org.thingsboard.server.common.data.TenantProfile;
  45 +import org.thingsboard.server.common.data.User;
31 46 import org.thingsboard.server.common.data.alarm.Alarm;
32 47 import org.thingsboard.server.common.data.alarm.AlarmInfo;
33 48 import org.thingsboard.server.common.data.asset.Asset;
... ... @@ -84,6 +99,7 @@ import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService;
84 99 import org.thingsboard.server.dao.oauth2.OAuth2Service;
85 100 import org.thingsboard.server.dao.relation.RelationService;
86 101 import org.thingsboard.server.dao.rule.RuleChainService;
  102 +import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
87 103 import org.thingsboard.server.dao.tenant.TenantProfileService;
88 104 import org.thingsboard.server.dao.tenant.TenantService;
89 105 import org.thingsboard.server.dao.user.UserService;
... ... @@ -96,7 +112,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
96 112 import org.thingsboard.server.service.component.ComponentDiscoveryService;
97 113 import org.thingsboard.server.service.lwm2m.LwM2MModelsRepository;
98 114 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
99   -import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
100 115 import org.thingsboard.server.service.queue.TbClusterService;
101 116 import org.thingsboard.server.service.security.model.SecurityUser;
102 117 import org.thingsboard.server.service.security.permission.AccessControlService;
... ... @@ -124,6 +139,9 @@ public abstract class BaseController {
124 139 public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
125 140 public static final String YOU_DON_T_HAVE_PERMISSION_TO_PERFORM_THIS_OPERATION = "You don't have permission to perform this operation!";
126 141
  142 + protected static final String DEFAULT_DASHBOARD = "defaultDashboardId";
  143 + protected static final String HOME_DASHBOARD = "homeDashboardId";
  144 +
127 145 private static final ObjectMapper json = new ObjectMapper();
128 146
129 147 @Autowired
... ... @@ -766,7 +784,7 @@ public abstract class BaseController {
766 784 String scope = extractParameter(String.class, 0, additionalInfo);
767 785 @SuppressWarnings("unchecked")
768 786 List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo);
769   - metaData.putValue("scope", scope);
  787 + metaData.putValue(DataConstants.SCOPE, scope);
770 788 if (attributes != null) {
771 789 for (AttributeKvEntry attr : attributes) {
772 790 addKvEntry(entityNode, attr);
... ... @@ -776,7 +794,7 @@ public abstract class BaseController {
776 794 String scope = extractParameter(String.class, 0, additionalInfo);
777 795 @SuppressWarnings("unchecked")
778 796 List<String> keys = extractParameter(List.class, 1, additionalInfo);
779   - metaData.putValue("scope", scope);
  797 + metaData.putValue(DataConstants.SCOPE, scope);
780 798 ArrayNode attrsArrayNode = entityNode.putArray("attributes");
781 799 if (keys != null) {
782 800 keys.forEach(attrsArrayNode::add);
... ... @@ -862,4 +880,14 @@ public abstract class BaseController {
862 880 }
863 881 }
864 882 }
  883 +
  884 + protected void processDashboardIdFromAdditionalInfo(ObjectNode additionalInfo, String requiredFields) throws ThingsboardException {
  885 + String dashboardId = additionalInfo.has(requiredFields) ? additionalInfo.get(requiredFields).asText() : null;
  886 + if(dashboardId != null && !dashboardId.equals("null")) {
  887 + if(dashboardService.findDashboardById(getTenantId(), new DashboardId(UUID.fromString(dashboardId))) == null) {
  888 + additionalInfo.remove(requiredFields);
  889 + }
  890 + }
  891 + }
  892 +
865 893 }
... ...
... ... @@ -55,7 +55,11 @@ public class CustomerController extends BaseController {
55 55 checkParameter(CUSTOMER_ID, strCustomerId);
56 56 try {
57 57 CustomerId customerId = new CustomerId(toUUID(strCustomerId));
58   - return checkCustomerId(customerId, Operation.READ);
  58 + Customer customer = checkCustomerId(customerId, Operation.READ);
  59 + if(!customer.getAdditionalInfo().isNull()) {
  60 + processDashboardIdFromAdditionalInfo((ObjectNode) customer.getAdditionalInfo(), HOME_DASHBOARD);
  61 + }
  62 + return customer;
59 63 } catch (Exception e) {
60 64 throw handleException(e);
61 65 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.controller;
17 17
  18 +import com.fasterxml.jackson.databind.node.ObjectNode;
18 19 import lombok.extern.slf4j.Slf4j;
19 20 import org.springframework.beans.factory.annotation.Autowired;
20 21 import org.springframework.http.HttpStatus;
... ... @@ -59,7 +60,11 @@ public class TenantController extends BaseController {
59 60 checkParameter("tenantId", strTenantId);
60 61 try {
61 62 TenantId tenantId = new TenantId(toUUID(strTenantId));
62   - return checkTenantId(tenantId, Operation.READ);
  63 + Tenant tenant = checkTenantId(tenantId, Operation.READ);
  64 + if(!tenant.getAdditionalInfo().isNull()) {
  65 + processDashboardIdFromAdditionalInfo((ObjectNode) tenant.getAdditionalInfo(), HOME_DASHBOARD);
  66 + }
  67 + return tenant;
63 68 } catch (Exception e) {
64 69 throw handleException(e);
65 70 }
... ...
... ... @@ -53,7 +53,6 @@ import org.thingsboard.server.service.security.model.token.JwtTokenFactory;
53 53 import org.thingsboard.server.service.security.permission.Operation;
54 54 import org.thingsboard.server.service.security.permission.Resource;
55 55 import org.thingsboard.server.service.security.system.SystemSecurityService;
56   -import org.thingsboard.server.utils.MiscUtils;
57 56
58 57 import javax.servlet.http.HttpServletRequest;
59 58
... ... @@ -90,7 +89,12 @@ public class UserController extends BaseController {
90 89 checkParameter(USER_ID, strUserId);
91 90 try {
92 91 UserId userId = new UserId(toUUID(strUserId));
93   - return checkUserId(userId, Operation.READ);
  92 + User user = checkUserId(userId, Operation.READ);
  93 + if(!user.getAdditionalInfo().isNull()) {
  94 + processDashboardIdFromAdditionalInfo((ObjectNode) user.getAdditionalInfo(), DEFAULT_DASHBOARD);
  95 + processDashboardIdFromAdditionalInfo((ObjectNode) user.getAdditionalInfo(), HOME_DASHBOARD);
  96 + }
  97 + return user;
94 98 } catch (Exception e) {
95 99 throw handleException(e);
96 100 }
... ... @@ -329,4 +333,5 @@ public class UserController extends BaseController {
329 333 throw handleException(e);
330 334 }
331 335 }
  336 +
332 337 }
... ...
... ... @@ -54,6 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
54 54 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
55 55 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
56 56 import org.thingsboard.server.queue.discovery.PartitionService;
  57 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
57 58 import org.thingsboard.server.queue.scheduler.SchedulerComponent;
58 59 import org.thingsboard.server.service.queue.TbClusterService;
59 60 import org.thingsboard.server.service.telemetry.InternalTelemetryService;
... ... @@ -78,7 +79,7 @@ import java.util.stream.Collectors;
78 79
79 80 @Slf4j
80 81 @Service
81   -public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
  82 +public class DefaultTbApiUsageStateService extends TbApplicationEventListener<PartitionChangeEvent> implements TbApiUsageStateService {
82 83
83 84 public static final String HOURLY = "Hourly";
84 85 public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
... ... @@ -188,7 +189,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
188 189 }
189 190
190 191 @Override
191   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  192 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
192 193 if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
193 194 myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
194 195 otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
... ...
... ... @@ -151,12 +151,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
151 151 }
152 152
153 153 @Override
154   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
155   - if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
156   - log.info("Subscribing to partitions: {}", partitionChangeEvent.getPartitions());
157   - this.mainConsumer.subscribe(partitionChangeEvent.getPartitions());
  154 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
  155 + if (event.getServiceType().equals(getServiceType())) {
  156 + log.info("Subscribing to partitions: {}", event.getPartitions());
  157 + this.mainConsumer.subscribe(event.getPartitions());
158 158 this.usageStatsConsumer.subscribe(
159   - partitionChangeEvent
  159 + event
160 160 .getPartitions()
161 161 .stream()
162 162 .map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
... ...
... ... @@ -140,11 +140,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
140 140 }
141 141
142 142 @Override
143   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
144   - if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
145   - ServiceQueue serviceQueue = partitionChangeEvent.getServiceQueueKey().getServiceQueue();
146   - log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), partitionChangeEvent.getPartitions());
147   - consumers.get(serviceQueue.getQueue()).subscribe(partitionChangeEvent.getPartitions());
  143 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
  144 + if (event.getServiceType().equals(getServiceType())) {
  145 + ServiceQueue serviceQueue = event.getServiceQueueKey().getServiceQueue();
  146 + log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), event.getPartitions());
  147 + consumers.get(serviceQueue.getQueue()).subscribe(event.getPartitions());
148 148 }
149 149 }
150 150
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
36 36 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
37 37 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
38 38 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
  39 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
39 40 import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
40 41 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
41 42 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
... ... @@ -56,7 +57,7 @@ import java.util.function.Function;
56 57 import java.util.stream.Collectors;
57 58
58 59 @Slf4j
59   -public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> implements ApplicationListener<PartitionChangeEvent> {
  60 +public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
60 61
61 62 protected volatile ExecutorService consumersExecutor;
62 63 protected volatile ExecutorService notificationsConsumerExecutor;
... ...
... ... @@ -56,6 +56,7 @@ import org.thingsboard.server.dao.util.mapping.JacksonUtil;
56 56 import org.thingsboard.server.gen.transport.TransportProtos;
57 57 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
58 58 import org.thingsboard.server.queue.discovery.PartitionService;
  59 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
59 60 import org.thingsboard.server.queue.util.TbCoreComponent;
60 61 import org.thingsboard.server.service.queue.TbClusterService;
61 62 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
... ... @@ -90,7 +91,7 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
90 91 @Service
91 92 @TbCoreComponent
92 93 @Slf4j
93   -public class DefaultDeviceStateService implements DeviceStateService {
  94 +public class DefaultDeviceStateService extends TbApplicationEventListener<PartitionChangeEvent> implements DeviceStateService {
94 95
95 96 public static final String ACTIVITY_STATE = "active";
96 97 public static final String LAST_CONNECT_TIME = "lastConnectTime";
... ... @@ -206,7 +207,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
206 207 if (!state.isActive()) {
207 208 state.setActive(true);
208 209 save(deviceId, ACTIVITY_STATE, state.isActive());
209   - stateData.getMetaData().putValue("scope", SERVER_SCOPE);
210 210 pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
211 211 }
212 212 }
... ... @@ -295,7 +295,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
295 295 }
296 296
297 297 @Override
298   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  298 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
299 299 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
300 300 deduplicationExecutor.submit(partitionChangeEvent.getPartitions());
301 301 }
... ... @@ -447,7 +447,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
447 447 }
448 448
449 449 private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {
450   - return new Function<List<T>, DeviceStateData>() {
  450 + return new Function<>() {
451 451 @Nullable
452 452 @Override
453 453 public DeviceStateData apply(@Nullable List<T> data) {
... ... @@ -503,7 +503,11 @@ public class DefaultDeviceStateService implements DeviceStateService {
503 503 } else {
504 504 data = JacksonUtil.toString(state);
505 505 }
506   - TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON, data);
  506 + TbMsgMetaData md = stateData.getMetaData().copy();
  507 + if(!persistToTelemetry){
  508 + md.putValue(DataConstants.SCOPE, SERVER_SCOPE);
  509 + }
  510 + TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), md, TbMsgDataType.JSON, data);
507 511 clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
508 512 } catch (Exception e) {
509 513 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
... ...
... ... @@ -48,6 +48,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
48 48 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
49 49 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
50 50 import org.thingsboard.server.queue.discovery.PartitionService;
  51 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
51 52 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
52 53 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
53 54 import org.thingsboard.server.queue.util.TbCoreComponent;
... ... @@ -76,7 +77,7 @@ import java.util.function.Predicate;
76 77 @Slf4j
77 78 @TbCoreComponent
78 79 @Service
79   -public class DefaultSubscriptionManagerService implements SubscriptionManagerService {
  80 +public class DefaultSubscriptionManagerService extends TbApplicationEventListener<PartitionChangeEvent> implements SubscriptionManagerService {
80 81
81 82 @Autowired
82 83 private AttributesService attrService;
... ... @@ -178,7 +179,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
178 179 }
179 180
180 181 @Override
181   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  182 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
182 183 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
183 184 Set<TopicPartitionInfo> removedPartitions = new HashSet<>(currentPartitions);
184 185 removedPartitions.removeAll(partitionChangeEvent.getPartitions());
... ...
... ... @@ -476,7 +476,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
476 476 public void cancelAllSessionSubscriptions(String sessionId) {
477 477 Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.remove(sessionId);
478 478 if (sessionSubs != null) {
479   - sessionSubs.values().stream().filter(sub -> sub instanceof TbEntityDataSubCtx).map(sub -> (TbEntityDataSubCtx) sub).forEach(this::cleanupAndCancel);
  479 + sessionSubs.values().forEach(this::cleanupAndCancel);
480 480 }
481 481 }
482 482
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
28 28 import org.thingsboard.server.common.msg.queue.ServiceType;
29 29 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
30 30 import org.thingsboard.server.common.msg.queue.TbCallback;
  31 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
31 32 import org.thingsboard.server.queue.util.TbCoreComponent;
32 33 import org.thingsboard.server.service.queue.TbClusterService;
33 34 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
... ... @@ -62,6 +63,34 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
62 63 private SubscriptionManagerService subscriptionManagerService;
63 64
64 65 private ExecutorService subscriptionUpdateExecutor;
  66 +
  67 + private TbApplicationEventListener<PartitionChangeEvent> partitionChangeListener = new TbApplicationEventListener<>() {
  68 + @Override
  69 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
  70 + if (ServiceType.TB_CORE.equals(event.getServiceType())) {
  71 + currentPartitions.clear();
  72 + currentPartitions.addAll(event.getPartitions());
  73 + }
  74 + }
  75 + };
  76 +
  77 + private TbApplicationEventListener<ClusterTopologyChangeEvent> clusterTopologyChangeListener = new TbApplicationEventListener<>() {
  78 + @Override
  79 + protected void onTbApplicationEvent(ClusterTopologyChangeEvent event) {
  80 + if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
  81 + /*
  82 + * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
  83 + * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
  84 + * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
  85 + * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
  86 + * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
  87 + * Since number of subscriptions is usually much less then number of devices that are pushing data.
  88 + */
  89 + subscriptionsBySessionId.values().forEach(map -> map.values()
  90 + .forEach(sub -> pushSubscriptionToManagerService(sub, true)));
  91 + }
  92 + }
  93 + };
65 94
66 95 @PostConstruct
67 96 public void initExecutor() {
... ... @@ -77,28 +106,14 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
77 106
78 107 @Override
79 108 @EventListener(PartitionChangeEvent.class)
80   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
81   - if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
82   - currentPartitions.clear();
83   - currentPartitions.addAll(partitionChangeEvent.getPartitions());
84   - }
  109 + public void onApplicationEvent(PartitionChangeEvent event) {
  110 + partitionChangeListener.onApplicationEvent(event);
85 111 }
86 112
87 113 @Override
88 114 @EventListener(ClusterTopologyChangeEvent.class)
89 115 public void onApplicationEvent(ClusterTopologyChangeEvent event) {
90   - if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
91   - /*
92   - * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
93   - * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
94   - * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
95   - * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
96   - * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
97   - * Since number of subscriptions is usually much less then number of devices that are pushing data.
98   - */
99   - subscriptionsBySessionId.values().forEach(map -> map.values()
100   - .forEach(sub -> pushSubscriptionToManagerService(sub, true)));
101   - }
  116 + clusterTopologyChangeListener.onApplicationEvent(event);
102 117 }
103 118
104 119 //TODO 3.1: replace null callbacks with callbacks from websocket service.
... ...
... ... @@ -41,6 +41,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
41 41 import org.thingsboard.server.gen.transport.TransportProtos;
42 42 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
43 43 import org.thingsboard.server.queue.discovery.PartitionService;
  44 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
44 45 import org.thingsboard.server.service.queue.TbClusterService;
45 46 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
46 47 import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
... ... @@ -61,7 +62,7 @@ import java.util.function.Consumer;
61 62 * Created by ashvayka on 27.03.18.
62 63 */
63 64 @Slf4j
64   -public abstract class AbstractSubscriptionService implements ApplicationListener<PartitionChangeEvent> {
  65 +public abstract class AbstractSubscriptionService extends TbApplicationEventListener<PartitionChangeEvent>{
65 66
66 67 protected final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
67 68
... ... @@ -97,8 +98,7 @@ public abstract class AbstractSubscriptionService implements ApplicationListener
97 98 }
98 99
99 100 @Override
100   - @EventListener(PartitionChangeEvent.class)
101   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  101 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
102 102 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
103 103 currentPartitions.clear();
104 104 currentPartitions.addAll(partitionChangeEvent.getPartitions());
... ...
... ... @@ -118,6 +118,15 @@ security:
118 118 githubMapper:
119 119 emailUrl: "${SECURITY_OAUTH2_GITHUB_MAPPER_EMAIL_URL_KEY:https://api.github.com/user/emails}"
120 120
  121 +# Usage statistics parameters
  122 +usage:
  123 + stats:
  124 + report:
  125 + enabled: "${USAGE_STATS_REPORT_ENABLED:true}"
  126 + interval: "${USAGE_STATS_REPORT_INTERVAL:10}"
  127 + check:
  128 + cycle: "${USAGE_STATS_CHECK_CYCLE:60000}"
  129 +
121 130 # Dashboard parameters
122 131 dashboard:
123 132 # Maximum allowed datapoints fetched by widgets
... ... @@ -675,6 +684,10 @@ queue:
675 684 transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
676 685 notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
677 686 js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
  687 + consumer-stats:
  688 + enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
  689 + print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
  690 + kafka-response-timeout-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS:1000}"
678 691 aws_sqs:
679 692 use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
680 693 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.actors;
17 17
18 18 import lombok.Data;
19 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.msg.MsgType;
20 21 import org.thingsboard.server.common.msg.TbActorMsg;
21 22 import org.thingsboard.server.common.msg.TbActorStopReason;
22 23
... ... @@ -73,7 +74,7 @@ public final class TbActorMailbox implements TbActorCtx {
73 74 if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
74 75 log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
75 76 stopReason = TbActorStopReason.INIT_FAILED;
76   - system.stop(selfId);
  77 + destroy();
77 78 } else if (strategy.getRetryDelay() > 0) {
78 79 log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay());
79 80 log.debug("[{}] Error", selfId, t);
... ... @@ -95,7 +96,19 @@ public final class TbActorMailbox implements TbActorCtx {
95 96 }
96 97 tryProcessQueue(true);
97 98 } else {
98   - msg.onTbActorStopped(stopReason);
  99 + if (highPriority && msg.getMsgType().equals(MsgType.RULE_NODE_UPDATED_MSG)) {
  100 + synchronized (this) {
  101 + if (stopReason == TbActorStopReason.INIT_FAILED) {
  102 + destroyInProgress.set(false);
  103 + stopReason = null;
  104 + initActor();
  105 + } else {
  106 + msg.onTbActorStopped(stopReason);
  107 + }
  108 + }
  109 + } else {
  110 + msg.onTbActorStopped(stopReason);
  111 + }
99 112 }
100 113 }
101 114
... ... @@ -126,6 +139,9 @@ public final class TbActorMailbox implements TbActorCtx {
126 139 try {
127 140 log.debug("[{}] Going to process message: {}", selfId, msg);
128 141 actor.process(msg);
  142 + } catch (TbRuleNodeUpdateException updateException){
  143 + stopReason = TbActorStopReason.INIT_FAILED;
  144 + destroy();
129 145 } catch (Throwable t) {
130 146 log.debug("[{}] Failed to process message: {}", selfId, msg, t);
131 147 ProcessFailureStrategy strategy = actor.onProcessFailure(t);
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors;
  17 +
  18 +public class TbRuleNodeUpdateException extends RuntimeException {
  19 +
  20 + private static final long serialVersionUID = 8209771144711980882L;
  21 +
  22 + public TbRuleNodeUpdateException(String message, Throwable cause) {
  23 + super(message, cause);
  24 + }
  25 +}
  26 +
... ...
... ... @@ -24,6 +24,7 @@ public class DataConstants {
24 24 public static final String CUSTOMER = "CUSTOMER";
25 25 public static final String DEVICE = "DEVICE";
26 26
  27 + public static final String SCOPE = "scope";
27 28 public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
28 29 public static final String SERVER_SCOPE = "SERVER_SCOPE";
29 30 public static final String SHARED_SCOPE = "SHARED_SCOPE";
... ...
... ... @@ -17,17 +17,23 @@ package org.thingsboard.server.common.data.query;
17 17
18 18 import com.fasterxml.jackson.annotation.JsonIgnore;
19 19 import lombok.Data;
20   -import lombok.Getter;
  20 +import lombok.RequiredArgsConstructor;
21 21
22 22 @Data
  23 +@RequiredArgsConstructor
23 24 public class DynamicValue<T> {
24 25
25 26 @JsonIgnore
26 27 private T resolvedValue;
27 28
28   - @Getter
29 29 private final DynamicValueSourceType sourceType;
30   - @Getter
31 30 private final String sourceAttribute;
  31 + private final boolean inherit;
  32 +
  33 + public DynamicValue(DynamicValueSourceType sourceType, String sourceAttribute) {
  34 + this.sourceAttribute = sourceAttribute;
  35 + this.sourceType = sourceType;
  36 + this.inherit = false;
  37 + }
32 38
33 39 }
... ...
... ... @@ -40,6 +40,11 @@ public enum MsgType {
40 40 COMPONENT_LIFE_CYCLE_MSG,
41 41
42 42 /**
  43 + * Special message to indicate rule node update request
  44 + */
  45 + RULE_NODE_UPDATED_MSG,
  46 +
  47 + /**
43 48 * Misc messages consumed from the Queue and forwarded to Rule Engine Actor.
44 49 *
45 50 * See {@link QueueToRuleEngineMsg}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.plugin;
  17 +
  18 +import lombok.ToString;
  19 +import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  22 +import org.thingsboard.server.common.msg.MsgType;
  23 +
  24 +import java.util.Optional;
  25 +
  26 +/**
  27 + * @author Andrew Shvayka
  28 + */
  29 +@ToString
  30 +public class RuleNodeUpdatedMsg extends ComponentLifecycleMsg {
  31 +
  32 + public RuleNodeUpdatedMsg(TenantId tenantId, EntityId entityId) {
  33 + super(tenantId, entityId, ComponentLifecycleEvent.UPDATED);
  34 + }
  35 +
  36 + @Override
  37 + public MsgType getMsgType() {
  38 + return MsgType.RULE_NODE_UPDATED_MSG;
  39 + }
  40 +}
\ No newline at end of file
... ...
... ... @@ -22,7 +22,9 @@ import org.thingsboard.server.common.msg.queue.ServiceQueueKey;
22 22 import java.util.Set;
23 23
24 24
25   -public class ClusterTopologyChangeEvent extends ApplicationEvent {
  25 +public class ClusterTopologyChangeEvent extends TbApplicationEvent {
  26 +
  27 + private static final long serialVersionUID = -2441739930040282254L;
26 28
27 29 @Getter
28 30 private final Set<ServiceQueueKey> serviceQueueKeys;
... ...
... ... @@ -126,7 +126,7 @@ public class HashPartitionService implements PartitionService {
126 126 }
127 127
128 128 @Override
129   - public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
  129 + public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
130 130 logServiceInfo(currentService);
131 131 otherServices.forEach(this::logServiceInfo);
132 132 Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
... ... @@ -134,7 +134,7 @@ public class HashPartitionService implements PartitionService {
134 134 for (ServiceInfo other : otherServices) {
135 135 addNode(queueServicesMap, other);
136 136 }
137   - queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));
  137 + queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
138 138
139 139 ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
140 140 TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
... ...
... ... @@ -24,7 +24,9 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
24 24 import java.util.Set;
25 25
26 26
27   -public class PartitionChangeEvent extends ApplicationEvent {
  27 +public class PartitionChangeEvent extends TbApplicationEvent {
  28 +
  29 + private static final long serialVersionUID = -8731788167026510559L;
28 30
29 31 @Getter
30 32 private final ServiceQueueKey serviceQueueKey;
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.discovery;
  17 +
  18 +import lombok.Getter;
  19 +import org.springframework.context.ApplicationEvent;
  20 +
  21 +import java.util.concurrent.atomic.AtomicInteger;
  22 +
  23 +public class TbApplicationEvent extends ApplicationEvent {
  24 +
  25 + private static final long serialVersionUID = 3884264064887765146L;
  26 +
  27 + private static final AtomicInteger sequence = new AtomicInteger();
  28 +
  29 + @Getter
  30 + private final int sequenceNumber;
  31 +
  32 + public TbApplicationEvent(Object source) {
  33 + super(source);
  34 + sequenceNumber = sequence.incrementAndGet();
  35 + }
  36 +
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.discovery;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.context.ApplicationListener;
  20 +
  21 +import java.util.concurrent.locks.Lock;
  22 +import java.util.concurrent.locks.ReentrantLock;
  23 +
  24 +@Slf4j
  25 +public abstract class TbApplicationEventListener<T extends TbApplicationEvent> implements ApplicationListener<T> {
  26 +
  27 + private int lastProcessedSequenceNumber = Integer.MIN_VALUE;
  28 + private final Lock seqNumberLock = new ReentrantLock();
  29 +
  30 + @Override
  31 + public void onApplicationEvent(T event) {
  32 + boolean validUpdate = false;
  33 + seqNumberLock.lock();
  34 + try {
  35 + if (event.getSequenceNumber() > lastProcessedSequenceNumber) {
  36 + validUpdate = true;
  37 + lastProcessedSequenceNumber = event.getSequenceNumber();
  38 + }
  39 + } finally {
  40 + seqNumberLock.unlock();
  41 + }
  42 + if (validUpdate) {
  43 + onTbApplicationEvent(event);
  44 + } else {
  45 + log.info("Application event ignored due to invalid sequence number ({} > {}). Event: {}", lastProcessedSequenceNumber, event.getSequenceNumber(), event);
  46 + }
  47 + }
  48 +
  49 + protected abstract void onTbApplicationEvent(T event);
  50 +
  51 +
  52 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.kafka;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Getter;
  20 +import lombok.NoArgsConstructor;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  23 +import org.springframework.stereotype.Component;
  24 +
  25 +@Component
  26 +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
  27 +@Getter
  28 +@AllArgsConstructor
  29 +@NoArgsConstructor
  30 +public class TbKafkaConsumerStatisticConfig {
  31 + @Value("${queue.kafka.consumer-stats.enabled:true}")
  32 + private Boolean enabled;
  33 + @Value("${queue.kafka.consumer-stats.print-interval-ms:60000}")
  34 + private Long printIntervalMs;
  35 + @Value("${queue.kafka.consumer-stats.kafka-response-timeout-ms:1000}")
  36 + private Long kafkaResponseTimeoutMs;
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.kafka;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Data;
  20 +import lombok.RequiredArgsConstructor;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.apache.kafka.clients.admin.AdminClient;
  23 +import org.apache.kafka.clients.consumer.Consumer;
  24 +import org.apache.kafka.clients.consumer.ConsumerConfig;
  25 +import org.apache.kafka.clients.consumer.KafkaConsumer;
  26 +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  27 +import org.apache.kafka.common.TopicPartition;
  28 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  29 +import org.springframework.stereotype.Component;
  30 +import org.springframework.util.StringUtils;
  31 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  32 +import org.thingsboard.server.common.data.id.TenantId;
  33 +import org.thingsboard.server.common.msg.queue.ServiceType;
  34 +import org.thingsboard.server.queue.discovery.PartitionService;
  35 +
  36 +import javax.annotation.PostConstruct;
  37 +import javax.annotation.PreDestroy;
  38 +import java.time.Duration;
  39 +import java.util.ArrayList;
  40 +import java.util.List;
  41 +import java.util.Map;
  42 +import java.util.Properties;
  43 +import java.util.Set;
  44 +import java.util.concurrent.ConcurrentHashMap;
  45 +import java.util.concurrent.Executors;
  46 +import java.util.concurrent.ScheduledExecutorService;
  47 +import java.util.concurrent.TimeUnit;
  48 +
  49 +@Slf4j
  50 +@Component
  51 +@RequiredArgsConstructor
  52 +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
  53 +public class TbKafkaConsumerStatsService {
  54 + private final Set<String> monitoredGroups = ConcurrentHashMap.newKeySet();
  55 +
  56 + private final TbKafkaSettings kafkaSettings;
  57 + private final TbKafkaConsumerStatisticConfig statsConfig;
  58 + private final PartitionService partitionService;
  59 +
  60 + private AdminClient adminClient;
  61 + private Consumer<String, byte[]> consumer;
  62 + private ScheduledExecutorService statsPrintScheduler;
  63 +
  64 + @PostConstruct
  65 + public void init() {
  66 + if (!statsConfig.getEnabled()) {
  67 + return;
  68 + }
  69 + this.adminClient = AdminClient.create(kafkaSettings.toAdminProps());
  70 + this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats"));
  71 +
  72 + Properties consumerProps = kafkaSettings.toConsumerProps();
  73 + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client");
  74 + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group");
  75 + this.consumer = new KafkaConsumer<>(consumerProps);
  76 +
  77 + startLogScheduling();
  78 + }
  79 +
  80 + private void startLogScheduling() {
  81 + Duration timeoutDuration = Duration.ofMillis(statsConfig.getKafkaResponseTimeoutMs());
  82 + statsPrintScheduler.scheduleWithFixedDelay(() -> {
  83 + if (!isStatsPrintRequired()) {
  84 + return;
  85 + }
  86 + for (String groupId : monitoredGroups) {
  87 + try {
  88 + Map<TopicPartition, OffsetAndMetadata> groupOffsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
  89 + .get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS);
  90 + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration);
  91 +
  92 + List<GroupTopicStats> lagTopicsStats = getTopicsStatsWithLag(groupOffsets, endOffsets);
  93 + if (!lagTopicsStats.isEmpty()) {
  94 + StringBuilder builder = new StringBuilder();
  95 + for (int i = 0; i < lagTopicsStats.size(); i++) {
  96 + builder.append(lagTopicsStats.get(i).toString());
  97 + if (i != lagTopicsStats.size() - 1) {
  98 + builder.append(", ");
  99 + }
  100 + }
  101 + log.info("[{}] Topic partitions with lag: [{}].", groupId, builder.toString());
  102 + }
  103 + } catch (Exception e) {
  104 + log.warn("[{}] Failed to get consumer group stats. Reason - {}.", groupId, e.getMessage());
  105 + log.trace("Detailed error: ", e);
  106 + }
  107 + }
  108 +
  109 + }, statsConfig.getPrintIntervalMs(), statsConfig.getPrintIntervalMs(), TimeUnit.MILLISECONDS);
  110 + }
  111 +
  112 + private boolean isStatsPrintRequired() {
  113 + boolean isMyRuleEnginePartition = partitionService.resolve(ServiceType.TB_RULE_ENGINE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
  114 + boolean isMyCorePartition = partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
  115 + return log.isInfoEnabled() && (isMyRuleEnginePartition || isMyCorePartition);
  116 + }
  117 +
  118 + private List<GroupTopicStats> getTopicsStatsWithLag(Map<TopicPartition, OffsetAndMetadata> groupOffsets, Map<TopicPartition, Long> endOffsets) {
  119 + List<GroupTopicStats> consumerGroupStats = new ArrayList<>();
  120 + for (TopicPartition topicPartition : groupOffsets.keySet()) {
  121 + long endOffset = endOffsets.get(topicPartition);
  122 + long committedOffset = groupOffsets.get(topicPartition).offset();
  123 + long lag = endOffset - committedOffset;
  124 + if (lag != 0) {
  125 + GroupTopicStats groupTopicStats = GroupTopicStats.builder()
  126 + .topic(topicPartition.topic())
  127 + .partition(topicPartition.partition())
  128 + .committedOffset(committedOffset)
  129 + .endOffset(endOffset)
  130 + .lag(lag)
  131 + .build();
  132 + consumerGroupStats.add(groupTopicStats);
  133 + }
  134 + }
  135 + return consumerGroupStats;
  136 + }
  137 +
  138 + public void registerClientGroup(String groupId) {
  139 + if (statsConfig.getEnabled() && !StringUtils.isEmpty(groupId)) {
  140 + monitoredGroups.add(groupId);
  141 + }
  142 + }
  143 +
  144 + public void unregisterClientGroup(String groupId) {
  145 + if (statsConfig.getEnabled() && !StringUtils.isEmpty(groupId)) {
  146 + monitoredGroups.remove(groupId);
  147 + }
  148 + }
  149 +
  150 + @PreDestroy
  151 + public void destroy() {
  152 + if (statsPrintScheduler != null) {
  153 + statsPrintScheduler.shutdownNow();
  154 + }
  155 + if (adminClient != null) {
  156 + adminClient.close();
  157 + }
  158 + if (consumer != null) {
  159 + consumer.close();
  160 + }
  161 + }
  162 +
  163 +
  164 + @Builder
  165 + @Data
  166 + private static class GroupTopicStats {
  167 + private String topic;
  168 + private int partition;
  169 + private long committedOffset;
  170 + private long endOffset;
  171 + private long lag;
  172 +
  173 + @Override
  174 + public String toString() {
  175 + return "[" +
  176 + "topic=[" + topic + ']' +
  177 + ", partition=[" + partition + "]" +
  178 + ", committedOffset=[" + committedOffset + "]" +
  179 + ", endOffset=[" + endOffset + "]" +
  180 + ", lag=[" + lag + "]" +
  181 + "]";
  182 + }
  183 + }
  184 +}
... ...
... ... @@ -42,10 +42,13 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
42 42 private final KafkaConsumer<String, byte[]> consumer;
43 43 private final TbKafkaDecoder<T> decoder;
44 44
  45 + private final TbKafkaConsumerStatsService statsService;
  46 + private final String groupId;
  47 +
45 48 @Builder
46 49 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
47 50 String clientId, String groupId, String topic,
48   - TbQueueAdmin admin) {
  51 + TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) {
49 52 super(topic);
50 53 Properties props = settings.toConsumerProps();
51 54 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
... ... @@ -53,6 +56,13 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
53 56 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
54 57 }
55 58
  59 + this.statsService = statsService;
  60 + this.groupId = groupId;
  61 +
  62 + if (statsService != null) {
  63 + statsService.registerClientGroup(groupId);
  64 + }
  65 +
56 66 this.admin = admin;
57 67 this.consumer = new KafkaConsumer<>(props);
58 68 this.decoder = decoder;
... ... @@ -96,6 +106,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
96 106 consumer.unsubscribe();
97 107 consumer.close();
98 108 }
  109 + if (statsService != null) {
  110 + statsService.unregisterClientGroup(groupId);
  111 + }
99 112 }
100   -
101 113 }
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
39 39 import org.thingsboard.server.queue.discovery.PartitionService;
40 40 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
41 41 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  42 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
43 44 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
44 45 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -65,6 +66,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
65 66 private final TbQueueTransportApiSettings transportApiSettings;
66 67 private final TbQueueTransportNotificationSettings transportNotificationSettings;
67 68 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  69 + private final TbKafkaConsumerStatsService consumerStatsService;
68 70
69 71 private final TbQueueAdmin coreAdmin;
70 72 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -79,6 +81,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
79 81 TbQueueTransportApiSettings transportApiSettings,
80 82 TbQueueTransportNotificationSettings transportNotificationSettings,
81 83 TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  84 + TbKafkaConsumerStatsService consumerStatsService,
82 85 TbKafkaTopicConfigs kafkaTopicConfigs) {
83 86 this.partitionService = partitionService;
84 87 this.kafkaSettings = kafkaSettings;
... ... @@ -88,6 +91,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
88 91 this.transportApiSettings = transportApiSettings;
89 92 this.transportNotificationSettings = transportNotificationSettings;
90 93 this.jsInvokeSettings = jsInvokeSettings;
  94 + this.consumerStatsService = consumerStatsService;
91 95
92 96 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
93 97 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -156,6 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
156 160 consumerBuilder.groupId("re-" + queueName + "-consumer");
157 161 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
158 162 consumerBuilder.admin(ruleEngineAdmin);
  163 + consumerBuilder.statsService(consumerStatsService);
159 164 return consumerBuilder.build();
160 165 }
161 166
... ... @@ -168,6 +173,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
168 173 consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
169 174 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
170 175 consumerBuilder.admin(notificationAdmin);
  176 + consumerBuilder.statsService(consumerStatsService);
171 177 return consumerBuilder.build();
172 178 }
173 179
... ... @@ -180,6 +186,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
180 186 consumerBuilder.groupId("monolith-core-consumer");
181 187 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
182 188 consumerBuilder.admin(coreAdmin);
  189 + consumerBuilder.statsService(consumerStatsService);
183 190 return consumerBuilder.build();
184 191 }
185 192
... ... @@ -192,6 +199,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
192 199 consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
193 200 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
194 201 consumerBuilder.admin(notificationAdmin);
  202 + consumerBuilder.statsService(consumerStatsService);
195 203 return consumerBuilder.build();
196 204 }
197 205
... ... @@ -204,6 +212,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
204 212 consumerBuilder.groupId("monolith-transport-api-consumer");
205 213 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
206 214 consumerBuilder.admin(transportApiAdmin);
  215 + consumerBuilder.statsService(consumerStatsService);
207 216 return consumerBuilder.build();
208 217 }
209 218
... ... @@ -237,6 +246,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
237 246 return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
238 247 }
239 248 );
  249 + responseBuilder.statsService(consumerStatsService);
240 250 responseBuilder.admin(jsExecutorAdmin);
241 251
242 252 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
... ... @@ -259,6 +269,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
259 269 consumerBuilder.groupId("monolith-us-consumer");
260 270 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
261 271 consumerBuilder.admin(coreAdmin);
  272 + consumerBuilder.statsService(consumerStatsService);
262 273 return consumerBuilder.build();
263 274 }
264 275
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
39 39 import org.thingsboard.server.queue.discovery.PartitionService;
40 40 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
41 41 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  42 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
43 44 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
44 45 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -62,6 +63,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
62 63 private final TbQueueRuleEngineSettings ruleEngineSettings;
63 64 private final TbQueueTransportApiSettings transportApiSettings;
64 65 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  66 + private final TbKafkaConsumerStatsService consumerStatsService;
65 67
66 68 private final TbQueueAdmin coreAdmin;
67 69 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -75,6 +77,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
75 77 TbQueueRuleEngineSettings ruleEngineSettings,
76 78 TbQueueTransportApiSettings transportApiSettings,
77 79 TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  80 + TbKafkaConsumerStatsService consumerStatsService,
78 81 TbKafkaTopicConfigs kafkaTopicConfigs) {
79 82 this.partitionService = partitionService;
80 83 this.kafkaSettings = kafkaSettings;
... ... @@ -83,6 +86,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
83 86 this.ruleEngineSettings = ruleEngineSettings;
84 87 this.transportApiSettings = transportApiSettings;
85 88 this.jsInvokeSettings = jsInvokeSettings;
  89 + this.consumerStatsService = consumerStatsService;
86 90
87 91 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
88 92 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -150,6 +154,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
150 154 consumerBuilder.groupId("tb-core-node");
151 155 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
152 156 consumerBuilder.admin(coreAdmin);
  157 + consumerBuilder.statsService(consumerStatsService);
153 158 return consumerBuilder.build();
154 159 }
155 160
... ... @@ -162,6 +167,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
162 167 consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId());
163 168 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
164 169 consumerBuilder.admin(notificationAdmin);
  170 + consumerBuilder.statsService(consumerStatsService);
165 171 return consumerBuilder.build();
166 172 }
167 173
... ... @@ -174,6 +180,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
174 180 consumerBuilder.groupId("tb-core-transport-api-consumer");
175 181 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
176 182 consumerBuilder.admin(transportApiAdmin);
  183 + consumerBuilder.statsService(consumerStatsService);
177 184 return consumerBuilder.build();
178 185 }
179 186
... ... @@ -208,6 +215,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
208 215 }
209 216 );
210 217 responseBuilder.admin(jsExecutorAdmin);
  218 + responseBuilder.statsService(consumerStatsService);
211 219
212 220 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
213 221 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
... ... @@ -229,6 +237,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
229 237 consumerBuilder.groupId("tb-core-us-consumer");
230 238 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
231 239 consumerBuilder.admin(coreAdmin);
  240 + consumerBuilder.statsService(consumerStatsService);
232 241 return consumerBuilder.build();
233 242 }
234 243
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
37 37 import org.thingsboard.server.queue.discovery.PartitionService;
38 38 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
39 39 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  40 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
40 41 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
41 42 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -59,6 +60,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
59 60 private final TbQueueCoreSettings coreSettings;
60 61 private final TbQueueRuleEngineSettings ruleEngineSettings;
61 62 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  63 + private final TbKafkaConsumerStatsService consumerStatsService;
62 64
63 65 private final TbQueueAdmin coreAdmin;
64 66 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -70,6 +72,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
70 72 TbQueueCoreSettings coreSettings,
71 73 TbQueueRuleEngineSettings ruleEngineSettings,
72 74 TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  75 + TbKafkaConsumerStatsService consumerStatsService,
73 76 TbKafkaTopicConfigs kafkaTopicConfigs) {
74 77 this.partitionService = partitionService;
75 78 this.kafkaSettings = kafkaSettings;
... ... @@ -77,6 +80,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
77 80 this.coreSettings = coreSettings;
78 81 this.ruleEngineSettings = ruleEngineSettings;
79 82 this.jsInvokeSettings = jsInvokeSettings;
  83 + this.consumerStatsService = consumerStatsService;
80 84
81 85 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
82 86 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -145,6 +149,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
145 149 consumerBuilder.groupId("re-" + queueName + "-consumer");
146 150 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
147 151 consumerBuilder.admin(ruleEngineAdmin);
  152 + consumerBuilder.statsService(consumerStatsService);
148 153 return consumerBuilder.build();
149 154 }
150 155
... ... @@ -157,6 +162,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
157 162 consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
158 163 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
159 164 consumerBuilder.admin(notificationAdmin);
  165 + consumerBuilder.statsService(consumerStatsService);
160 166 return consumerBuilder.build();
161 167 }
162 168
... ... @@ -181,6 +187,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
181 187 }
182 188 );
183 189 responseBuilder.admin(jsExecutorAdmin);
  190 + responseBuilder.statsService(consumerStatsService);
184 191
185 192 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
186 193 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
... ...
... ... @@ -32,6 +32,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
32 32 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 33 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
34 34 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  35 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
35 36 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
36 37 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
37 38 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -54,6 +55,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
54 55 private final TbQueueRuleEngineSettings ruleEngineSettings;
55 56 private final TbQueueTransportApiSettings transportApiSettings;
56 57 private final TbQueueTransportNotificationSettings transportNotificationSettings;
  58 + private final TbKafkaConsumerStatsService consumerStatsService;
57 59
58 60 private final TbQueueAdmin coreAdmin;
59 61 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -66,6 +68,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
66 68 TbQueueRuleEngineSettings ruleEngineSettings,
67 69 TbQueueTransportApiSettings transportApiSettings,
68 70 TbQueueTransportNotificationSettings transportNotificationSettings,
  71 + TbKafkaConsumerStatsService consumerStatsService,
69 72 TbKafkaTopicConfigs kafkaTopicConfigs) {
70 73 this.kafkaSettings = kafkaSettings;
71 74 this.serviceInfoProvider = serviceInfoProvider;
... ... @@ -73,6 +76,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
73 76 this.ruleEngineSettings = ruleEngineSettings;
74 77 this.transportApiSettings = transportApiSettings;
75 78 this.transportNotificationSettings = transportNotificationSettings;
  79 + this.consumerStatsService = consumerStatsService;
76 80
77 81 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
78 82 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -95,6 +99,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
95 99 responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
96 100 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
97 101 responseBuilder.admin(transportApiAdmin);
  102 + responseBuilder.statsService(consumerStatsService);
98 103
99 104 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
100 105 <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
... ... @@ -136,6 +141,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
136 141 responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
137 142 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
138 143 responseBuilder.admin(notificationAdmin);
  144 + responseBuilder.statsService(consumerStatsService);
139 145 return responseBuilder.build();
140 146 }
141 147
... ...
... ... @@ -51,6 +51,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCre
51 51 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
52 52 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
53 53
  54 +import java.math.BigDecimal;
54 55 import java.util.ArrayList;
55 56 import java.util.HashMap;
56 57 import java.util.HashSet;
... ... @@ -244,12 +245,25 @@ public class JsonConverter {
244 245 }
245 246
246 247 private static void parseNumericValue(List<KvEntry> result, Entry<String, JsonElement> valueEntry, JsonPrimitive value) {
247   - if (value.getAsString().contains(".")) {
248   - result.add(new DoubleDataEntry(valueEntry.getKey(), value.getAsDouble()));
  248 + String valueAsString = value.getAsString();
  249 + String key = valueEntry.getKey();
  250 + if (valueAsString.contains("e") || valueAsString.contains("E")) {
  251 + var bd = new BigDecimal(valueAsString);
  252 + if (bd.stripTrailingZeros().scale() <= 0) {
  253 + try {
  254 + result.add(new LongDataEntry(key, bd.longValueExact()));
  255 + } catch (ArithmeticException e) {
  256 + result.add(new DoubleDataEntry(key, bd.doubleValue()));
  257 + }
  258 + } else {
  259 + result.add(new DoubleDataEntry(key, bd.doubleValue()));
  260 + }
  261 + } else if (valueAsString.contains(".")) {
  262 + result.add(new DoubleDataEntry(key, value.getAsDouble()));
249 263 } else {
250 264 try {
251 265 long longValue = Long.parseLong(value.getAsString());
252   - result.add(new LongDataEntry(valueEntry.getKey(), longValue));
  266 + result.add(new LongDataEntry(key, longValue));
253 267 } catch (NumberFormatException e) {
254 268 throw new JsonSyntaxException("Big integer values are not supported!");
255 269 }
... ... @@ -284,7 +298,8 @@ public class JsonConverter {
284 298 return result;
285 299 }
286 300
287   - public static JsonObject getJsonObjectForGateway(String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) {
  301 + public static JsonObject getJsonObjectForGateway(String deviceName, TransportProtos.GetAttributeResponseMsg
  302 + responseMsg) {
288 303 JsonObject result = new JsonObject();
289 304 result.addProperty("id", responseMsg.getRequestId());
290 305 result.addProperty(DEVICE_PROPERTY, deviceName);
... ... @@ -297,7 +312,8 @@ public class JsonConverter {
297 312 return result;
298 313 }
299 314
300   - public static JsonObject getJsonObjectForGateway(String deviceName, AttributeUpdateNotificationMsg notificationMsg) {
  315 + public static JsonObject getJsonObjectForGateway(String deviceName, AttributeUpdateNotificationMsg
  316 + notificationMsg) {
301 317 JsonObject result = new JsonObject();
302 318 result.addProperty(DEVICE_PROPERTY, deviceName);
303 319 result.add("data", toJson(notificationMsg));
... ... @@ -447,7 +463,8 @@ public class JsonConverter {
447 463 return result;
448 464 }
449 465
450   - public static JsonElement toGatewayJson(String deviceName, TransportProtos.ProvisionDeviceResponseMsg responseRequest) {
  466 + public static JsonElement toGatewayJson(String deviceName, TransportProtos.ProvisionDeviceResponseMsg
  467 + responseRequest) {
451 468 JsonObject result = new JsonObject();
452 469 result.addProperty(DEVICE_PROPERTY, deviceName);
453 470 result.add("data", JsonConverter.toJson(responseRequest));
... ... @@ -497,15 +514,18 @@ public class JsonConverter {
497 514 return result;
498 515 }
499 516
500   - public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException {
  517 + public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs) throws
  518 + JsonSyntaxException {
501 519 return convertToTelemetry(jsonElement, systemTs, false);
502 520 }
503 521
504   - public static Map<Long, List<KvEntry>> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException {
  522 + public static Map<Long, List<KvEntry>> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws
  523 + JsonSyntaxException {
505 524 return convertToTelemetry(jsonElement, systemTs, true);
506 525 }
507 526
508   - public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws JsonSyntaxException {
  527 + public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws
  528 + JsonSyntaxException {
509 529 Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>();
510 530 convertToTelemetry(jsonElement, systemTs, result, null);
511 531 return result;
... ... @@ -575,7 +595,8 @@ public class JsonConverter {
575 595 .build();
576 596 }
577 597
578   - private static TransportProtos.ProvisionDeviceCredentialsMsg buildProvisionDeviceCredentialsMsg(String provisionKey, String provisionSecret) {
  598 + private static TransportProtos.ProvisionDeviceCredentialsMsg buildProvisionDeviceCredentialsMsg(String
  599 + provisionKey, String provisionSecret) {
579 600 return TransportProtos.ProvisionDeviceCredentialsMsg.newBuilder()
580 601 .setProvisionDeviceKey(provisionKey)
581 602 .setProvisionDeviceSecret(provisionSecret)
... ...
... ... @@ -784,8 +784,8 @@ public class DefaultTransportService implements TransportService {
784 784 wrappedCallback);
785 785 }
786 786
787   - protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
788   - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
  787 + private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
  788 + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());
789 789 if (log.isTraceEnabled()) {
790 790 log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
791 791 }
... ... @@ -797,7 +797,7 @@ public class DefaultTransportService implements TransportService {
797 797 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
798 798 }
799 799
800   - protected void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,
  800 + private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,
801 801 TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {
802 802 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
803 803 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +import com.google.gson.JsonParser;
  18 +import org.junit.Assert;
  19 +import org.junit.Test;
  20 +import org.junit.runner.RunWith;
  21 +import org.mockito.junit.MockitoJUnitRunner;
  22 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  23 +
  24 +@RunWith(MockitoJUnitRunner.class)
  25 +public class JsonConverterTest {
  26 +
  27 + private static final JsonParser JSON_PARSER = new JsonParser();
  28 +
  29 + @Test
  30 + public void testParseBigDecimalAsLong() {
  31 + var result = JsonConverter.convertToTelemetry(JSON_PARSER.parse("{\"meterReadingDelta\": 1E+1}"), 0L);
  32 + Assert.assertEquals(10L, result.get(0L).get(0).getLongValue().get().longValue());
  33 + }
  34 +
  35 + @Test
  36 + public void testParseBigDecimalAsDouble() {
  37 + var result = JsonConverter.convertToTelemetry(JSON_PARSER.parse("{\"meterReadingDelta\": 101E-1}"), 0L);
  38 + Assert.assertEquals(10.1, result.get(0L).get(0).getDoubleValue().get(), 0.0);
  39 + }
  40 +
  41 + @Test
  42 + public void testParseAsDouble() {
  43 + var result = JsonConverter.convertToTelemetry(JSON_PARSER.parse("{\"meterReadingDelta\": 1.1}"), 0L);
  44 + Assert.assertEquals(1.1, result.get(0L).get(0).getDoubleValue().get(), 0.0);
  45 + }
  46 +
  47 + @Test
  48 + public void testParseAsLong() {
  49 + var result = JsonConverter.convertToTelemetry(JSON_PARSER.parse("{\"meterReadingDelta\": 11}"), 0L);
  50 + Assert.assertEquals(11L, result.get(0L).get(0).getLongValue().get().longValue());
  51 + }
  52 +
  53 +}
... ...
... ... @@ -76,7 +76,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
76 76 if (!msg.getMetaData().getData().isEmpty()) {
77 77 long now = System.currentTimeMillis();
78 78 String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
79   - DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope");
  79 + DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue(DataConstants.SCOPE);
80 80
81 81 ListenableFuture<List<EntityView>> entityViewsFuture =
82 82 ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
... ...
... ... @@ -104,9 +104,10 @@ public class TbMsgGeneratorNode implements TbNode {
104 104 }
105 105 },
106 106 t -> {
107   - if (initialized) {
  107 + if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
108 108 ctx.tellFailure(msg, t);
109 109 scheduleTickMsg(ctx);
  110 + currentMsgCount++;
110 111 }
111 112 });
112 113 }
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.EntityId;
33 33 import org.thingsboard.server.common.data.kv.TsKvEntry;
34 34 import org.thingsboard.server.common.data.plugin.ComponentType;
35 35 import org.thingsboard.server.common.msg.TbMsg;
  36 +import org.thingsboard.server.common.msg.session.SessionMsgType;
36 37 import org.thingsboard.server.dao.timeseries.TimeseriesService;
37 38 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
38 39
... ... @@ -44,11 +45,13 @@ import java.util.concurrent.ConcurrentHashMap;
44 45
45 46 @Slf4j
46 47 @RuleNode(type = ComponentType.ENRICHMENT,
47   - name = "calculate delta",
  48 + name = "calculate delta", relationTypes = {"Success", "Failure", "Other"},
48 49 configClazz = CalculateDeltaNodeConfiguration.class,
49 50 nodeDescription = "Calculates and adds 'delta' value into message based on the incoming and previous value",
50 51 nodeDetails = "Calculates delta and period based on the previous time-series reading and current data. " +
51   - "Delta calculation is done in scope of the message originator, e.g. device, asset or customer.",
  52 + "Delta calculation is done in scope of the message originator, e.g. device, asset or customer. " +
  53 + "If there is input key, the output relation will be 'Success' unless delta is negative and corresponding configuration parameter is set. " +
  54 + "If there is no input value key in the incoming message, the output relation will be 'Other'.",
52 55 uiResources = {"static/rulenode/rulenode-core-config.js"},
53 56 configDirective = "tbEnrichmentNodeCalculateDeltaConfig")
54 57 public class CalculateDeltaNode implements TbNode {
... ... @@ -72,43 +75,50 @@ public class CalculateDeltaNode implements TbNode {
72 75
73 76 @Override
74 77 public void onMsg(TbContext ctx, TbMsg msg) {
75   - JsonNode json = JacksonUtil.toJsonNode(msg.getData());
76   - String inputKey = config.getInputValueKey();
77   - if (json.has(inputKey)) {
78   - DonAsynchron.withCallback(getLastValue(msg.getOriginator()),
79   - previousData -> {
80   - double currentValue = json.get(inputKey).asDouble();
81   - long currentTs = TbMsgTimeseriesNode.getTs(msg);
82   -
83   - if (useCache) {
84   - cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue));
85   - }
86   -
87   - BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
88   -
89   - if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
90   - ctx.tellNext(msg, TbRelationTypes.FAILURE);
91   - return;
92   - }
93   -
94   - if (config.getRound() != null) {
95   - delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
96   - }
97   -
98   - ObjectNode result = (ObjectNode) json;
99   - result.put(config.getOutputValueKey(), delta);
100   -
101   - if (config.isAddPeriodBetweenMsgs()) {
102   - long period = previousData != null ? currentTs - previousData.ts : 0;
103   - result.put(config.getPeriodValueKey(), period);
104   - }
105   - ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
106   - },
107   - t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
108   - } else if (config.isTellFailureIfInputValueKeyIsAbsent()) {
109   - ctx.tellNext(msg, TbRelationTypes.FAILURE);
  78 + if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
  79 + JsonNode json = JacksonUtil.toJsonNode(msg.getData());
  80 + String inputKey = config.getInputValueKey();
  81 + if (json.has(inputKey)) {
  82 + DonAsynchron.withCallback(getLastValue(msg.getOriginator()),
  83 + previousData -> {
  84 + double currentValue = json.get(inputKey).asDouble();
  85 + long currentTs = TbMsgTimeseriesNode.getTs(msg);
  86 +
  87 + if (useCache) {
  88 + cache.put(msg.getOriginator(), new ValueWithTs(currentTs, currentValue));
  89 + }
  90 +
  91 + BigDecimal delta = BigDecimal.valueOf(previousData != null ? currentValue - previousData.value : 0.0);
  92 +
  93 + if (config.isTellFailureIfDeltaIsNegative() && delta.doubleValue() < 0) {
  94 + ctx.tellNext(msg, TbRelationTypes.FAILURE);
  95 + return;
  96 + }
  97 +
  98 +
  99 + if (config.getRound() != null) {
  100 + delta = delta.setScale(config.getRound(), RoundingMode.HALF_UP);
  101 + }
  102 +
  103 + ObjectNode result = (ObjectNode) json;
  104 + if (delta.stripTrailingZeros().scale() > 0) {
  105 + result.put(config.getOutputValueKey(), delta.doubleValue());
  106 + } else {
  107 + result.put(config.getOutputValueKey(), delta.longValueExact());
  108 + }
  109 +
  110 + if (config.isAddPeriodBetweenMsgs()) {
  111 + long period = previousData != null ? currentTs - previousData.ts : 0;
  112 + result.put(config.getPeriodValueKey(), period);
  113 + }
  114 + ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(result)));
  115 + },
  116 + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
  117 + } else {
  118 + ctx.tellNext(msg, "Other");
  119 + }
110 120 } else {
111   - ctx.tellSuccess(msg);
  121 + ctx.tellNext(msg, "Other");
112 122 }
113 123 }
114 124
... ...
... ... @@ -15,10 +15,12 @@
15 15 */
16 16 package org.thingsboard.rule.engine.metadata;
17 17
  18 +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
18 19 import lombok.Data;
19 20 import org.thingsboard.rule.engine.api.NodeConfiguration;
20 21
21 22 @Data
  23 +@JsonIgnoreProperties(ignoreUnknown = true)
22 24 public class CalculateDeltaNodeConfiguration implements NodeConfiguration<CalculateDeltaNodeConfiguration> {
23 25 private String inputValueKey;
24 26 private String outputValueKey;
... ... @@ -26,7 +28,6 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
26 28 private boolean addPeriodBetweenMsgs;
27 29 private String periodValueKey;
28 30 private Integer round;
29   - private boolean tellFailureIfInputValueKeyIsAbsent;
30 31 private boolean tellFailureIfDeltaIsNegative;
31 32
32 33 @Override
... ... @@ -37,7 +38,6 @@ public class CalculateDeltaNodeConfiguration implements NodeConfiguration<Calcul
37 38 configuration.setUseCache(true);
38 39 configuration.setAddPeriodBetweenMsgs(false);
39 40 configuration.setPeriodValueKey("periodInMs");
40   - configuration.setTellFailureIfInputValueKeyIsAbsent(true);
41 41 configuration.setTellFailureIfDeltaIsNegative(true);
42 42 return configuration;
43 43 }
... ...
... ... @@ -388,12 +388,6 @@ class AlarmRuleState {
388 388 EntityKeyValue ekv = null;
389 389 if (value.getDynamicValue() != null) {
390 390 switch (value.getDynamicValue().getSourceType()) {
391   - case CURRENT_TENANT:
392   - ekv = dynamicPredicateValueCtx.getTenantValue(value.getDynamicValue().getSourceAttribute());
393   - break;
394   - case CURRENT_CUSTOMER:
395   - ekv = dynamicPredicateValueCtx.getCustomerValue(value.getDynamicValue().getSourceAttribute());
396   - break;
397 391 case CURRENT_DEVICE:
398 392 ekv = data.getValue(new EntityKey(EntityKeyType.ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
399 393 if (ekv == null) {
... ... @@ -405,6 +399,16 @@ class AlarmRuleState {
405 399 }
406 400 }
407 401 }
  402 + if(ekv != null || !value.getDynamicValue().isInherit()) {
  403 + break;
  404 + }
  405 + case CURRENT_CUSTOMER:
  406 + ekv = dynamicPredicateValueCtx.getCustomerValue(value.getDynamicValue().getSourceAttribute());
  407 + if(ekv != null || !value.getDynamicValue().isInherit()) {
  408 + break;
  409 + }
  410 + case CURRENT_TENANT:
  411 + ekv = dynamicPredicateValueCtx.getTenantValue(value.getDynamicValue().getSourceAttribute());
408 412 }
409 413 }
410 414 return ekv;
... ...
... ... @@ -138,6 +138,8 @@ class DeviceState {
138 138 stateChanged = processTelemetry(ctx, msg);
139 139 } else if (msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
140 140 stateChanged = processAttributesUpdateRequest(ctx, msg);
  141 + } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT) || msg.getType().equals(DataConstants.INACTIVITY_EVENT)) {
  142 + stateChanged = processDeviceActivityEvent(ctx, msg);
141 143 } else if (msg.getType().equals(DataConstants.ATTRIBUTES_UPDATED)) {
142 144 stateChanged = processAttributesUpdateNotification(ctx, msg);
143 145 } else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
... ... @@ -158,6 +160,15 @@ class DeviceState {
158 160 }
159 161 }
160 162
  163 + private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
  164 + String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
  165 + if (StringUtils.isEmpty(scope)) {
  166 + return processTelemetry(ctx, msg);
  167 + } else {
  168 + return processAttributes(ctx, msg, scope);
  169 + }
  170 + }
  171 +
161 172 private boolean processAlarmClearNotification(TbContext ctx, TbMsg msg) {
162 173 boolean stateChanged = false;
163 174 Alarm alarmNf = JacksonUtil.fromString(msg.getData(), Alarm.class);
... ... @@ -181,19 +192,18 @@ class DeviceState {
181 192 }
182 193
183 194 private boolean processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
184   - Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
185   - String scope = msg.getMetaData().getValue("scope");
  195 + String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
186 196 if (StringUtils.isEmpty(scope)) {
187 197 scope = DataConstants.CLIENT_SCOPE;
188 198 }
189   - return processAttributesUpdate(ctx, msg, attributes, scope);
  199 + return processAttributes(ctx, msg, scope);
190 200 }
191 201
192 202 private boolean processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
193 203 boolean stateChanged = false;
194 204 List<String> keys = new ArrayList<>();
195 205 new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString()));
196   - String scope = msg.getMetaData().getValue("scope");
  206 + String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
197 207 if (StringUtils.isEmpty(scope)) {
198 208 scope = DataConstants.CLIENT_SCOPE;
199 209 }
... ... @@ -211,12 +221,12 @@ class DeviceState {
211 221 }
212 222
213 223 protected boolean processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
214   - Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
215   - return processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE);
  224 + return processAttributes(ctx, msg, DataConstants.CLIENT_SCOPE);
216 225 }
217 226
218   - private boolean processAttributesUpdate(TbContext ctx, TbMsg msg, Set<AttributeKvEntry> attributes, String scope) throws ExecutionException, InterruptedException {
  227 + private boolean processAttributes(TbContext ctx, TbMsg msg, String scope) throws ExecutionException, InterruptedException {
219 228 boolean stateChanged = false;
  229 + Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
220 230 if (!attributes.isEmpty()) {
221 231 SnapshotUpdate update = merge(latestValues, attributes, scope);
222 232 for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
... ...
... ... @@ -134,7 +134,8 @@ public class TbDeviceProfileNode implements TbNode {
134 134 if (deviceState != null) {
135 135 deviceState.process(ctx, msg);
136 136 } else {
137   - ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
  137 + log.info("Device was not found! Most probably device [" + deviceId + "] has been removed from the database. Acknowledging msg.");
  138 + ctx.ack(msg);
138 139 }
139 140 }
140 141 } else {
... ...
... ... @@ -434,11 +434,152 @@ public class TbDeviceProfileNodeTest {
434 434 verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
435 435 }
436 436
437   - private void init() throws TbNodeException {
  437 + @Test
  438 + public void testTenantInheritModeForDynamicValues() throws Exception {
  439 + init();
  440 +
  441 + DeviceProfile deviceProfile = new DeviceProfile();
  442 + DeviceProfileData deviceProfileData = new DeviceProfileData();
  443 +
  444 + AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey(
  445 + EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "tenantAttribute"
  446 + );
  447 +
  448 + AttributeKvEntity attributeKvEntity = new AttributeKvEntity();
  449 + attributeKvEntity.setId(compositeKey);
  450 + attributeKvEntity.setLongValue(100L);
  451 + attributeKvEntity.setLastUpdateTs(0L);
  452 +
  453 + AttributeKvEntry entry = attributeKvEntity.toData();
  454 + ListenableFuture<List<AttributeKvEntry>> listListenableFutureWithLess =
  455 + Futures.immediateFuture(Collections.singletonList(entry));
  456 +
  457 + KeyFilter lowTempFilter = new KeyFilter();
  458 + lowTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
  459 + lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
  460 + NumericFilterPredicate lowTempPredicate = new NumericFilterPredicate();
  461 + lowTempPredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
  462 + lowTempPredicate.setValue(
  463 + new FilterPredicateValue<>(
  464 + 0.0,
  465 + null,
  466 + new DynamicValue<>(DynamicValueSourceType.CURRENT_DEVICE, "tenantAttribute", true))
  467 + );
  468 + lowTempFilter.setPredicate(lowTempPredicate);
  469 + AlarmCondition alarmCondition = new AlarmCondition();
  470 + alarmCondition.setCondition(Collections.singletonList(lowTempFilter));
  471 + AlarmRule alarmRule = new AlarmRule();
  472 + alarmRule.setCondition(alarmCondition);
  473 + DeviceProfileAlarm dpa = new DeviceProfileAlarm();
  474 + dpa.setId("lesstempID");
  475 + dpa.setAlarmType("lessTemperatureAlarm");
  476 + dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
  477 +
  478 + deviceProfileData.setAlarms(Collections.singletonList(dpa));
  479 + deviceProfile.setProfileData(deviceProfileData);
438 480
439   - UUID uuid = new UUID(6041557255264276971L, -9019477126543226049L);
440   - System.out.println(uuid);
  481 + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
  482 + Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
  483 + .thenReturn(Futures.immediateFuture(Collections.emptyList()));
  484 + Mockito.when(alarmService.findLatestByOriginatorAndType(tenantId, deviceId, "lessTemperatureAlarm"))
  485 + .thenReturn(Futures.immediateFuture(null));
  486 + Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any()))
  487 + .thenAnswer(AdditionalAnswers.returnsFirstArg());
  488 + Mockito.when(ctx.getAttributesService()).thenReturn(attributesService);
  489 + Mockito.when(attributesService.find(eq(tenantId), eq(deviceId), Mockito.anyString(), Mockito.anySet()))
  490 + .thenReturn(listListenableFutureWithLess);
  491 +
  492 + TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "");
  493 + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString()))
  494 + .thenReturn(theMsg);
  495 +
  496 + ObjectNode data = mapper.createObjectNode();
  497 + data.put("temperature", 150L);
  498 + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
  499 + TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
  500 +
  501 + node.onMsg(ctx, msg);
  502 + verify(ctx).tellSuccess(msg);
  503 + verify(ctx).tellNext(theMsg, "Alarm Created");
  504 + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
  505 +
  506 + }
  507 +
  508 + @Test
  509 + public void testCustomerInheritModeForDynamicValues() throws Exception {
  510 + init();
441 511
  512 + DeviceProfile deviceProfile = new DeviceProfile();
  513 + DeviceProfileData deviceProfileData = new DeviceProfileData();
  514 +
  515 + AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey(
  516 + EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "customerAttribute"
  517 + );
  518 +
  519 + AttributeKvEntity attributeKvEntity = new AttributeKvEntity();
  520 + attributeKvEntity.setId(compositeKey);
  521 + attributeKvEntity.setLongValue(100L);
  522 + attributeKvEntity.setLastUpdateTs(0L);
  523 +
  524 + AttributeKvEntry entry = attributeKvEntity.toData();
  525 + ListenableFuture<List<AttributeKvEntry>> listListenableFutureWithLess =
  526 + Futures.immediateFuture(Collections.singletonList(entry));
  527 + ListenableFuture<Optional<AttributeKvEntry>> optionalListenableFutureWithLess =
  528 + Futures.immediateFuture(Optional.of(entry));
  529 +
  530 + KeyFilter lowTempFilter = new KeyFilter();
  531 + lowTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
  532 + lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
  533 + NumericFilterPredicate lowTempPredicate = new NumericFilterPredicate();
  534 + lowTempPredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
  535 + lowTempPredicate.setValue(
  536 + new FilterPredicateValue<>(
  537 + 0.0,
  538 + null,
  539 + new DynamicValue<>(DynamicValueSourceType.CURRENT_CUSTOMER, "customerAttribute", true))
  540 + );
  541 + lowTempFilter.setPredicate(lowTempPredicate);
  542 + AlarmCondition alarmCondition = new AlarmCondition();
  543 + alarmCondition.setCondition(Collections.singletonList(lowTempFilter));
  544 + AlarmRule alarmRule = new AlarmRule();
  545 + alarmRule.setCondition(alarmCondition);
  546 + DeviceProfileAlarm dpa = new DeviceProfileAlarm();
  547 + dpa.setId("lesstempID");
  548 + dpa.setAlarmType("lessTemperatureAlarm");
  549 + dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
  550 +
  551 + deviceProfileData.setAlarms(Collections.singletonList(dpa));
  552 + deviceProfile.setProfileData(deviceProfileData);
  553 +
  554 + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
  555 + Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
  556 + .thenReturn(Futures.immediateFuture(Collections.emptyList()));
  557 + Mockito.when(alarmService.findLatestByOriginatorAndType(tenantId, deviceId, "lessTemperatureAlarm"))
  558 + .thenReturn(Futures.immediateFuture(null));
  559 + Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any()))
  560 + .thenAnswer(AdditionalAnswers.returnsFirstArg());
  561 + Mockito.when(ctx.getAttributesService()).thenReturn(attributesService);
  562 + Mockito.when(attributesService.find(eq(tenantId), eq(deviceId), Mockito.anyString(), Mockito.anySet()))
  563 + .thenReturn(listListenableFutureWithLess);
  564 + Mockito.when(attributesService.find(eq(tenantId), eq(tenantId), eq(DataConstants.SERVER_SCOPE), Mockito.anyString()))
  565 + .thenReturn(optionalListenableFutureWithLess);
  566 +
  567 + TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "");
  568 + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString()))
  569 + .thenReturn(theMsg);
  570 +
  571 + ObjectNode data = mapper.createObjectNode();
  572 + data.put("temperature", 150L);
  573 + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
  574 + TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
  575 +
  576 + node.onMsg(ctx, msg);
  577 + verify(ctx).tellSuccess(msg);
  578 + verify(ctx).tellNext(theMsg, "Alarm Created");
  579 + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
  580 + }
  581 +
  582 + private void init() throws TbNodeException {
442 583 Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
443 584 Mockito.when(ctx.getDeviceProfileCache()).thenReturn(cache);
444 585 Mockito.when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
... ...
... ... @@ -43,6 +43,7 @@ import {
43 43 AlarmDetailsDialogComponent,
44 44 AlarmDetailsDialogData
45 45 } from '@home/components/alarm/alarm-details-dialog.component';
  46 +import { DAY, historyInterval } from '@shared/models/time/time.models';
46 47
47 48 export class AlarmTableConfig extends EntityTableConfig<AlarmInfo, TimePageLink> {
48 49
... ... @@ -59,6 +60,7 @@ export class AlarmTableConfig extends EntityTableConfig<AlarmInfo, TimePageLink>
59 60 this.loadDataOnInit = false;
60 61 this.tableTitle = '';
61 62 this.useTimePageLink = true;
  63 + this.defaultTimewindowInterval = historyInterval(DAY * 30);
62 64 this.detailsPanelEnabled = false;
63 65 this.selectionEnabled = false;
64 66 this.searchEnabled = true;
... ...
... ... @@ -55,11 +55,11 @@ import { EntityTypeTranslation } from '@shared/models/entity-type.models';
55 55 import { DialogService } from '@core/services/dialog.service';
56 56 import { AddEntityDialogComponent } from './add-entity-dialog.component';
57 57 import { AddEntityDialogData, EntityAction } from '@home/models/entity/entity-component.models';
58   -import { DAY, historyInterval, HistoryWindowType, Timewindow } from '@shared/models/time/time.models';
  58 +import { HistoryWindowType, Timewindow } from '@shared/models/time/time.models';
59 59 import { DomSanitizer, SafeHtml } from '@angular/platform-browser';
60 60 import { TbAnchorComponent } from '@shared/components/tb-anchor.component';
61 61 import { isDefined, isUndefined } from '@core/utils';
62   -import { HasUUID } from '../../../../shared/models/id/has-uuid';
  62 +import { HasUUID } from '@shared/models/id/has-uuid';
63 63
64 64 @Component({
65 65 selector: 'tb-entities-table',
... ... @@ -202,7 +202,7 @@ export class EntitiesTableComponent extends PageComponent implements AfterViewIn
202 202 this.pageSizeOptions = [this.defaultPageSize, this.defaultPageSize * 2, this.defaultPageSize * 3];
203 203
204 204 if (this.entitiesTableConfig.useTimePageLink) {
205   - this.timewindow = historyInterval(DAY);
  205 + this.timewindow = this.entitiesTableConfig.defaultTimewindowInterval;
206 206 const currentTime = Date.now();
207 207 this.pageLink = new TimePageLink(10, 0, null, sortOrder,
208 208 currentTime - this.timewindow.history.timewindowMs, currentTime);
... ... @@ -446,7 +446,7 @@ export class EntitiesTableComponent extends PageComponent implements AfterViewIn
446 446 resetSortAndFilter(update: boolean = true, preserveTimewindow: boolean = false) {
447 447 this.pageLink.textSearch = null;
448 448 if (this.entitiesTableConfig.useTimePageLink && !preserveTimewindow) {
449   - this.timewindow = historyInterval(DAY);
  449 + this.timewindow = this.entitiesTableConfig.defaultTimewindowInterval;
450 450 }
451 451 if (this.displayPagination) {
452 452 this.paginator.pageIndex = 0;
... ...
... ... @@ -16,7 +16,7 @@
16 16
17 17 -->
18 18 <div fxFlex fxLayout="row" fxLayoutAlign="start start" fxLayoutGap="8px" [formGroup]="booleanFilterPredicateFormGroup">
19   - <mat-form-field floatLabel="always" hideRequiredMarker fxFlex="40" class="mat-block">
  19 + <mat-form-field floatLabel="always" hideRequiredMarker fxFlex="30" class="mat-block">
20 20 <mat-label></mat-label>
21 21 <mat-select required formControlName="operation" placeholder="{{'filter.operation.operation' | translate}}">
22 22 <mat-option *ngFor="let operation of booleanOperations" [value]="operation">
... ... @@ -25,7 +25,7 @@
25 25 </mat-select>
26 26 </mat-form-field>
27 27 <tb-filter-predicate-value [allowUserDynamicSource]="allowUserDynamicSource"
28   - fxFlex="60"
  28 + fxFlex="70"
29 29 [valueType]="valueTypeEnum.BOOLEAN"
30 30 formControlName="value">
31 31 </tb-filter-predicate-value>
... ...
... ... @@ -26,12 +26,12 @@
26 26 <span fxFlex="8"></span>
27 27 <div fxLayout="row" fxLayoutAlign="start center" fxLayoutGap="8px" fxFlex="92">
28 28 <div fxFlex fxLayout="row" fxLayoutGap="8px">
29   - <div fxFlex="40" fxLayout="row" fxLayoutAlign="start center" fxLayoutGap="8px">
  29 + <div fxFlex="30" fxLayout="row" fxLayoutAlign="start center" fxLayoutGap="8px">
30 30 <label fxFlex translate class="tb-title no-padding">filter.operation.operation</label>
31 31 <label *ngIf="valueType === valueTypeEnum.STRING"
32 32 translate class="tb-title no-padding" style="min-width: 70px;">filter.ignore-case</label>
33 33 </div>
34   - <label fxFlex="60" translate class="tb-title no-padding">filter.value</label>
  34 + <label fxFlex="70" translate class="tb-title no-padding">filter.value</label>
35 35 </div>
36 36 <label *ngIf="displayUserParameters"
37 37 translate class="tb-title no-padding" style="width: 60px;">filter.user-parameters</label>
... ...
... ... @@ -47,7 +47,7 @@
47 47 </div>
48 48 <div fxFlex fxLayout="column" [fxShow]="dynamicMode">
49 49 <div formGroupName="dynamicValue" fxLayout="row" fxLayoutAlign="start center" fxLayoutGap="8px">
50   - <div fxFlex fxLayout="column">
  50 + <div fxFlex="35" fxLayout="column">
51 51 <mat-form-field floatLabel="always" hideRequiredMarker class="mat-block">
52 52 <mat-label></mat-label>
53 53 <mat-select formControlName="sourceType" placeholder="{{'filter.dynamic-source-type' | translate}}">
... ... @@ -68,6 +68,14 @@
68 68 </mat-form-field>
69 69 <div class="tb-hint" translate>filter.source-attribute</div>
70 70 </div>
  71 + <div *ngIf="!allow && inheritMode"
  72 + fxLayout="column"
  73 + style="padding-top: 6px">
  74 + <mat-checkbox formControlName="inherit">
  75 + {{ 'filter.inherit-owner' | translate}}
  76 + </mat-checkbox>
  77 + <div class="tb-hint" translate>filter.source-attribute-not-set</div>
  78 + </div>
71 79 </div>
72 80 </div>
73 81 <button mat-icon-button
... ...
... ... @@ -44,6 +44,10 @@ import {
44 44 })
45 45 export class FilterPredicateValueComponent implements ControlValueAccessor, OnInit {
46 46
  47 + private readonly inheritModeForSources: DynamicValueSourceType[] = [
  48 + DynamicValueSourceType.CURRENT_CUSTOMER,
  49 + DynamicValueSourceType.CURRENT_DEVICE];
  50 +
47 51 @Input() disabled: boolean;
48 52
49 53 @Input()
... ... @@ -72,6 +76,8 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
72 76
73 77 dynamicMode = false;
74 78
  79 + inheritMode = false;
  80 +
75 81 allow = true;
76 82
77 83 private propagateChange = null;
... ... @@ -105,7 +111,8 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
105 111 dynamicValue: this.fb.group(
106 112 {
107 113 sourceType: [null],
108   - sourceAttribute: [null]
  114 + sourceAttribute: [null],
  115 + inherit: [false]
109 116 }
110 117 )
111 118 });
... ... @@ -114,6 +121,7 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
114 121 if (!sourceType) {
115 122 this.filterPredicateValueFormGroup.get('dynamicValue').get('sourceAttribute').patchValue(null, {emitEvent: false});
116 123 }
  124 + this.updateShowInheritMode(sourceType);
117 125 }
118 126 );
119 127 this.filterPredicateValueFormGroup.valueChanges.subscribe(() => {
... ... @@ -139,10 +147,13 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
139 147
140 148 writeValue(predicateValue: FilterPredicateValue<string | number | boolean>): void {
141 149 this.filterPredicateValueFormGroup.get('defaultValue').patchValue(predicateValue.defaultValue, {emitEvent: false});
142   - this.filterPredicateValueFormGroup.get('dynamicValue').get('sourceType').patchValue(predicateValue.dynamicValue ?
  150 + this.filterPredicateValueFormGroup.get('dynamicValue.sourceType').patchValue(predicateValue.dynamicValue ?
143 151 predicateValue.dynamicValue.sourceType : null, {emitEvent: false});
144   - this.filterPredicateValueFormGroup.get('dynamicValue').get('sourceAttribute').patchValue(predicateValue.dynamicValue ?
  152 + this.filterPredicateValueFormGroup.get('dynamicValue.sourceAttribute').patchValue(predicateValue.dynamicValue ?
145 153 predicateValue.dynamicValue.sourceAttribute : null, {emitEvent: false});
  154 + this.filterPredicateValueFormGroup.get('dynamicValue.inherit').patchValue(predicateValue.dynamicValue ?
  155 + predicateValue.dynamicValue.inherit : false, {emitEvent: false});
  156 + this.updateShowInheritMode(predicateValue?.dynamicValue?.sourceType);
146 157 }
147 158
148 159 private updateModel() {
... ... @@ -158,4 +169,12 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
158 169 this.propagateChange(predicateValue);
159 170 }
160 171
  172 + private updateShowInheritMode(sourceType: DynamicValueSourceType) {
  173 + if (this.inheritModeForSources.includes(sourceType)) {
  174 + this.inheritMode = true;
  175 + } else {
  176 + this.filterPredicateValueFormGroup.get('dynamicValue.inherit').patchValue(false, {emitEvent: false});
  177 + this.inheritMode = false;
  178 + }
  179 + }
161 180 }
... ...
... ... @@ -16,7 +16,7 @@
16 16
17 17 -->
18 18 <div fxFlex fxLayout="row" fxLayoutAlign="start start" fxLayoutGap="8px" [formGroup]="numericFilterPredicateFormGroup">
19   - <mat-form-field floatLabel="always" hideRequiredMarker fxFlex="40" class="mat-block">
  19 + <mat-form-field floatLabel="always" hideRequiredMarker fxFlex="30" class="mat-block">
20 20 <mat-label></mat-label>
21 21 <mat-select required formControlName="operation" placeholder="{{'filter.operation.operation' | translate}}">
22 22 <mat-option *ngFor="let operation of numericOperations" [value]="operation">
... ... @@ -25,7 +25,7 @@
25 25 </mat-select>
26 26 </mat-form-field>
27 27 <tb-filter-predicate-value [allowUserDynamicSource]="allowUserDynamicSource"
28   - fxFlex="60"
  28 + fxFlex="70"
29 29 [valueType]="valueType"
30 30 formControlName="value">
31 31 </tb-filter-predicate-value>
... ...
... ... @@ -16,7 +16,7 @@
16 16
17 17 -->
18 18 <div fxFlex fxLayout="row" fxLayoutAlign="start start" fxLayoutGap="8px" [formGroup]="stringFilterPredicateFormGroup">
19   - <div fxFlex="40" fxLayout="row" fxLayoutAlign="start center" fxLayoutGap="8px">
  19 + <div fxFlex="30" fxLayout="row" fxLayoutAlign="start center" fxLayoutGap="8px">
20 20 <mat-form-field floatLabel="always" hideRequiredMarker fxFlex class="mat-block">
21 21 <mat-label></mat-label>
22 22 <mat-select required formControlName="operation" placeholder="{{'filter.operation.operation' | translate}}">
... ... @@ -29,7 +29,7 @@
29 29 </mat-checkbox>
30 30 </div>
31 31 <tb-filter-predicate-value [allowUserDynamicSource]="allowUserDynamicSource"
32   - fxFlex="60"
  32 + fxFlex="70"
33 33 [valueType]="valueTypeEnum.STRING"
34 34 formControlName="value">
35 35 </tb-filter-predicate-value>
... ...
... ... @@ -30,6 +30,7 @@ import { EntitiesTableComponent } from '@home/components/entity/entities-table.c
30 30 import { EntityTableHeaderComponent } from '@home/components/entity/entity-table-header.component';
31 31 import { ActivatedRoute } from '@angular/router';
32 32 import { EntityTabsComponent } from '../../components/entity/entity-tabs.component';
  33 +import { DAY, historyInterval } from '@shared/models/time/time.models';
33 34
34 35 export type EntityBooleanFunction<T extends BaseData<HasId>> = (entity: T) => boolean;
35 36 export type EntityStringFunction<T extends BaseData<HasId>> = (entity: T) => string;
... ... @@ -135,6 +136,7 @@ export class EntityTableConfig<T extends BaseData<HasId>, P extends PageLink = P
135 136 onLoadAction: (route: ActivatedRoute) => void = null;
136 137 table: EntitiesTableComponent = null;
137 138 useTimePageLink = false;
  139 + defaultTimewindowInterval = historyInterval(DAY);
138 140 entityType: EntityType = null;
139 141 tableTitle = '';
140 142 selectionEnabled = true;
... ... @@ -162,7 +164,7 @@ export class EntityTableConfig<T extends BaseData<HasId>, P extends PageLink = P
162 164 dataSource: (dataLoadedFunction: (col?: number, row?: number) => void)
163 165 => EntitiesDataSource<L> = (dataLoadedFunction: (col?: number, row?: number) => void) => {
164 166 return new EntitiesDataSource(this.entitiesFetchFunction, this.entitySelectionEnabled, dataLoadedFunction);
165   - };
  167 + }
166 168 detailsReadonly: EntityBooleanFunction<T> = () => false;
167 169 entitySelectionEnabled: EntityBooleanFunction<L> = () => true;
168 170 deleteEnabled: EntityBooleanFunction<T | L> = () => true;
... ...
... ... @@ -285,6 +285,7 @@ export const dynamicValueSourceTypeTranslationMap = new Map<DynamicValueSourceTy
285 285 export interface DynamicValue<T> {
286 286 sourceType: DynamicValueSourceType;
287 287 sourceAttribute: string;
  288 + inherit?: boolean;
288 289 }
289 290
290 291 export interface FilterPredicateValue<T> {
... ...
... ... @@ -1667,7 +1667,9 @@
1667 1667 "no-dynamic-value": "No dynamic value",
1668 1668 "source-attribute": "Source attribute",
1669 1669 "switch-to-dynamic-value": "Switch to dynamic value",
1670   - "switch-to-default-value": "Switch to default value"
  1670 + "switch-to-default-value": "Switch to default value",
  1671 + "inherit-owner": "Inherit from owner",
  1672 + "source-attribute-not-set": "If source attribute isn't set"
1671 1673 },
1672 1674 "fullscreen": {
1673 1675 "expand": "Expand to fullscreen",
... ...