Commit c0c71284050aa44fb7c569033019c356b2b76bc1

Authored by vparomskiy
2 parents 76e852bf 02e2c14f

Merge branch 'develop/2.0' of https://github.com/thingsboard/thingsboard into develop/2.0

Showing 39 changed files with 165 additions and 995 deletions
application/src/main/data/upgrade/2.0.0/schema_update.cql renamed from application/src/main/data/upgrade/1.5.0/schema_update.cql
application/src/main/data/upgrade/2.0.0/schema_update.sql renamed from application/src/main/data/upgrade/1.5.0/schema_update.sql
@@ -235,6 +235,10 @@ public class ActorSystemContext { @@ -235,6 +235,10 @@ public class ActorSystemContext {
235 @Getter 235 @Getter
236 private boolean tenantComponentsInitEnabled; 236 private boolean tenantComponentsInitEnabled;
237 237
  238 + @Value("${actors.rule.allow_system_mail_service}")
  239 + @Getter
  240 + private boolean allowSystemMailService;
  241 +
238 @Getter 242 @Getter
239 @Setter 243 @Setter
240 private ActorSystem actorSystem; 244 private ActorSystem actorSystem;
@@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext { @@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext {
209 209
210 @Override 210 @Override
211 public MailService getMailService() { 211 public MailService getMailService() {
212 - return mainCtx.getMailService(); 212 + if (mainCtx.isAllowSystemMailService()) {
  213 + return mainCtx.getMailService();
  214 + } else {
  215 + throw new RuntimeException("Access to System Mail Service is forbidden!");
  216 + }
213 } 217 }
214 218
215 @Override 219 @Override
@@ -183,23 +183,35 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -183,23 +183,35 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
183 183
184 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { 184 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
185 checkActive(); 185 checkActive();
186 - putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, "")); 186 + if (firstNode != null) {
  187 + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
  188 + }
187 } 189 }
188 190
189 void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) { 191 void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
190 checkActive(); 192 checkActive();
191 - putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {  
192 - pushMsgToNode(firstNode, msg, "");  
193 - envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);  
194 - }); 193 + if (firstNode != null) {
  194 + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
  195 + pushMsgToNode(firstNode, msg, "");
  196 + envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
  197 + });
  198 + }
195 } 199 }
196 200
197 void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) { 201 void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
198 checkActive(); 202 checkActive();
199 if (envelope.isEnqueue()) { 203 if (envelope.isEnqueue()) {
200 - putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType())); 204 + if (firstNode != null) {
  205 + putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
  206 + }
201 } else { 207 } else {
202 - pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType()); 208 + if (firstNode != null) {
  209 + pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
  210 + } else {
  211 + TbMsg msg = envelope.getMsg();
  212 + EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
  213 + queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
  214 + }
