Commit da7955f97d71af4217f3ffb652b57e7effaa0fba

Authored by Andrew Shvayka
2 parents f49d480d 02c4d6b0

Merge branch 'develop/1.5' into develop/1.5-no-more-plugins

Showing 61 changed files with 1298 additions and 162 deletions
@@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication; @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
19 import org.springframework.boot.SpringBootConfiguration; 19 import org.springframework.boot.SpringBootConfiguration;
20 import org.springframework.context.annotation.ComponentScan; 20 import org.springframework.context.annotation.ComponentScan;
21 import org.springframework.scheduling.annotation.EnableAsync; 21 import org.springframework.scheduling.annotation.EnableAsync;
  22 +import org.springframework.scheduling.annotation.EnableScheduling;
22 import springfox.documentation.swagger2.annotations.EnableSwagger2; 23 import springfox.documentation.swagger2.annotations.EnableSwagger2;
23 24
24 import java.util.Arrays; 25 import java.util.Arrays;
@@ -26,6 +27,7 @@ import java.util.Arrays; @@ -26,6 +27,7 @@ import java.util.Arrays;
26 @SpringBootConfiguration 27 @SpringBootConfiguration
27 @EnableAsync 28 @EnableAsync
28 @EnableSwagger2 29 @EnableSwagger2
  30 +@EnableScheduling
29 @ComponentScan({"org.thingsboard.server"}) 31 @ComponentScan({"org.thingsboard.server"})
30 public class ThingsboardServerApplication { 32 public class ThingsboardServerApplication {
31 33
@@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
28 import org.thingsboard.server.common.data.plugin.PluginMetaData; 28 import org.thingsboard.server.common.data.plugin.PluginMetaData;
29 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 29 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
30 import org.thingsboard.server.common.msg.cluster.ServerAddress; 30 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  31 +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
  32 +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
  33 +import org.thingsboard.server.common.msg.session.MsgType;
31 import org.thingsboard.server.extensions.api.plugins.Plugin; 34 import org.thingsboard.server.extensions.api.plugins.Plugin;
32 import org.thingsboard.server.extensions.api.plugins.PluginInitializationException; 35 import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
33 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; 36 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
  37 +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
  38 +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
34 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg; 39 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
35 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; 40 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
36 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; 41 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
@@ -98,7 +103,20 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId> @@ -98,7 +103,20 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
98 103
99 public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException { 104 public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException {
100 if (state == ComponentLifecycleState.ACTIVE) { 105 if (state == ComponentLifecycleState.ACTIVE) {
101 - pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); 106 + try {
  107 + pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
  108 + } catch (Exception ex) {
  109 + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex);
  110 + RuleToPluginMsg ruleMsg = msg.getMsg();
  111 + MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
  112 + Integer requestId = 0;
  113 + if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) {
  114 + requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId();
  115 + }
  116 + trustedCtx.reply(
  117 + new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(),
  118 + BasicStatusCodeResponse.onError(responceMsgType, requestId, ex)));
  119 + }
102 } else { 120 } else {
103 //TODO: reply with plugin suspended message 121 //TODO: reply with plugin suspended message
104 } 122 }
@@ -180,7 +180,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe @@ -180,7 +180,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
180 return scannedComponent; 180 return scannedComponent;
181 } 181 }
182 182
183 - private NodeDefinition prepareNodeDefinition(RuleNode nodeAnnotation) throws IOException { 183 + private NodeDefinition prepareNodeDefinition(RuleNode nodeAnnotation) throws Exception {
184 NodeDefinition nodeDefinition = new NodeDefinition(); 184 NodeDefinition nodeDefinition = new NodeDefinition();
185 nodeDefinition.setDetails(nodeAnnotation.nodeDetails()); 185 nodeDefinition.setDetails(nodeAnnotation.nodeDetails());
186 nodeDefinition.setDescription(nodeAnnotation.nodeDescription()); 186 nodeDefinition.setDescription(nodeAnnotation.nodeDescription());
@@ -188,9 +188,10 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe @@ -188,9 +188,10 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
188 nodeDefinition.setOutEnabled(nodeAnnotation.outEnabled()); 188 nodeDefinition.setOutEnabled(nodeAnnotation.outEnabled());
189 nodeDefinition.setRelationTypes(nodeAnnotation.relationTypes()); 189 nodeDefinition.setRelationTypes(nodeAnnotation.relationTypes());
190 nodeDefinition.setCustomRelations(nodeAnnotation.customRelations()); 190 nodeDefinition.setCustomRelations(nodeAnnotation.customRelations());
191 - String defaultConfigResourceName = nodeAnnotation.defaultConfigResource();  
192 - nodeDefinition.setDefaultConfiguration(mapper.readTree(  
193 - Resources.toString(Resources.getResource(defaultConfigResourceName), Charsets.UTF_8))); 191 + Class<? extends NodeConfiguration> configClazz = nodeAnnotation.configClazz();
  192 + NodeConfiguration config = configClazz.newInstance();
  193 + NodeConfiguration defaultConfiguration = config.defaultConfiguration();
  194 + nodeDefinition.setDefaultConfiguration(mapper.valueToTree(defaultConfiguration));
194 return nodeDefinition; 195 return nodeDefinition;
195 } 196 }
196 197
@@ -187,7 +187,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { @@ -187,7 +187,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
187 187
188 @Override 188 @Override
189 public void loadSystemRules() throws Exception { 189 public void loadSystemRules() throws Exception {
190 - loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null); 190 +// loadRules(Paths.get(dataDir, JSON_DIR, SYSTEM_DIR, RULES_DIR), null);
191 } 191 }
192 192
193 @Override 193 @Override
@@ -228,7 +228,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { @@ -228,7 +228,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
228 "Raspberry Pi GPIO control sample application"); 228 "Raspberry Pi GPIO control sample application");
229 229
230 loadPlugins(Paths.get(dataDir, JSON_DIR, DEMO_DIR, PLUGINS_DIR), demoTenant.getId()); 230 loadPlugins(Paths.get(dataDir, JSON_DIR, DEMO_DIR, PLUGINS_DIR), demoTenant.getId());
231 - loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId()); 231 +// loadRules(Paths.get(dataDir, JSON_DIR, DEMO_DIR, RULES_DIR), demoTenant.getId());
232 loadDashboards(Paths.get(dataDir, JSON_DIR, DEMO_DIR, DASHBOARDS_DIR), demoTenant.getId(), null); 232 loadDashboards(Paths.get(dataDir, JSON_DIR, DEMO_DIR, DASHBOARDS_DIR), demoTenant.getId(), null);
233 } 233 }
234 234
@@ -181,6 +181,10 @@ cassandra: @@ -181,6 +181,10 @@ cassandra:
181 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 181 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
182 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS 182 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
183 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 183 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  184 + buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
  185 + concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
  186 + permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
  187 + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
