Commit 164dbd68b714a970247fc8aaebb1ec47dc4a8ba4

Authored by Igor Kulikov
2 parents 7b1a39fb 94039f94

Merge branch 'develop/1.5' of github.com:thingsboard/thingsboard into develop/1.5

Showing 22 changed files with 751 additions and 90 deletions
... ... @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
19 19 import org.springframework.boot.SpringBootConfiguration;
20 20 import org.springframework.context.annotation.ComponentScan;
21 21 import org.springframework.scheduling.annotation.EnableAsync;
  22 +import org.springframework.scheduling.annotation.EnableScheduling;
22 23 import springfox.documentation.swagger2.annotations.EnableSwagger2;
23 24
24 25 import java.util.Arrays;
... ... @@ -26,6 +27,7 @@ import java.util.Arrays;
26 27 @SpringBootConfiguration
27 28 @EnableAsync
28 29 @EnableSwagger2
  30 +@EnableScheduling
29 31 @ComponentScan({"org.thingsboard.server"})
30 32 public class ThingsboardServerApplication {
31 33
... ...
... ... @@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
28 28 import org.thingsboard.server.common.data.plugin.PluginMetaData;
29 29 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
30 30 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  31 +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
  32 +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
  33 +import org.thingsboard.server.common.msg.session.MsgType;
31 34 import org.thingsboard.server.extensions.api.plugins.Plugin;
32 35 import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
33 36 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
  37 +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
  38 +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
34 39 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
35 40 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
36 41 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
... ... @@ -98,7 +103,20 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
98 103
99 104 public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException {
100 105 if (state == ComponentLifecycleState.ACTIVE) {
101   - pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
  106 + try {
  107 + pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
  108 + } catch (Exception ex) {
  109 + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex);
  110 + RuleToPluginMsg ruleMsg = msg.getMsg();
  111 + MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
  112 + Integer requestId = 0;
  113 + if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) {
  114 + requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId();
  115 + }
  116 + trustedCtx.reply(
  117 + new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(),
  118 + BasicStatusCodeResponse.onError(responceMsgType, requestId, ex)));
  119 + }
102 120 } else {
103 121 //TODO: reply with plugin suspended message
104 122 }
... ...
... ... @@ -181,6 +181,10 @@ cassandra:
181 181 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
182 182 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
183 183 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
  184 + buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
  185 + concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
  186 + permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
  187 + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
184 188
185 189 queue:
186 190 msg.ttl: 604800 # 7 days
... ... @@ -234,7 +238,7 @@ caffeine:
234 238 specs:
235 239 relations:
236 240 timeToLiveInMinutes: 1440
237   - maxSize: 0
  241 + maxSize: 100000
238 242 deviceCredentials:
239 243 timeToLiveInMinutes: 1440
240 244 maxSize: 100000
... ...
... ... @@ -16,16 +16,16 @@
16 16 package org.thingsboard.server.common.msg.core;
17 17
18 18 import lombok.Data;
19   -import org.thingsboard.server.common.msg.session.FromDeviceMsg;
  19 +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
20 20 import org.thingsboard.server.common.msg.session.MsgType;
21 21
22 22 /**
23 23 * @author Andrew Shvayka
24 24 */
25 25 @Data
26   -public class ToServerRpcRequestMsg implements FromDeviceMsg {
  26 +public class ToServerRpcRequestMsg implements FromDeviceRequestMsg {
27 27
28   - private final int requestId;
  28 + private final Integer requestId;
29 29 private final String method;
30 30 private final String params;
31 31
... ...
... ... @@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
148 148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
149 149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET));
150 150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
151   - ResultSetFuture resultSetFuture = getSession().executeAsync(query);
  151 + ResultSetFuture resultSetFuture = executeAsyncRead(query);
152 152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
153 153 @Nullable
154 154 @Override
... ...
... ... @@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
147 147 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
148 148 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
149 149 log.debug("Remove request: {}", delete.toString());
150   - return getFuture(getSession().executeAsync(delete), rs -> null);
  150 + return getFuture(executeAsyncWrite(delete), rs -> null);
151 151 }
152 152
153 153 private PreparedStatement getSaveStmt() {
154 154 if (saveStmt == null) {
155   - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
  155 + saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
156 156 "(" + ENTITY_TYPE_COLUMN +
157 157 "," + ENTITY_ID_COLUMN +
158 158 "," + ATTRIBUTE_TYPE_COLUMN +
... ...
... ... @@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
244 244 values.add("?");
245 245 }
246 246 String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")";
247   - return getSession().prepare(statementString);
  247 + return prepare(statementString);
248 248 }
249 249
250 250 private PreparedStatement getPartitionInsertStmt() {
251 251 if (partitionInsertStmt == null) {
252   - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
  252 + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
253 253 "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY +
254 254 "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" +
255 255 " VALUES(?, ?)");
... ... @@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
343 343 .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId));
344 344 select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
345 345 select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
346   - return getSession().execute(select);
  346 + return executeRead(select);