203 } 215 }
204 } 216 }
205 217
@@ -82,7 +82,7 @@ public class ThingsboardInstallService { @@ -82,7 +82,7 @@ public class ThingsboardInstallService {
82 databaseUpgradeService.upgradeDatabase("1.3.1"); 82 databaseUpgradeService.upgradeDatabase("1.3.1");
83 83
84 case "1.4.0": 84 case "1.4.0":
85 - log.info("Upgrading ThingsBoard from version 1.4.0 to 1.5.0 ..."); 85 + log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ...");
86 86
87 databaseUpgradeService.upgradeDatabase("1.4.0"); 87 databaseUpgradeService.upgradeDatabase("1.4.0");
88 88
@@ -198,7 +198,7 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService { @@ -198,7 +198,7 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
198 case "1.4.0": 198 case "1.4.0":
199 199
200 log.info("Updating schema ..."); 200 log.info("Updating schema ...");
201 - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.5.0", SCHEMA_UPDATE_CQL); 201 + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_CQL);
202 loadCql(schemaUpdateFile); 202 loadCql(schemaUpdateFile);
203 log.info("Schema updated."); 203 log.info("Schema updated.");
204 204
@@ -47,7 +47,7 @@ public class DefaultDataUpdateService implements DataUpdateService { @@ -47,7 +47,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
47 public void updateData(String fromVersion) throws Exception { 47 public void updateData(String fromVersion) throws Exception {
48 switch (fromVersion) { 48 switch (fromVersion) {
49 case "1.4.0": 49 case "1.4.0":
50 - log.info("Updating data from version 1.4.0 to 1.5.0 ..."); 50 + log.info("Updating data from version 1.4.0 to 2.0.0 ...");
51 tenantsDefaultRuleChainUpdater.updateEntities(null); 51 tenantsDefaultRuleChainUpdater.updateEntities(null);
52 break; 52 break;
53 default: 53 default:
@@ -104,7 +104,7 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService { @@ -104,7 +104,7 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
104 case "1.4.0": 104 case "1.4.0":
105 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { 105 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
106 log.info("Updating schema ..."); 106 log.info("Updating schema ...");
107 - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.5.0", SCHEMA_UPDATE_SQL); 107 + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_SQL);
108 String sql = new String(Files.readAllBytes(schemaUpdateFile), Charset.forName("UTF-8")); 108 String sql = new String(Files.readAllBytes(schemaUpdateFile), Charset.forName("UTF-8"));
109 conn.createStatement().execute(sql); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script 109 conn.createStatement().execute(sql); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
110 log.info("Schema updated."); 110 log.info("Schema updated.");
@@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong; @@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong;
40 @Slf4j 40 @Slf4j
41 public class DefaultMsgQueueService implements MsgQueueService { 41 public class DefaultMsgQueueService implements MsgQueueService {
42 42
43 - @Value("${rule.queue.max_size}") 43 + @Value("${actors.rule.queue.max_size}")
44 private long queueMaxSize; 44 private long queueMaxSize;
45 45
46 - @Value("${rule.queue.cleanup_period}") 46 + @Value("${actors.rule.queue.cleanup_period}")
47 private long queueCleanUpPeriod; 47 private long queueCleanUpPeriod;
48 48
49 @Autowired 49 @Autowired
@@ -203,6 +203,7 @@ cassandra: @@ -203,6 +203,7 @@ cassandra:
203 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 203 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
204 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS 204 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
205 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 205 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  206 + ts_key_value_ttl: "${TS_KV_TTL:0}"
206 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" 207 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
207 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" 208 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
208 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" 209 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
@@ -236,6 +237,8 @@ actors: @@ -236,6 +237,8 @@ actors:
236 js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}" 237 js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
237 # Specify thread pool size for mail sender executor service 238 # Specify thread pool size for mail sender executor service
238 mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}" 239 mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
  240 + # Whether to allow usage of system mail service for rules
  241 + allow_system_mail_service: "${ACTORS_RULE_ALLOW_SYSTEM_MAIL_SERVICE:true}"
239 # Specify thread pool size for external call service 242 # Specify thread pool size for external call service
240 external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}" 243 external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}"
241 js_sandbox: 244 js_sandbox:
@@ -253,6 +256,13 @@ actors: @@ -253,6 +256,13 @@ actors:
253 node: 256 node:
254 # Errors for particular actor are persisted once per specified amount of milliseconds 257 # Errors for particular actor are persisted once per specified amount of milliseconds
255 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" 258 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
  259 + queue:
  260 + # Message queue type
  261 + type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
  262 + # Message queue maximum size (per tenant)
  263 + max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
  264 + # Message queue cleanup period in seconds
  265 + cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
256 statistics: 266 statistics:
257 # Enable/disable actor statistics 267 # Enable/disable actor statistics
258 enabled: "${ACTORS_STATISTICS_ENABLED:true}" 268 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -333,16 +343,6 @@ spring: @@ -333,16 +343,6 @@ spring:
333 username: "${SPRING_DATASOURCE_USERNAME:sa}" 343 username: "${SPRING_DATASOURCE_USERNAME:sa}"
334 password: "${SPRING_DATASOURCE_PASSWORD:}" 344 password: "${SPRING_DATASOURCE_PASSWORD:}"
335 345
336 -rule:  
337 - queue:  
338 - #Message queue type (memory or db)  
339 - type: "${RULE_QUEUE_TYPE:memory}"  
340 - #Message queue maximum size (per tenant)  
341 - max_size: "${RULE_QUEUE_MAX_SIZE:100}"  
342 - #Message queue cleanup period in seconds  
343 - cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"  
344 -  
345 -  
346 # PostgreSQL DAO Configuration 346 # PostgreSQL DAO Configuration
347 #spring: 347 #spring:
348 # data: 348 # data:
@@ -32,6 +32,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; @@ -32,6 +32,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
32 import org.thingsboard.server.service.queue.MsgQueueService; 32 import org.thingsboard.server.service.queue.MsgQueueService;
33 33
34 import java.io.IOException; 34 import java.io.IOException;
  35 +import java.util.function.Predicate;
35 36
36 /** 37 /**
37 * Created by ashvayka on 20.03.18. 38 * Created by ashvayka on 20.03.18.
@@ -75,4 +76,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest { @@ -75,4 +76,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
75 throw new RuntimeException(e); 76 throw new RuntimeException(e);
76 } 77 }
77 } 78 }
  79 +
  80 + protected Predicate<Event> filterByCustomEvent() {
  81 + return event -> event.getBody().get("msgType").textValue().equals("CUSTOM");
  82 + }
  83 +
78 } 84 }
@@ -47,6 +47,8 @@ import java.io.IOException; @@ -47,6 +47,8 @@ import java.io.IOException;
47 import java.util.Arrays; 47 import java.util.Arrays;
48 import java.util.Collections; 48 import java.util.Collections;
49 import java.util.List; 49 import java.util.List;
  50 +import java.util.function.Predicate;
  51 +import java.util.stream.Collectors;
50 52
51 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; 53 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
52 54
@@ -157,15 +159,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -157,15 +159,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
157 159
158 Thread.sleep(3000); 160 Thread.sleep(3000);
159 161
160 - TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); 162 + TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
  163 + List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
  164 + Assert.assertEquals(2, events.size());
161 165
162 - Assert.assertEquals(2, events.getData().size());  
163 -  
164 - Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); 166 + Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
165 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); 167 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
166 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); 168 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
167 169
168 - Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); 170 + Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
169 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); 171 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
170 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); 172 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
171 173
@@ -174,15 +176,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -174,15 +176,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
174 RuleChain finalRuleChain = ruleChain; 176 RuleChain finalRuleChain = ruleChain;
175 RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); 177 RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
176 178
177 - events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); 179 + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
  180 + events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
178 181
179 - Assert.assertEquals(2, events.getData().size()); 182 + Assert.assertEquals(2, events.size());
180 183
181 - inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); 184 + inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
182 Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId()); 185 Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
183 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); 186 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
184 187
185 - outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); 188 + outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
186 Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId()); 189 Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
187 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); 190 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
188 191
@@ -274,15 +277,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -274,15 +277,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
274 277
275 Thread.sleep(3000); 278 Thread.sleep(3000);
276 279
277 - TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000); 280 + TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
  281 + List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
278 282
279 - Assert.assertEquals(2, events.getData().size()); 283 + Assert.assertEquals(2, events.size());
280 284
281 - Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); 285 + Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
282 Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId()); 286 Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
283 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); 287 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
284 288
285 - Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); 289 + Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
286 Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId()); 290 Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
287 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); 291 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
288 292
@@ -291,15 +295,17 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -291,15 +295,17 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
291 RuleChain finalRuleChain = rootRuleChain; 295 RuleChain finalRuleChain = rootRuleChain;
292 RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); 296 RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
293 297
294 - events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); 298 + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
  299 + events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
  300 +
295 301
296 - Assert.assertEquals(2, events.getData().size()); 302 + Assert.assertEquals(2, events.size());
297 303
298 - inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); 304 + inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
299 Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId()); 305 Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
300 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); 306 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
301 307
302 - outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); 308 + outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
303 Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId()); 309 Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
304 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); 310 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
305 311
@@ -45,6 +45,8 @@ import org.thingsboard.server.dao.attributes.AttributesService; @@ -45,6 +45,8 @@ import org.thingsboard.server.dao.attributes.AttributesService;
45 45
46 import java.io.IOException; 46 import java.io.IOException;
47 import java.util.Collections; 47 import java.util.Collections;
  48 +import java.util.List;
  49 +import java.util.stream.Collectors;
48 50
49 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; 51 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
50 52
@@ -144,15 +146,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac @@ -144,15 +146,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
144 146
145 Thread.sleep(3000); 147 Thread.sleep(3000);
146 148
147 - TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); 149 + TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
  150 + List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
148 151
149 - Assert.assertEquals(2, events.getData().size()); 152 + Assert.assertEquals(2, events.size());
150 153
151 - Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); 154 + Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
152 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); 155 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
153 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); 156 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
154 157
155 - Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); 158 + Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
156 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); 159 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
157 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); 160 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
158 161
@@ -212,15 +215,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac @@ -212,15 +215,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
212 215
213 Thread.sleep(3000); 216 Thread.sleep(3000);
214 217
215 - TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); 218 + TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
  219 + List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
216 220
217 - Assert.assertEquals(2, events.getData().size()); 221 + Assert.assertEquals(2, events.size());
218 222
219 - Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); 223 + Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
220 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); 224 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
221 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); 225 Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
222 226
223 - Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); 227 + Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
224 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); 228 Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
225 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); 229 Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
226 230
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db;  
17 -  
18 -import lombok.Data;  
19 -import lombok.EqualsAndHashCode;  
20 -  
21 -import java.util.UUID;  
22 -  
23 -@Data  
24 -@EqualsAndHashCode  
25 -public class MsgAck {  
26 -  
27 - private final UUID msgId;  
28 - private final UUID nodeId;  
29 - private final long clusteredPartition;  
30 - private final long tsPartition;  
31 -  
32 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db;  
17 -  
18 -import org.springframework.stereotype.Component;  
19 -import org.thingsboard.server.common.msg.TbMsg;  
20 -import org.thingsboard.server.dao.queue.db.MsgAck;  
21 -  
22 -import java.util.Collection;  
23 -import java.util.List;  
24 -import java.util.Set;  
25 -import java.util.UUID;  
26 -import java.util.stream.Collectors;  
27 -  
28 -@Component  
29 -public class UnprocessedMsgFilter {  
30 -  
31 - public Collection<TbMsg> filter(List<TbMsg> msgs, List<MsgAck> acks) {  
32 - Set<UUID> processedIds = acks.stream().map(MsgAck::getMsgId).collect(Collectors.toSet());  
33 - return msgs.stream().filter(i -> !processedIds.contains(i.getId())).collect(Collectors.toList());  
34 - }  
35 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -import com.datastax.driver.core.utils.UUIDs;  
19 -import com.google.common.collect.Lists;  
20 -import com.google.common.util.concurrent.Futures;  
21 -import com.google.common.util.concurrent.ListenableFuture;  
22 -import lombok.extern.slf4j.Slf4j;  
23 -import org.springframework.beans.factory.annotation.Autowired;  
24 -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;  
25 -import org.springframework.stereotype.Component;  
26 -import org.thingsboard.server.common.data.id.TenantId;  
27 -import org.thingsboard.server.common.msg.TbMsg;  
28 -import org.thingsboard.server.dao.queue.MsgQueue;  
29 -import org.thingsboard.server.dao.queue.db.MsgAck;  
30 -import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;  
31 -import org.thingsboard.server.dao.queue.db.repository.AckRepository;  
32 -import org.thingsboard.server.dao.queue.db.repository.MsgRepository;  
33 -import org.thingsboard.server.dao.util.NoSqlDao;  
34 -  
35 -import java.util.List;  
36 -import java.util.UUID;  
37 -  
38 -@Component  
39 -@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "db")  
40 -@Slf4j  
41 -@NoSqlDao  
42 -public class CassandraMsgQueue implements MsgQueue {  
43 -  
44 - @Autowired  
45 - private MsgRepository msgRepository;  
46 - @Autowired  
47 - private AckRepository ackRepository;  
48 - @Autowired  
49 - private UnprocessedMsgFilter unprocessedMsgFilter;  
50 - @Autowired  
51 - private QueuePartitioner queuePartitioner;  
52 -  
53 - @Override  
54 - public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {  
55 - long msgTime = getMsgTime(msg);  
56 - long tsPartition = queuePartitioner.getPartition(msgTime);  
57 - return msgRepository.save(msg, nodeId, clusterPartition, tsPartition, msgTime);  
58 - }  
59 -  
60 - @Override  
61 - public ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {  
62 - long tsPartition = queuePartitioner.getPartition(getMsgTime(msg));  
63 - MsgAck ack = new MsgAck(msg.getId(), nodeId, clusterPartition, tsPartition);  
64 - return ackRepository.ack(ack);  
65 - }  
66 -  
67 - @Override  
68 - public Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) {  
69 - List<TbMsg> unprocessedMsgs = Lists.newArrayList();  
70 - for (Long tsPartition : queuePartitioner.findUnprocessedPartitions(nodeId, clusterPartition)) {  
71 - List<TbMsg> msgs = msgRepository.findMsgs(nodeId, clusterPartition, tsPartition);  
72 - List<MsgAck> acks = ackRepository.findAcks(nodeId, clusterPartition, tsPartition);  
73 - unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks));  
74 - }  
75 - return unprocessedMsgs;  
76 - }  
77 -  
78 - @Override  
79 - public ListenableFuture<Void> cleanUp(TenantId tenantId) {  
80 - return Futures.immediateFuture(null);  
81 - }  
82 -  
83 - private long getMsgTime(TbMsg msg) {  
84 - return UUIDs.unixTimestamp(msg.getId());  
85 - }  
86 -  
87 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -import com.google.common.collect.Lists;  
19 -import lombok.extern.slf4j.Slf4j;  
20 -import org.springframework.beans.factory.annotation.Value;  
21 -import org.springframework.stereotype.Component;  
22 -import org.thingsboard.server.dao.queue.db.repository.ProcessedPartitionRepository;  
23 -import org.thingsboard.server.dao.timeseries.TsPartitionDate;  
24 -import org.thingsboard.server.dao.util.NoSqlDao;  
25 -  
26 -import java.time.Clock;  
27 -import java.time.Instant;  
28 -import java.time.LocalDateTime;  
29 -import java.time.ZoneOffset;  
30 -import java.util.List;  
31 -import java.util.Optional;  
32 -import java.util.UUID;  
33 -import java.util.concurrent.TimeUnit;  
34 -  
35 -@Component  
36 -@Slf4j  
37 -@NoSqlDao  
38 -public class QueuePartitioner {  
39 -  
40 - private final TsPartitionDate tsFormat;  
41 - private ProcessedPartitionRepository processedPartitionRepository;  
42 - private Clock clock = Clock.systemUTC();  
43 -  
44 - public QueuePartitioner(@Value("${cassandra.queue.partitioning}") String partitioning,  
45 - ProcessedPartitionRepository processedPartitionRepository) {  
46 - this.processedPartitionRepository = processedPartitionRepository;  
47 - Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);  
48 - if (partition.isPresent()) {  
49 - tsFormat = partition.get();  
50 - } else {  
51 - log.warn("Incorrect configuration of partitioning {}", partitioning);  
52 - throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");  
53 - }  
54 - }  
55 -  
56 - public long getPartition(long ts) {  
57 - //TODO: use TsPartitionDate.truncateTo?  
58 - LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);  
59 - return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();  
60 - }  
61 -  
62 - public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {  
63 - Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);  
64 - long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7));  
65 - List<Long> unprocessedPartitions = Lists.newArrayList();  
66 -  
67 - LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);  
68 - LocalDateTime end = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC)  
69 - .plus(1L, tsFormat.getTruncateUnit());  
70 -  
71 - while (current.isBefore(end)) {  
72 - current = current.plus(1L, tsFormat.getTruncateUnit());  
73 - unprocessedPartitions.add(tsFormat.truncatedTo(current).toInstant(ZoneOffset.UTC).toEpochMilli());  
74 - }  
75 -  
76 - return unprocessedPartitions;  
77 - }  
78 -  
79 - public void setClock(Clock clock) {  
80 - this.clock = clock;  
81 - }  
82 -  
83 - public void checkProcessedPartitions() {  
84 - //todo-vp: we need to implement this  
85 - }  
86 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.BoundStatement;  
19 -import com.datastax.driver.core.PreparedStatement;  
20 -import com.datastax.driver.core.ResultSet;  
21 -import com.datastax.driver.core.ResultSetFuture;  
22 -import com.datastax.driver.core.Row;  
23 -import com.google.common.base.Function;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import org.springframework.beans.factory.annotation.Value;  
27 -import org.springframework.stereotype.Component;  
28 -import org.thingsboard.server.dao.nosql.CassandraAbstractDao;  
29 -import org.thingsboard.server.dao.queue.db.MsgAck;  
30 -import org.thingsboard.server.dao.queue.db.repository.AckRepository;  
31 -import org.thingsboard.server.dao.util.NoSqlDao;  
32 -  
33 -import java.util.ArrayList;  
34 -import java.util.List;  
35 -import java.util.UUID;  
36 -  
37 -@Component  
38 -@NoSqlDao  
39 -public class CassandraAckRepository extends CassandraAbstractDao implements AckRepository {  
40 -  
41 - @Value("${cassandra.queue.ack.ttl}")  
42 - private int ackQueueTtl;  
43 -  
44 - @Override  
45 - public ListenableFuture<Void> ack(MsgAck msgAck) {  
46 - String insert = "INSERT INTO msg_ack_queue (node_id, cluster_partition, ts_partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?";  
47 - PreparedStatement statement = prepare(insert);  
48 - BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredPartition(),  
49 - msgAck.getTsPartition(), msgAck.getMsgId(), ackQueueTtl);  
50 - ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);  
51 - return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);  
52 - }  
53 -  
54 - @Override  
55 - public List<MsgAck> findAcks(UUID nodeId, long clusterPartition, long tsPartition) {  
56 - String select = "SELECT msg_id FROM msg_ack_queue WHERE " +  
57 - "node_id = ? AND cluster_partition = ? AND ts_partition = ?";  
58 - PreparedStatement statement = prepare(select);  
59 - BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);  
60 - ResultSet rows = executeRead(boundStatement);  
61 - List<MsgAck> msgs = new ArrayList<>();  
62 - for (Row row : rows) {  
63 - msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusterPartition, tsPartition));  
64 - }  
65 - return msgs;  
66 - }  
67 -  
68 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.BoundStatement;  
19 -import com.datastax.driver.core.PreparedStatement;  
20 -import com.datastax.driver.core.ResultSet;  
21 -import com.datastax.driver.core.ResultSetFuture;  
22 -import com.datastax.driver.core.Row;  
23 -import com.google.common.base.Function;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import org.springframework.beans.factory.annotation.Value;  
27 -import org.springframework.stereotype.Component;  
28 -import org.thingsboard.server.common.msg.TbMsg;  
29 -import org.thingsboard.server.dao.nosql.CassandraAbstractDao;  
30 -import org.thingsboard.server.dao.queue.db.repository.MsgRepository;  
31 -import org.thingsboard.server.dao.util.NoSqlDao;  
32 -  
33 -import java.util.ArrayList;  
34 -import java.util.List;  
35 -import java.util.UUID;  
36 -  
37 -@Component  
38 -@NoSqlDao  
39 -public class CassandraMsgRepository extends CassandraAbstractDao implements MsgRepository {  
40 -  
41 - @Value("${cassandra.queue.msg.ttl}")  
42 - private int msqQueueTtl;  
43 -  
44 - @Override  
45 - public ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs) {  
46 - String insert = "INSERT INTO msg_queue (node_id, cluster_partition, ts_partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?";  
47 - PreparedStatement statement = prepare(insert);  
48 - BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition, msgTs, TbMsg.toBytes(msg), msqQueueTtl);  
49 - ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);  
50 - return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);  
51 - }  
52 -  
53 - @Override  
54 - public List<TbMsg> findMsgs(UUID nodeId, long clusterPartition, long tsPartition) {  
55 - String select = "SELECT node_id, cluster_partition, ts_partition, ts, msg FROM msg_queue WHERE " +  
56 - "node_id = ? AND cluster_partition = ? AND ts_partition = ?";  
57 - PreparedStatement statement = prepare(select);  
58 - BoundStatement boundStatement = statement.bind(nodeId, clusterPartition, tsPartition);  
59 - ResultSet rows = executeRead(boundStatement);  
60 - List<TbMsg> msgs = new ArrayList<>();  
61 - for (Row row : rows) {  
62 - msgs.add(TbMsg.fromBytes(row.getBytes("msg")));  
63 - }  
64 - return msgs;  
65 - }  
66 -  
67 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.BoundStatement;  
19 -import com.datastax.driver.core.PreparedStatement;  
20 -import com.datastax.driver.core.ResultSet;  
21 -import com.datastax.driver.core.ResultSetFuture;  
22 -import com.datastax.driver.core.Row;  
23 -import com.google.common.base.Function;  
24 -import com.google.common.util.concurrent.Futures;  
25 -import com.google.common.util.concurrent.ListenableFuture;  
26 -import org.springframework.beans.factory.annotation.Value;  
27 -import org.springframework.stereotype.Component;  
28 -import org.thingsboard.server.dao.nosql.CassandraAbstractDao;  
29 -import org.thingsboard.server.dao.queue.db.repository.ProcessedPartitionRepository;  
30 -import org.thingsboard.server.dao.util.NoSqlDao;  
31 -  
32 -import java.util.Optional;  
33 -import java.util.UUID;  
34 -  
35 -@Component  
36 -@NoSqlDao  
37 -public class CassandraProcessedPartitionRepository extends CassandraAbstractDao implements ProcessedPartitionRepository {  
38 -  
39 - @Value("${cassandra.queue.partitions.ttl}")  
40 - private int partitionsTtl;  
41 -  
42 - @Override  
43 - public ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusterPartition, long tsPartition) {  
44 - String insert = "INSERT INTO processed_msg_partitions (node_id, cluster_partition, ts_partition) VALUES (?, ?, ?) USING TTL ?";  
45 - PreparedStatement prepared = prepare(insert);  
46 - BoundStatement boundStatement = prepared.bind(nodeId, clusterPartition, tsPartition, partitionsTtl);  
47 - ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement);  
48 - return Futures.transform(resultSetFuture, (Function<ResultSet, Void>) input -> null);  
49 - }  
50 -  
51 - @Override  
52 - public Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash) {  
53 - String select = "SELECT ts_partition FROM processed_msg_partitions WHERE " +  
54 - "node_id = ? AND cluster_partition = ?";  
55 - PreparedStatement prepared = prepare(select);  
56 - BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash);  
57 - Row row = executeRead(boundStatement).one();  
58 - if (row == null) {  
59 - return Optional.empty();  
60 - }  
61 -  
62 - return Optional.of(row.getLong("ts_partition"));  
63 - }  
64 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.repository;  
17 -  
18 -import com.google.common.util.concurrent.ListenableFuture;  
19 -import org.thingsboard.server.dao.queue.db.MsgAck;  
20 -  
21 -import java.util.List;  
22 -import java.util.UUID;  
23 -  
24 -public interface AckRepository {  
25 -  
26 - ListenableFuture<Void> ack(MsgAck msgAck);  
27 -  
28 - List<MsgAck> findAcks(UUID nodeId, long clusterPartition, long tsPartition);  
29 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.repository;  
17 -  
18 -import com.google.common.util.concurrent.ListenableFuture;  
19 -import org.thingsboard.server.common.msg.TbMsg;  
20 -  
21 -import java.util.List;  
22 -import java.util.UUID;  
23 -  
24 -public interface MsgRepository {  
25 -  
26 - ListenableFuture<Void> save(TbMsg msg, UUID nodeId, long clusterPartition, long tsPartition, long msgTs);  
27 -  
28 - List<TbMsg> findMsgs(UUID nodeId, long clusterPartition, long tsPartition);  
29 -  
30 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.repository;  
17 -  
18 -import com.google.common.util.concurrent.ListenableFuture;  
19 -  
20 -import java.util.Optional;  
21 -import java.util.UUID;  
22 -  
23 -public interface ProcessedPartitionRepository {  
24 -  
25 - ListenableFuture<Void> partitionProcessed(UUID nodeId, long clusteredHash, long partition);  
26 -  
27 - Optional<Long> findLastProcessedPartition(UUID nodeId, long clusteredHash);  
28 -  
29 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.sql;  
17 -  
18 -//@todo-vp: implement  
19 -public class SqlMsgQueue {  
20 -}  
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors; @@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
40 * Created by ashvayka on 27.04.18. 40 * Created by ashvayka on 27.04.18.
41 */ 41 */
42 @Component 42 @Component
43 -@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true) 43 +@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
44 @Slf4j 44 @Slf4j
45 public class InMemoryMsgQueue implements MsgQueue { 45 public class InMemoryMsgQueue implements MsgQueue {
46 46
@@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
82 @Value("${cassandra.query.ts_key_value_partitioning}") 82 @Value("${cassandra.query.ts_key_value_partitioning}")
83 private String partitioning; 83 private String partitioning;
84 84
  85 + @Value("${cassandra.query.ts_key_value_ttl}")
  86 + private long systemTtl;
  87 +
85 private TsPartitionDate tsFormat; 88 private TsPartitionDate tsFormat;
86 89
87 private PreparedStatement partitionInsertStmt; 90 private PreparedStatement partitionInsertStmt;
@@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
287 290
288 @Override 291 @Override
289 public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) { 292 public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
  293 + ttl = computeTtl(ttl);
290 long partition = toPartitionTs(tsKvEntry.getTs()); 294 long partition = toPartitionTs(tsKvEntry.getTs());
291 DataType type = tsKvEntry.getDataType(); 295 DataType type = tsKvEntry.getDataType();
292 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); 296 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
@@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
304 308
305 @Override 309 @Override
306 public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { 310 public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
  311 + ttl = computeTtl(ttl);
307 long partition = toPartitionTs(tsKvEntryTs); 312 long partition = toPartitionTs(tsKvEntryTs);
308 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); 313 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
309 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind(); 314 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
@@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
317 return getFuture(executeAsyncWrite(stmt), rs -> null); 322 return getFuture(executeAsyncWrite(stmt), rs -> null);
318 } 323 }
319 324
  325 + private long computeTtl(long ttl) {
  326 + if (systemTtl > 0) {
  327 + if (ttl == 0) {
  328 + ttl = systemTtl;
  329 + } else {
  330 + ttl = Math.min(systemTtl, ttl);
  331 + }
  332 + }
  333 + return ttl;
  334 + }
  335 +
320 @Override 336 @Override
321 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { 337 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
322 BoundStatement stmt = getLatestStmt().bind() 338 BoundStatement stmt = getLatestStmt().bind()
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -  
19 -import org.junit.Before;  
20 -import org.junit.Test;  
21 -import org.junit.runner.RunWith;  
22 -import org.mockito.Mock;  
23 -import org.mockito.runners.MockitoJUnitRunner;  
24 -import org.thingsboard.server.dao.queue.db.repository.ProcessedPartitionRepository;  
25 -  
26 -import java.time.Clock;  
27 -import java.time.Instant;  
28 -import java.time.ZoneOffset;  
29 -import java.time.temporal.ChronoUnit;  
30 -import java.util.List;  
31 -import java.util.Optional;  
32 -import java.util.UUID;  
33 -  
34 -import static org.junit.Assert.assertEquals;  
35 -import static org.mockito.Mockito.when;  
36 -  
37 -@RunWith(MockitoJUnitRunner.class)  
38 -public class QueuePartitionerTest {  
39 -  
40 - private QueuePartitioner queuePartitioner;  
41 -  
42 - @Mock  
43 - private ProcessedPartitionRepository partitionRepo;  
44 -  
45 - private Instant startInstant;  
46 - private Instant endInstant;  
47 -  
48 - @Before  
49 - public void init() {  
50 - queuePartitioner = new QueuePartitioner("MINUTES", partitionRepo);  
51 - startInstant = Instant.now();  
52 - endInstant = startInstant.plus(2, ChronoUnit.MINUTES);  
53 - queuePartitioner.setClock(Clock.fixed(endInstant, ZoneOffset.UTC));  
54 - }  
55 -  
56 - @Test  
57 - public void partitionCalculated() {  
58 - long time = 1519390191425L;  
59 - long partition = queuePartitioner.getPartition(time);  
60 - assertEquals(1519390140000L, partition);  
61 - }  
62 -  
63 - @Test  
64 - public void unprocessedPartitionsReturned() {  
65 - UUID nodeId = UUID.randomUUID();  
66 - long clusteredHash = 101L;  
67 - when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.of(startInstant.toEpochMilli()));  
68 - List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);  
69 - assertEquals(3, actual.size());  
70 - }  
71 -  
72 - @Test  
73 - public void defaultShiftUsedIfNoPartitionWasProcessed() {  
74 - UUID nodeId = UUID.randomUUID();  
75 - long clusteredHash = 101L;  
76 - when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());  
77 - List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);  
78 - assertEquals(10083, actual.size());  
79 - }  
80 -  
81 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql;  
17 -  
18 -import com.google.common.collect.Lists;  
19 -import org.junit.Test;  
20 -import org.thingsboard.server.common.msg.TbMsg;  
21 -import org.thingsboard.server.dao.queue.db.MsgAck;  
22 -import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;  
23 -  
24 -import java.util.Collection;  
25 -import java.util.List;  
26 -import java.util.UUID;  
27 -  
28 -import static org.junit.Assert.assertEquals;  
29 -  
30 -public class UnprocessedMsgFilterTest {  
31 -  
32 - private UnprocessedMsgFilter msgFilter = new UnprocessedMsgFilter();  
33 -  
34 - @Test  
35 - public void acknowledgedMsgsAreFilteredOut() {  
36 - UUID id1 = UUID.randomUUID();  
37 - UUID id2 = UUID.randomUUID();  
38 - TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null, null, null, 0L);  
39 - TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null, null, null, 0L);  
40 - List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);  
41 - List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));  
42 - Collection<TbMsg> actual = msgFilter.filter(msgs, acks);  
43 - assertEquals(1, actual.size());  
44 - assertEquals(msg1, actual.iterator().next());  
45 - }  
46 -  
47 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.utils.UUIDs;  
19 -import com.google.common.collect.Lists;  
20 -import com.google.common.util.concurrent.ListenableFuture;  
21 -import org.junit.Test;  
22 -import org.springframework.beans.factory.annotation.Autowired;  
23 -import org.springframework.test.util.ReflectionTestUtils;  
24 -import org.thingsboard.server.dao.service.AbstractServiceTest;  
25 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
26 -import org.thingsboard.server.dao.queue.db.MsgAck;  
27 -  
28 -import java.util.List;  
29 -import java.util.UUID;  
30 -import java.util.concurrent.ExecutionException;  
31 -import java.util.concurrent.TimeUnit;  
32 -  
33 -import static org.junit.Assert.assertEquals;  
34 -import static org.junit.Assert.assertTrue;  
35 -  
36 -@DaoNoSqlTest  
37 -public class CassandraAckRepositoryTest extends AbstractServiceTest {  
38 -  
39 - @Autowired  
40 - private CassandraAckRepository ackRepository;  
41 -  
42 - @Test  
43 - public void acksInPartitionCouldBeFound() {  
44 - UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");  
45 -  
46 - List<MsgAck> extectedAcks = Lists.newArrayList(  
47 - new MsgAck(UUID.fromString("bebaeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L),  
48 - new MsgAck(UUID.fromString("12baeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L)  
49 - );  
50 -  
51 - List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 101L, 300L);  
52 - assertEquals(extectedAcks, actualAcks);  
53 - }  
54 -  
55 - @Test  
56 - public void ackCanBeSavedAndRead() throws ExecutionException, InterruptedException {  
57 - UUID msgId = UUIDs.timeBased();  
58 - UUID nodeId = UUIDs.timeBased();  
59 - MsgAck ack = new MsgAck(msgId, nodeId, 10L, 20L);  
60 - ListenableFuture<Void> future = ackRepository.ack(ack);  
61 - future.get();  
62 - List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 10L, 20L);  
63 - assertEquals(1, actualAcks.size());  
64 - assertEquals(ack, actualAcks.get(0));  
65 - }  
66 -  
67 - @Test  
68 - public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {  
69 - ReflectionTestUtils.setField(ackRepository, "ackQueueTtl", 1);  
70 - UUID msgId = UUIDs.timeBased();  
71 - UUID nodeId = UUIDs.timeBased();  
72 - MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);  
73 - ListenableFuture<Void> future = ackRepository.ack(ack);  
74 - future.get();  
75 - List<MsgAck> actualAcks = ackRepository.findAcks(nodeId, 30L, 40L);  
76 - assertEquals(1, actualAcks.size());  
77 - TimeUnit.SECONDS.sleep(2);  
78 - assertTrue(ackRepository.findAcks(nodeId, 30L, 40L).isEmpty());  
79 - }  
80 -  
81 -  
82 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -//import static org.junit.jupiter.api.Assertions.*;  
19 -  
20 -import com.datastax.driver.core.utils.UUIDs;  
21 -import com.google.common.util.concurrent.ListenableFuture;  
22 -import org.junit.Test;  
23 -import org.springframework.beans.factory.annotation.Autowired;  
24 -import org.springframework.test.util.ReflectionTestUtils;  
25 -import org.thingsboard.server.common.data.id.DeviceId;  
26 -import org.thingsboard.server.common.data.id.RuleChainId;  
27 -import org.thingsboard.server.common.data.id.RuleNodeId;  
28 -import org.thingsboard.server.common.msg.TbMsg;  
29 -import org.thingsboard.server.common.msg.TbMsgDataType;  
30 -import org.thingsboard.server.common.msg.TbMsgMetaData;  
31 -import org.thingsboard.server.dao.service.AbstractServiceTest;  
32 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
33 -  
34 -import java.util.List;  
35 -import java.util.UUID;  
36 -import java.util.concurrent.ExecutionException;  
37 -import java.util.concurrent.TimeUnit;  
38 -  
39 -import static org.junit.Assert.assertEquals;  
40 -import static org.junit.Assert.assertTrue;  
41 -  
42 -@DaoNoSqlTest  
43 -public class CassandraMsgRepositoryTest extends AbstractServiceTest {  
44 -  
45 - @Autowired  
46 - private CassandraMsgRepository msgRepository;  
47 -  
48 - @Test  
49 - public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {  
50 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",  
51 - new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);  
52 - UUID nodeId = UUIDs.timeBased();  
53 - ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);  
54 - future.get();  
55 - List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);  
56 - assertEquals(1, msgs.size());  
57 - }  
58 -  
59 - @Test  
60 - public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {  
61 - ReflectionTestUtils.setField(msgRepository, "msqQueueTtl", 1);  
62 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",  
63 - new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);  
64 - UUID nodeId = UUIDs.timeBased();  
65 - ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);  
66 - future.get();  
67 - TimeUnit.SECONDS.sleep(2);  
68 - assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty());  
69 - }  
70 -  
71 - @Test  
72 - public void protoBufConverterWorkAsExpected() throws ExecutionException, InterruptedException {  
73 - TbMsgMetaData metaData = new TbMsgMetaData();  
74 - metaData.putValue("key", "value");  
75 - String dataStr = "someContent";  
76 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr,  
77 - new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);  
78 - UUID nodeId = UUIDs.timeBased();  
79 - ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);  
80 - future.get();  
81 - List<TbMsg> msgs = msgRepository.findMsgs(nodeId, 1L, 1L);  
82 - assertEquals(1, msgs.size());  
83 - assertEquals(msg, msgs.get(0));  
84 - }  
85 -  
86 -  
87 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.queue.db.nosql.repository;  
17 -  
18 -import com.datastax.driver.core.utils.UUIDs;  
19 -import com.google.common.util.concurrent.Futures;  
20 -import com.google.common.util.concurrent.ListenableFuture;  
21 -import org.junit.Test;  
22 -import org.springframework.beans.factory.annotation.Autowired;  
23 -import org.springframework.test.util.ReflectionTestUtils;  
24 -import org.thingsboard.server.dao.service.AbstractServiceTest;  
25 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
26 -  
27 -import java.util.List;  
28 -import java.util.Optional;  
29 -import java.util.UUID;  
30 -import java.util.concurrent.ExecutionException;  
31 -import java.util.concurrent.TimeUnit;  
32 -  
33 -import static org.junit.Assert.assertEquals;  
34 -import static org.junit.Assert.assertFalse;  
35 -import static org.junit.Assert.assertTrue;  
36 -  
37 -@DaoNoSqlTest  
38 -public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTest {  
39 -  
40 - @Autowired  
41 - private CassandraProcessedPartitionRepository partitionRepository;  
42 -  
43 - @Test  
44 - public void lastProcessedPartitionCouldBeFound() {  
45 - UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9");  
46 - Optional<Long> lastProcessedPartition = partitionRepository.findLastProcessedPartition(nodeId, 101L);  
47 - assertTrue(lastProcessedPartition.isPresent());  
48 - assertEquals((Long) 777L, lastProcessedPartition.get());  
49 - }  
50 -  
51 - @Test  
52 - public void highestProcessedPartitionReturned() throws ExecutionException, InterruptedException {  
53 - UUID nodeId = UUIDs.timeBased();  
54 - ListenableFuture<Void> future1 = partitionRepository.partitionProcessed(nodeId, 303L, 100L);  
55 - ListenableFuture<Void> future2 = partitionRepository.partitionProcessed(nodeId, 303L, 200L);  
56 - ListenableFuture<Void> future3 = partitionRepository.partitionProcessed(nodeId, 303L, 10L);  
57 - ListenableFuture<List<Void>> allFutures = Futures.allAsList(future1, future2, future3);  
58 - allFutures.get();  
59 - Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 303L);  
60 - assertTrue(actual.isPresent());  
61 - assertEquals((Long) 200L, actual.get());  
62 - }  
63 -  
64 - @Test  
65 - public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {  
66 - ReflectionTestUtils.setField(partitionRepository, "partitionsTtl", 1);  
67 - UUID nodeId = UUIDs.timeBased();  
68 - ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);  
69 - future.get();  
70 - Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 404L);  
71 - assertEquals((Long) 10L, actual.get());  
72 - TimeUnit.SECONDS.sleep(2);  
73 - assertFalse(partitionRepository.findLastProcessedPartition(nodeId, 404L).isPresent());  
74 - }  
75 -  
76 - @Test  
77 - public void ifNoPartitionsWereProcessedEmptyResultReturned() {  
78 - UUID nodeId = UUIDs.timeBased();  
79 - Optional<Long> actual = partitionRepository.findLastProcessedPartition(nodeId, 505L);  
80 - assertFalse(actual.isPresent());  
81 - }  
82 -  
83 -}  
@@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000 @@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000
46 46
47 cassandra.query.ts_key_value_partitioning=HOURS 47 cassandra.query.ts_key_value_partitioning=HOURS
48 48
  49 +cassandra.query.ts_key_value_ttl=0
  50 +
