Commit 905273ea3dba010f01781f28b998c66fc95b2477

Authored by Volodymyr Babak
2 parents efaca8e7 eeec6bac

Merge remote-tracking branch 'upstream/develop/2.5.5' into develop/2.6-edge

Showing 30 changed files with 453 additions and 43 deletions
@@ -107,6 +107,7 @@ class DefaultTbContext implements TbContext { @@ -107,6 +107,7 @@ class DefaultTbContext implements TbContext {
107 if (nodeCtx.getSelf().isDebugMode()) { 107 if (nodeCtx.getSelf().isDebugMode()) {
108 relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); 108 relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
109 } 109 }
  110 + msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
110 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); 111 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
111 } 112 }
112 113
@@ -216,6 +217,7 @@ class DefaultTbContext implements TbContext { @@ -216,6 +217,7 @@ class DefaultTbContext implements TbContext {
216 if (nodeCtx.getSelf().isDebugMode()) { 217 if (nodeCtx.getSelf().isDebugMode()) {
217 mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null); 218 mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null);
218 } 219 }
  220 + tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
219 tbMsg.getCallback().onSuccess(); 221 tbMsg.getCallback().onSuccess();
220 } 222 }
221 223
@@ -252,26 +254,26 @@ class DefaultTbContext implements TbContext { @@ -252,26 +254,26 @@ class DefaultTbContext implements TbContext {
252 } 254 }
253 255
254 public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) { 256 public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) {
255 - return entityCreatedMsg(customer, customer.getId(), ruleNodeId); 257 + return entityActionMsg(customer, customer.getId(), ruleNodeId, DataConstants.ENTITY_CREATED);
256 } 258 }
257 259
258 public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) { 260 public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) {
259 - return entityCreatedMsg(device, device.getId(), ruleNodeId); 261 + return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED);
260 } 262 }
261 263
262 public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) { 264 public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) {
263 - return entityCreatedMsg(asset, asset.getId(), ruleNodeId); 265 + return entityActionMsg(asset, asset.getId(), ruleNodeId, DataConstants.ENTITY_CREATED);
264 } 266 }
265 267
266 - public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) {  
267 - return entityCreatedMsg(alarm, alarm.getId(), ruleNodeId); 268 + public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) {
  269 + return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action);