347 347 }
348 348
349 349 }
... ...
... ... @@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
130 130 public boolean removeById(UUID key) {
131 131 Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key));
132 132 log.debug("Remove request: {}", delete.toString());
133   - return getSession().execute(delete).wasApplied();
  133 + return executeWrite(delete).wasApplied();
134 134 }
135 135
136 136 @Override
... ... @@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
145 145 log.debug("Delete plugin meta-data entity by id [{}]", clazz);
146 146 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz));
147 147 log.debug("Remove request: {}", delete.toString());
148   - ResultSet resultSet = getSession().execute(delete);
  148 + ResultSet resultSet = executeWrite(delete);
149 149 log.debug("Delete result: [{}]", resultSet.wasApplied());
150 150 }
151 151
... ...
... ... @@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
148 148 query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
149 149 query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE));
150 150 query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
151   - ResultSetFuture resultSetFuture = getSession().executeAsync(query);
  151 + ResultSetFuture resultSetFuture = executeAsyncRead(query);
152 152 return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
153 153 @Nullable
154 154 @Override
... ...
... ... @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
21 21 import org.springframework.beans.factory.annotation.Autowired;
22 22 import org.thingsboard.server.dao.cassandra.CassandraCluster;
23 23 import org.thingsboard.server.dao.model.type.*;
  24 +import org.thingsboard.server.dao.util.BufferedRateLimiter;