49 cassandra.query.max_limit_per_request=1000 51 cassandra.query.max_limit_per_request=1000
50 cassandra.query.buffer_size=100000 52 cassandra.query.buffer_size=100000
51 cassandra.query.concurrent_limit=1000 53 cassandra.query.concurrent_limit=1000
@@ -18,6 +18,11 @@ @@ -18,6 +18,11 @@
18 18
19 dpkg -i /thingsboard.deb 19 dpkg -i /thingsboard.deb
20 20
  21 +# Copying env variables into conf files
  22 +printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf
  23 +
  24 +cat /usr/share/thingsboard/conf/thingsboard.conf
  25 +
21 if [ "$DATABASE_TYPE" == "cassandra" ]; then 26 if [ "$DATABASE_TYPE" == "cassandra" ]; then
22 until nmap $CASSANDRA_HOST -p $CASSANDRA_PORT | grep "$CASSANDRA_PORT/tcp open\|filtered" 27 until nmap $CASSANDRA_HOST -p $CASSANDRA_PORT | grep "$CASSANDRA_PORT/tcp open\|filtered"
23 do 28 do
@@ -46,12 +51,6 @@ if [ "$ADD_SCHEMA_AND_SYSTEM_DATA" == "true" ]; then @@ -46,12 +51,6 @@ if [ "$ADD_SCHEMA_AND_SYSTEM_DATA" == "true" ]; then
46 fi 51 fi
47 fi 52 fi
48 53
49 -  
50 -# Copying env variables into conf files  
51 -printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf  
52 -  
53 -cat /usr/share/thingsboard/conf/thingsboard.conf  
54 -  
55 echo "Starting 'Thingsboard' service..." 54 echo "Starting 'Thingsboard' service..."
56 service thingsboard start 55 service thingsboard start
57 56
@@ -15,7 +15,6 @@ @@ -15,7 +15,6 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.metadata; 16 package org.thingsboard.rule.engine.metadata;
17 17
18 -import com.google.common.base.Function;  
19 import com.google.common.util.concurrent.Futures; 18 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 19 import com.google.common.util.concurrent.ListenableFuture;
21 import org.apache.commons.collections.CollectionUtils; 20 import org.apache.commons.collections.CollectionUtils;
@@ -33,9 +32,11 @@ import java.util.List; @@ -33,9 +32,11 @@ import java.util.List;
33 import static org.thingsboard.rule.engine.DonAsynchron.withCallback; 32 import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
34 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; 33 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
35 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; 34 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
36 -import static org.thingsboard.server.common.data.DataConstants.*; 35 +import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
  36 +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
  37 +import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