268 } 270 }
269 271
270 - public <E, I extends EntityId> TbMsg entityCreatedMsg(E entity, I id, RuleNodeId ruleNodeId) { 272 + public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) {
271 try { 273 try {
272 - return TbMsg.newMsg(DataConstants.ENTITY_CREATED, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity))); 274 + return TbMsg.newMsg(action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)));
273 } catch (JsonProcessingException | IllegalArgumentException e) { 275 } catch (JsonProcessingException | IllegalArgumentException e) {
274 - throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " created msg: " + e); 276 + throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e);
275 } 277 }
276 } 278 }
277 279
@@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod @@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
103 } 103 }
104 104
105 void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { 105 void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
106 - msg.getMsg().getCallback().visit(info); 106 + msg.getMsg().getCallback().onProcessingStart(info);
107 checkActive(msg.getMsg()); 107 checkActive(msg.getMsg());
108 if (ruleNode.isDebugMode()) { 108 if (ruleNode.isDebugMode()) {
109 systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType()); 109 systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
@@ -132,6 +132,7 @@ public class AlarmController extends BaseController { @@ -132,6 +132,7 @@ public class AlarmController extends BaseController {
132 long ackTs = System.currentTimeMillis(); 132 long ackTs = System.currentTimeMillis();
133 alarmService.ackAlarm(getCurrentUser().getTenantId(), alarmId, ackTs).get(); 133 alarmService.ackAlarm(getCurrentUser().getTenantId(), alarmId, ackTs).get();
134 alarm.setAckTs(ackTs); 134 alarm.setAckTs(ackTs);
  135 + alarm.setStatus(alarm.getStatus().isCleared() ? AlarmStatus.CLEARED_ACK : AlarmStatus.ACTIVE_ACK);
135 logEntityAction(alarmId, alarm, getCurrentUser().getCustomerId(), ActionType.ALARM_ACK, null); 136 logEntityAction(alarmId, alarm, getCurrentUser().getCustomerId(), ActionType.ALARM_ACK, null);
136 137
137 sendNotificationMsgToEdgeService(getTenantId(), alarmId, EdgeEventActionType.ALARM_ACK); 138 sendNotificationMsgToEdgeService(getTenantId(), alarmId, EdgeEventActionType.ALARM_ACK);
@@ -151,6 +152,7 @@ public class AlarmController extends BaseController { @@ -151,6 +152,7 @@ public class AlarmController extends BaseController {
151 long clearTs = System.currentTimeMillis(); 152 long clearTs = System.currentTimeMillis();
152 alarmService.clearAlarm(getCurrentUser().getTenantId(), alarmId, null, clearTs).get(); 153 alarmService.clearAlarm(getCurrentUser().getTenantId(), alarmId, null, clearTs).get();
153 alarm.setClearTs(clearTs); 154 alarm.setClearTs(clearTs);
  155 + alarm.setStatus(alarm.getStatus().isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK);
154 logEntityAction(alarmId, alarm, getCurrentUser().getCustomerId(), ActionType.ALARM_CLEAR, null); 156 logEntityAction(alarmId, alarm, getCurrentUser().getCustomerId(), ActionType.ALARM_CLEAR, null);
155 157
156 sendNotificationMsgToEdgeService(getTenantId(), alarmId, EdgeEventActionType.ALARM_CLEAR); 158 sendNotificationMsgToEdgeService(getTenantId(), alarmId, EdgeEventActionType.ALARM_CLEAR);
@@ -176,7 +176,8 @@ public class AuthController extends BaseController { @@ -176,7 +176,8 @@ public class AuthController extends BaseController {
176 try { 176 try {
177 String email = resetPasswordByEmailRequest.get("email").asText(); 177 String email = resetPasswordByEmailRequest.get("email").asText();
178 UserCredentials userCredentials = userService.requestPasswordReset(TenantId.SYS_TENANT_ID, email); 178 UserCredentials userCredentials = userService.requestPasswordReset(TenantId.SYS_TENANT_ID, email);
179 - String baseUrl = MiscUtils.constructBaseUrl(request); 179 + User user = userService.findUserById(TenantId.SYS_TENANT_ID, userCredentials.getUserId());
  180 + String baseUrl = systemSecurityService.getBaseUrl(user.getTenantId(), user.getCustomerId(), request);
180 String resetUrl = String.format("%s/api/noauth/resetPassword?resetToken=%s", baseUrl, 181 String resetUrl = String.format("%s/api/noauth/resetPassword?resetToken=%s", baseUrl,
181 userCredentials.getResetToken()); 182 userCredentials.getResetToken());
182 183
@@ -224,7 +225,7 @@ public class AuthController extends BaseController { @@ -224,7 +225,7 @@ public class AuthController extends BaseController {
224 User user = userService.findUserById(TenantId.SYS_TENANT_ID, credentials.getUserId()); 225 User user = userService.findUserById(TenantId.SYS_TENANT_ID, credentials.getUserId());
225 UserPrincipal principal = new UserPrincipal(UserPrincipal.Type.USER_NAME, user.getEmail()); 226 UserPrincipal principal = new UserPrincipal(UserPrincipal.Type.USER_NAME, user.getEmail());
226 SecurityUser securityUser = new SecurityUser(user, credentials.isEnabled(), principal); 227 SecurityUser securityUser = new SecurityUser(user, credentials.isEnabled(), principal);
227 - String baseUrl = MiscUtils.constructBaseUrl(request); 228 + String baseUrl = systemSecurityService.getBaseUrl(user.getTenantId(), user.getCustomerId(), request);
228 String loginUrl = String.format("%s/login", baseUrl); 229 String loginUrl = String.format("%s/login", baseUrl);
229 String email = user.getEmail(); 230 String email = user.getEmail();
230 231
@@ -273,7 +274,7 @@ public class AuthController extends BaseController { @@ -273,7 +274,7 @@ public class AuthController extends BaseController {
273 User user = userService.findUserById(TenantId.SYS_TENANT_ID, userCredentials.getUserId()); 274 User user = userService.findUserById(TenantId.SYS_TENANT_ID, userCredentials.getUserId());
274 UserPrincipal principal = new UserPrincipal(UserPrincipal.Type.USER_NAME, user.getEmail()); 275 UserPrincipal principal = new UserPrincipal(UserPrincipal.Type.USER_NAME, user.getEmail());
275 SecurityUser securityUser = new SecurityUser(user, userCredentials.isEnabled(), principal); 276 SecurityUser securityUser = new SecurityUser(user, userCredentials.isEnabled(), principal);
276 - String baseUrl = MiscUtils.constructBaseUrl(request); 277 + String baseUrl = systemSecurityService.getBaseUrl(user.getTenantId(), user.getCustomerId(), request);
277 String loginUrl = String.format("%s/login", baseUrl); 278 String loginUrl = String.format("%s/login", baseUrl);
278 String email = user.getEmail(); 279 String email = user.getEmail();
279 mailService.sendPasswordWasResetEmail(loginUrl, email); 280 mailService.sendPasswordWasResetEmail(loginUrl, email);
@@ -53,6 +53,7 @@ import org.thingsboard.server.service.security.model.token.JwtToken; @@ -53,6 +53,7 @@ import org.thingsboard.server.service.security.model.token.JwtToken;
53 import org.thingsboard.server.service.security.model.token.JwtTokenFactory; 53 import org.thingsboard.server.service.security.model.token.JwtTokenFactory;
54 import org.thingsboard.server.service.security.permission.Operation; 54 import org.thingsboard.server.service.security.permission.Operation;
55 import org.thingsboard.server.service.security.permission.Resource; 55 import org.thingsboard.server.service.security.permission.Resource;
  56 +import org.thingsboard.server.service.security.system.SystemSecurityService;
56 import org.thingsboard.server.utils.MiscUtils; 57 import org.thingsboard.server.utils.MiscUtils;
57 58
58 import javax.servlet.http.HttpServletRequest; 59 import javax.servlet.http.HttpServletRequest;
@@ -79,6 +80,9 @@ public class UserController extends BaseController { @@ -79,6 +80,9 @@ public class UserController extends BaseController {
79 @Autowired 80 @Autowired
80 private RefreshTokenRepository refreshTokenRepository; 81 private RefreshTokenRepository refreshTokenRepository;
81 82
  83 + @Autowired
  84 + private SystemSecurityService systemSecurityService;
  85 +
82 86
83 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") 87 @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
84 @RequestMapping(value = "/user/{userId}", method = RequestMethod.GET) 88 @RequestMapping(value = "/user/{userId}", method = RequestMethod.GET)
@@ -146,7 +150,7 @@ public class UserController extends BaseController { @@ -146,7 +150,7 @@ public class UserController extends BaseController {
146 if (sendEmail) { 150 if (sendEmail) {
147 SecurityUser authUser = getCurrentUser(); 151 SecurityUser authUser = getCurrentUser();
148 UserCredentials userCredentials = userService.findUserCredentialsByUserId(authUser.getTenantId(), savedUser.getId()); 152 UserCredentials userCredentials = userService.findUserCredentialsByUserId(authUser.getTenantId(), savedUser.getId());
149 - String baseUrl = MiscUtils.constructBaseUrl(request); 153 + String baseUrl = systemSecurityService.getBaseUrl(getTenantId(), getCurrentUser().getCustomerId(), request);
150 String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl, 154 String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl,
151 userCredentials.getActivateToken()); 155 userCredentials.getActivateToken());
152 String email = savedUser.getEmail(); 156 String email = savedUser.getEmail();
@@ -189,7 +193,7 @@ public class UserController extends BaseController { @@ -189,7 +193,7 @@ public class UserController extends BaseController {
189 193
190 UserCredentials userCredentials = userService.findUserCredentialsByUserId(getCurrentUser().getTenantId(), user.getId()); 194 UserCredentials userCredentials = userService.findUserCredentialsByUserId(getCurrentUser().getTenantId(), user.getId());
191 if (!userCredentials.isEnabled()) { 195 if (!userCredentials.isEnabled()) {
192 - String baseUrl = MiscUtils.constructBaseUrl(request); 196 + String baseUrl = systemSecurityService.getBaseUrl(getTenantId(), getCurrentUser().getCustomerId(), request);
193 String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl, 197 String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl,
194 userCredentials.getActivateToken()); 198 userCredentials.getActivateToken());
195 mailService.sendActivationEmail(activateUrl, email); 199 mailService.sendActivationEmail(activateUrl, email);
@@ -214,7 +218,7 @@ public class UserController extends BaseController { @@ -214,7 +218,7 @@ public class UserController extends BaseController {
214 SecurityUser authUser = getCurrentUser(); 218 SecurityUser authUser = getCurrentUser();
215 UserCredentials userCredentials = userService.findUserCredentialsByUserId(authUser.getTenantId(), user.getId()); 219 UserCredentials userCredentials = userService.findUserCredentialsByUserId(authUser.getTenantId(), user.getId());
216 if (!userCredentials.isEnabled()) { 220 if (!userCredentials.isEnabled()) {
217 - String baseUrl = MiscUtils.constructBaseUrl(request); 221 + String baseUrl = systemSecurityService.getBaseUrl(getTenantId(), getCurrentUser().getCustomerId(), request);
218 String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl, 222 String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl,
219 userCredentials.getActivateToken()); 223 userCredentials.getActivateToken());
220 return activateUrl; 224 return activateUrl;
@@ -116,6 +116,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { @@ -116,6 +116,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
116 generalSettings.setKey("general"); 116 generalSettings.setKey("general");
117 ObjectNode node = objectMapper.createObjectNode(); 117 ObjectNode node = objectMapper.createObjectNode();
118 node.put("baseUrl", "http://localhost:8080"); 118 node.put("baseUrl", "http://localhost:8080");
  119 + node.put("prohibitDifferentUrl", true);
119 generalSettings.setJsonValue(node); 120 generalSettings.setJsonValue(node);
120 adminSettingsService.saveAdminSettings(TenantId.SYS_TENANT_ID, generalSettings); 121 adminSettingsService.saveAdminSettings(TenantId.SYS_TENANT_ID, generalSettings);
121 122
@@ -165,7 +165,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -165,7 +165,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
165 submitStrategy.init(msgs); 165 submitStrategy.init(msgs);
166 166
167 while (!stopped) { 167 while (!stopped) {
168 - TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(submitStrategy); 168 + TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy);
169 submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> { 169 submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> {
170 log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); 170 log.trace("[{}] Creating callback for message: {}", id, msg.getValue());
171 ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); 171 ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
@@ -194,6 +194,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -194,6 +194,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
194 if (!ctx.getFailedMap().isEmpty()) { 194 if (!ctx.getFailedMap().isEmpty()) {
195 printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); 195 printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
196 } 196 }
  197 + ctx.printProfilerStats();
  198 +
197 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); 199 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
198 if (statsEnabled) { 200 if (statsEnabled) {
199 stats.log(result, decision.isCommit()); 201 stats.log(result, decision.isCommit());
@@ -49,8 +49,14 @@ public class TbMsgPackCallback implements TbMsgCallback { @@ -49,8 +49,14 @@ public class TbMsgPackCallback implements TbMsgCallback {
49 } 49 }
50 50
51 @Override 51 @Override
52 - public void visit(RuleNodeInfo ruleNodeInfo) {  
53 - log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo);  
54 - ctx.visit(id, ruleNodeInfo); 52 + public void onProcessingStart(RuleNodeInfo ruleNodeInfo) {
  53 + log.trace("[{}] ON PROCESSING START: {}", id, ruleNodeInfo);
  54 + ctx.onProcessingStart(id, ruleNodeInfo);
  55 + }
  56 +
  57 + @Override
  58 + public void onProcessingEnd(RuleNodeId ruleNodeId) {
  59 + log.trace("[{}] ON PROCESSING END: {}", id, ruleNodeId);
  60 + ctx.onProcessingEnd(id, ruleNodeId);
55 } 61 }
56 } 62 }
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.service.queue; 16 package org.thingsboard.server.service.queue;
17 17
18 import lombok.Getter; 18 import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
19 import org.thingsboard.server.common.data.id.RuleNodeId; 20 import org.thingsboard.server.common.data.id.RuleNodeId;
20 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
21 import org.thingsboard.server.common.msg.queue.RuleEngineException; 22 import org.thingsboard.server.common.msg.queue.RuleEngineException;
@@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos;
24 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 25 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
25 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; 26 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
26 27
  28 +import java.util.Comparator;
  29 +import java.util.Map;
27 import java.util.UUID; 30 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap; 32 import java.util.concurrent.ConcurrentMap;
@@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch; @@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit; 34 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger; 35 import java.util.concurrent.atomic.AtomicInteger;
33 36
  37 +@Slf4j
34 public class TbMsgPackProcessingContext { 38 public class TbMsgPackProcessingContext {
35 39
  40 + private final String queueName;
36 private final TbRuleEngineSubmitStrategy submitStrategy; 41 private final TbRuleEngineSubmitStrategy submitStrategy;
  42 + @Getter
  43 + private final boolean profilerEnabled;
37 private final AtomicInteger pendingCount; 44 private final AtomicInteger pendingCount;
38 private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); 45 private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
39 @Getter 46 @Getter
@@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext { @@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext {
47 54
48 private final ConcurrentMap<UUID, RuleNodeInfo> lastRuleNodeMap = new ConcurrentHashMap<>(); 55 private final ConcurrentMap<UUID, RuleNodeInfo> lastRuleNodeMap = new ConcurrentHashMap<>();
49 56
50 - public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) { 57 + public TbMsgPackProcessingContext(String queueName, TbRuleEngineSubmitStrategy submitStrategy) {
  58 + this.queueName = queueName;
51 this.submitStrategy = submitStrategy; 59 this.submitStrategy = submitStrategy;
  60 + this.profilerEnabled = log.isDebugEnabled();
52 this.pendingMap = submitStrategy.getPendingMap(); 61 this.pendingMap = submitStrategy.getPendingMap();
53 this.pendingCount = new AtomicInteger(pendingMap.size()); 62 this.pendingCount = new AtomicInteger(pendingMap.size());
54 } 63 }
55 64
56 public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { 65 public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException {
57 - return processingTimeoutLatch.await(packProcessingTimeout, milliseconds); 66 + boolean success = processingTimeoutLatch.await(packProcessingTimeout, milliseconds);
  67 + if (!success && profilerEnabled) {
  68 + msgProfilerMap.values().forEach(this::onTimeout);
  69 + }
  70 + return success;
58 } 71 }
59 72
60 public void onSuccess(UUID id) { 73 public void onSuccess(UUID id) {
@@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext { @@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext {
85 } 98 }
86 } 99 }
87 100
88 - public void visit(UUID id, RuleNodeInfo ruleNodeInfo) { 101 + private final ConcurrentHashMap<UUID, TbMsgProfilerInfo> msgProfilerMap = new ConcurrentHashMap<>();
  102 + private final ConcurrentHashMap<UUID, TbRuleNodeProfilerInfo> ruleNodeProfilerMap = new ConcurrentHashMap<>();
  103 +
  104 + public void onProcessingStart(UUID id, RuleNodeInfo ruleNodeInfo) {
89 lastRuleNodeMap.put(id, ruleNodeInfo); 105 lastRuleNodeMap.put(id, ruleNodeInfo);
  106 + if (profilerEnabled) {
  107 + msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onStart(ruleNodeInfo.getRuleNodeId());
  108 + ruleNodeProfilerMap.putIfAbsent(ruleNodeInfo.getRuleNodeId().getId(), new TbRuleNodeProfilerInfo(ruleNodeInfo));
  109 + }
  110 + }
  111 +
  112 + public void onProcessingEnd(UUID id, RuleNodeId ruleNodeId) {
  113 + if (profilerEnabled) {
  114 + long processingTime = msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onEnd(ruleNodeId);
  115 + if (processingTime > 0) {
  116 + ruleNodeProfilerMap.computeIfAbsent(ruleNodeId.getId(), TbRuleNodeProfilerInfo::new).record(processingTime);
  117 + }
  118 + }
  119 + }
  120 +
  121 + public void onTimeout(TbMsgProfilerInfo profilerInfo) {
  122 + Map.Entry<UUID, Long> ruleNodeInfo = profilerInfo.onTimeout();
  123 + if (ruleNodeInfo != null) {
  124 + ruleNodeProfilerMap.computeIfAbsent(ruleNodeInfo.getKey(), TbRuleNodeProfilerInfo::new).record(ruleNodeInfo.getValue());
  125 + }
90 } 126 }
91 127
92 public RuleNodeInfo getLastVisitedRuleNode(UUID id) { 128 public RuleNodeInfo getLastVisitedRuleNode(UUID id) {
93 return lastRuleNodeMap.get(id); 129 return lastRuleNodeMap.get(id);
94 } 130 }
95 131
  132 + public void printProfilerStats() {
  133 + if (profilerEnabled) {
  134 + log.debug("Top Rule Nodes by max execution time:");
  135 + ruleNodeProfilerMap.values().stream()
  136 + .sorted(Comparator.comparingLong(TbRuleNodeProfilerInfo::getMaxExecutionTime).reversed()).limit(5)
  137 + .forEach(info -> log.debug("[{}][{}] max execution time: {}. {}", queueName, info.getRuleNodeId(), info.getMaxExecutionTime(), info.getLabel()));
  138 +
  139 + log.info("Top Rule Nodes by avg execution time:");
  140 + ruleNodeProfilerMap.values().stream()
  141 + .sorted(Comparator.comparingDouble(TbRuleNodeProfilerInfo::getAvgExecutionTime).reversed()).limit(5)
  142 + .forEach(info -> log.info("[{}][{}] avg execution time: {}. {}", queueName, info.getRuleNodeId(), info.getAvgExecutionTime(), info.getLabel()));
  143 +
  144 + log.info("Top Rule Nodes by execution count:");
  145 + ruleNodeProfilerMap.values().stream()
  146 + .sorted(Comparator.comparingInt(TbRuleNodeProfilerInfo::getExecutionCount).reversed()).limit(5)
  147 + .forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel()));
  148 + }
  149 + }
96 } 150 }
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.server.common.data.id.RuleNodeId;
  20 +import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
  21 +
  22 +import java.util.AbstractMap;
  23 +import java.util.Map;
  24 +import java.util.UUID;
  25 +import java.util.concurrent.atomic.AtomicLong;
  26 +import java.util.concurrent.locks.Lock;
  27 +import java.util.concurrent.locks.ReentrantLock;
  28 +
  29 +@Slf4j
  30 +public class TbMsgProfilerInfo {
  31 + private final UUID msgId;
  32 + private AtomicLong totalProcessingTime = new AtomicLong();
  33 + private Lock stateLock = new ReentrantLock();
  34 + private RuleNodeId currentRuleNodeId;
  35 + private long stateChangeTime;
  36 +
  37 + public TbMsgProfilerInfo(UUID msgId) {
  38 + this.msgId = msgId;
  39 + }
  40 +
  41 + public void onStart(RuleNodeId ruleNodeId) {
  42 + long currentTime = System.currentTimeMillis();
  43 + stateLock.lock();
  44 + try {
  45 + currentRuleNodeId = ruleNodeId;
  46 + stateChangeTime = currentTime;
  47 + } finally {
  48 + stateLock.unlock();
  49 + }
  50 + }
  51 +
  52 + public long onEnd(RuleNodeId ruleNodeId) {
  53 + long currentTime = System.currentTimeMillis();
  54 + stateLock.lock();
  55 + try {
  56 + if (ruleNodeId.equals(currentRuleNodeId)) {
  57 + long processingTime = currentTime - stateChangeTime;
  58 + stateChangeTime = currentTime;
  59 + totalProcessingTime.addAndGet(processingTime);
  60 + currentRuleNodeId = null;
  61 + return processingTime;
  62 + } else {
  63 + log.trace("[{}] Invalid sequence of rule node processing detected. Expected [{}] but was [{}]", msgId, currentRuleNodeId, ruleNodeId);
  64 + return 0;
  65 + }
  66 + } finally {
  67 + stateLock.unlock();
  68 + }
  69 + }
  70 +
  71 + public Map.Entry<UUID, Long> onTimeout() {
  72 + long currentTime = System.currentTimeMillis();
  73 + stateLock.lock();
  74 + try {
  75 + if (currentRuleNodeId != null && stateChangeTime > 0) {
  76 + long timeoutTime = currentTime - stateChangeTime;
  77 + totalProcessingTime.addAndGet(timeoutTime);
  78 + return new AbstractMap.SimpleEntry<>(currentRuleNodeId.getId(), timeoutTime);
  79 + }
  80 + } finally {
  81 + stateLock.unlock();
  82 + }
  83 + return null;
  84 + }
  85 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.queue;
  17 +
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
  20 +
  21 +import java.util.UUID;
  22 +import java.util.concurrent.atomic.AtomicInteger;
  23 +import java.util.concurrent.atomic.AtomicLong;
  24 +
  25 +public class TbRuleNodeProfilerInfo {
  26 + @Getter
  27 + private final UUID ruleNodeId;
  28 + @Getter
  29 + private final String label;
  30 + private AtomicInteger executionCount = new AtomicInteger(0);
  31 + private AtomicLong executionTime = new AtomicLong(0);
  32 + private AtomicLong maxExecutionTime = new AtomicLong(0);
  33 +
  34 + public TbRuleNodeProfilerInfo(RuleNodeInfo ruleNodeInfo) {
  35 + this.ruleNodeId = ruleNodeInfo.getRuleNodeId().getId();
  36 + this.label = ruleNodeInfo.toString();
  37 + }
  38 +
  39 + public TbRuleNodeProfilerInfo(UUID ruleNodeId) {
  40 + this.ruleNodeId = ruleNodeId;
  41 + this.label = "";
  42 + }
  43 +
  44 + public void record(long processingTime) {
  45 + executionCount.incrementAndGet();
  46 + executionTime.addAndGet(processingTime);
  47 + while (true) {
  48 + long value = maxExecutionTime.get();
  49 + if (value >= processingTime) {
  50 + break;
  51 + }
  52 + if (maxExecutionTime.compareAndSet(value, processingTime)) {
  53 + break;
  54 + }
  55 + }
  56 + }
  57 +
  58 + int getExecutionCount() {
  59 + return executionCount.get();
  60 + }
  61 +
  62 + long getMaxExecutionTime() {
  63 + return maxExecutionTime.get();
  64 + }
  65 +
  66 + double getAvgExecutionTime() {
  67 + double executionCnt = (double) executionCount.get();
  68 + if (executionCnt > 0) {
  69 + return executionTime.get() / executionCnt;
  70 + } else {
  71 + return 0.0;
  72 + }
  73 + }
  74 +
  75 +}
@@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
68 int listSize = orderedMsgList.size(); 68 int listSize = orderedMsgList.size();
69 int startIdx = Math.min(packIdx.get() * batchSize, listSize); 69 int startIdx = Math.min(packIdx.get() * batchSize, listSize);
70 int endIdx = Math.min(startIdx + batchSize, listSize); 70 int endIdx = Math.min(startIdx + batchSize, listSize);
  71 + Map<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tmpPack;
71 synchronized (pendingPack) { 72 synchronized (pendingPack) {
72 pendingPack.clear(); 73 pendingPack.clear();
73 for (int i = startIdx; i < endIdx; i++) { 74 for (int i = startIdx; i < endIdx; i++) {
74 IdMsgPair pair = orderedMsgList.get(i); 75 IdMsgPair pair = orderedMsgList.get(i);
75 pendingPack.put(pair.uuid, pair.msg); 76 pendingPack.put(pair.uuid, pair.msg);
76 } 77 }
  78 + tmpPack = new LinkedHashMap<>(pendingPack);
77 } 79 }
78 int submitSize = pendingPack.size(); 80 int submitSize = pendingPack.size();
79 if (log.isDebugEnabled() && submitSize > 0) { 81 if (log.isDebugEnabled() && submitSize > 0) {
80 log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); 82 log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
81 } 83 }
82 - pendingPack.forEach(msgConsumer); 84 + tmpPack.forEach(msgConsumer);
83 } 85 }
84 86
85 } 87 }
@@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets; @@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets;
30 30
31 @Component(value = "oauth2AuthenticationFailureHandler") 31 @Component(value = "oauth2AuthenticationFailureHandler")
32 @ConditionalOnProperty(prefix = "security.oauth2", value = "enabled", havingValue = "true") 32 @ConditionalOnProperty(prefix = "security.oauth2", value = "enabled", havingValue = "true")
33 -public class Oauth2AuthenticationFailureHandler extends SimpleUrlAuthenticationFailureHandler { 33 +public class Oauth2AuthenticationFailureHandler extends SimpleUrlAuthenticationFailureHandler {
34 34
35 @Override 35 @Override
36 public void onAuthenticationFailure(HttpServletRequest request, 36 public void onAuthenticationFailure(HttpServletRequest request,
@@ -59,7 +59,6 @@ public class Oauth2AuthenticationSuccessHandler extends SimpleUrlAuthenticationS @@ -59,7 +59,6 @@ public class Oauth2AuthenticationSuccessHandler extends SimpleUrlAuthenticationS
59 public void onAuthenticationSuccess(HttpServletRequest request, 59 public void onAuthenticationSuccess(HttpServletRequest request,
60 HttpServletResponse response, 60 HttpServletResponse response,
61 Authentication authentication) throws IOException { 61 Authentication authentication) throws IOException {
62 -  
63 String baseUrl = MiscUtils.constructBaseUrl(request); 62 String baseUrl = MiscUtils.constructBaseUrl(request);
64 try { 63 try {
65 OAuth2AuthenticationToken token = (OAuth2AuthenticationToken) authentication; 64 OAuth2AuthenticationToken token = (OAuth2AuthenticationToken) authentication;
@@ -40,17 +40,20 @@ import org.thingsboard.rule.engine.api.MailService; @@ -40,17 +40,20 @@ import org.thingsboard.rule.engine.api.MailService;
40 import org.thingsboard.server.common.data.AdminSettings; 40 import org.thingsboard.server.common.data.AdminSettings;
41 import org.thingsboard.server.common.data.User; 41 import org.thingsboard.server.common.data.User;
42 import org.thingsboard.server.common.data.exception.ThingsboardException; 42 import org.thingsboard.server.common.data.exception.ThingsboardException;
  43 +import org.thingsboard.server.common.data.id.CustomerId;
43 import org.thingsboard.server.common.data.id.TenantId; 44 import org.thingsboard.server.common.data.id.TenantId;
44 import org.thingsboard.server.common.data.security.UserCredentials; 45 import org.thingsboard.server.common.data.security.UserCredentials;
  46 +import org.thingsboard.server.common.data.security.model.SecuritySettings;
  47 +import org.thingsboard.server.common.data.security.model.UserPasswordPolicy;
45 import org.thingsboard.server.dao.exception.DataValidationException; 48 import org.thingsboard.server.dao.exception.DataValidationException;
46 import org.thingsboard.server.dao.settings.AdminSettingsService; 49 import org.thingsboard.server.dao.settings.AdminSettingsService;
47 import org.thingsboard.server.dao.user.UserService; 50 import org.thingsboard.server.dao.user.UserService;
48 import org.thingsboard.server.dao.user.UserServiceImpl; 51 import org.thingsboard.server.dao.user.UserServiceImpl;
49 import org.thingsboard.server.service.security.exception.UserPasswordExpiredException; 52 import org.thingsboard.server.service.security.exception.UserPasswordExpiredException;
50 -import org.thingsboard.server.common.data.security.model.SecuritySettings;  
51 -import org.thingsboard.server.common.data.security.model.UserPasswordPolicy; 53 +import org.thingsboard.server.utils.MiscUtils;
52 54
53 import javax.annotation.Resource; 55 import javax.annotation.Resource;
  56 +import javax.servlet.http.HttpServletRequest;
54 import java.util.ArrayList; 57 import java.util.ArrayList;
55 import java.util.List; 58 import java.util.List;
56 import java.util.Map; 59 import java.util.Map;
@@ -146,7 +149,7 @@ public class DefaultSystemSecurityService implements SystemSecurityService { @@ -146,7 +149,7 @@ public class DefaultSystemSecurityService implements SystemSecurityService {
146 if (isPositiveInteger(securitySettings.getPasswordPolicy().getPasswordExpirationPeriodDays())) { 149 if (isPositiveInteger(securitySettings.getPasswordPolicy().getPasswordExpirationPeriodDays())) {
147 if ((userCredentials.getCreatedTime() 150 if ((userCredentials.getCreatedTime()
148 + TimeUnit.DAYS.toMillis(securitySettings.getPasswordPolicy().getPasswordExpirationPeriodDays())) 151 + TimeUnit.DAYS.toMillis(securitySettings.getPasswordPolicy().getPasswordExpirationPeriodDays()))
149 - < System.currentTimeMillis()) { 152 + < System.currentTimeMillis()) {
150 userCredentials = userService.requestExpiredPasswordReset(tenantId, userCredentials.getId()); 153 userCredentials = userService.requestExpiredPasswordReset(tenantId, userCredentials.getId());
151 throw new UserPasswordExpiredException("User password expired!", userCredentials.getResetToken()); 154 throw new UserPasswordExpiredException("User password expired!", userCredentials.getResetToken());
152 } 155 }
@@ -197,6 +200,21 @@ public class DefaultSystemSecurityService implements SystemSecurityService { @@ -197,6 +200,21 @@ public class DefaultSystemSecurityService implements SystemSecurityService {
197 } 200 }
198 } 201 }
199 202
  203 + @Override
  204 + public String getBaseUrl(TenantId tenantId, CustomerId customerId, HttpServletRequest httpServletRequest) {
  205 + String baseUrl;
  206 + AdminSettings generalSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "general");
  207 +
  208 + JsonNode prohibitDifferentUrl = generalSettings.getJsonValue().get("prohibitDifferentUrl");
  209 +
  210 + if (prohibitDifferentUrl != null && prohibitDifferentUrl.asBoolean()) {
  211 + baseUrl = generalSettings.getJsonValue().get("baseUrl").asText();
  212 + } else {
  213 + baseUrl = MiscUtils.constructBaseUrl(httpServletRequest);
  214 + }
  215 + return baseUrl;
  216 + }
  217 +
200 private static boolean isPositiveInteger(Integer val) { 218 private static boolean isPositiveInteger(Integer val) {
201 return val != null && val.intValue() > 0; 219 return val != null && val.intValue() > 0;
202 } 220 }
@@ -16,11 +16,14 @@ @@ -16,11 +16,14 @@
16 package org.thingsboard.server.service.security.system; 16 package org.thingsboard.server.service.security.system;
17 17
18 import org.springframework.security.core.AuthenticationException; 18 import org.springframework.security.core.AuthenticationException;
  19 +import org.thingsboard.server.common.data.id.CustomerId;
19 import org.thingsboard.server.common.data.id.TenantId; 20 import org.thingsboard.server.common.data.id.TenantId;
20 import org.thingsboard.server.common.data.security.UserCredentials; 21 import org.thingsboard.server.common.data.security.UserCredentials;
21 import org.thingsboard.server.dao.exception.DataValidationException; 22 import org.thingsboard.server.dao.exception.DataValidationException;
22 import org.thingsboard.server.common.data.security.model.SecuritySettings; 23 import org.thingsboard.server.common.data.security.model.SecuritySettings;
23 24
  25 +import javax.servlet.http.HttpServletRequest;
  26 +
24 public interface SystemSecurityService { 27 public interface SystemSecurityService {
25 28
26 SecuritySettings getSecuritySettings(TenantId tenantId); 29 SecuritySettings getSecuritySettings(TenantId tenantId);
@@ -31,4 +34,6 @@ public interface SystemSecurityService { @@ -31,4 +34,6 @@ public interface SystemSecurityService {
31 34
32 void validatePassword(TenantId tenantId, String password, UserCredentials userCredentials) throws DataValidationException; 35 void validatePassword(TenantId tenantId, String password, UserCredentials userCredentials) throws DataValidationException;
33 36
  37 + String getBaseUrl(TenantId tenantId, CustomerId customerId, HttpServletRequest httpServletRequest);
  38 +
34 } 39 }
@@ -175,7 +175,6 @@ public abstract class AbstractControllerTest { @@ -175,7 +175,6 @@ public abstract class AbstractControllerTest {
175 .apply(springSecurity()).build(); 175 .apply(springSecurity()).build();
176 } 176 }
177 loginSysAdmin(); 177 loginSysAdmin();
178 -  
179 Tenant tenant = new Tenant(); 178 Tenant tenant = new Tenant();
180 tenant.setTitle(TEST_TENANT_NAME); 179 tenant.setTitle(TEST_TENANT_NAME);
181 Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class); 180 Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class);
@@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest { @@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest {
51 messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null)); 51 messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null));
52 } 52 }
53 when(strategyMock.getPendingMap()).thenReturn(messages); 53 when(strategyMock.getPendingMap()).thenReturn(messages);
54 - TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(strategyMock); 54 + TbMsgPackProcessingContext context = new TbMsgPackProcessingContext("Main", strategyMock);
55 for (UUID uuid : messages.keySet()) { 55 for (UUID uuid : messages.keySet()) {
56 for (int i = 0; i < parallelCount; i++) { 56 for (int i = 0; i < parallelCount; i++) {
57 executorService.submit(() -> context.onSuccess(uuid)); 57 executorService.submit(() -> context.onSuccess(uuid));
@@ -15,12 +15,16 @@ @@ -15,12 +15,16 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.queue; 16 package org.thingsboard.server.common.msg.queue;
17 17
  18 +import lombok.Getter;
18 import org.thingsboard.server.common.data.id.RuleNodeId; 19 import org.thingsboard.server.common.data.id.RuleNodeId;
19 20
20 public class RuleNodeInfo { 21 public class RuleNodeInfo {
21 private final String label; 22 private final String label;
  23 + @Getter
  24 + private final RuleNodeId ruleNodeId;
22 25
23 public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) { 26 public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) {
  27 + this.ruleNodeId = id;
24 this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]"; 28 this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]";
25 } 29 }
26 30
@@ -15,6 +15,8 @@ @@ -15,6 +15,8 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.queue; 16 package org.thingsboard.server.common.msg.queue;
17 17
  18 +import org.thingsboard.server.common.data.id.RuleNodeId;
  19 +
18 public interface TbMsgCallback { 20 public interface TbMsgCallback {
19 21
20 TbMsgCallback EMPTY = new TbMsgCallback() { 22 TbMsgCallback EMPTY = new TbMsgCallback() {
@@ -34,7 +36,11 @@ public interface TbMsgCallback { @@ -34,7 +36,11 @@ public interface TbMsgCallback {
34 36
35 void onFailure(RuleEngineException e); 37 void onFailure(RuleEngineException e);
36 38
37 - default void visit(RuleNodeInfo ruleNodeInfo) { 39 + default void onProcessingStart(RuleNodeInfo ruleNodeInfo) {
  40 + }
  41 +
  42 + default void onProcessingEnd(RuleNodeId ruleNodeId) {
38 } 43 }
39 44
  45 +
40 } 46 }
@@ -87,8 +87,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr @@ -87,8 +87,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
87 sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); 87 sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
88 sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); 88 sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg)));
89 89
90 - sendMsgRequest.withMessageGroupId(tpi.getTopic());  
91 - sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString()); 90 + String sqsMsgId = UUID.randomUUID().toString();
  91 + sendMsgRequest.withMessageGroupId(sqsMsgId);
  92 + sendMsgRequest.withMessageDeduplicationId(sqsMsgId);
92 93
93 ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); 94 ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
94 95
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors; 21 import com.google.common.util.concurrent.MoreExecutors;
22 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
  23 +import org.apache.commons.collections.CollectionUtils;
23 import org.apache.commons.lang3.StringUtils; 24 import org.apache.commons.lang3.StringUtils;
24 import org.springframework.beans.factory.annotation.Autowired; 25 import org.springframework.beans.factory.annotation.Autowired;
25 import org.springframework.stereotype.Service; 26 import org.springframework.stereotype.Service;
@@ -54,8 +55,10 @@ import org.thingsboard.server.dao.tenant.TenantDao; @@ -54,8 +55,10 @@ import org.thingsboard.server.dao.tenant.TenantDao;
54 import javax.annotation.Nullable; 55 import javax.annotation.Nullable;
55 import java.util.ArrayList; 56 import java.util.ArrayList;
56 import java.util.HashMap; 57 import java.util.HashMap;
  58 +import java.util.HashSet;
57 import java.util.List; 59 import java.util.List;
58 import java.util.Map; 60 import java.util.Map;
  61 +import java.util.Set;
59 import java.util.concurrent.ExecutionException; 62 import java.util.concurrent.ExecutionException;
60 63
61 import static org.thingsboard.server.dao.service.Validator.validateId; 64 import static org.thingsboard.server.dao.service.Validator.validateId;
@@ -135,6 +138,10 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC @@ -135,6 +138,10 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
135 return null; 138 return null;
136 } 139 }
137 140
  141 + if (CollectionUtils.isNotEmpty(ruleChainMetaData.getConnections())) {
  142 + validateCircles(ruleChainMetaData.getConnections());
  143 + }
  144 +
138 List<RuleNode> nodes = ruleChainMetaData.getNodes(); 145 List<RuleNode> nodes = ruleChainMetaData.getNodes();
139 List<RuleNode> toAddOrUpdate = new ArrayList<>(); 146 List<RuleNode> toAddOrUpdate = new ArrayList<>();
140 List<RuleNode> toDelete = new ArrayList<>(); 147 List<RuleNode> toDelete = new ArrayList<>();
@@ -217,6 +224,31 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC @@ -217,6 +224,31 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
217 return loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId()); 224 return loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId());
218 } 225 }
219 226
  227 + private void validateCircles(List<NodeConnectionInfo> connectionInfos) {
  228 + Map<Integer, Set<Integer>> connectionsMap = new HashMap<>();
  229 + for (NodeConnectionInfo nodeConnection : connectionInfos) {
  230 + if (nodeConnection.getFromIndex() == nodeConnection.getToIndex()) {
  231 + throw new DataValidationException("Can't create the relation to yourself.");
  232 + }
  233 + connectionsMap
  234 + .computeIfAbsent(nodeConnection.getFromIndex(), from -> new HashSet<>())
  235 + .add(nodeConnection.getToIndex());
  236 + }
  237 + connectionsMap.keySet().forEach(key -> validateCircles(key, connectionsMap.get(key), connectionsMap));
  238 + }
  239 +
  240 + private void validateCircles(int from, Set<Integer> toList, Map<Integer, Set<Integer>> connectionsMap) {
  241 + if (toList == null) {
  242 + return;
  243 + }
  244 + for (Integer to : toList) {
  245 + if (from == to) {
  246 + throw new DataValidationException("Can't create circling relations in rule chain.");
  247 + }
  248 + validateCircles(from, connectionsMap.get(to), connectionsMap);
  249 + }
  250 + }
  251 +
220 @Override 252 @Override
221 public RuleChainMetaData loadRuleChainMetaData(TenantId tenantId, RuleChainId ruleChainId) { 253 public RuleChainMetaData loadRuleChainMetaData(TenantId tenantId, RuleChainId ruleChainId) {
222 Validator.validateId(ruleChainId, "Incorrect rule chain id."); 254 Validator.validateId(ruleChainId, "Incorrect rule chain id.");
@@ -299,7 +331,6 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC @@ -299,7 +331,6 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
299 } 331 }
300 } 332 }
301 333
302 -  
303 @Override 334 @Override
304 public List<RuleNode> getRuleChainNodes(TenantId tenantId, RuleChainId ruleChainId) { 335 public List<RuleNode> getRuleChainNodes(TenantId tenantId, RuleChainId ruleChainId) {
305 Validator.validateId(ruleChainId, "Incorrect rule chain id for search request."); 336 Validator.validateId(ruleChainId, "Incorrect rule chain id for search request.");
@@ -319,6 +319,16 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest { @@ -319,6 +319,16 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
319 ruleChainService.deleteRuleChainById(tenantId, savedRuleChainMetaData.getRuleChainId()); 319 ruleChainService.deleteRuleChainById(tenantId, savedRuleChainMetaData.getRuleChainId());
320 } 320 }
321 321
  322 + @Test(expected = DataValidationException.class)
  323 + public void testUpdateRuleChainMetaDataWithCirclingRelation() throws Exception {
  324 + ruleChainService.saveRuleChainMetaData(tenantId, createRuleChainMetadataWithCirclingRelation());
  325 + }
  326 +
  327 + @Test(expected = DataValidationException.class)
  328 + public void testUpdateRuleChainMetaDataWithCirclingRelation2() throws Exception {
  329 + ruleChainService.saveRuleChainMetaData(tenantId, createRuleChainMetadataWithCirclingRelation2());
  330 + }
  331 +
322 @Test 332 @Test
323 public void testGetDefaultEdgeRuleChains() throws Exception { 333 public void testGetDefaultEdgeRuleChains() throws Exception {
324 RuleChainId ruleChainId = saveRuleChainAndSetDefaultEdge("Default Edge Rule Chain 1"); 334 RuleChainId ruleChainId = saveRuleChainAndSetDefaultEdge("Default Edge Rule Chain 1");
@@ -397,5 +407,85 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest { @@ -397,5 +407,85 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
397 return ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData); 407 return ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData);
398 } 408 }
399 409
  410 + private RuleChainMetaData createRuleChainMetadataWithCirclingRelation() throws Exception {
  411 + RuleChain ruleChain = new RuleChain();
  412 + ruleChain.setName("My RuleChain");
  413 + ruleChain.setTenantId(tenantId);
  414 + RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain);
  415 +
  416 + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
  417 + ruleChainMetaData.setRuleChainId(savedRuleChain.getId());
  418 +
  419 + ObjectMapper mapper = new ObjectMapper();
  420 +
  421 + RuleNode ruleNode1 = new RuleNode();
  422 + ruleNode1.setName("name1");
  423 + ruleNode1.setType("type1");
  424 + ruleNode1.setConfiguration(mapper.readTree("\"key1\": \"val1\""));
  425 +
  426 + RuleNode ruleNode2 = new RuleNode();
  427 + ruleNode2.setName("name2");
  428 + ruleNode2.setType("type2");
  429 + ruleNode2.setConfiguration(mapper.readTree("\"key2\": \"val2\""));
  430 +
  431 + RuleNode ruleNode3 = new RuleNode();
  432 + ruleNode3.setName("name3");
  433 + ruleNode3.setType("type3");
  434 + ruleNode3.setConfiguration(mapper.readTree("\"key3\": \"val3\""));
  435 +
  436 + List<RuleNode> ruleNodes = new ArrayList<>();
  437 + ruleNodes.add(ruleNode1);
  438 + ruleNodes.add(ruleNode2);
  439 + ruleNodes.add(ruleNode3);
  440 + ruleChainMetaData.setFirstNodeIndex(0);
  441 + ruleChainMetaData.setNodes(ruleNodes);
  442 +
  443 + ruleChainMetaData.addConnectionInfo(0,1,"success");
  444 + ruleChainMetaData.addConnectionInfo(0,2,"fail");
  445 + ruleChainMetaData.addConnectionInfo(1,2,"success");
  446 + ruleChainMetaData.addConnectionInfo(2,2,"success");
  447 +
  448 + return ruleChainMetaData;
  449 + }
  450 +
  451 + private RuleChainMetaData createRuleChainMetadataWithCirclingRelation2() throws Exception {
  452 + RuleChain ruleChain = new RuleChain();
  453 + ruleChain.setName("My RuleChain");
  454 + ruleChain.setTenantId(tenantId);
  455 + RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain);
  456 +
  457 + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
  458 + ruleChainMetaData.setRuleChainId(savedRuleChain.getId());
  459 +
  460 + ObjectMapper mapper = new ObjectMapper();
  461 +
  462 + RuleNode ruleNode1 = new RuleNode();
  463 + ruleNode1.setName("name1");
  464 + ruleNode1.setType("type1");
  465 + ruleNode1.setConfiguration(mapper.readTree("\"key1\": \"val1\""));
  466 +
  467 + RuleNode ruleNode2 = new RuleNode();
  468 + ruleNode2.setName("name2");
  469 + ruleNode2.setType("type2");
  470 + ruleNode2.setConfiguration(mapper.readTree("\"key2\": \"val2\""));
  471 +
  472 + RuleNode ruleNode3 = new RuleNode();
  473 + ruleNode3.setName("name3");
  474 + ruleNode3.setType("type3");
  475 + ruleNode3.setConfiguration(mapper.readTree("\"key3\": \"val3\""));
  476 +
  477 + List<RuleNode> ruleNodes = new ArrayList<>();
  478 + ruleNodes.add(ruleNode1);
  479 + ruleNodes.add(ruleNode2);
  480 + ruleNodes.add(ruleNode3);
  481 + ruleChainMetaData.setFirstNodeIndex(0);
  482 + ruleChainMetaData.setNodes(ruleNodes);
  483 +
  484 + ruleChainMetaData.addConnectionInfo(0,1,"success");
  485 + ruleChainMetaData.addConnectionInfo(0,2,"fail");
  486 + ruleChainMetaData.addConnectionInfo(1,2,"success");
  487 + ruleChainMetaData.addConnectionInfo(2,0,"success");
400 488
  489 + return ruleChainMetaData;
  490 + }
401 } 491 }
1 TB_QUEUE_TYPE=kafka 1 TB_QUEUE_TYPE=kafka
2 TB_KAFKA_SERVERS=kafka:9092 2 TB_KAFKA_SERVERS=kafka:9092
3 -TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100  
@@ -52,11 +52,13 @@ function AwsSqsProducer() { @@ -52,11 +52,13 @@ function AwsSqsProducer() {
52 queueUrls.set(responseTopic, responseQueueUrl); 52 queueUrls.set(responseTopic, responseQueueUrl);
53 } 53 }
54 54
  55 + let msgId = uuid();
  56 +
55 let params = { 57 let params = {
56 MessageBody: msgBody, 58 MessageBody: msgBody,
57 QueueUrl: responseQueueUrl, 59 QueueUrl: responseQueueUrl,
58 - MessageGroupId: 'js_eval',  
59 - MessageDeduplicationId: uuid() 60 + MessageGroupId: msgId,
  61 + MessageDeduplicationId: msgId
60 }; 62 };
61 63
62 return new Promise((resolve, reject) => { 64 return new Promise((resolve, reject) => {
@@ -143,7 +143,7 @@ public interface TbContext { @@ -143,7 +143,7 @@ public interface TbContext {
143 TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId); 143 TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId);
144 144
145 // TODO: Does this changes the message? 145 // TODO: Does this changes the message?
146 - TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId); 146 + TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action);
147 147
148 /* 148 /*
149 * 149 *
@@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbContext; @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbContext;
25 import org.thingsboard.rule.engine.api.TbNode; 25 import org.thingsboard.rule.engine.api.TbNode;
26 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 26 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
27 import org.thingsboard.rule.engine.api.TbNodeException; 27 import org.thingsboard.rule.engine.api.TbNodeException;
  28 +import org.thingsboard.server.common.data.DataConstants;
28 import org.thingsboard.server.common.data.alarm.Alarm; 29 import org.thingsboard.server.common.data.alarm.Alarm;
29 import org.thingsboard.server.common.msg.TbMsg; 30 import org.thingsboard.server.common.msg.TbMsg;
30 import org.thingsboard.server.common.msg.TbMsgMetaData; 31 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -61,13 +62,11 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura @@ -61,13 +62,11 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
61 if (alarmResult.alarm == null) { 62 if (alarmResult.alarm == null) {
62 ctx.tellNext(msg, "False"); 63 ctx.tellNext(msg, "False");
63 } else if (alarmResult.isCreated) { 64 } else if (alarmResult.isCreated) {
64 - ctx.enqueue(ctx.alarmCreatedMsg(alarmResult.alarm, ctx.getSelfId()),  
65 - () -> ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Created"),  
66 - throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable)); 65 + tellNext(ctx, msg, alarmResult, DataConstants.ENTITY_CREATED, "Created");
67 } else if (alarmResult.isUpdated) { 66 } else if (alarmResult.isUpdated) {
68 - ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated"); 67 + tellNext(ctx, msg, alarmResult, DataConstants.ENTITY_UPDATED, "Updated");
69 } else if (alarmResult.isCleared) { 68 } else if (alarmResult.isCleared) {
70 - ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared"); 69 + tellNext(ctx, msg, alarmResult, DataConstants.ALARM_CLEAR, "Cleared");
71 } else { 70 } else {
72 ctx.tellSuccess(msg); 71 ctx.tellSuccess(msg);
73 } 72 }
@@ -126,4 +125,10 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura @@ -126,4 +125,10 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
126 this.alarm = alarm; 125 this.alarm = alarm;
127 } 126 }
128 } 127 }
  128 +
  129 + private void tellNext(TbContext ctx, TbMsg msg, AlarmResult alarmResult, String alarmAction, String alarmResultMsgType) {
  130 + ctx.enqueue(ctx.alarmActionMsg(alarmResult.alarm, ctx.getSelfId(), alarmAction),
  131 + () -> ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmResultMsgType),
  132 + throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable));
  133 + }
129 } 134 }
@@ -34,6 +34,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine; @@ -34,6 +34,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine;
34 import org.thingsboard.rule.engine.api.TbContext; 34 import org.thingsboard.rule.engine.api.TbContext;
35 import org.thingsboard.rule.engine.api.TbNodeConfiguration; 35 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
36 import org.thingsboard.rule.engine.api.TbNodeException; 36 import org.thingsboard.rule.engine.api.TbNodeException;
  37 +import org.thingsboard.server.common.data.DataConstants;
37 import org.thingsboard.server.common.data.alarm.Alarm; 38 import org.thingsboard.server.common.data.alarm.Alarm;
38 import org.thingsboard.server.common.data.id.AlarmId; 39 import org.thingsboard.server.common.data.id.AlarmId;
39 import org.thingsboard.server.common.data.id.DeviceId; 40 import org.thingsboard.server.common.data.id.DeviceId;
@@ -249,6 +250,8 @@ public class TbAlarmNodeTest { @@ -249,6 +250,8 @@ public class TbAlarmNodeTest {
249 250
250 node.onMsg(ctx, msg); 251 node.onMsg(ctx, msg);
251 252
  253 + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture());
  254 + successCaptor.getValue().run();
252 verify(ctx).tellNext(any(), eq("Updated")); 255 verify(ctx).tellNext(any(), eq("Updated"));
253 256
254 ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class); 257 ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
@@ -297,6 +300,8 @@ public class TbAlarmNodeTest { @@ -297,6 +300,8 @@ public class TbAlarmNodeTest {
297 300
298 node.onMsg(ctx, msg); 301 node.onMsg(ctx, msg);
299 302
  303 + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture());
  304 + successCaptor.getValue().run();
300 verify(ctx).tellNext(any(), eq("Cleared")); 305 verify(ctx).tellNext(any(), eq("Cleared"));
301 306
302 ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class); 307 ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
@@ -345,6 +350,8 @@ public class TbAlarmNodeTest { @@ -345,6 +350,8 @@ public class TbAlarmNodeTest {
345 350
346 node.onMsg(ctx, msg); 351 node.onMsg(ctx, msg);
347 352
  353 + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture());
  354 + successCaptor.getValue().run();
348 verify(ctx).tellNext(any(), eq("Cleared")); 355 verify(ctx).tellNext(any(), eq("Cleared"));
349 356
350 ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class); 357 ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
@@ -34,6 +34,14 @@ @@ -34,6 +34,14 @@
34 <div translate ng-message="required">admin.base-url-required</div> 34 <div translate ng-message="required">admin.base-url-required</div>
35 </div> 35 </div>
36 </md-input-container> 36 </md-input-container>
  37 + <md-checkbox class="md-block"
  38 + aria-label="{{ 'admin.prohibit-different-url' | translate }}"
  39 + ng-model="vm.settings.jsonValue.prohibitDifferentUrl">
  40 + {{ 'admin.prohibit-different-url' | translate }}
  41 + </md-checkbox>
  42 + <div translate class="tb-hint">
  43 + admin.prohibit-different-url-hint
  44 + </div>
37 <div layout="row" layout-align="end center" width="100%" layout-wrap> 45 <div layout="row" layout-align="end center" width="100%" layout-wrap>
38 <md-button ng-disabled="$root.loading || vm.settingsForm.$invalid || !vm.settingsForm.$dirty" type="submit" class="md-raised md-primary">{{'action.save' | translate}}</md-button> 46 <md-button ng-disabled="$root.loading || vm.settingsForm.$invalid || !vm.settingsForm.$dirty" type="submit" class="md-raised md-primary">{{'action.save' | translate}}</md-button>
39 </div> 47 </div>
@@ -74,6 +74,8 @@ @@ -74,6 +74,8 @@
74 "test-mail-sent": "Test mail was successfully sent!", 74 "test-mail-sent": "Test mail was successfully sent!",
75 "base-url": "Base URL", 75 "base-url": "Base URL",
76 "base-url-required": "Base URL is required.", 76 "base-url-required": "Base URL is required.",
  77 + "prohibit-different-url": "Prohibit to use hostname from the client request headers",
  78 + "prohibit-different-url-hint": "This setting should be enabled for production environments. May cause security issues when disabled",
77 "mail-from": "Mail From", 79 "mail-from": "Mail From",
78 "mail-from-required": "Mail From is required.", 80 "mail-from-required": "Mail From is required.",
79 "smtp-protocol": "SMTP protocol", 81 "smtp-protocol": "SMTP protocol",