184 188
185 queue: 189 queue:
186 msg.ttl: 604800 # 7 days 190 msg.ttl: 604800 # 7 days
@@ -16,16 +16,16 @@ @@ -16,16 +16,16 @@
16 package org.thingsboard.server.common.msg.core; 16 package org.thingsboard.server.common.msg.core;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 -import org.thingsboard.server.common.msg.session.FromDeviceMsg; 19 +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
20 import org.thingsboard.server.common.msg.session.MsgType; 20 import org.thingsboard.server.common.msg.session.MsgType;
21 21
22 /** 22 /**
23 * @author Andrew Shvayka 23 * @author Andrew Shvayka
24 */ 24 */
25 @Data 25 @Data
26 -public class ToServerRpcRequestMsg implements FromDeviceMsg { 26 +public class ToServerRpcRequestMsg implements FromDeviceRequestMsg {
27 27
28 - private final int requestId; 28 + private final Integer requestId;
29 private final String method; 29 private final String method;
30 private final String params; 30 private final String params;
31 31
@@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit @@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId)); 148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET)); 149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET));
150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
151 - ResultSetFuture resultSetFuture = getSession().executeAsync(query); 151 + ResultSetFuture resultSetFuture = executeAsyncRead(query);
152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() { 152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
153 @Nullable 153 @Nullable
154 @Override 154 @Override
@@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem @@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
147 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) 147 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
148 .and(eq(ATTRIBUTE_KEY_COLUMN, key)); 148 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
149 log.debug("Remove request: {}", delete.toString()); 149 log.debug("Remove request: {}", delete.toString());
150 - return getFuture(getSession().executeAsync(delete), rs -> null); 150 + return getFuture(executeAsyncWrite(delete), rs -> null);
151 } 151 }
152 152
153 private PreparedStatement getSaveStmt() { 153 private PreparedStatement getSaveStmt() {
154 if (saveStmt == null) { 154 if (saveStmt == null) {
155 - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + 155 + saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
156 "(" + ENTITY_TYPE_COLUMN + 156 "(" + ENTITY_TYPE_COLUMN +
157 "," + ENTITY_ID_COLUMN + 157 "," + ENTITY_ID_COLUMN +
158 "," + ATTRIBUTE_TYPE_COLUMN + 158 "," + ATTRIBUTE_TYPE_COLUMN +
@@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo @@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
244 values.add("?"); 244 values.add("?");
245 } 245 }
246 String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")"; 246 String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")";
247 - return getSession().prepare(statementString); 247 + return prepare(statementString);
248 } 248 }
249 249
250 private PreparedStatement getPartitionInsertStmt() { 250 private PreparedStatement getPartitionInsertStmt() {
251 if (partitionInsertStmt == null) { 251 if (partitionInsertStmt == null) {
252 - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF + 252 + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
253 "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY + 253 "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY +
254 "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" + 254 "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" +
255 " VALUES(?, ?)"); 255 " VALUES(?, ?)");
@@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo @@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
343 .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId)); 343 .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId));
344 select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition)); 344 select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
345 select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition)); 345 select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
346 - return getSession().execute(select); 346 + return executeRead(select);
347 } 347 }
348 348
349 } 349 }
@@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch @@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
130 public boolean removeById(UUID key) { 130 public boolean removeById(UUID key) {
131 Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key)); 131 Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key));
132 log.debug("Remove request: {}", delete.toString()); 132 log.debug("Remove request: {}", delete.toString());
133 - return getSession().execute(delete).wasApplied(); 133 + return executeWrite(delete).wasApplied();
134 } 134 }
135 135
136 @Override 136 @Override
@@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch @@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
145 log.debug("Delete plugin meta-data entity by id [{}]", clazz); 145 log.debug("Delete plugin meta-data entity by id [{}]", clazz);
146 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz)); 146 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz));
147 log.debug("Remove request: {}", delete.toString()); 147 log.debug("Remove request: {}", delete.toString());
148 - ResultSet resultSet = getSession().execute(delete); 148 + ResultSet resultSet = executeWrite(delete);
149 log.debug("Delete result: [{}]", resultSet.wasApplied()); 149 log.debug("Delete result: [{}]", resultSet.wasApplied());
150 } 150 }
151 151
@@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt @@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId)); 148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE)); 149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE));
150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
151 - ResultSetFuture resultSetFuture = getSession().executeAsync(query); 151 + ResultSetFuture resultSetFuture = executeAsyncRead(query);
152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() { 152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
153 @Nullable 153 @Nullable
154 @Override 154 @Override
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Autowired; 21 import org.springframework.beans.factory.annotation.Autowired;
22 import org.thingsboard.server.dao.cassandra.CassandraCluster; 22 import org.thingsboard.server.dao.cassandra.CassandraCluster;
23 import org.thingsboard.server.dao.model.type.*; 23 import org.thingsboard.server.dao.model.type.*;
  24 +import org.thingsboard.server.dao.util.BufferedRateLimiter;
24 25
25 import java.util.concurrent.ConcurrentHashMap; 26 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap; 27 import java.util.concurrent.ConcurrentMap;
@@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao { @@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao {
33 34
34 private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>(); 35 private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
35 36
36 - protected PreparedStatement prepare(String query) {  
37 - return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));  
38 - } 37 + @Autowired
  38 + private BufferedRateLimiter rateLimiter;
39 39
40 private Session session; 40 private Session session;
41 41
42 private ConsistencyLevel defaultReadLevel; 42 private ConsistencyLevel defaultReadLevel;
43 private ConsistencyLevel defaultWriteLevel; 43 private ConsistencyLevel defaultWriteLevel;
44 44
45 - protected Session getSession() { 45 + private Session getSession() {
46 if (session == null) { 46 if (session == null) {
47 session = cluster.getSession(); 47 session = cluster.getSession();
48 defaultReadLevel = cluster.getDefaultReadConsistencyLevel(); 48 defaultReadLevel = cluster.getDefaultReadConsistencyLevel();
@@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao { @@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao {
59 return session; 59 return session;
60 } 60 }
61 61
  62 + protected PreparedStatement prepare(String query) {
  63 + return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
  64 + }
  65 +
62 private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) { 66 private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
63 try { 67 try {
64 registry.codecFor(codec.getCqlType(), codec.getJavaType()); 68 registry.codecFor(codec.getCqlType(), codec.getJavaType());
@@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao { @@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao {
85 89
86 private ResultSet execute(Statement statement, ConsistencyLevel level) { 90 private ResultSet execute(Statement statement, ConsistencyLevel level) {
87 log.debug("Execute cassandra statement {}", statement); 91 log.debug("Execute cassandra statement {}", statement);
88 - if (statement.getConsistencyLevel() == null) {  
89 - statement.setConsistencyLevel(level);  
90 - }  
91 - return getSession().execute(statement); 92 + return executeAsync(statement, level).getUninterruptibly();
92 } 93 }
93 94
94 private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) { 95 private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
@@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao { @@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao {
96 if (statement.getConsistencyLevel() == null) { 97 if (statement.getConsistencyLevel() == null) {
97 statement.setConsistencyLevel(level); 98 statement.setConsistencyLevel(level);
98 } 99 }
99 - return getSession().executeAsync(statement); 100 + return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
100 } 101 }
101 } 102 }
@@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
63 List<E> list = Collections.emptyList(); 63 List<E> list = Collections.emptyList();
64 if (statement != null) { 64 if (statement != null) {
65 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 65 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
66 - ResultSet resultSet = getSession().execute(statement); 66 + ResultSet resultSet = executeRead(statement);
67 Result<E> result = getMapper().map(resultSet); 67 Result<E> result = getMapper().map(resultSet);
68 if (result != null) { 68 if (result != null) {
69 list = result.all(); 69 list = result.all();
@@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
75 protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) { 75 protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) {
76 if (statement != null) { 76 if (statement != null) {
77 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 77 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
78 - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); 78 + ResultSetFuture resultSetFuture = executeAsyncRead(statement);
79 return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() { 79 return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() {
80 @Nullable 80 @Nullable
81 @Override 81 @Override
@@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
97 E object = null; 97 E object = null;
98 if (statement != null) { 98 if (statement != null) {
99 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 99 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
100 - ResultSet resultSet = getSession().execute(statement); 100 + ResultSet resultSet = executeRead(statement);
101 Result<E> result = getMapper().map(resultSet); 101 Result<E> result = getMapper().map(resultSet);
102 if (result != null) { 102 if (result != null) {
103 object = result.one(); 103 object = result.one();
@@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
109 protected ListenableFuture<D> findOneByStatementAsync(Statement statement) { 109 protected ListenableFuture<D> findOneByStatementAsync(Statement statement) {
110 if (statement != null) { 110 if (statement != null) {
111 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); 111 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
112 - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); 112 + ResultSetFuture resultSetFuture = executeAsyncRead(statement);
113 return Futures.transform(resultSetFuture, new Function<ResultSet, D>() { 113 return Futures.transform(resultSetFuture, new Function<ResultSet, D>() {
114 @Nullable 114 @Nullable
115 @Override 115 @Override
@@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte @@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
184 public boolean removeById(UUID key) { 184 public boolean removeById(UUID key) {
185 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); 185 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
186 log.debug("Remove request: {}", delete.toString()); 186 log.debug("Remove request: {}", delete.toString());
187 - return getSession().execute(delete).wasApplied(); 187 + return executeWrite(delete).wasApplied();
188 } 188 }
189 189
190 @Override 190 @Override
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.datastax.driver.core.Session;
  21 +import com.datastax.driver.core.Statement;
  22 +import com.google.common.base.Function;
  23 +import com.google.common.util.concurrent.FutureCallback;
  24 +import com.google.common.util.concurrent.Futures;
  25 +import com.google.common.util.concurrent.ListenableFuture;
  26 +import com.google.common.util.concurrent.Uninterruptibles;
  27 +import org.thingsboard.server.dao.util.AsyncRateLimiter;
  28 +
  29 +import javax.annotation.Nullable;
  30 +import java.util.concurrent.*;
  31 +
  32 +public class RateLimitedResultSetFuture implements ResultSetFuture {
  33 +
  34 + private final ListenableFuture<ResultSetFuture> originalFuture;
  35 + private final ListenableFuture<Void> rateLimitFuture;
  36 +
  37 + public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
  38 + this.rateLimitFuture = rateLimiter.acquireAsync();
  39 + this.originalFuture = Futures.transform(rateLimitFuture,
  40 + (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
  41 + }
  42 +
  43 + @Override
  44 + public ResultSet getUninterruptibly() {
  45 + return safeGet().getUninterruptibly();
  46 + }
  47 +
  48 + @Override
  49 + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
  50 + long rateLimitStart = System.nanoTime();
  51 + ResultSetFuture resultSetFuture = null;
  52 + try {
  53 + resultSetFuture = originalFuture.get(timeout, unit);
  54 + } catch (InterruptedException | ExecutionException e) {
  55 + throw new IllegalStateException(e);
  56 + }
  57 + long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
  58 + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
  59 + if (innerTimeoutNano > 0) {
  60 + return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS);
  61 + }
  62 + throw new TimeoutException("Timeout waiting for task.");
  63 + }
  64 +
  65 + @Override
  66 + public boolean cancel(boolean mayInterruptIfRunning) {
  67 + if (originalFuture.isDone()) {
  68 + return safeGet().cancel(mayInterruptIfRunning);
  69 + } else {
  70 + return originalFuture.cancel(mayInterruptIfRunning);
  71 + }
  72 + }
  73 +
  74 + @Override
  75 + public boolean isCancelled() {
  76 + if (originalFuture.isDone()) {
  77 + return safeGet().isCancelled();
  78 + }
  79 +
  80 + return originalFuture.isCancelled();
  81 + }
  82 +
  83 + @Override
  84 + public boolean isDone() {
  85 + return originalFuture.isDone() && safeGet().isDone();
  86 + }
  87 +
  88 + @Override
  89 + public ResultSet get() throws InterruptedException, ExecutionException {
  90 + return safeGet().get();
  91 + }
  92 +
  93 + @Override
  94 + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  95 + long rateLimitStart = System.nanoTime();
  96 + ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit);
  97 + long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
  98 + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
  99 + if (innerTimeoutNano > 0) {
  100 + return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS);
  101 + }
  102 + throw new TimeoutException("Timeout waiting for task.");
  103 + }
  104 +
  105 + @Override
  106 + public void addListener(Runnable listener, Executor executor) {
  107 + originalFuture.addListener(() -> {
  108 + try {
  109 + ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
  110 + resultSetFuture.addListener(listener, executor);
  111 + } catch (CancellationException e) {
  112 + cancel(false);
  113 + return;
  114 + } catch (ExecutionException e) {
  115 + Futures.immediateFailedFuture(e).addListener(listener, executor);
  116 + }
  117 + }, executor);
  118 + }
  119 +
  120 + private ResultSetFuture safeGet() {
  121 + try {
  122 + return originalFuture.get();
  123 + } catch (InterruptedException | ExecutionException e) {
  124 + throw new IllegalStateException(e);
  125 + }
  126 + }
  127 +
  128 + private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) {
  129 + try {
  130 + ResultSetFuture resultSetFuture = session.executeAsync(statement);
  131 + Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
  132 + @Override
  133 + public void onSuccess(@Nullable ResultSet result) {
  134 + rateLimiter.release();
  135 + }
  136 +
  137 + @Override
  138 + public void onFailure(Throwable t) {
  139 + rateLimiter.release();
  140 + }
  141 + });
  142 + return resultSetFuture;
  143 + } catch (RuntimeException re) {
  144 + rateLimiter.release();
  145 + throw re;
  146 + }
  147 + }
  148 +}
@@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
242 242
243 private PreparedStatement getSaveStmt() { 243 private PreparedStatement getSaveStmt() {
244 if (saveStmt == null) { 244 if (saveStmt == null) {
245 - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 245 + saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
246 "(" + ModelConstants.RELATION_FROM_ID_PROPERTY + 246 "(" + ModelConstants.RELATION_FROM_ID_PROPERTY +
247 "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY + 247 "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY +
248 "," + ModelConstants.RELATION_TO_ID_PROPERTY + 248 "," + ModelConstants.RELATION_TO_ID_PROPERTY +
@@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
257 257
258 private PreparedStatement getDeleteStmt() { 258 private PreparedStatement getDeleteStmt() {
259 if (deleteStmt == null) { 259 if (deleteStmt == null) {
260 - deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + 260 + deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
261 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + 261 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
262 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" + 262 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" +
263 AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" + 263 AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" +
@@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
270 270
271 private PreparedStatement getDeleteAllByEntityStmt() { 271 private PreparedStatement getDeleteAllByEntityStmt() {
272 if (deleteAllByEntityStmt == null) { 272 if (deleteAllByEntityStmt == null) {
273 - deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + 273 + deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
274 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + 274 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
275 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?"); 275 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?");
276 } 276 }
@@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
279 279
280 private PreparedStatement getFindAllByFromStmt() { 280 private PreparedStatement getFindAllByFromStmt() {
281 if (findAllByFromStmt == null) { 281 if (findAllByFromStmt == null) {
282 - findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " + 282 + findAllByFromStmt = prepare(SELECT_COLUMNS + " " +
283 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 283 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
284 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + 284 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
285 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + 285 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
290 290
291 private PreparedStatement getFindAllByFromAndTypeStmt() { 291 private PreparedStatement getFindAllByFromAndTypeStmt() {
292 if (findAllByFromAndTypeStmt == null) { 292 if (findAllByFromAndTypeStmt == null) {
293 - findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + 293 + findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " +
294 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 294 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
295 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + 295 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
296 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + 296 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
303 303
304 private PreparedStatement getFindAllByToStmt() { 304 private PreparedStatement getFindAllByToStmt() {
305 if (findAllByToStmt == null) { 305 if (findAllByToStmt == null) {
306 - findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " + 306 + findAllByToStmt = prepare(SELECT_COLUMNS + " " +
307 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + 307 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
308 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + 308 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
309 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + 309 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
314 314
315 private PreparedStatement getFindAllByToAndTypeStmt() { 315 private PreparedStatement getFindAllByToAndTypeStmt() {
316 if (findAllByToAndTypeStmt == null) { 316 if (findAllByToAndTypeStmt == null) {
317 - findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + 317 + findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " +
318 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + 318 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
319 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + 319 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
320 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + 320 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
327 327
328 private PreparedStatement getCheckRelationStmt() { 328 private PreparedStatement getCheckRelationStmt() {
329 if (checkRelationStmt == null) { 329 if (checkRelationStmt == null) {
330 - checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " + 330 + checkRelationStmt = prepare(SELECT_COLUMNS + " " +
331 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + 331 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
332 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + 332 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
333 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + 333 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService { @@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService {
82 } 82 }
83 83
84 @Caching(evict = { 84 @Caching(evict = {
85 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), 85 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
86 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), 86 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  87 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
87 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), 88 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
88 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") 89 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
89 }) 90 })
@@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService { @@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService {
95 } 96 }
96 97
97 @Caching(evict = { 98 @Caching(evict = {
98 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), 99 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
99 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), 100 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  101 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
100 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), 102 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
101 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") 103 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
102 }) 104 })
@@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService { @@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService {
108 } 110 }
109 111
110 @Caching(evict = { 112 @Caching(evict = {
111 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), 113 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
112 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), 114 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  115 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
113 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), 116 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
114 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"),  
115 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") 117 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
116 }) 118 })
117 @Override 119 @Override
118 public boolean deleteRelation(EntityRelation relation) { 120 public boolean deleteRelation(EntityRelation relation) {
@@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService { @@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService {
122 } 124 }
123 125
124 @Caching(evict = { 126 @Caching(evict = {
125 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),  
126 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),  
127 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),  
128 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"),  
129 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") 127 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
  128 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  129 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
  130 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
  131 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
130 }) 132 })
131 @Override 133 @Override
132 public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) { 134 public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
@@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService { @@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService {
136 } 138 }
137 139
138 @Caching(evict = { 140 @Caching(evict = {
139 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),  
140 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),  
141 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),  
142 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),  
143 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") 141 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
  142 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
  143 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
  144 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
  145 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
144 }) 146 })
145 @Override 147 @Override
146 public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { 148 public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
@@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService { @@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService {
150 } 152 }
151 153
152 @Caching(evict = { 154 @Caching(evict = {
153 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),  
154 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),  
155 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),  
156 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),  
157 - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") 155 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
  156 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
  157 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
  158 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
  159 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
158 }) 160 })
159 @Override 161 @Override
160 public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { 162 public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
@@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
73 73
74 private PreparedStatement partitionInsertStmt; 74 private PreparedStatement partitionInsertStmt;
75 private PreparedStatement partitionInsertTtlStmt; 75 private PreparedStatement partitionInsertTtlStmt;
76 - private PreparedStatement[] latestInsertStmts; 76 + private PreparedStatement latestInsertStmt;
77 private PreparedStatement[] saveStmts; 77 private PreparedStatement[] saveStmts;
78 private PreparedStatement[] saveTtlStmts; 78 private PreparedStatement[] saveTtlStmts;
79 private PreparedStatement[] fetchStmts; 79 private PreparedStatement[] fetchStmts;
@@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
306 306
307 @Override 307 @Override
308 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { 308 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
309 - DataType type = tsKvEntry.getDataType();  
310 - BoundStatement stmt = getLatestStmt(type).bind() 309 + BoundStatement stmt = getLatestStmt().bind()
311 .setString(0, entityId.getEntityType().name()) 310 .setString(0, entityId.getEntityType().name())
312 .setUUID(1, entityId.getId()) 311 .setUUID(1, entityId.getId())
313 .setString(2, tsKvEntry.getKey()) 312 .setString(2, tsKvEntry.getKey())
314 - .setLong(3, tsKvEntry.getTs());  
315 - addValue(tsKvEntry, stmt, 4); 313 + .setLong(3, tsKvEntry.getTs())
  314 + .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class)
  315 + .set(5, tsKvEntry.getStrValue().orElse(null), String.class)
  316 + .set(6, tsKvEntry.getLongValue().orElse(null), Long.class)
  317 + .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class);
316 return getFuture(executeAsyncWrite(stmt), rs -> null); 318 return getFuture(executeAsyncWrite(stmt), rs -> null);
317 } 319 }
318 320
@@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
381 if (saveStmts == null) { 383 if (saveStmts == null) {
382 saveStmts = new PreparedStatement[DataType.values().length]; 384 saveStmts = new PreparedStatement[DataType.values().length];
383 for (DataType type : DataType.values()) { 385 for (DataType type : DataType.values()) {
384 - saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + 386 + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
385 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 387 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
386 "," + ModelConstants.ENTITY_ID_COLUMN + 388 "," + ModelConstants.ENTITY_ID_COLUMN +
387 "," + ModelConstants.KEY_COLUMN + 389 "," + ModelConstants.KEY_COLUMN +
@@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
398 if (saveTtlStmts == null) { 400 if (saveTtlStmts == null) {
399 saveTtlStmts = new PreparedStatement[DataType.values().length]; 401 saveTtlStmts = new PreparedStatement[DataType.values().length];
400 for (DataType type : DataType.values()) { 402 for (DataType type : DataType.values()) {
401 - saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + 403 + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
402 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 404 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
403 "," + ModelConstants.ENTITY_ID_COLUMN + 405 "," + ModelConstants.ENTITY_ID_COLUMN +
404 "," + ModelConstants.KEY_COLUMN + 406 "," + ModelConstants.KEY_COLUMN +
@@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
420 } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { 422 } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
421 fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; 423 fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
422 } else { 424 } else {
423 - fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX + 425 + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
424 String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF 426 String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
425 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM 427 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
426 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM 428 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
@@ -435,26 +437,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -435,26 +437,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
435 return fetchStmts[aggType.ordinal()]; 437 return fetchStmts[aggType.ordinal()];
436 } 438 }
437 439
438 - private PreparedStatement getLatestStmt(DataType dataType) {  
439 - if (latestInsertStmts == null) {  
440 - latestInsertStmts = new PreparedStatement[DataType.values().length];  
441 - for (DataType type : DataType.values()) {  
442 - latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +  
443 - "(" + ModelConstants.ENTITY_TYPE_COLUMN +  
444 - "," + ModelConstants.ENTITY_ID_COLUMN +  
445 - "," + ModelConstants.KEY_COLUMN +  
446 - "," + ModelConstants.TS_COLUMN +  
447 - "," + getColumnName(type) + ")" +  
448 - " VALUES(?, ?, ?, ?, ?)");  
449 - } 440 + private PreparedStatement getLatestStmt() {
  441 + if (latestInsertStmt == null) {
  442 + latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
  443 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  444 + "," + ModelConstants.ENTITY_ID_COLUMN +
  445 + "," + ModelConstants.KEY_COLUMN +
  446 + "," + ModelConstants.TS_COLUMN +
  447 + "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
  448 + "," + ModelConstants.STRING_VALUE_COLUMN +
  449 + "," + ModelConstants.LONG_VALUE_COLUMN +
  450 + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
  451 + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
450 } 452 }
451 - return latestInsertStmts[dataType.ordinal()]; 453 + return latestInsertStmt;
452 } 454 }
453 455
454 456
455 private PreparedStatement getPartitionInsertStmt() { 457 private PreparedStatement getPartitionInsertStmt() {
456 if (partitionInsertStmt == null) { 458 if (partitionInsertStmt == null) {
457 - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + 459 + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
458 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 460 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
459 "," + ModelConstants.ENTITY_ID_COLUMN + 461 "," + ModelConstants.ENTITY_ID_COLUMN +
460 "," + ModelConstants.PARTITION_COLUMN + 462 "," + ModelConstants.PARTITION_COLUMN +
@@ -466,7 +468,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -466,7 +468,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
466 468
467 private PreparedStatement getPartitionInsertTtlStmt() { 469 private PreparedStatement getPartitionInsertTtlStmt() {
468 if (partitionInsertTtlStmt == null) { 470 if (partitionInsertTtlStmt == null) {
469 - partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + 471 + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
470 "(" + ModelConstants.ENTITY_TYPE_COLUMN + 472 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
471 "," + ModelConstants.ENTITY_ID_COLUMN + 473 "," + ModelConstants.ENTITY_ID_COLUMN +
472 "," + ModelConstants.PARTITION_COLUMN + 474 "," + ModelConstants.PARTITION_COLUMN +
@@ -479,7 +481,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -479,7 +481,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
479 481
480 private PreparedStatement getFindLatestStmt() { 482 private PreparedStatement getFindLatestStmt() {
481 if (findLatestStmt == null) { 483 if (findLatestStmt == null) {
482 - findLatestStmt = getSession().prepare(SELECT_PREFIX + 484 + findLatestStmt = prepare(SELECT_PREFIX +
483 ModelConstants.KEY_COLUMN + "," + 485 ModelConstants.KEY_COLUMN + "," +
484 ModelConstants.TS_COLUMN + "," + 486 ModelConstants.TS_COLUMN + "," +
485 ModelConstants.STRING_VALUE_COLUMN + "," + 487 ModelConstants.STRING_VALUE_COLUMN + "," +
@@ -496,7 +498,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -496,7 +498,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
496 498
497 private PreparedStatement getFindAllLatestStmt() { 499 private PreparedStatement getFindAllLatestStmt() {
498 if (findAllLatestStmt == null) { 500 if (findAllLatestStmt == null) {
499 - findAllLatestStmt = getSession().prepare(SELECT_PREFIX + 501 + findAllLatestStmt = prepare(SELECT_PREFIX +
500 ModelConstants.KEY_COLUMN + "," + 502 ModelConstants.KEY_COLUMN + "," +
501 ModelConstants.TS_COLUMN + "," + 503 ModelConstants.TS_COLUMN + "," +
502 ModelConstants.STRING_VALUE_COLUMN + "," + 504 ModelConstants.STRING_VALUE_COLUMN + "," +
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +
  20 +public interface AsyncRateLimiter {
  21 +
  22 + ListenableFuture<Void> acquireAsync();
  23 +
  24 + void release();
  25 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.ListeningExecutorService;
  21 +import com.google.common.util.concurrent.MoreExecutors;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.springframework.beans.factory.annotation.Value;
  24 +import org.springframework.scheduling.annotation.Scheduled;
  25 +import org.springframework.stereotype.Component;
  26 +
  27 +import java.util.concurrent.*;
  28 +import java.util.concurrent.atomic.AtomicInteger;
  29 +
  30 +@Component
  31 +@Slf4j
  32 +@NoSqlDao
  33 +public class BufferedRateLimiter implements AsyncRateLimiter {
  34 +
  35 + private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
  36 +
  37 + private final int permitsLimit;
  38 + private final int maxPermitWaitTime;
  39 + private final AtomicInteger permits;
  40 + private final BlockingQueue<LockedFuture> queue;
  41 +
  42 + private final AtomicInteger maxQueueSize = new AtomicInteger();
  43 + private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
  44 +
  45 + public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
  46 + @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
  47 + @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
  48 + this.permitsLimit = permitsLimit;
  49 + this.maxPermitWaitTime = maxPermitWaitTime;
  50 + this.permits = new AtomicInteger();
  51 + this.queue = new LinkedBlockingQueue<>(queueLimit);
  52 + }
  53 +
  54 + @Override
  55 + public ListenableFuture<Void> acquireAsync() {
  56 + if (queue.isEmpty()) {
  57 + if (permits.incrementAndGet() <= permitsLimit) {
  58 + if (permits.get() > maxGrantedPermissions.get()) {
  59 + maxGrantedPermissions.set(permits.get());
  60 + }
  61 + return Futures.immediateFuture(null);
  62 + }
  63 + permits.decrementAndGet();
  64 + }
  65 +
  66 + return putInQueue();
  67 + }
  68 +
  69 + @Override
  70 + public void release() {
  71 + permits.decrementAndGet();
  72 + reprocessQueue();
  73 + }
  74 +
  75 + private void reprocessQueue() {
  76 + while (permits.get() < permitsLimit) {
  77 + if (permits.incrementAndGet() <= permitsLimit) {
  78 + if (permits.get() > maxGrantedPermissions.get()) {
  79 + maxGrantedPermissions.set(permits.get());
  80 + }
  81 + LockedFuture lockedFuture = queue.poll();
  82 + if (lockedFuture != null) {
  83 + lockedFuture.latch.countDown();
  84 + } else {
  85 + permits.decrementAndGet();
  86 + break;
  87 + }
  88 + } else {
  89 + permits.decrementAndGet();
  90 + }
  91 + }
  92 + }
  93 +
  94 + private LockedFuture createLockedFuture() {
  95 + CountDownLatch latch = new CountDownLatch(1);
  96 + ListenableFuture<Void> future = pool.submit(() -> {
  97 + latch.await();
  98 + return null;
  99 + });
  100 + return new LockedFuture(latch, future, System.currentTimeMillis());
  101 + }
  102 +
  103 + private ListenableFuture<Void> putInQueue() {
  104 +
  105 + int size = queue.size();
  106 + if (size > maxQueueSize.get()) {
  107 + maxQueueSize.set(size);
  108 + }
  109 +
  110 + if (queue.remainingCapacity() > 0) {
  111 + try {
  112 + LockedFuture lockedFuture = createLockedFuture();
  113 + if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
  114 + lockedFuture.cancelFuture();
  115 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
  116 + }
  117 + if(permits.get() < permitsLimit) {
  118 + reprocessQueue();
  119 + }
  120 + return lockedFuture.future;
  121 + } catch (InterruptedException e) {
  122 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
  123 + }
  124 + }
  125 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
  126 + }
  127 +
  128 + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
  129 + public void printStats() {
  130 + int expiredCount = 0;
  131 + for (LockedFuture lockedFuture : queue) {
  132 + if (lockedFuture.isExpired()) {
  133 + lockedFuture.cancelFuture();
  134 + expiredCount++;
  135 + }
  136 + }
  137 + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
  138 + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
  139 + }
  140 +
  141 + private class LockedFuture {
  142 + final CountDownLatch latch;
  143 + final ListenableFuture<Void> future;
  144 + final long createTime;
  145 +
  146 + public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
  147 + this.latch = latch;
  148 + this.future = future;
  149 + this.createTime = createTime;
  150 + }
  151 +
  152 + void cancelFuture() {
  153 + future.cancel(false);
  154 + latch.countDown();
  155 + }
  156 +
  157 + boolean isExpired() {
  158 + return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
  159 + }
  160 +
  161 + }
  162 +
  163 +
  164 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.*;
  19 +import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import org.junit.Test;
  23 +import org.junit.runner.RunWith;
  24 +import org.mockito.Mock;
  25 +import org.mockito.Mockito;
  26 +import org.mockito.runners.MockitoJUnitRunner;
  27 +import org.mockito.stubbing.Answer;
  28 +import org.thingsboard.server.dao.util.AsyncRateLimiter;
  29 +
  30 +import java.util.concurrent.ExecutionException;
  31 +import java.util.concurrent.TimeoutException;
  32 +
  33 +import static org.junit.Assert.*;
  34 +import static org.mockito.Mockito.*;
  35 +
  36 +@RunWith(MockitoJUnitRunner.class)
  37 +public class RateLimitedResultSetFutureTest {
  38 +
  39 + private RateLimitedResultSetFuture resultSetFuture;
  40 +
  41 + @Mock
  42 + private AsyncRateLimiter rateLimiter;
  43 + @Mock
  44 + private Session session;
  45 + @Mock
  46 + private Statement statement;
  47 + @Mock
  48 + private ResultSetFuture realFuture;
  49 + @Mock
  50 + private ResultSet rows;
  51 + @Mock
  52 + private Row row;
  53 +
  54 + @Test
  55 + public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
  56 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
  57 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  58 + Thread.sleep(1000L);
  59 + verify(rateLimiter).acquireAsync();
  60 + try {
  61 + assertTrue(resultSetFuture.isDone());
  62 + fail();
  63 + } catch (Exception e) {
  64 + assertTrue(e instanceof IllegalStateException);
  65 + Throwable actualCause = e.getCause();
  66 + assertTrue(actualCause instanceof ExecutionException);
  67 + }
  68 + verifyNoMoreInteractions(session, rateLimiter, statement);
  69 +
  70 + }
  71 +
  72 + @Test
  73 + public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException {
  74 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  75 + when(session.executeAsync(statement)).thenReturn(realFuture);
  76 + Mockito.doAnswer((Answer<Void>) invocation -> {
  77 + Object[] args = invocation.getArguments();
  78 + Runnable task = (Runnable) args[0];
  79 + task.run();
  80 + return null;
  81 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  82 +
  83 + when(realFuture.getUninterruptibly()).thenReturn(rows);
  84 +
  85 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  86 + ResultSet actual = resultSetFuture.getUninterruptibly();
  87 + assertSame(rows, actual);
  88 + verify(rateLimiter, times(1)).acquireAsync();
  89 + verify(rateLimiter, times(1)).release();
  90 + }
  91 +
  92 + @Test
  93 + public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException {
  94 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  95 + when(session.executeAsync(statement)).thenReturn(realFuture);
  96 + Mockito.doAnswer((Answer<Void>) invocation -> {
  97 + Object[] args = invocation.getArguments();
  98 + Runnable task = (Runnable) args[0];
  99 + task.run();
  100 + return null;
  101 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  102 +
  103 + when(realFuture.get()).thenReturn(rows);
  104 + when(rows.one()).thenReturn(row);
  105 +
  106 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  107 +
  108 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  109 + Row actualRow = transform.get();
  110 +
  111 + assertSame(row, actualRow);
  112 + verify(rateLimiter, times(1)).acquireAsync();
  113 + verify(rateLimiter, times(1)).release();
  114 + }
  115 +
  116 + @Test
  117 + public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException {
  118 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  119 + when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
  120 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  121 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  122 + try {
  123 + transform.get();
  124 + fail();
  125 + } catch (Exception e) {
  126 + assertTrue(e instanceof ExecutionException);
  127 + }
  128 + verify(rateLimiter, times(1)).acquireAsync();
  129 + verify(rateLimiter, times(1)).release();
  130 + }
  131 +
  132 + @Test
  133 + public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException {
  134 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  135 + when(session.executeAsync(statement)).thenReturn(realFuture);
  136 + Mockito.doAnswer((Answer<Void>) invocation -> {
  137 + Object[] args = invocation.getArguments();
  138 + Runnable task = (Runnable) args[0];
  139 + task.run();
  140 + return null;
  141 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  142 +
  143 + when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
  144 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  145 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  146 + try {
  147 + transform.get();
  148 + fail();
  149 + } catch (Exception e) {
  150 + assertTrue(e instanceof ExecutionException);
  151 + }
  152 + verify(rateLimiter, times(1)).acquireAsync();
  153 + verify(rateLimiter, times(1)).release();
  154 + }
  155 +
  156 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.*;
  19 +import org.junit.Test;
  20 +
  21 +import javax.annotation.Nullable;
  22 +import java.util.concurrent.ExecutionException;
  23 +import java.util.concurrent.Executors;
  24 +import java.util.concurrent.TimeUnit;
  25 +import java.util.concurrent.atomic.AtomicInteger;
  26 +
  27 +import static org.junit.Assert.*;
  28 +
  29 +
  30 +public class BufferedRateLimiterTest {
  31 +
  32 + @Test
  33 + public void finishedFutureReturnedIfPermitsAreGranted() {
  34 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
  35 + ListenableFuture<Void> actual = limiter.acquireAsync();
  36 + assertTrue(actual.isDone());
  37 + }
  38 +
  39 + @Test
  40 + public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
  41 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
  42 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  43 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  44 + assertTrue(actual1.isDone());
  45 + assertFalse(actual2.isDone());
  46 + }
  47 +
  48 + @Test
  49 + public void failedFutureReturnedIfQueueIsfull() {
  50 + BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
  51 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  52 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  53 + ListenableFuture<Void> actual3 = limiter.acquireAsync();
  54 +
  55 + assertTrue(actual1.isDone());
  56 + assertFalse(actual2.isDone());
  57 + assertTrue(actual3.isDone());
  58 + try {
  59 + actual3.get();
  60 + fail();
  61 + } catch (Exception e) {
  62 + assertTrue(e instanceof ExecutionException);
  63 + Throwable actualCause = e.getCause();
  64 + assertTrue(actualCause instanceof IllegalStateException);
  65 + assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
  66 + }
  67 + }
  68 +
  69 + @Test
  70 + public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
  71 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
  72 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  73 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  74 + ListenableFuture<Void> actual3 = limiter.acquireAsync();
  75 + ListenableFuture<Void> actual4 = limiter.acquireAsync();
  76 + assertTrue(actual1.isDone());
  77 + assertTrue(actual2.isDone());
  78 + assertFalse(actual3.isDone());
  79 + assertFalse(actual4.isDone());
  80 + limiter.release();
  81 + TimeUnit.MILLISECONDS.sleep(100L);
  82 + assertTrue(actual3.isDone());
  83 + assertFalse(actual4.isDone());
  84 + limiter.release();
  85 + TimeUnit.MILLISECONDS.sleep(100L);
  86 + assertTrue(actual4.isDone());
  87 + }
  88 +
  89 + @Test
  90 + public void permitsReleasedInConcurrentMode() throws InterruptedException {
  91 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
  92 + AtomicInteger actualReleased = new AtomicInteger();
  93 + AtomicInteger actualRejected = new AtomicInteger();
  94 + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
  95 + for (int i = 0; i < 100; i++) {
  96 + ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
  97 + Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
  98 + @Override
  99 + public void onSuccess(@Nullable ListenableFuture<Void> result) {
  100 + Futures.addCallback(result, new FutureCallback<Void>() {
  101 + @Override
  102 + public void onSuccess(@Nullable Void result) {
  103 + try {
  104 + TimeUnit.MILLISECONDS.sleep(100);
  105 + } catch (InterruptedException e) {
  106 + e.printStackTrace();
  107 + }
  108 + limiter.release();
  109 + actualReleased.incrementAndGet();
  110 + }
  111 +
  112 + @Override
  113 + public void onFailure(Throwable t) {
  114 + actualRejected.incrementAndGet();
  115 + }
  116 + });
  117 + }
  118 +
  119 + @Override
  120 + public void onFailure(Throwable t) {
  121 + }
  122 + });
  123 + }
  124 +
  125 + TimeUnit.SECONDS.sleep(2);
  126 + assertTrue("Unexpected released count " + actualReleased.get(),
  127 + actualReleased.get() > 10 && actualReleased.get() < 20);
  128 + assertTrue("Unexpected rejected count " + actualRejected.get(),
  129 + actualRejected.get() > 80 && actualRejected.get() < 90);
  130 +
  131 + }
  132 +
  133 +
  134 +}
@@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000
47 cassandra.query.ts_key_value_partitioning=HOURS 47 cassandra.query.ts_key_value_partitioning=HOURS
48 48
49 cassandra.query.max_limit_per_request=1000 49 cassandra.query.max_limit_per_request=1000
  50 +cassandra.query.buffer_size=100000
  51 +cassandra.query.concurrent_limit=1000
  52 +cassandra.query.permit_max_wait_time=20000
  53 +cassandra.query.rate_limit_print_interval_ms=30000
  54 +
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.api;
  17 +
  18 +public interface NodeConfiguration {
  19 +
  20 + NodeConfiguration defaultConfiguration();
  21 +
  22 +}
@@ -35,15 +35,16 @@ public @interface RuleNode { @@ -35,15 +35,16 @@ public @interface RuleNode {
35 35
36 String nodeDetails(); 36 String nodeDetails();
37 37
  38 + Class<? extends NodeConfiguration> configClazz();
  39 +
38 boolean inEnabled() default true; 40 boolean inEnabled() default true;
39 41
40 boolean outEnabled() default true; 42 boolean outEnabled() default true;
41 43
42 ComponentScope scope() default ComponentScope.TENANT; 44 ComponentScope scope() default ComponentScope.TENANT;
43 45
44 - String defaultConfigResource() default "EmptyNodeConfig.json";  
45 -  
46 String[] relationTypes() default {"Success", "Failure"}; 46 String[] relationTypes() default {"Success", "Failure"};
47 47
48 boolean customRelations() default false; 48 boolean customRelations() default false;
  49 +
49 } 50 }
@@ -30,6 +30,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback; @@ -30,6 +30,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
30 @RuleNode( 30 @RuleNode(
31 type = ComponentType.FILTER, 31 type = ComponentType.FILTER,
32 name = "script", relationTypes = {"True", "False", "Failure"}, 32 name = "script", relationTypes = {"True", "False", "Failure"},
  33 + configClazz = TbJsFilterNodeConfiguration.class,
33 nodeDescription = "Filter incoming messages using JS script", 34 nodeDescription = "Filter incoming messages using JS script",
34 nodeDetails = "Evaluate incoming Message with configured JS condition. " + 35 nodeDetails = "Evaluate incoming Message with configured JS condition. " +
35 "If <b>True</b> - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used." + 36 "If <b>True</b> - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used." +
@@ -16,9 +16,17 @@ @@ -16,9 +16,17 @@
16 package org.thingsboard.rule.engine.filter; 16 package org.thingsboard.rule.engine.filter;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
19 20
20 @Data 21 @Data
21 -public class TbJsFilterNodeConfiguration { 22 +public class TbJsFilterNodeConfiguration implements NodeConfiguration {
22 23
23 private String jsScript; 24 private String jsScript;
  25 +
  26 + @Override
  27 + public TbJsFilterNodeConfiguration defaultConfiguration() {
  28 + TbJsFilterNodeConfiguration configuration = new TbJsFilterNodeConfiguration();
  29 + configuration.setJsScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;");
  30 + return configuration;
  31 + }
24 } 32 }
@@ -31,6 +31,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback; @@ -31,6 +31,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
31 @RuleNode( 31 @RuleNode(
32 type = ComponentType.FILTER, 32 type = ComponentType.FILTER,
33 name = "switch", customRelations = true, 33 name = "switch", customRelations = true,
  34 + configClazz = TbJsSwitchNodeConfiguration.class,
34 nodeDescription = "Route incoming Message to one or multiple output chains", 35 nodeDescription = "Route incoming Message to one or multiple output chains",
35 nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " + 36 nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " +
36 "If Array is empty - message not routed to next Node. " + 37 "If Array is empty - message not routed to next Node. " +
@@ -15,14 +15,29 @@ @@ -15,14 +15,29 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.filter; 16 package org.thingsboard.rule.engine.filter;
17 17
  18 +import com.google.common.collect.Sets;
18 import lombok.Data; 19 import lombok.Data;
  20 +import org.thingsboard.rule.engine.api.NodeConfiguration;
19 21
20 import java.util.Set; 22 import java.util.Set;
21 23
22 @Data 24 @Data
23 -public class TbJsSwitchNodeConfiguration { 25 +public class TbJsSwitchNodeConfiguration implements NodeConfiguration {
24 26
25 private String jsScript; 27 private String jsScript;
26 private Set<String> allowedRelations; 28 private Set<String> allowedRelations;
27 private boolean routeToAllWithNoCheck; 29 private boolean routeToAllWithNoCheck;
  30 +
  31 + @Override
  32 + public TbJsSwitchNodeConfiguration defaultConfiguration() {
  33 + TbJsSwitchNodeConfiguration configuration = new TbJsSwitchNodeConfiguration();
  34 + configuration.setJsScript("function nextRelation(meta, msg) {\n" +
  35 + " return ['one','nine'];" +
  36 + "};\n" +
  37 + "\n" +
  38 + "nextRelation(meta, msg);");
  39 + configuration.setAllowedRelations(Sets.newHashSet("one", "two"));
  40 + configuration.setRouteToAllWithNoCheck(false);
  41 + return configuration;
  42 + }
28 } 43 }
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.TbMsg; @@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.TbMsg;
28 @RuleNode( 28 @RuleNode(
29 type = ComponentType.FILTER, 29 type = ComponentType.FILTER,
30 name = "message type", 30 name = "message type",
  31 + configClazz = TbMsgTypeFilterNodeConfiguration.class,
31 nodeDescription = "Filter incoming messages by Message Type", 32 nodeDescription = "Filter incoming messages by Message Type",
32 nodeDetails = "Evaluate incoming Message with configured JS condition. " + 33 nodeDetails = "Evaluate incoming Message with configured JS condition. " +
33 "If incoming MessageType is expected - send Message via <b>Success</b> chain, otherwise <b>Failure</b> chain is used.") 34 "If incoming MessageType is expected - send Message via <b>Success</b> chain, otherwise <b>Failure</b> chain is used.")
@@ -16,15 +16,24 @@ @@ -16,15 +16,24 @@
16 package org.thingsboard.rule.engine.filter; 16 package org.thingsboard.rule.engine.filter;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
19 20
  21 +import java.util.Arrays;
  22 +import java.util.Collections;
20 import java.util.List; 23 import java.util.List;
21 24
22 /** 25 /**
23 * Created by ashvayka on 19.01.18. 26 * Created by ashvayka on 19.01.18.
24 */ 27 */
25 @Data 28 @Data
26 -public class TbMsgTypeFilterNodeConfiguration { 29 +public class TbMsgTypeFilterNodeConfiguration implements NodeConfiguration {
27 30
28 private List<String> messageTypes; 31 private List<String> messageTypes;
29 32
  33 + @Override
  34 + public TbMsgTypeFilterNodeConfiguration defaultConfiguration() {
  35 + TbMsgTypeFilterNodeConfiguration configuration = new TbMsgTypeFilterNodeConfiguration();
  36 + configuration.setMessageTypes(Arrays.asList("GET_ATTRIBUTES","POST_ATTRIBUTES","POST_TELEMETRY","RPC_REQUEST"));
  37 + return configuration;
  38 + }
30 } 39 }
@@ -38,6 +38,7 @@ import static org.thingsboard.server.common.data.DataConstants.*; @@ -38,6 +38,7 @@ import static org.thingsboard.server.common.data.DataConstants.*;
38 @Slf4j 38 @Slf4j
39 @RuleNode(type = ComponentType.ENRICHMENT, 39 @RuleNode(type = ComponentType.ENRICHMENT,
40 name = "originator attributes", 40 name = "originator attributes",
  41 + configClazz = TbGetAttributesNodeConfiguration.class,
41 nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata", 42 nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata",
42 nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " + 43 nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " +
43 "with specific prefix: <i>cs/shared/ss</i>. To access those attributes in other nodes this template can be used " + 44 "with specific prefix: <i>cs/shared/ss</i>. To access those attributes in other nodes this template can be used " +
@@ -16,14 +16,16 @@ @@ -16,14 +16,16 @@
16 package org.thingsboard.rule.engine.metadata; 16 package org.thingsboard.rule.engine.metadata;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
19 20
  21 +import java.util.Collections;
20 import java.util.List; 22 import java.util.List;
21 23
22 /** 24 /**
23 * Created by ashvayka on 19.01.18. 25 * Created by ashvayka on 19.01.18.
24 */ 26 */
25 @Data 27 @Data
26 -public class TbGetAttributesNodeConfiguration { 28 +public class TbGetAttributesNodeConfiguration implements NodeConfiguration {
27 29
28 private List<String> clientAttributeNames; 30 private List<String> clientAttributeNames;
29 private List<String> sharedAttributeNames; 31 private List<String> sharedAttributeNames;
@@ -31,4 +33,13 @@ public class TbGetAttributesNodeConfiguration { @@ -31,4 +33,13 @@ public class TbGetAttributesNodeConfiguration {
31 33
32 private List<String> latestTsKeyNames; 34 private List<String> latestTsKeyNames;
33 35
  36 + @Override
  37 + public TbGetAttributesNodeConfiguration defaultConfiguration() {
  38 + TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
  39 + configuration.setClientAttributeNames(Collections.emptyList());
  40 + configuration.setSharedAttributeNames(Collections.emptyList());
  41 + configuration.setServerAttributeNames(Collections.emptyList());
  42 + configuration.setLatestTsKeyNames(Collections.emptyList());
  43 + return configuration;
  44 + }
34 } 45 }
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
26 @RuleNode( 26 @RuleNode(
27 type = ComponentType.ENRICHMENT, 27 type = ComponentType.ENRICHMENT,
28 name="customer attributes", 28 name="customer attributes",
  29 + configClazz = TbGetEntityAttrNodeConfiguration.class,
29 nodeDescription = "Add Originators Customer Attributes or Latest Telemetry into Message Metadata", 30 nodeDescription = "Add Originators Customer Attributes or Latest Telemetry into Message Metadata",
30 nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " + 31 nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
31 "To access those attributes in other nodes this template can be used " + 32 "To access those attributes in other nodes this template can be used " +
@@ -16,13 +16,25 @@ @@ -16,13 +16,25 @@
16 package org.thingsboard.rule.engine.metadata; 16 package org.thingsboard.rule.engine.metadata;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
19 20
  21 +import java.util.HashMap;
20 import java.util.Map; 22 import java.util.Map;
21 import java.util.Optional; 23 import java.util.Optional;
22 24
23 @Data 25 @Data
24 -public class TbGetEntityAttrNodeConfiguration { 26 +public class TbGetEntityAttrNodeConfiguration implements NodeConfiguration {
25 27
26 private Map<String, String> attrMapping; 28 private Map<String, String> attrMapping;
27 private boolean isTelemetry = false; 29 private boolean isTelemetry = false;
  30 +
  31 + @Override
  32 + public TbGetEntityAttrNodeConfiguration defaultConfiguration() {
  33 + TbGetEntityAttrNodeConfiguration configuration = new TbGetEntityAttrNodeConfiguration();
  34 + Map<String, String> attrMapping = new HashMap<>();
  35 + attrMapping.putIfAbsent("temperature", "tempo");
  36 + configuration.setAttrMapping(attrMapping);
  37 + configuration.setTelemetry(true);
  38 + return configuration;
  39 + }
28 } 40 }
@@ -16,11 +16,28 @@ @@ -16,11 +16,28 @@
16 package org.thingsboard.rule.engine.metadata; 16 package org.thingsboard.rule.engine.metadata;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
  20 +import org.thingsboard.server.common.data.relation.EntityRelation;
19 import org.thingsboard.server.common.data.relation.EntitySearchDirection; 21 import org.thingsboard.server.common.data.relation.EntitySearchDirection;
20 22
  23 +import java.util.HashMap;
  24 +import java.util.Map;
  25 +
21 @Data 26 @Data
22 -public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration { 27 +public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration {
23 28
24 private String relationType; 29 private String relationType;
25 private EntitySearchDirection direction; 30 private EntitySearchDirection direction;
  31 +
  32 + @Override
  33 + public TbGetRelatedAttrNodeConfiguration defaultConfiguration() {
  34 + TbGetRelatedAttrNodeConfiguration configuration = new TbGetRelatedAttrNodeConfiguration();
  35 + Map<String, String> attrMapping = new HashMap<>();
  36 + attrMapping.putIfAbsent("temperature", "tempo");
  37 + configuration.setAttrMapping(attrMapping);
  38 + configuration.setTelemetry(true);
  39 + configuration.setRelationType(EntityRelation.CONTAINS_TYPE);
  40 + configuration.setDirection(EntitySearchDirection.FROM);
  41 + return configuration;
  42 + }
26 } 43 }
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
26 @RuleNode( 26 @RuleNode(
27 type = ComponentType.ENRICHMENT, 27 type = ComponentType.ENRICHMENT,
28 name="related attributes", 28 name="related attributes",
  29 + configClazz = TbGetRelatedAttrNodeConfiguration.class,
29 nodeDescription = "Add Originators Related Entity Attributes or Latest Telemetry into Message Metadata", 30 nodeDescription = "Add Originators Related Entity Attributes or Latest Telemetry into Message Metadata",
30 nodeDetails = "Related Entity found using configured relation direction and Relation Type. " + 31 nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
31 "If multiple Related Entities are found, only first Entity is used for attributes enrichment, other entities are discarded. " + 32 "If multiple Related Entities are found, only first Entity is used for attributes enrichment, other entities are discarded. " +
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType; @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
28 @RuleNode( 28 @RuleNode(
29 type = ComponentType.ENRICHMENT, 29 type = ComponentType.ENRICHMENT,
30 name="tenant attributes", 30 name="tenant attributes",
  31 + configClazz = TbGetEntityAttrNodeConfiguration.class,
31 nodeDescription = "Add Originators Tenant Attributes or Latest Telemetry into Message Metadata", 32 nodeDescription = "Add Originators Tenant Attributes or Latest Telemetry into Message Metadata",
32 nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " + 33 nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
33 "To access those attributes in other nodes this template can be used " + 34 "To access those attributes in other nodes this template can be used " +
@@ -36,6 +36,7 @@ import java.util.HashSet; @@ -36,6 +36,7 @@ import java.util.HashSet;
36 @RuleNode( 36 @RuleNode(
37 type = ComponentType.TRANSFORMATION, 37 type = ComponentType.TRANSFORMATION,
38 name="change originator", 38 name="change originator",
  39 + configClazz = TbChangeOriginatorNodeConfiguration.class,
39 nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity", 40 nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity",
40 nodeDetails = "Related Entity found using configured relation direction and Relation Type. " + 41 nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
41 "If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ") 42 "If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ")
@@ -16,12 +16,24 @@ @@ -16,12 +16,24 @@
16 package org.thingsboard.rule.engine.transform; 16 package org.thingsboard.rule.engine.transform;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
  20 +import org.thingsboard.server.common.data.relation.EntityRelation;
19 import org.thingsboard.server.common.data.relation.EntitySearchDirection; 21 import org.thingsboard.server.common.data.relation.EntitySearchDirection;
20 22
21 @Data 23 @Data
22 -public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{ 24 +public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration implements NodeConfiguration {
23 25
24 private String originatorSource; 26 private String originatorSource;
25 private EntitySearchDirection direction; 27 private EntitySearchDirection direction;
26 private String relationType; 28 private String relationType;
  29 +
  30 + @Override
  31 + public TbChangeOriginatorNodeConfiguration defaultConfiguration() {
  32 + TbChangeOriginatorNodeConfiguration configuration = new TbChangeOriginatorNodeConfiguration();
  33 + configuration.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
  34 + configuration.setDirection(EntitySearchDirection.FROM);
  35 + configuration.setRelationType(EntityRelation.CONTAINS_TYPE);
  36 + configuration.setStartNewChain(false);
  37 + return configuration;
  38 + }
27 } 39 }
@@ -27,6 +27,7 @@ import javax.script.Bindings; @@ -27,6 +27,7 @@ import javax.script.Bindings;
27 @RuleNode( 27 @RuleNode(
28 type = ComponentType.TRANSFORMATION, 28 type = ComponentType.TRANSFORMATION,
29 name = "script", 29 name = "script",
  30 + configClazz = TbTransformMsgNodeConfiguration.class,
30 nodeDescription = "Change Message payload and Metadata using JavaScript", 31 nodeDescription = "Change Message payload and Metadata using JavaScript",
31 nodeDetails = "JavaScript function recieve 2 input parameters that can be changed inside.<br/> " + 32 nodeDetails = "JavaScript function recieve 2 input parameters that can be changed inside.<br/> " +
32 "<code>meta</code> - is a Message metadata.<br/>" + 33 "<code>meta</code> - is a Message metadata.<br/>" +
@@ -16,9 +16,18 @@ @@ -16,9 +16,18 @@
16 package org.thingsboard.rule.engine.transform; 16 package org.thingsboard.rule.engine.transform;
17 17
18 import lombok.Data; 18 import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
19 20
20 @Data 21 @Data
21 -public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration { 22 +public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration implements NodeConfiguration {
22 23
23 private String jsScript; 24 private String jsScript;
  25 +
  26 + @Override
  27 + public TbTransformMsgNodeConfiguration defaultConfiguration() {
  28 + TbTransformMsgNodeConfiguration configuration = new TbTransformMsgNodeConfiguration();
  29 + configuration.setStartNewChain(false);
  30 + configuration.setJsScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' ");
  31 + return configuration;
  32 + }
24 } 33 }
@@ -153,16 +153,21 @@ function RuleChainService($http, $q, $filter, types, componentDescriptorService) @@ -153,16 +153,21 @@ function RuleChainService($http, $q, $filter, types, componentDescriptorService)
153 return deferred.promise; 153 return deferred.promise;
154 } 154 }
155 155
156 - function getRuleNodeSupportedLinks(nodeType) { //eslint-disable-line  
157 - //TODO:  
158 - var deferred = $q.defer();  
159 - var linkLabels = [  
160 - { name: 'Success', custom: false },  
161 - { name: 'Fail', custom: false },  
162 - { name: 'Custom', custom: true },  
163 - ];  
164 - deferred.resolve(linkLabels);  
165 - return deferred.promise; 156 + function getRuleNodeSupportedLinks(component) {
  157 + var relationTypes = component.configurationDescriptor.nodeDefinition.relationTypes;
  158 + var customRelations = component.configurationDescriptor.nodeDefinition.customRelations;
  159 + var linkLabels = [];
  160 + for (var i=0;i<relationTypes.length;i++) {
  161 + linkLabels.push({
  162 + name: relationTypes[i], custom: false
  163 + });
  164 + }
  165 + if (customRelations) {
  166 + linkLabels.push(
  167 + { name: 'Custom', custom: true }
  168 + );
  169 + }
  170 + return linkLabels;
166 } 171 }
167 172
168 function getRuleNodeComponents() { 173 function getRuleNodeComponents() {
  1 +/*
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +export default function fixAceEditor(aceEditor) {
  18 + aceEditor.$blockScrolling = Infinity;
  19 + aceEditor.on("showGutterTooltip", function (tooltip) {
  20 + if (!tooltip.isAttachedToBody) {
  21 + document.body.appendChild(tooltip.$element); //eslint-disable-line
  22 + tooltip.isAttachedToBody = true;
  23 + onElementRemoved(tooltip.$parentNode, () => {
  24 + if (tooltip.$element.parentNode != null) {
  25 + tooltip.$element.parentNode.removeChild(tooltip.$element);
  26 + }
  27 + });
  28 + }
  29 + });
  30 +}
  31 +
  32 +function onElementRemoved(element, callback) {
  33 + if (!document.body.contains(element)) { //eslint-disable-line
  34 + callback();
  35 + } else {
  36 + var observer;
  37 + observer = new MutationObserver(function(mutations) { //eslint-disable-line
  38 + if (!document.body.contains(element)) { //eslint-disable-line
  39 + callback();
  40 + observer.disconnect();
  41 + }
  42 + });
  43 + observer.observe(document.body, {childList: true}); //eslint-disable-line
  44 + }
  45 +}
@@ -22,6 +22,8 @@ import thingsboardToast from '../services/toast'; @@ -22,6 +22,8 @@ import thingsboardToast from '../services/toast';
22 import thingsboardUtils from '../common/utils.service'; 22 import thingsboardUtils from '../common/utils.service';
23 import thingsboardExpandFullscreen from './expand-fullscreen.directive'; 23 import thingsboardExpandFullscreen from './expand-fullscreen.directive';
24 24
  25 +import fixAceEditor from './ace-editor-fix';
  26 +
25 /* eslint-disable import/no-unresolved, import/default */ 27 /* eslint-disable import/no-unresolved, import/default */
26 28
27 import jsFuncTemplate from './js-func.tpl.html'; 29 import jsFuncTemplate from './js-func.tpl.html';
@@ -83,6 +85,7 @@ function JsFunc($compile, $templateCache, toast, utils, $translate) { @@ -83,6 +85,7 @@ function JsFunc($compile, $templateCache, toast, utils, $translate) {
83 scope.js_editor.session.on("change", function () { 85 scope.js_editor.session.on("change", function () {
84 scope.cleanupJsErrors(); 86 scope.cleanupJsErrors();
85 }); 87 });
  88 + fixAceEditor(_ace);
86 } 89 }
87 }; 90 };
88 91
  1 +/*
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +import './json-object-edit.scss';
  17 +
  18 +import 'brace/ext/language_tools';
  19 +import 'brace/mode/json';
  20 +import 'ace-builds/src-min-noconflict/snippets/json';
  21 +
  22 +import fixAceEditor from './ace-editor-fix';
  23 +
  24 +/* eslint-disable import/no-unresolved, import/default */
  25 +
  26 +import jsonObjectEditTemplate from './json-object-edit.tpl.html';
  27 +
  28 +/* eslint-enable import/no-unresolved, import/default */
  29 +
  30 +export default angular.module('thingsboard.directives.jsonObjectEdit', [])
  31 + .directive('tbJsonObjectEdit', JsonObjectEdit)
  32 + .name;
  33 +
  34 +/*@ngInject*/
  35 +function JsonObjectEdit($compile, $templateCache, $document, toast, utils) {
  36 +
  37 + var linker = function (scope, element, attrs, ngModelCtrl) {
  38 + var template = $templateCache.get(jsonObjectEditTemplate);
  39 + element.html(template);
  40 +
  41 + scope.label = attrs.label;
  42 +
  43 + scope.objectValid = true;
  44 + scope.validationError = '';
  45 +
  46 + scope.json_editor;
  47 +
  48 + scope.onFullscreenChanged = function () {
  49 + updateEditorSize();
  50 + };
  51 +
  52 + function updateEditorSize() {
  53 + if (scope.json_editor) {
  54 + scope.json_editor.resize();
  55 + scope.json_editor.renderer.updateFull();
  56 + }
  57 + }
  58 +
  59 + scope.jsonEditorOptions = {
  60 + useWrapMode: true,
  61 + mode: 'json',
  62 + advanced: {
  63 + enableSnippets: true,
  64 + enableBasicAutocompletion: true,
  65 + enableLiveAutocompletion: true
  66 + },
  67 + onLoad: function (_ace) {
  68 + scope.json_editor = _ace;
  69 + scope.json_editor.session.on("change", function () {
  70 + scope.cleanupJsonErrors();
  71 + });
  72 + fixAceEditor(_ace);
  73 + }
  74 + };
  75 +
  76 + scope.cleanupJsonErrors = function () {
  77 + toast.hide();
  78 + };
  79 +
  80 + scope.updateValidity = function () {
  81 + ngModelCtrl.$setValidity('objectValid', scope.objectValid);
  82 + };
  83 +
  84 + scope.$watch('contentBody', function (newVal, prevVal) {
  85 + if (!angular.equals(newVal, prevVal)) {
  86 + var object = scope.validate();
  87 + ngModelCtrl.$setViewValue(object);
  88 + scope.updateValidity();
  89 + }
  90 + });
  91 +
  92 + ngModelCtrl.$render = function () {
  93 + var object = ngModelCtrl.$viewValue;
  94 + var content = '';
  95 + try {
  96 + if (object) {
  97 + content = angular.toJson(object, true);
  98 + }
  99 + } catch (e) {
  100 + //
  101 + }
  102 + scope.contentBody = content;
  103 + };
  104 +
  105 + scope.showError = function (error) {
  106 + var toastParent = angular.element('#tb-json-panel', element);
  107 + toast.showError(error, toastParent, 'bottom left');
  108 + };
  109 +
  110 + scope.validate = function () {
  111 + if (!scope.contentBody || !scope.contentBody.length) {
  112 + if (scope.required) {
  113 + scope.validationError = 'Json object is required.';
  114 + scope.objectValid = false;
  115 + } else {
  116 + scope.validationError = '';
  117 + scope.objectValid = true;
  118 + }
  119 + return null;
  120 + } else {
  121 + try {
  122 + var object = angular.fromJson(scope.contentBody);
  123 + scope.validationError = '';
  124 + scope.objectValid = true;
  125 + return object;
  126 + } catch (e) {
  127 + var details = utils.parseException(e);
  128 + var errorInfo = 'Error:';
  129 + if (details.name) {
  130 + errorInfo += ' ' + details.name + ':';
  131 + }
  132 + if (details.message) {
  133 + errorInfo += ' ' + details.message;
  134 + }
  135 + scope.validationError = errorInfo;
  136 + scope.objectValid = false;
  137 + return null;
  138 + }
  139 + }
  140 + };
  141 +
  142 + scope.$on('form-submit', function () {
  143 + if (!scope.readonly) {
  144 + scope.cleanupJsonErrors();
  145 + if (!scope.objectValid) {
  146 + scope.showError(scope.validationError);
  147 + }
  148 + }
  149 + });
  150 +
  151 + scope.$on('update-ace-editor-size', function () {
  152 + updateEditorSize();
  153 + });
  154 +
  155 + $compile(element.contents())(scope);
  156 + }
  157 +
  158 + return {
  159 + restrict: "E",
  160 + require: "^ngModel",
  161 + scope: {
  162 + required:'=ngRequired',
  163 + readonly:'=ngReadonly',
  164 + fillHeight:'=?'
  165 + },
  166 + link: linker
  167 + };
  168 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +tb-json-object-edit {
  17 + position: relative;
  18 + .fill-height {
  19 + height: 100%;
  20 + }
  21 +}
  22 +
  23 +.tb-json-object-panel {
  24 + margin-left: 15px;
  25 + border: 1px solid #C0C0C0;
  26 + height: 100%;
  27 + #tb-json-input {
  28 + min-width: 200px;
  29 + width: 100%;
  30 + height: 100%;
  31 + &:not(.fill-height) {
  32 + min-height: 200px;
  33 + }
  34 + }
  35 +}
  1 +<!--
  2 +
  3 + Copyright © 2016-2018 The Thingsboard Authors
  4 +
  5 + Licensed under the Apache License, Version 2.0 (the "License");
  6 + you may not use this file except in compliance with the License.
  7 + You may obtain a copy of the License at
  8 +
  9 + http://www.apache.org/licenses/LICENSE-2.0
  10 +
  11 + Unless required by applicable law or agreed to in writing, software
  12 + distributed under the License is distributed on an "AS IS" BASIS,
  13 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + See the License for the specific language governing permissions and
  15 + limitations under the License.
  16 +
  17 +-->
  18 +<div style="background: #fff;" ng-class="{'fill-height': fillHeight}" tb-expand-fullscreen fullscreen-zindex="100" expand-button-id="expand-button" on-fullscreen-changed="onFullscreenChanged()" layout="column">
  19 + <div layout="row" layout-align="start center">
  20 + <label class="tb-title no-padding"
  21 + ng-class="{'tb-required': required,
  22 + 'tb-readonly': readonly,
  23 + 'tb-error': !objectValid}">{{ label }}</label>
  24 + <span flex></span>
  25 + <md-button id="expand-button" aria-label="Fullscreen" class="md-icon-button tb-md-32 tb-fullscreen-button-style"></md-button>
  26 + </div>
  27 + <div flex id="tb-json-panel" class="tb-json-object-panel" layout="column">
  28 + <div flex id="tb-json-input" ng-class="{'fill-height': fillHeight}"
  29 + ng-readonly="readonly"
  30 + ui-ace="jsonEditorOptions"
  31 + ng-model="contentBody">
  32 + </div>
  33 + </div>
  34 +</div>
@@ -23,6 +23,8 @@ import FlatButton from 'material-ui/FlatButton'; @@ -23,6 +23,8 @@ import FlatButton from 'material-ui/FlatButton';
23 import 'brace/ext/language_tools'; 23 import 'brace/ext/language_tools';
24 import 'brace/theme/github'; 24 import 'brace/theme/github';
25 25
  26 +import fixAceEditor from './../ace-editor-fix';
  27 +
26 class ThingsboardAceEditor extends React.Component { 28 class ThingsboardAceEditor extends React.Component {
27 29
28 constructor(props) { 30 constructor(props) {
@@ -31,6 +33,7 @@ class ThingsboardAceEditor extends React.Component { @@ -31,6 +33,7 @@ class ThingsboardAceEditor extends React.Component {
31 this.onBlur = this.onBlur.bind(this); 33 this.onBlur = this.onBlur.bind(this);
32 this.onFocus = this.onFocus.bind(this); 34 this.onFocus = this.onFocus.bind(this);
33 this.onTidy = this.onTidy.bind(this); 35 this.onTidy = this.onTidy.bind(this);
  36 + this.onLoad = this.onLoad.bind(this);
34 var value = props.value ? props.value + '' : ''; 37 var value = props.value ? props.value + '' : '';
35 this.state = { 38 this.state = {
36 value: value, 39 value: value,
@@ -72,6 +75,10 @@ class ThingsboardAceEditor extends React.Component { @@ -72,6 +75,10 @@ class ThingsboardAceEditor extends React.Component {
72 } 75 }
73 } 76 }
74 77
  78 + onLoad(editor) {
  79 + fixAceEditor(editor);
  80 + }
  81 +
75 render() { 82 render() {
76 83
77 const styles = reactCSS({ 84 const styles = reactCSS({
@@ -117,6 +124,7 @@ class ThingsboardAceEditor extends React.Component { @@ -117,6 +124,7 @@ class ThingsboardAceEditor extends React.Component {
117 onChange={this.onValueChanged} 124 onChange={this.onValueChanged}
118 onFocus={this.onFocus} 125 onFocus={this.onFocus}
119 onBlur={this.onBlur} 126 onBlur={this.onBlur}
  127 + onLoad={this.onLoad}
120 name={this.props.form.title} 128 name={this.props.form.title}
121 value={this.state.value} 129 value={this.state.value}
122 readOnly={this.props.form.readonly} 130 readOnly={this.props.form.readonly}
@@ -23,6 +23,8 @@ import thingsboardJsonForm from '../json-form.directive'; @@ -23,6 +23,8 @@ import thingsboardJsonForm from '../json-form.directive';
23 import thingsboardManageWidgetActions from './action/manage-widget-actions.directive'; 23 import thingsboardManageWidgetActions from './action/manage-widget-actions.directive';
24 import 'angular-ui-ace'; 24 import 'angular-ui-ace';
25 25
  26 +import fixAceEditor from './../ace-editor-fix';
  27 +
26 import './widget-config.scss'; 28 import './widget-config.scss';
27 29
28 /* eslint-disable import/no-unresolved, import/default */ 30 /* eslint-disable import/no-unresolved, import/default */
@@ -72,6 +74,9 @@ function WidgetConfig($compile, $templateCache, $rootScope, $translate, $timeout @@ -72,6 +74,9 @@ function WidgetConfig($compile, $templateCache, $rootScope, $translate, $timeout
72 enableSnippets: true, 74 enableSnippets: true,
73 enableBasicAutocompletion: true, 75 enableBasicAutocompletion: true,
74 enableLiveAutocompletion: true 76 enableLiveAutocompletion: true
  77 + },
  78 + onLoad: function (_ace) {
  79 + fixAceEditor(_ace);
75 } 80 }
76 }; 81 };
77 82
@@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra @@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
128 let addedFile = event.target.result; 128 let addedFile = event.target.result;
129 129
130 if (addedFile && addedFile.length > 0) { 130 if (addedFile && addedFile.length > 0) {
131 - model[options.fileName] = $file.name;  
132 - model[options.file] = addedFile.replace(/^data.*base64,/, ""); 131 + model[options.location] = $file.name;
  132 + model[options.fileContent] = addedFile.replace(/^data.*base64,/, "");
133 133
134 } 134 }
135 } 135 }
@@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra @@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
142 scope.clearFile = function(model, options) { 142 scope.clearFile = function(model, options) {
143 scope.theForm.$setDirty(); 143 scope.theForm.$setDirty();
144 144
145 - model[options.fileName] = null;  
146 - model[options.file] = null; 145 + model[options.location] = null;
  146 + model[options.fileContent] = null;
147 147
148 }; 148 };
149 149
@@ -212,8 +212,8 @@ @@ -212,8 +212,8 @@
212 </md-input-container> 212 </md-input-container>
213 213
214 <section class="dropdown-section"> 214 <section class="dropdown-section">
215 - <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.file}">  
216 - <span ng-init='fieldsToFill = {"fileName":"fileName", "file":"file"}'></span> 215 + <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.fileContent}">
  216 + <span ng-init='fieldsToFill = {"location":"location", "fileContent":"fileContent"}'></span>
217 <label class="tb-label" translate>extension.opc-keystore-location</label> 217 <label class="tb-label" translate>extension.opc-keystore-location</label>
218 <div flow-init="{singleFile:true}" flow-file-added='fileAdded($file, server.keystore, fieldsToFill)' class="tb-file-select-container"> 218 <div flow-init="{singleFile:true}" flow-file-added='fileAdded($file, server.keystore, fieldsToFill)' class="tb-file-select-container">
219 <div class="tb-file-clear-container"> 219 <div class="tb-file-clear-container">
@@ -231,14 +231,14 @@ @@ -231,14 +231,14 @@
231 class="file-input" 231 class="file-input"
232 flow-btn id="dropFileKeystore_{{serverIndex}}" 232 flow-btn id="dropFileKeystore_{{serverIndex}}"
233 name="keystoreFile" 233 name="keystoreFile"
234 - ng-model="server.keystore.file" 234 + ng-model="server.keystore.fileContent"
235 > 235 >
236 </div> 236 </div>
237 </div> 237 </div>
238 </div> 238 </div>
239 <div class="dropdown-messages"> 239 <div class="dropdown-messages">
240 - <div ng-if="!server.keystore[fieldsToFill.fileName]" class="tb-error-message" translate>extension.no-file</div>  
241 - <div ng-if="server.keystore[fieldsToFill.fileName]">{{server.keystore[fieldsToFill.fileName]}}</div> 240 + <div ng-if="!server.keystore[fieldsToFill.location]" class="tb-error-message" translate>extension.no-file</div>
  241 + <div ng-if="server.keystore[fieldsToFill.location]">{{server.keystore[fieldsToFill.location]}}</div>
242 </div> 242 </div>
243 </section> 243 </section>
244 244
@@ -29,6 +29,7 @@ import thingsboardNoAnimate from '../components/no-animate.directive'; @@ -29,6 +29,7 @@ import thingsboardNoAnimate from '../components/no-animate.directive';
29 import thingsboardOnFinishRender from '../components/finish-render.directive'; 29 import thingsboardOnFinishRender from '../components/finish-render.directive';
30 import thingsboardSideMenu from '../components/side-menu.directive'; 30 import thingsboardSideMenu from '../components/side-menu.directive';
31 import thingsboardDashboardAutocomplete from '../components/dashboard-autocomplete.directive'; 31 import thingsboardDashboardAutocomplete from '../components/dashboard-autocomplete.directive';
  32 +import thingsboardJsonObjectEdit from '../components/json-object-edit.directive';
32 33
33 import thingsboardUserMenu from './user-menu.directive'; 34 import thingsboardUserMenu from './user-menu.directive';
34 35
@@ -90,7 +91,8 @@ export default angular.module('thingsboard.home', [ @@ -90,7 +91,8 @@ export default angular.module('thingsboard.home', [
90 thingsboardNoAnimate, 91 thingsboardNoAnimate,
91 thingsboardOnFinishRender, 92 thingsboardOnFinishRender,
92 thingsboardSideMenu, 93 thingsboardSideMenu,
93 - thingsboardDashboardAutocomplete 94 + thingsboardDashboardAutocomplete,
  95 + thingsboardJsonObjectEdit
94 ]) 96 ])
95 .config(HomeRoutes) 97 .config(HomeRoutes)
96 .controller('HomeController', HomeController) 98 .controller('HomeController', HomeController)
@@ -1179,6 +1179,7 @@ export default angular.module('thingsboard.locale', []) @@ -1179,6 +1179,7 @@ export default angular.module('thingsboard.locale', [])
1179 "delete": "Delete rule node", 1179 "delete": "Delete rule node",
1180 "rulenode-details": "Rule node details", 1180 "rulenode-details": "Rule node details",
1181 "debug-mode": "Debug mode", 1181 "debug-mode": "Debug mode",
  1182 + "configuration": "Configuration",
1182 "link-details": "Rule node link details", 1183 "link-details": "Rule node link details",
1183 "add-link": "Add link", 1184 "add-link": "Add link",
1184 "link-label": "Link label", 1185 "link-label": "Link label",
@@ -151,6 +151,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -151,6 +151,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
151 }, 151 },
152 'mouseLeave': function () { 152 'mouseLeave': function () {
153 destroyTooltips(); 153 destroyTooltips();
  154 + },
  155 + 'mouseDown': function () {
  156 + destroyTooltips();
154 } 157 }
155 } 158 }
156 }; 159 };
@@ -226,16 +229,12 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -226,16 +229,12 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
226 edgeDoubleClick: function (event, edge) { 229 edgeDoubleClick: function (event, edge) {
227 var sourceNode = vm.modelservice.nodes.getNodeByConnectorId(edge.source); 230 var sourceNode = vm.modelservice.nodes.getNodeByConnectorId(edge.source);
228 if (sourceNode.component.type != types.ruleNodeType.INPUT.value) { 231 if (sourceNode.component.type != types.ruleNodeType.INPUT.value) {
229 - ruleChainService.getRuleNodeSupportedLinks(sourceNode.component.clazz).then(  
230 - (labels) => {  
231 - vm.isEditingRuleNode = false;  
232 - vm.editingRuleNode = null;  
233 - vm.editingRuleNodeLinkLabels = labels;  
234 - vm.isEditingRuleNodeLink = true;  
235 - vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge);  
236 - vm.editingRuleNodeLink = angular.copy(edge);  
237 - }  
238 - ); 232 + vm.isEditingRuleNode = false;
  233 + vm.editingRuleNode = null;
  234 + vm.editingRuleNodeLinkLabels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component);
  235 + vm.isEditingRuleNodeLink = true;
  236 + vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge);
  237 + vm.editingRuleNodeLink = angular.copy(edge);
239 } 238 }
240 }, 239 },
241 nodeCallbacks: { 240 nodeCallbacks: {
@@ -267,16 +266,10 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -267,16 +266,10 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
267 deferred.resolve(edge); 266 deferred.resolve(edge);
268 } 267 }
269 } else { 268 } else {
270 - ruleChainService.getRuleNodeSupportedLinks(sourceNode.component.clazz).then(  
271 - (labels) => {  
272 - addRuleNodeLink(event, edge, labels).then(  
273 - (link) => {  
274 - deferred.resolve(link);  
275 - },  
276 - () => {  
277 - deferred.reject();  
278 - }  
279 - ); 269 + var labels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component);
  270 + addRuleNodeLink(event, edge, labels).then(
  271 + (link) => {
  272 + deferred.resolve(link);
280 }, 273 },
281 () => { 274 () => {
282 deferred.reject(); 275 deferred.reject();
@@ -309,24 +302,19 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -309,24 +302,19 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
309 y: 10+50*model.nodes.length, 302 y: 10+50*model.nodes.length,
310 connectors: [] 303 connectors: []
311 }; 304 };
312 - if (componentType == types.ruleNodeType.RULE_CHAIN.value) {  
313 - node.connectors.push(  
314 - {  
315 - type: flowchartConstants.leftConnectorType,  
316 - id: model.nodes.length  
317 - }  
318 - );  
319 - } else { 305 + if (ruleNodeComponent.configurationDescriptor.nodeDefinition.inEnabled) {
320 node.connectors.push( 306 node.connectors.push(
321 { 307 {
322 type: flowchartConstants.leftConnectorType, 308 type: flowchartConstants.leftConnectorType,
323 - id: model.nodes.length*2 309 + id: model.nodes.length * 2
324 } 310 }
325 ); 311 );
  312 + }
  313 + if (ruleNodeComponent.configurationDescriptor.nodeDefinition.outEnabled) {
326 node.connectors.push( 314 node.connectors.push(
327 { 315 {
328 type: flowchartConstants.rightConnectorType, 316 type: flowchartConstants.rightConnectorType,
329 - id: model.nodes.length*2+1 317 + id: model.nodes.length * 2 + 1
330 } 318 }
331 ); 319 );
332 } 320 }
@@ -398,17 +386,24 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -398,17 +386,24 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
398 name: ruleNode.name, 386 name: ruleNode.name,
399 nodeClass: vm.types.ruleNodeType[component.type].nodeClass, 387 nodeClass: vm.types.ruleNodeType[component.type].nodeClass,
400 icon: vm.types.ruleNodeType[component.type].icon, 388 icon: vm.types.ruleNodeType[component.type].icon,
401 - connectors: [ 389 + connectors: []
  390 + };
  391 + if (component.configurationDescriptor.nodeDefinition.inEnabled) {
  392 + node.connectors.push(
402 { 393 {
403 type: flowchartConstants.leftConnectorType, 394 type: flowchartConstants.leftConnectorType,
404 id: vm.nextConnectorID++ 395 id: vm.nextConnectorID++
405 - }, 396 + }
  397 + );
  398 + }
  399 + if (component.configurationDescriptor.nodeDefinition.outEnabled) {
  400 + node.connectors.push(
406 { 401 {
407 type: flowchartConstants.rightConnectorType, 402 type: flowchartConstants.rightConnectorType,
408 id: vm.nextConnectorID++ 403 id: vm.nextConnectorID++
409 } 404 }
410 - ]  
411 - }; 405 + );
  406 + }
412 nodes.push(node); 407 nodes.push(node);
413 vm.ruleChainModel.nodes.push(node); 408 vm.ruleChainModel.nodes.push(node);
414 } 409 }
@@ -590,6 +585,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -590,6 +585,9 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
590 } 585 }
591 586
592 function addRuleNode($event, ruleNode) { 587 function addRuleNode($event, ruleNode) {
  588 +
  589 + ruleNode.configuration = angular.copy(ruleNode.component.configurationDescriptor.nodeDefinition.defaultConfiguration);
  590 +
593 $mdDialog.show({ 591 $mdDialog.show({
594 controller: 'AddRuleNodeController', 592 controller: 'AddRuleNodeController',
595 controllerAs: 'vm', 593 controllerAs: 'vm',
@@ -601,13 +599,15 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil, @@ -601,13 +599,15 @@ export function RuleChainController($stateParams, $scope, $compile, $q, $mdUtil,
601 }).then(function (ruleNode) { 599 }).then(function (ruleNode) {
602 ruleNode.id = vm.nextNodeID++; 600 ruleNode.id = vm.nextNodeID++;
603 ruleNode.connectors = []; 601 ruleNode.connectors = [];
604 - ruleNode.connectors.push(  
605 - {  
606 - id: vm.nextConnectorID++,  
607 - type: flowchartConstants.leftConnectorType  
608 - }  
609 - );  
610 - if (ruleNode.component.type != types.ruleNodeType.RULE_CHAIN.value) { 602 + if (ruleNode.component.configurationDescriptor.nodeDefinition.inEnabled) {
  603 + ruleNode.connectors.push(
  604 + {
  605 + id: vm.nextConnectorID++,
  606 + type: flowchartConstants.leftConnectorType
  607 + }
  608 + );
  609 + }
  610 + if (ruleNode.component.configurationDescriptor.nodeDefinition.outEnabled) {
611 ruleNode.connectors.push( 611 ruleNode.connectors.push(
612 { 612 {
613 id: vm.nextConnectorID++, 613 id: vm.nextConnectorID++,
@@ -38,6 +38,11 @@ @@ -38,6 +38,11 @@
38 ng-model="ruleNode.debugMode">{{ 'rulenode.debug-mode' | translate }} 38 ng-model="ruleNode.debugMode">{{ 'rulenode.debug-mode' | translate }}
39 </md-checkbox> 39 </md-checkbox>
40 </md-input-container> 40 </md-input-container>
  41 + <tb-json-object-edit class="tb-rule-node-configuration-json" ng-model="ruleNode.configuration"
  42 + label="{{ 'rulenode.configuration' | translate }}"
  43 + ng-required="true"
  44 + fill-height="true">
  45 + </tb-json-object-edit>
41 <md-input-container class="md-block"> 46 <md-input-container class="md-block">
42 <label translate>rulenode.description</label> 47 <label translate>rulenode.description</label>
43 <textarea ng-model="ruleNode.additionalInfo.description" rows="2"></textarea> 48 <textarea ng-model="ruleNode.additionalInfo.description" rows="2"></textarea>
@@ -14,6 +14,8 @@ @@ -14,6 +14,8 @@
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 16
  17 +import './rulenode.scss';
  18 +
17 /* eslint-disable import/no-unresolved, import/default */ 19 /* eslint-disable import/no-unresolved, import/default */
18 20
19 import ruleNodeFieldsetTemplate from './rulenode-fieldset.tpl.html'; 21 import ruleNodeFieldsetTemplate from './rulenode-fieldset.tpl.html';
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +.tb-rulenode {
  18 + tb-json-object-edit.tb-rule-node-configuration-json {
  19 + height: 300px;
  20 + display: block;
  21 + }
  22 +}
@@ -19,7 +19,7 @@ @@ -19,7 +19,7 @@
19 id="{{node.id}}" 19 id="{{node.id}}"
20 ng-attr-style="position: absolute; top: {{ node.y }}px; left: {{ node.x }}px;" 20 ng-attr-style="position: absolute; top: {{ node.y }}px; left: {{ node.x }}px;"
21 ng-dblclick="callbacks.doubleClick($event, node)" 21 ng-dblclick="callbacks.doubleClick($event, node)"
22 - ng-mouseover="callbacks.mouseOver($event, node)" 22 + ng-mousedown="callbacks.mouseDown($event, node)"
23 ng-mouseenter="callbacks.mouseEnter($event, node)" 23 ng-mouseenter="callbacks.mouseEnter($event, node)"
24 ng-mouseleave="callbacks.mouseLeave($event, node)"> 24 ng-mouseleave="callbacks.mouseLeave($event, node)">
25 <div class="tb-rule-node {{node.nodeClass}}"> 25 <div class="tb-rule-node {{node.nodeClass}}">
@@ -203,6 +203,12 @@ md-sidenav { @@ -203,6 +203,12 @@ md-sidenav {
203 * THINGSBOARD SPECIFIC 203 * THINGSBOARD SPECIFIC
204 ***********************/ 204 ***********************/
205 205
  206 +$swift-ease-out-duration: 0.4s !default;
  207 +$swift-ease-out-timing-function: cubic-bezier(0.25, 0.8, 0.25, 1) !default;
  208 +
  209 +$input-label-float-offset: 6px !default;
  210 +$input-label-float-scale: 0.75 !default;
  211 +
206 label { 212 label {
207 &.tb-title { 213 &.tb-title {
208 pointer-events: none; 214 pointer-events: none;
@@ -213,6 +219,18 @@ label { @@ -213,6 +219,18 @@ label {
213 &.no-padding { 219 &.no-padding {
214 padding-bottom: 0px; 220 padding-bottom: 0px;
215 } 221 }
  222 + &.tb-required:after {
  223 + content: ' *';
  224 + font-size: 13px;
  225 + vertical-align: top;
  226 + color: rgba(0,0,0,0.54);
  227 + }
  228 + &.tb-error {
  229 + color: rgb(221,44,0);
  230 + &.tb-required:after {
  231 + color: rgb(221,44,0);
  232 + }
  233 + }
216 } 234 }
217 } 235 }
218 236