37 38
38 -public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode { 39 +public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
39 40
40 protected C config; 41 protected C config;
41 42
@@ -59,7 +60,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -59,7 +60,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
59 } 60 }
60 61
61 private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) { 62 private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
62 - if(entityId == null || entityId.isNullUid()) { 63 + if (entityId == null || entityId.isNullUid()) {
63 ctx.tellNext(msg, FAILURE); 64 ctx.tellNext(msg, FAILURE);
64 return; 65 return;
65 } 66 }
@@ -78,7 +79,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -78,7 +79,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
78 } 79 }
79 ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys); 80 ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys);
80 return Futures.transform(latest, l -> { 81 return Futures.transform(latest, l -> {
81 - l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString())); 82 + l.forEach(r -> {
  83 + if (r.getValue() != null) {
  84 + msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());
  85 + } else {
  86 + throw new RuntimeException("[" + scope + "][" + r.getKey() + "] attribute value is not present in the DB!");
  87 + }
  88 + });
82 return null; 89 return null;
83 }); 90 });
84 } 91 }
@@ -89,7 +96,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -89,7 +96,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
89 } 96 }
90 ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys); 97 ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys);
91 return Futures.transform(latest, l -> { 98 return Futures.transform(latest, l -> {
92 - l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString())); 99 + l.forEach(r -> {
  100 + if (r.getValue() != null) {
  101 + msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
  102 + } else {
  103 + throw new RuntimeException("[" + r.getKey() + "] telemetry value is not present in the DB!");
  104 + }
  105 + });
93 return null; 106 return null;
94 }); 107 });
95 } 108 }
@@ -69,10 +69,8 @@ public class TbMsgTimeseriesNode implements TbNode { @@ -69,10 +69,8 @@ public class TbMsgTimeseriesNode implements TbNode {
69 try { 69 try {
70 ts = Long.parseLong(tsStr); 70 ts = Long.parseLong(tsStr);
71 } catch (NumberFormatException e) {} 71 } catch (NumberFormatException e) {}
72 - }  
73 - if (ts == -1) {  
74 - ctx.tellFailure(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));  
75 - return; 72 + } else {
  73 + ts = System.currentTimeMillis();
76 } 74 }
77 String src = msg.getData(); 75 String src = msg.getData();
78 TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); 76 TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
@@ -119,8 +119,8 @@ public class MqttTransportService { @@ -119,8 +119,8 @@ public class MqttTransportService {
119 try { 119 try {
120 serverChannel.close().sync(); 120 serverChannel.close().sync();
121 } finally { 121 } finally {
122 - bossGroup.shutdownGracefully();  
123 workerGroup.shutdownGracefully(); 122 workerGroup.shutdownGracefully();
  123 + bossGroup.shutdownGracefully();
124 } 124 }
125 log.info("MQTT transport stopped!"); 125 log.info("MQTT transport stopped!");
126 } 126 }
@@ -280,6 +280,38 @@ export default function addLocaleChinese(locales) { @@ -280,6 +280,38 @@ export default function addLocaleChinese(locales) {
280 "selected-attributes": "{ count, select, 1 {1 属性} other {# 属性} } 被选中", 280 "selected-attributes": "{ count, select, 1 {1 属性} other {# 属性} } 被选中",
281 "selected-telemetry": "{ count, select, 1 {1 遥测} other {# 遥测} } 被选中" 281 "selected-telemetry": "{ count, select, 1 {1 遥测} other {# 遥测} } 被选中"
282 }, 282 },
  283 + "audit-log": {
  284 + "audit": "审计",
  285 + "audit-logs": "审计日志",
  286 + "timestamp": "时间戳",
  287 + "entity-type": "实体类型",
  288 + "entity-name": "实体名称",
  289 + "user": "用户",
  290 + "type": "类型",
  291 + "status": "状态",
  292 + "details": "详情",
  293 + "type-added": "添加",
  294 + "type-deleted": "删除",
  295 + "type-updated": "更新",
  296 + "type-attributes-updated": "更新属性",
  297 + "type-attributes-deleted": "删除属性",
  298 + "type-rpc-call": "RPC调用",
  299 + "type-credentials-updated": "更新凭证",
  300 + "type-assigned-to-customer": "分配给客户",
  301 + "type-unassigned-from-customer": "未分配给客户",
  302 + "type-activated": "激活",
  303 + "type-suspended": "暂停",
  304 + "type-credentials-read": "读取凭证",
  305 + "type-attributes-read": "读取属性",
  306 + "status-success": "成功",
  307 + "status-failure": "失败",
  308 + "audit-log-details": "审计日志详情",
  309 + "no-audit-logs-prompt": "找不到日志",
  310 + "action-data": "活动数据",
  311 + "failure-details": "失败详情",
  312 + "search": "查找审计日志",
  313 + "clear-search": "清空查找"
  314 + },
