Commit 94db399367c62d3a3885c00c0564e45b4055f424

Authored by Igor Kulikov
1 parent 55127ae6

New system parameters: default cassandra ts key/val ttl; allow system mail service for rules.

@@ -235,6 +235,10 @@ public class ActorSystemContext { @@ -235,6 +235,10 @@ public class ActorSystemContext {
235 @Getter 235 @Getter
236 private boolean tenantComponentsInitEnabled; 236 private boolean tenantComponentsInitEnabled;
237 237
  238 + @Value("${actors.rule.allow_system_mail_service}")
  239 + @Getter
  240 + private boolean allowSystemMailService;
  241 +
238 @Getter 242 @Getter
239 @Setter 243 @Setter
240 private ActorSystem actorSystem; 244 private ActorSystem actorSystem;
@@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext { @@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext {
209 209
210 @Override 210 @Override
211 public MailService getMailService() { 211 public MailService getMailService() {
212 - return mainCtx.getMailService(); 212 + if (mainCtx.isAllowSystemMailService()) {
  213 + return mainCtx.getMailService();
  214 + } else {
  215 + throw new RuntimeException("Access to System Mail Service is forbidden!");
  216 + }
213 } 217 }
214 218
215 @Override 219 @Override
@@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong; @@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong;
40 @Slf4j 40 @Slf4j
41 public class DefaultMsgQueueService implements MsgQueueService { 41 public class DefaultMsgQueueService implements MsgQueueService {
42 42
43 - @Value("${rule.queue.max_size}") 43 + @Value("${actors.rule.queue.max_size}")
44 private long queueMaxSize; 44 private long queueMaxSize;
45 45
46 - @Value("${rule.queue.cleanup_period}") 46 + @Value("${actors.rule.queue.cleanup_period}")
47 private long queueCleanUpPeriod; 47 private long queueCleanUpPeriod;
48 48
49 @Autowired 49 @Autowired
@@ -203,6 +203,7 @@ cassandra: @@ -203,6 +203,7 @@ cassandra:
203 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 203 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
204 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS 204 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
205 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 205 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  206 + ts_key_value_ttl: "${TS_KV_TTL:0}"
206 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" 207 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
207 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" 208 concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
208 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" 209 permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
@@ -236,6 +237,8 @@ actors: @@ -236,6 +237,8 @@ actors:
236 js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}" 237 js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
237 # Specify thread pool size for mail sender executor service 238 # Specify thread pool size for mail sender executor service
238 mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}" 239 mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
  240 + # Whether to allow usage of system mail service for rules
  241 + allow_system_mail_service: "${ACTORS_RULE_ALLOW_SYSTEM_MAIL_SERVICE:true}"
239 # Specify thread pool size for external call service 242 # Specify thread pool size for external call service
240 external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}" 243 external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}"
241 js_sandbox: 244 js_sandbox:
@@ -253,6 +256,13 @@ actors: @@ -253,6 +256,13 @@ actors:
253 node: 256 node:
254 # Errors for particular actor are persisted once per specified amount of milliseconds 257 # Errors for particular actor are persisted once per specified amount of milliseconds
255 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" 258 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
  259 + queue:
  260 + # Message queue type (memory or db)
  261 + type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
  262 + # Message queue maximum size (per tenant)
  263 + max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
  264 + # Message queue cleanup period in seconds
  265 + cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
256 statistics: 266 statistics:
257 # Enable/disable actor statistics 267 # Enable/disable actor statistics
258 enabled: "${ACTORS_STATISTICS_ENABLED:true}" 268 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -333,16 +343,6 @@ spring: @@ -333,16 +343,6 @@ spring:
333 username: "${SPRING_DATASOURCE_USERNAME:sa}" 343 username: "${SPRING_DATASOURCE_USERNAME:sa}"
334 password: "${SPRING_DATASOURCE_PASSWORD:}" 344 password: "${SPRING_DATASOURCE_PASSWORD:}"
335 345
336 -rule:  
337 - queue:  
338 - #Message queue type (memory or db)  
339 - type: "${RULE_QUEUE_TYPE:memory}"  
340 - #Message queue maximum size (per tenant)  
341 - max_size: "${RULE_QUEUE_MAX_SIZE:100}"  
342 - #Message queue cleanup period in seconds  
343 - cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"  
344 -  
345 -  
346 # PostgreSQL DAO Configuration 346 # PostgreSQL DAO Configuration
347 #spring: 347 #spring:
348 # data: 348 # data:
@@ -36,7 +36,7 @@ import java.util.List; @@ -36,7 +36,7 @@ import java.util.List;
36 import java.util.UUID; 36 import java.util.UUID;
37 37
38 @Component 38 @Component
39 -@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "db") 39 +@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "db")
40 @Slf4j 40 @Slf4j
41 @NoSqlDao 41 @NoSqlDao
42 public class CassandraMsgQueue implements MsgQueue { 42 public class CassandraMsgQueue implements MsgQueue {
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors; @@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
40 * Created by ashvayka on 27.04.18. 40 * Created by ashvayka on 27.04.18.
41 */ 41 */
42 @Component 42 @Component
43 -@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true) 43 +@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
44 @Slf4j 44 @Slf4j
45 public class InMemoryMsgQueue implements MsgQueue { 45 public class InMemoryMsgQueue implements MsgQueue {
46 46
@@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
82 @Value("${cassandra.query.ts_key_value_partitioning}") 82 @Value("${cassandra.query.ts_key_value_partitioning}")
83 private String partitioning; 83 private String partitioning;
84 84
  85 + @Value("${cassandra.query.ts_key_value_ttl}")
  86 + private long systemTtl;
  87 +
85 private TsPartitionDate tsFormat; 88 private TsPartitionDate tsFormat;
86 89
87 private PreparedStatement partitionInsertStmt; 90 private PreparedStatement partitionInsertStmt;
@@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
287 290
288 @Override 291 @Override
289 public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) { 292 public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
  293 + ttl = computeTtl(ttl);
290 long partition = toPartitionTs(tsKvEntry.getTs()); 294 long partition = toPartitionTs(tsKvEntry.getTs());
291 DataType type = tsKvEntry.getDataType(); 295 DataType type = tsKvEntry.getDataType();
292 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); 296 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
@@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
304 308
305 @Override 309 @Override
306 public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { 310 public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
  311 + ttl = computeTtl(ttl);
307 long partition = toPartitionTs(tsKvEntryTs); 312 long partition = toPartitionTs(tsKvEntryTs);
308 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); 313 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
309 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind(); 314 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
@@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
317 return getFuture(executeAsyncWrite(stmt), rs -> null); 322 return getFuture(executeAsyncWrite(stmt), rs -> null);
318 } 323 }
319 324
  325 + private long computeTtl(long ttl) {
  326 + if (systemTtl > 0) {
  327 + if (ttl == 0) {
  328 + ttl = systemTtl;
  329 + } else {
  330 + ttl = Math.min(systemTtl, ttl);
  331 + }
  332 + }
  333 + return ttl;
  334 + }
  335 +
320 @Override 336 @Override
321 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { 337 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
322 BoundStatement stmt = getLatestStmt().bind() 338 BoundStatement stmt = getLatestStmt().bind()
@@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000 @@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000
46 46
47 cassandra.query.ts_key_value_partitioning=HOURS 47 cassandra.query.ts_key_value_partitioning=HOURS
48 48
  49 +cassandra.query.ts_key_value_ttl=0
  50 +
49 cassandra.query.max_limit_per_request=1000 51 cassandra.query.max_limit_per_request=1000
50 cassandra.query.buffer_size=100000 52 cassandra.query.buffer_size=100000
51 cassandra.query.concurrent_limit=1000 53 cassandra.query.concurrent_limit=1000