24 25
25 26 import java.util.concurrent.ConcurrentHashMap;
26 27 import java.util.concurrent.ConcurrentMap;
... ... @@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao {
33 34
34 35 private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
35 36
36   - protected PreparedStatement prepare(String query) {
37   - return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
38   - }
  37 + @Autowired
  38 + private BufferedRateLimiter rateLimiter;
39 39
40 40 private Session session;
41 41
42 42 private ConsistencyLevel defaultReadLevel;
43 43 private ConsistencyLevel defaultWriteLevel;
44 44
45   - protected Session getSession() {
  45 + private Session getSession() {
46 46 if (session == null) {
47 47 session = cluster.getSession();
48 48 defaultReadLevel = cluster.getDefaultReadConsistencyLevel();
... ... @@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao {
59 59 return session;
60 60 }
61 61
  62 + protected PreparedStatement prepare(String query) {
  63 + return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i));
  64 + }
  65 +
62 66 private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
63 67 try {
64 68 registry.codecFor(codec.getCqlType(), codec.getJavaType());
... ... @@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao {
85 89
86 90 private ResultSet execute(Statement statement, ConsistencyLevel level) {
87 91 log.debug("Execute cassandra statement {}", statement);
88   - if (statement.getConsistencyLevel() == null) {
89   - statement.setConsistencyLevel(level);
90   - }
91   - return getSession().execute(statement);
  92 + return executeAsync(statement, level).getUninterruptibly();
92 93 }
93 94
94 95 private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
... ... @@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao {
96 97 if (statement.getConsistencyLevel() == null) {
97 98 statement.setConsistencyLevel(level);
98 99 }
99   - return getSession().executeAsync(statement);
  100 + return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
100 101 }
101 102 }
\ No newline at end of file
... ...
... ... @@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
63 63 List<E> list = Collections.emptyList();
64 64 if (statement != null) {
65 65 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
66   - ResultSet resultSet = getSession().execute(statement);
  66 + ResultSet resultSet = executeRead(statement);
67 67 Result<E> result = getMapper().map(resultSet);
68 68 if (result != null) {
69 69 list = result.all();
... ... @@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
75 75 protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) {
76 76 if (statement != null) {
77 77 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
78   - ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
  78 + ResultSetFuture resultSetFuture = executeAsyncRead(statement);
79 79 return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() {
80 80 @Nullable
81 81 @Override
... ... @@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
97 97 E object = null;
98 98 if (statement != null) {
99 99 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
100   - ResultSet resultSet = getSession().execute(statement);
  100 + ResultSet resultSet = executeRead(statement);
101 101 Result<E> result = getMapper().map(resultSet);
102 102 if (result != null) {
103 103 object = result.one();
... ... @@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
109 109 protected ListenableFuture<D> findOneByStatementAsync(Statement statement) {
110 110 if (statement != null) {
111 111 statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
112   - ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
  112 + ResultSetFuture resultSetFuture = executeAsyncRead(statement);
113 113 return Futures.transform(resultSetFuture, new Function<ResultSet, D>() {
114 114 @Nullable
115 115 @Override
... ... @@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
184 184 public boolean removeById(UUID key) {
185 185 Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
186 186 log.debug("Remove request: {}", delete.toString());
187   - return getSession().execute(delete).wasApplied();
  187 + return executeWrite(delete).wasApplied();
188 188 }
189 189
190 190 @Override
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.ResultSet;
  19 +import com.datastax.driver.core.ResultSetFuture;
  20 +import com.datastax.driver.core.Session;
  21 +import com.datastax.driver.core.Statement;
  22 +import com.google.common.base.Function;
  23 +import com.google.common.util.concurrent.FutureCallback;
  24 +import com.google.common.util.concurrent.Futures;
  25 +import com.google.common.util.concurrent.ListenableFuture;
  26 +import com.google.common.util.concurrent.Uninterruptibles;
  27 +import org.thingsboard.server.dao.util.AsyncRateLimiter;
  28 +
  29 +import javax.annotation.Nullable;
  30 +import java.util.concurrent.*;
  31 +
  32 +public class RateLimitedResultSetFuture implements ResultSetFuture {
  33 +
  34 + private final ListenableFuture<ResultSetFuture> originalFuture;
  35 + private final ListenableFuture<Void> rateLimitFuture;
  36 +
  37 + public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
  38 + this.rateLimitFuture = rateLimiter.acquireAsync();
  39 + this.originalFuture = Futures.transform(rateLimitFuture,
  40 + (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
  41 + }
  42 +
  43 + @Override
  44 + public ResultSet getUninterruptibly() {
  45 + return safeGet().getUninterruptibly();
  46 + }
  47 +
  48 + @Override
  49 + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
  50 + long rateLimitStart = System.nanoTime();
  51 + ResultSetFuture resultSetFuture = null;
  52 + try {
  53 + resultSetFuture = originalFuture.get(timeout, unit);
  54 + } catch (InterruptedException | ExecutionException e) {
  55 + throw new IllegalStateException(e);
  56 + }
  57 + long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
  58 + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
  59 + if (innerTimeoutNano > 0) {
  60 + return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS);
  61 + }
  62 + throw new TimeoutException("Timeout waiting for task.");
  63 + }
  64 +
  65 + @Override
  66 + public boolean cancel(boolean mayInterruptIfRunning) {
  67 + if (originalFuture.isDone()) {
  68 + return safeGet().cancel(mayInterruptIfRunning);
  69 + } else {
  70 + return originalFuture.cancel(mayInterruptIfRunning);
  71 + }
  72 + }
  73 +
  74 + @Override
  75 + public boolean isCancelled() {
  76 + if (originalFuture.isDone()) {
  77 + return safeGet().isCancelled();
  78 + }
  79 +
  80 + return originalFuture.isCancelled();
  81 + }
  82 +
  83 + @Override
  84 + public boolean isDone() {
  85 + return originalFuture.isDone() && safeGet().isDone();
  86 + }
  87 +
  88 + @Override
  89 + public ResultSet get() throws InterruptedException, ExecutionException {
  90 + return safeGet().get();
  91 + }
  92 +
  93 + @Override
  94 + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  95 + long rateLimitStart = System.nanoTime();
  96 + ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit);
  97 + long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
  98 + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
  99 + if (innerTimeoutNano > 0) {
  100 + return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS);
  101 + }
  102 + throw new TimeoutException("Timeout waiting for task.");
  103 + }
  104 +
  105 + @Override
  106 + public void addListener(Runnable listener, Executor executor) {
  107 + originalFuture.addListener(() -> {
  108 + try {
  109 + ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
  110 + resultSetFuture.addListener(listener, executor);
  111 + } catch (CancellationException e) {
  112 + cancel(false);
  113 + return;
  114 + } catch (ExecutionException e) {
  115 + Futures.immediateFailedFuture(e).addListener(listener, executor);
  116 + }
  117 + }, executor);
  118 + }
  119 +
  120 + private ResultSetFuture safeGet() {
  121 + try {
  122 + return originalFuture.get();
  123 + } catch (InterruptedException | ExecutionException e) {
  124 + throw new IllegalStateException(e);
  125 + }
  126 + }
  127 +
  128 + private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) {
  129 + try {
  130 + ResultSetFuture resultSetFuture = session.executeAsync(statement);
  131 + Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
  132 + @Override
  133 + public void onSuccess(@Nullable ResultSet result) {
  134 + rateLimiter.release();
  135 + }
  136 +
  137 + @Override
  138 + public void onFailure(Throwable t) {
  139 + rateLimiter.release();
  140 + }
  141 + });
  142 + return resultSetFuture;
  143 + } catch (RuntimeException re) {
  144 + rateLimiter.release();
  145 + throw re;
  146 + }
  147 + }
  148 +}
... ...
... ... @@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
242 242
243 243 private PreparedStatement getSaveStmt() {
244 244 if (saveStmt == null) {
245   - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
  245 + saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
246 246 "(" + ModelConstants.RELATION_FROM_ID_PROPERTY +
247 247 "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY +
248 248 "," + ModelConstants.RELATION_TO_ID_PROPERTY +
... ... @@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
257 257
258 258 private PreparedStatement getDeleteStmt() {
259 259 if (deleteStmt == null) {
260   - deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
  260 + deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
261 261 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
262 262 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" +
263 263 AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" +
... ... @@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
270 270
271 271 private PreparedStatement getDeleteAllByEntityStmt() {
272 272 if (deleteAllByEntityStmt == null) {
273   - deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
  273 + deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
274 274 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
275 275 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?");
276 276 }
... ... @@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
279 279
280 280 private PreparedStatement getFindAllByFromStmt() {
281 281 if (findAllByFromStmt == null) {
282   - findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " +
  282 + findAllByFromStmt = prepare(SELECT_COLUMNS + " " +
283 283 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
284 284 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
285 285 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
... ... @@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
290 290
291 291 private PreparedStatement getFindAllByFromAndTypeStmt() {
292 292 if (findAllByFromAndTypeStmt == null) {
293   - findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
  293 + findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " +
294 294 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
295 295 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
296 296 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
... ... @@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
303 303
304 304 private PreparedStatement getFindAllByToStmt() {
305 305 if (findAllByToStmt == null) {
306   - findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " +
  306 + findAllByToStmt = prepare(SELECT_COLUMNS + " " +
307 307 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
308 308 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
309 309 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
... ... @@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
314 314
315 315 private PreparedStatement getFindAllByToAndTypeStmt() {
316 316 if (findAllByToAndTypeStmt == null) {
317   - findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
  317 + findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " +
318 318 FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
319 319 WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
320 320 AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
... ... @@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
327 327
328 328 private PreparedStatement getCheckRelationStmt() {
329 329 if (checkRelationStmt == null) {
330   - checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " +
  330 + checkRelationStmt = prepare(SELECT_COLUMNS + " " +
331 331 FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
332 332 WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
333 333 AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
... ...
... ... @@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService {
82 82 }
83 83
84 84 @Caching(evict = {
85   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
  85 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
86 86 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  87 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
87 88 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
88 89 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
89 90 })
... ... @@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService {
95 96 }
96 97
97 98 @Caching(evict = {
98   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
  99 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
99 100 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  101 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
100 102 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
101 103 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
102 104 })
... ... @@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService {
108 110 }
109 111
110 112 @Caching(evict = {
111   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
  113 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
112 114 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  115 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
113 116 @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
114   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"),
115   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}")
  117 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
116 118 })
117 119 @Override
118 120 public boolean deleteRelation(EntityRelation relation) {
... ... @@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService {
122 124 }
123 125
124 126 @Caching(evict = {
125   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
126   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
127   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
128   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"),
129   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}")
  127 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
  128 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"),
  129 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"),
  130 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"),
  131 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}")
130 132 })
131 133 @Override
132 134 public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
... ... @@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService {
136 138 }
137 139
138 140 @Caching(evict = {
139   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
140   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
141   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
142   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
143   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
  141 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
  142 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
  143 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
  144 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
  145 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
144 146 })
145 147 @Override
146 148 public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
... ... @@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService {
150 152 }
151 153
152 154 @Caching(evict = {
153   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
154   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
155   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
156   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
157   - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
  155 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
  156 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"),
  157 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"),
  158 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"),
  159 + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
158 160 })
159 161 @Override
160 162 public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
... ...
... ... @@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
73 73
74 74 private PreparedStatement partitionInsertStmt;
75 75 private PreparedStatement partitionInsertTtlStmt;
76   - private PreparedStatement[] latestInsertStmts;
  76 + private PreparedStatement latestInsertStmt;
77 77 private PreparedStatement[] saveStmts;
78 78 private PreparedStatement[] saveTtlStmts;
79 79 private PreparedStatement[] fetchStmts;
... ... @@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
306 306
307 307 @Override
308 308 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
309   - DataType type = tsKvEntry.getDataType();
310   - BoundStatement stmt = getLatestStmt(type).bind()
  309 + BoundStatement stmt = getLatestStmt().bind()
311 310 .setString(0, entityId.getEntityType().name())
312 311 .setUUID(1, entityId.getId())
313 312 .setString(2, tsKvEntry.getKey())
314   - .setLong(3, tsKvEntry.getTs());
315   - addValue(tsKvEntry, stmt, 4);
  313 + .setLong(3, tsKvEntry.getTs())
  314 + .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class)
  315 + .set(5, tsKvEntry.getStrValue().orElse(null), String.class)
  316 + .set(6, tsKvEntry.getLongValue().orElse(null), Long.class)
  317 + .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class);
316 318 return getFuture(executeAsyncWrite(stmt), rs -> null);
317 319 }
318 320
... ... @@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
381 383 if (saveStmts == null) {
382 384 saveStmts = new PreparedStatement[DataType.values().length];
383 385 for (DataType type : DataType.values()) {
384   - saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
  386 + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
385 387 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
386 388 "," + ModelConstants.ENTITY_ID_COLUMN +
387 389 "," + ModelConstants.KEY_COLUMN +
... ... @@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
398 400 if (saveTtlStmts == null) {
399 401 saveTtlStmts = new PreparedStatement[DataType.values().length];
400 402 for (DataType type : DataType.values()) {
401   - saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
  403 + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
402 404 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
403 405 "," + ModelConstants.ENTITY_ID_COLUMN +
404 406 "," + ModelConstants.KEY_COLUMN +
... ... @@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
420 422 } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
421 423 fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
422 424 } else {
423   - fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX +
  425 + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
424 426 String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
425 427 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
426 428 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
... ... @@ -435,26 +437,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
435 437 return fetchStmts[aggType.ordinal()];
436 438 }
437 439
438   - private PreparedStatement getLatestStmt(DataType dataType) {
439   - if (latestInsertStmts == null) {
440   - latestInsertStmts = new PreparedStatement[DataType.values().length];
441   - for (DataType type : DataType.values()) {
442   - latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
443   - "(" + ModelConstants.ENTITY_TYPE_COLUMN +
444   - "," + ModelConstants.ENTITY_ID_COLUMN +
445   - "," + ModelConstants.KEY_COLUMN +
446   - "," + ModelConstants.TS_COLUMN +
447   - "," + getColumnName(type) + ")" +
448   - " VALUES(?, ?, ?, ?, ?)");
449   - }
  440 + private PreparedStatement getLatestStmt() {
  441 + if (latestInsertStmt == null) {
  442 + latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
  443 + "(" + ModelConstants.ENTITY_TYPE_COLUMN +
  444 + "," + ModelConstants.ENTITY_ID_COLUMN +
  445 + "," + ModelConstants.KEY_COLUMN +
  446 + "," + ModelConstants.TS_COLUMN +
  447 + "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
  448 + "," + ModelConstants.STRING_VALUE_COLUMN +
  449 + "," + ModelConstants.LONG_VALUE_COLUMN +
  450 + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
  451 + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
450 452 }
451   - return latestInsertStmts[dataType.ordinal()];
  453 + return latestInsertStmt;
452 454 }
453 455
454 456
455 457 private PreparedStatement getPartitionInsertStmt() {
456 458 if (partitionInsertStmt == null) {
457   - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
  459 + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
458 460 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
459 461 "," + ModelConstants.ENTITY_ID_COLUMN +
460 462 "," + ModelConstants.PARTITION_COLUMN +
... ... @@ -466,7 +468,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
466 468
467 469 private PreparedStatement getPartitionInsertTtlStmt() {
468 470 if (partitionInsertTtlStmt == null) {
469   - partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
  471 + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
470 472 "(" + ModelConstants.ENTITY_TYPE_COLUMN +
471 473 "," + ModelConstants.ENTITY_ID_COLUMN +
472 474 "," + ModelConstants.PARTITION_COLUMN +
... ... @@ -479,7 +481,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
479 481
480 482 private PreparedStatement getFindLatestStmt() {
481 483 if (findLatestStmt == null) {
482   - findLatestStmt = getSession().prepare(SELECT_PREFIX +
  484 + findLatestStmt = prepare(SELECT_PREFIX +
483 485 ModelConstants.KEY_COLUMN + "," +
484 486 ModelConstants.TS_COLUMN + "," +
485 487 ModelConstants.STRING_VALUE_COLUMN + "," +
... ... @@ -496,7 +498,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
496 498
497 499 private PreparedStatement getFindAllLatestStmt() {
498 500 if (findAllLatestStmt == null) {
499   - findAllLatestStmt = getSession().prepare(SELECT_PREFIX +
  501 + findAllLatestStmt = prepare(SELECT_PREFIX +
500 502 ModelConstants.KEY_COLUMN + "," +
501 503 ModelConstants.TS_COLUMN + "," +
502 504 ModelConstants.STRING_VALUE_COLUMN + "," +
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +
  20 +public interface AsyncRateLimiter {
  21 +
  22 + ListenableFuture<Void> acquireAsync();
  23 +
  24 + void release();
  25 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.ListeningExecutorService;
  21 +import com.google.common.util.concurrent.MoreExecutors;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.springframework.beans.factory.annotation.Value;
  24 +import org.springframework.scheduling.annotation.Scheduled;
  25 +import org.springframework.stereotype.Component;
  26 +
  27 +import java.util.concurrent.*;
  28 +import java.util.concurrent.atomic.AtomicInteger;
  29 +
  30 +@Component
  31 +@Slf4j
  32 +@NoSqlDao
  33 +public class BufferedRateLimiter implements AsyncRateLimiter {
  34 +
  35 + private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
  36 +
  37 + private final int permitsLimit;
  38 + private final int maxPermitWaitTime;
  39 + private final AtomicInteger permits;
  40 + private final BlockingQueue<LockedFuture> queue;
  41 +
  42 + private final AtomicInteger maxQueueSize = new AtomicInteger();
  43 + private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
  44 +
  45 + public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
  46 + @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
  47 + @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
  48 + this.permitsLimit = permitsLimit;
  49 + this.maxPermitWaitTime = maxPermitWaitTime;
  50 + this.permits = new AtomicInteger();
  51 + this.queue = new LinkedBlockingQueue<>(queueLimit);
  52 + }
  53 +
  54 + @Override
  55 + public ListenableFuture<Void> acquireAsync() {
  56 + if (queue.isEmpty()) {
  57 + if (permits.incrementAndGet() <= permitsLimit) {
  58 + if (permits.get() > maxGrantedPermissions.get()) {
  59 + maxGrantedPermissions.set(permits.get());
  60 + }
  61 + return Futures.immediateFuture(null);
  62 + }
  63 + permits.decrementAndGet();
  64 + }
  65 +
  66 + return putInQueue();
  67 + }
  68 +
  69 + @Override
  70 + public void release() {
  71 + permits.decrementAndGet();
  72 + reprocessQueue();
  73 + }
  74 +
  75 + private void reprocessQueue() {
  76 + while (permits.get() < permitsLimit) {
  77 + if (permits.incrementAndGet() <= permitsLimit) {
  78 + if (permits.get() > maxGrantedPermissions.get()) {
  79 + maxGrantedPermissions.set(permits.get());
  80 + }
  81 + LockedFuture lockedFuture = queue.poll();
  82 + if (lockedFuture != null) {
  83 + lockedFuture.latch.countDown();
  84 + } else {
  85 + permits.decrementAndGet();
  86 + break;
  87 + }
  88 + } else {
  89 + permits.decrementAndGet();
  90 + }
  91 + }
  92 + }
  93 +
  94 + private LockedFuture createLockedFuture() {
  95 + CountDownLatch latch = new CountDownLatch(1);
  96 + ListenableFuture<Void> future = pool.submit(() -> {
  97 + latch.await();
  98 + return null;
  99 + });
  100 + return new LockedFuture(latch, future, System.currentTimeMillis());
  101 + }
  102 +
  103 + private ListenableFuture<Void> putInQueue() {
  104 +
  105 + int size = queue.size();
  106 + if (size > maxQueueSize.get()) {
  107 + maxQueueSize.set(size);
  108 + }
  109 +
  110 + if (queue.remainingCapacity() > 0) {
  111 + try {
  112 + LockedFuture lockedFuture = createLockedFuture();
  113 + if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
  114 + lockedFuture.cancelFuture();
  115 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
  116 + }
  117 + if(permits.get() < permitsLimit) {
  118 + reprocessQueue();
  119 + }
  120 + return lockedFuture.future;
  121 + } catch (InterruptedException e) {
  122 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
  123 + }
  124 + }
  125 + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
  126 + }
  127 +
  128 + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
  129 + public void printStats() {
  130 + int expiredCount = 0;
  131 + for (LockedFuture lockedFuture : queue) {
  132 + if (lockedFuture.isExpired()) {
  133 + lockedFuture.cancelFuture();
  134 + expiredCount++;
  135 + }
  136 + }
  137 + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
  138 + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
  139 + }
  140 +
  141 + private class LockedFuture {
  142 + final CountDownLatch latch;
  143 + final ListenableFuture<Void> future;
  144 + final long createTime;
  145 +
  146 + public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
  147 + this.latch = latch;
  148 + this.future = future;
  149 + this.createTime = createTime;
  150 + }
  151 +
  152 + void cancelFuture() {
  153 + future.cancel(false);
  154 + latch.countDown();
  155 + }
  156 +
  157 + boolean isExpired() {
  158 + return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
  159 + }
  160 +
  161 + }
  162 +
  163 +
  164 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.driver.core.*;
  19 +import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import org.junit.Test;
  23 +import org.junit.runner.RunWith;
  24 +import org.mockito.Mock;
  25 +import org.mockito.Mockito;
  26 +import org.mockito.runners.MockitoJUnitRunner;
  27 +import org.mockito.stubbing.Answer;
  28 +import org.thingsboard.server.dao.util.AsyncRateLimiter;
  29 +
  30 +import java.util.concurrent.ExecutionException;
  31 +import java.util.concurrent.TimeoutException;
  32 +
  33 +import static org.junit.Assert.*;
  34 +import static org.mockito.Mockito.*;
  35 +
  36 +@RunWith(MockitoJUnitRunner.class)
  37 +public class RateLimitedResultSetFutureTest {
  38 +
  39 + private RateLimitedResultSetFuture resultSetFuture;
  40 +
  41 + @Mock
  42 + private AsyncRateLimiter rateLimiter;
  43 + @Mock
  44 + private Session session;
  45 + @Mock
  46 + private Statement statement;
  47 + @Mock
  48 + private ResultSetFuture realFuture;
  49 + @Mock
  50 + private ResultSet rows;
  51 + @Mock
  52 + private Row row;
  53 +
  54 + @Test
  55 + public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
  56 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
  57 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  58 + Thread.sleep(1000L);
  59 + verify(rateLimiter).acquireAsync();
  60 + try {
  61 + assertTrue(resultSetFuture.isDone());
  62 + fail();
  63 + } catch (Exception e) {
  64 + assertTrue(e instanceof IllegalStateException);
  65 + Throwable actualCause = e.getCause();
  66 + assertTrue(actualCause instanceof ExecutionException);
  67 + }
  68 + verifyNoMoreInteractions(session, rateLimiter, statement);
  69 +
  70 + }
  71 +
  72 + @Test
  73 + public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException {
  74 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  75 + when(session.executeAsync(statement)).thenReturn(realFuture);
  76 + Mockito.doAnswer((Answer<Void>) invocation -> {
  77 + Object[] args = invocation.getArguments();
  78 + Runnable task = (Runnable) args[0];
  79 + task.run();
  80 + return null;
  81 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  82 +
  83 + when(realFuture.getUninterruptibly()).thenReturn(rows);
  84 +
  85 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  86 + ResultSet actual = resultSetFuture.getUninterruptibly();
  87 + assertSame(rows, actual);
  88 + verify(rateLimiter, times(1)).acquireAsync();
  89 + verify(rateLimiter, times(1)).release();
  90 + }
  91 +
  92 + @Test
  93 + public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException {
  94 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  95 + when(session.executeAsync(statement)).thenReturn(realFuture);
  96 + Mockito.doAnswer((Answer<Void>) invocation -> {
  97 + Object[] args = invocation.getArguments();
  98 + Runnable task = (Runnable) args[0];
  99 + task.run();
  100 + return null;
  101 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  102 +
  103 + when(realFuture.get()).thenReturn(rows);
  104 + when(rows.one()).thenReturn(row);
  105 +
  106 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  107 +
  108 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  109 + Row actualRow = transform.get();
  110 +
  111 + assertSame(row, actualRow);
  112 + verify(rateLimiter, times(1)).acquireAsync();
  113 + verify(rateLimiter, times(1)).release();
  114 + }
  115 +
  116 + @Test
  117 + public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException {
  118 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  119 + when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
  120 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  121 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  122 + try {
  123 + transform.get();
  124 + fail();
  125 + } catch (Exception e) {
  126 + assertTrue(e instanceof ExecutionException);
  127 + }
  128 + verify(rateLimiter, times(1)).acquireAsync();
  129 + verify(rateLimiter, times(1)).release();
  130 + }
  131 +
  132 + @Test
  133 + public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException {
  134 + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
  135 + when(session.executeAsync(statement)).thenReturn(realFuture);
  136 + Mockito.doAnswer((Answer<Void>) invocation -> {
  137 + Object[] args = invocation.getArguments();
  138 + Runnable task = (Runnable) args[0];
  139 + task.run();
  140 + return null;
  141 + }).when(realFuture).addListener(Mockito.any(), Mockito.any());
  142 +
  143 + when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
  144 + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
  145 + ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
  146 + try {
  147 + transform.get();
  148 + fail();
  149 + } catch (Exception e) {
  150 + assertTrue(e instanceof ExecutionException);
  151 + }
  152 + verify(rateLimiter, times(1)).acquireAsync();
  153 + verify(rateLimiter, times(1)).release();
  154 + }
  155 +
  156 +}
\ No newline at end of file
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import com.google.common.util.concurrent.*;
  19 +import org.junit.Test;
  20 +
  21 +import javax.annotation.Nullable;
  22 +import java.util.concurrent.ExecutionException;
  23 +import java.util.concurrent.Executors;
  24 +import java.util.concurrent.TimeUnit;
  25 +import java.util.concurrent.atomic.AtomicInteger;
  26 +
  27 +import static org.junit.Assert.*;
  28 +
  29 +
  30 +public class BufferedRateLimiterTest {
  31 +
  32 + @Test
  33 + public void finishedFutureReturnedIfPermitsAreGranted() {
  34 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
  35 + ListenableFuture<Void> actual = limiter.acquireAsync();
  36 + assertTrue(actual.isDone());
  37 + }
  38 +
  39 + @Test
  40 + public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
  41 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
  42 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  43 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  44 + assertTrue(actual1.isDone());
  45 + assertFalse(actual2.isDone());
  46 + }
  47 +
  48 + @Test
  49 + public void failedFutureReturnedIfQueueIsfull() {
  50 + BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
  51 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  52 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  53 + ListenableFuture<Void> actual3 = limiter.acquireAsync();
  54 +
  55 + assertTrue(actual1.isDone());
  56 + assertFalse(actual2.isDone());
  57 + assertTrue(actual3.isDone());
  58 + try {
  59 + actual3.get();
  60 + fail();
  61 + } catch (Exception e) {
  62 + assertTrue(e instanceof ExecutionException);
  63 + Throwable actualCause = e.getCause();
  64 + assertTrue(actualCause instanceof IllegalStateException);
  65 + assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
  66 + }
  67 + }
  68 +
  69 + @Test
  70 + public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
  71 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
  72 + ListenableFuture<Void> actual1 = limiter.acquireAsync();
  73 + ListenableFuture<Void> actual2 = limiter.acquireAsync();
  74 + ListenableFuture<Void> actual3 = limiter.acquireAsync();
  75 + ListenableFuture<Void> actual4 = limiter.acquireAsync();
  76 + assertTrue(actual1.isDone());
  77 + assertTrue(actual2.isDone());
  78 + assertFalse(actual3.isDone());
  79 + assertFalse(actual4.isDone());
  80 + limiter.release();
  81 + TimeUnit.MILLISECONDS.sleep(100L);
  82 + assertTrue(actual3.isDone());
  83 + assertFalse(actual4.isDone());
  84 + limiter.release();
  85 + TimeUnit.MILLISECONDS.sleep(100L);
  86 + assertTrue(actual4.isDone());
  87 + }
  88 +
  89 + @Test
  90 + public void permitsReleasedInConcurrentMode() throws InterruptedException {
  91 + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
  92 + AtomicInteger actualReleased = new AtomicInteger();
  93 + AtomicInteger actualRejected = new AtomicInteger();
  94 + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
  95 + for (int i = 0; i < 100; i++) {
  96 + ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
  97 + Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
  98 + @Override
  99 + public void onSuccess(@Nullable ListenableFuture<Void> result) {
  100 + Futures.addCallback(result, new FutureCallback<Void>() {
  101 + @Override
  102 + public void onSuccess(@Nullable Void result) {
  103 + try {
  104 + TimeUnit.MILLISECONDS.sleep(100);
  105 + } catch (InterruptedException e) {
  106 + e.printStackTrace();
  107 + }
  108 + limiter.release();
  109 + actualReleased.incrementAndGet();
  110 + }
  111 +
  112 + @Override
  113 + public void onFailure(Throwable t) {
  114 + actualRejected.incrementAndGet();
  115 + }
  116 + });
  117 + }
  118 +
  119 + @Override
  120 + public void onFailure(Throwable t) {
  121 + }
  122 + });
  123 + }
  124 +
  125 + TimeUnit.SECONDS.sleep(2);
  126 + assertTrue("Unexpected released count " + actualReleased.get(),
  127 + actualReleased.get() > 10 && actualReleased.get() < 20);
  128 + assertTrue("Unexpected rejected count " + actualRejected.get(),
  129 + actualRejected.get() > 80 && actualRejected.get() < 90);
  130 +
  131 + }
  132 +
  133 +
  134 +}
\ No newline at end of file
... ...
... ... @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000
47 47 cassandra.query.ts_key_value_partitioning=HOURS
48 48
49 49 cassandra.query.max_limit_per_request=1000
  50 +cassandra.query.buffer_size=100000
  51 +cassandra.query.concurrent_limit=1000
  52 +cassandra.query.permit_max_wait_time=20000
  53 +cassandra.query.rate_limit_print_interval_ms=30000
  54 +
... ...
... ... @@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
128 128 let addedFile = event.target.result;
129 129
130 130 if (addedFile && addedFile.length > 0) {
131   - model[options.fileName] = $file.name;
132   - model[options.file] = addedFile.replace(/^data.*base64,/, "");
  131 + model[options.location] = $file.name;
  132 + model[options.fileContent] = addedFile.replace(/^data.*base64,/, "");
133 133
134 134 }
135 135 }
... ... @@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra
142 142 scope.clearFile = function(model, options) {
143 143 scope.theForm.$setDirty();
144 144
145   - model[options.fileName] = null;
146   - model[options.file] = null;
  145 + model[options.location] = null;
  146 + model[options.fileContent] = null;
147 147
148 148 };
149 149
... ...
... ... @@ -212,8 +212,8 @@
212 212 </md-input-container>
213 213
214 214 <section class="dropdown-section">
215   - <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.file}">
216   - <span ng-init='fieldsToFill = {"fileName":"fileName", "file":"file"}'></span>
  215 + <div class="tb-container" ng-class="{'ng-invalid':!server.keystore.fileContent}">
  216 + <span ng-init='fieldsToFill = {"location":"location", "fileContent":"fileContent"}'></span>
217 217 <label class="tb-label" translate>extension.opc-keystore-location</label>
218 218 <div flow-init="{singleFile:true}" flow-file-added='fileAdded($file, server.keystore, fieldsToFill)' class="tb-file-select-container">
219 219 <div class="tb-file-clear-container">
... ... @@ -231,14 +231,14 @@
231 231 class="file-input"
232 232 flow-btn id="dropFileKeystore_{{serverIndex}}"
233 233 name="keystoreFile"
234   - ng-model="server.keystore.file"
  234 + ng-model="server.keystore.fileContent"
235 235 >
236 236 </div>
237 237 </div>
238 238 </div>
239 239 <div class="dropdown-messages">
240   - <div ng-if="!server.keystore[fieldsToFill.fileName]" class="tb-error-message" translate>extension.no-file</div>
241   - <div ng-if="server.keystore[fieldsToFill.fileName]">{{server.keystore[fieldsToFill.fileName]}}</div>
  240 + <div ng-if="!server.keystore[fieldsToFill.location]" class="tb-error-message" translate>extension.no-file</div>
  241 + <div ng-if="server.keystore[fieldsToFill.location]">{{server.keystore[fieldsToFill.location]}}</div>
242 242 </div>
243 243 </section>
244 244
... ...