283 "confirm-on-exit": { 315 "confirm-on-exit": {
284 "message": "您有未保存的更改。确定要离开此页吗?", 316 "message": "您有未保存的更改。确定要离开此页吗?",
285 "html-message": "您有未保存的更改。<br/> 确定要离开此页面吗?", 317 "html-message": "您有未保存的更改。<br/> 确定要离开此页面吗?",
@@ -20,6 +20,7 @@ import 'brace/mode/javascript'; @@ -20,6 +20,7 @@ import 'brace/mode/javascript';
20 import 'brace/mode/html'; 20 import 'brace/mode/html';
21 import 'brace/mode/css'; 21 import 'brace/mode/css';
22 import 'brace/mode/json'; 22 import 'brace/mode/json';
  23 +import 'ace-builds/src-min-noconflict/ace';
23 import 'ace-builds/src-min-noconflict/snippets/javascript'; 24 import 'ace-builds/src-min-noconflict/snippets/javascript';
24 import 'ace-builds/src-min-noconflict/snippets/text'; 25 import 'ace-builds/src-min-noconflict/snippets/text';
25 import 'ace-builds/src-min-noconflict/snippets/html'; 26 import 'ace-builds/src-min-noconflict/snippets/html';
@@ -662,4 +663,4 @@ export default function WidgetEditorController(widgetService, userService, types @@ -662,4 +663,4 @@ export default function WidgetEditorController(widgetService, userService, types
662 663
663 } 664 }
664 665
665 -/* eslint-enable angular/angularelement */  
  666 +/* eslint-enable angular/angularelement */