Commit 2a58bd318a4bd9b7ca40852cb786ab7625a8748e

Authored by Andrew Shvayka
1 parent e4f2c8df

Docker improvements and Telemetry Service in cluster mode

Showing 38 changed files with 143 additions and 167 deletions
1   -FROM cassandra
2   -
3   -WORKDIR /opt/cassandra
4   -
5   -COPY dao/src/main/resources/cassandra/schema.cql /opt/cassandra
6   -
7   -COPY entrypoint-with-db-init.sh /opt/cassandra/entrypoint-with-db-init.sh
8   -
9   -RUN chmod +x /opt/cassandra/entrypoint-with-db-init.sh
10   -
11   -ENTRYPOINT ["/opt/cassandra/entrypoint-with-db-init.sh"]
12   -
13   -CMD ["cassandra", "-f"]
... ... @@ -15,7 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors.plugin;
17 17
18   -import com.hazelcast.util.function.Consumer;
  18 +import java.util.function.Consumer;
19 19 import org.thingsboard.server.extensions.api.exception.AccessDeniedException;
20 20 import org.thingsboard.server.extensions.api.exception.EntityNotFoundException;
21 21 import org.thingsboard.server.extensions.api.exception.InternalErrorException;
... ...
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.actors.rpc;
17 17
18 18 import akka.actor.ActorRef;
19 19 import lombok.extern.slf4j.Slf4j;
20   -import org.thingsboard.server.actors.ActorSystemContext;
21 20 import org.thingsboard.server.actors.service.ActorService;
22 21 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
23 22 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
... ... @@ -58,7 +57,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
58 57 log.trace("{} Service [{}] received session actor msg {}", getType(session),
59 58 session.getRemoteServer(),
60 59 clusterMessage);
61   - service.onRecievedMsg(clusterMessage);
  60 + service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
62 61 }
63 62
64 63 @Override
... ...
... ... @@ -89,9 +89,9 @@ public class RpcManagerActor extends ContextAwareActor {
89 89 }
90 90
91 91 private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
92   - if (msg.hasServerAdresss()) {
93   - ServerAddress address = new ServerAddress(msg.getServerAdresss().getHost(),
94   - msg.getServerAdresss().getPort());
  92 + if (msg.hasServerAddress()) {
  93 + ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(),
  94 + msg.getServerAddress().getPort());
95 95 SessionActorInfo session = sessionActors.get(address);
96 96 if (session != null) {
97 97 log.debug("{} Forwarding msg to session actor", address);
... ... @@ -102,7 +102,7 @@ public class RpcManagerActor extends ContextAwareActor {
102 102 if (queue == null) {
103 103 queue = new LinkedList<>();
104 104 pendingMsgs.put(new ServerAddress(
105   - msg.getServerAdresss().getHost(), msg.getServerAdresss().getPort()), queue);
  105 + msg.getServerAddress().getHost(), msg.getServerAddress().getPort()), queue);
106 106 }
107 107 queue.add(msg);
108 108 }
... ...
... ... @@ -78,7 +78,7 @@ public class RpcSessionActor extends ContextAwareActor {
78 78 private void initSession(RpcSessionCreateRequestMsg msg) {
79 79 log.info("[{}] Initializing session", context().self());
80 80 ServerAddress remoteServer = msg.getRemoteAddress();
81   - listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
  81 + listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
82 82 if (msg.getRemoteAddress() == null) {
83 83 // Server session
84 84 session = new GrpcSession(listener);
... ... @@ -119,7 +119,7 @@ public class RpcSessionActor extends ContextAwareActor {
119 119
120 120 private ClusterAPIProtos.ClusterMessage toConnectMsg() {
121 121 ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress();
122   - return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAdresss(
  122 + return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAddress(
123 123 ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost())
124 124 .setPort(instance.getPort()).build()).build();
125 125 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -56,8 +56,6 @@ import javax.annotation.PostConstruct;
56 56 import javax.annotation.PreDestroy;
57 57
58 58 import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
59   -import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE_VALUE;
60   -import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
61 59
62 60 @Service
63 61 @Slf4j
... ... @@ -211,8 +209,8 @@ public class DefaultActorService implements ActorService {
211 209 }
212 210
213 211 @Override
214   - public void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg) {
215   - ServerAddress serverAddress = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort());
  212 + public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
  213 + ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
216 214 switch (msg.getMessageType()) {
217 215 case CLUSTER_ACTOR_MESSAGE:
218 216 java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
... ...
... ... @@ -87,7 +87,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
87 87 return address;
88 88 }
89 89
90   - protected Optional<ServerAddress> forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
  90 + protected Optional<ServerAddress> forwardToAppActorIfAddressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
91 91
92 92 Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
93 93 if (!newAddress.equals(oldAddress)) {
... ...
... ... @@ -73,7 +73,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
73 73 @Override
74 74 public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
75 75 if (pendingResponse) {
76   - Optional<ServerAddress> newTargetServer = forwardToAppActorIfAdressChanged(context, pendingMsg, currentTargetServer);
  76 + Optional<ServerAddress> newTargetServer = forwardToAppActorIfAddressChanged(context, pendingMsg, currentTargetServer);
77 77 if (logger.isDebugEnabled()) {
78 78 if (!newTargetServer.equals(currentTargetServer)) {
79 79 if (newTargetServer.isPresent()) {
... ...
... ... @@ -32,9 +32,11 @@ import org.springframework.beans.factory.annotation.Value;
32 32 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
33 33 import org.springframework.boot.context.event.ApplicationReadyEvent;
34 34 import org.springframework.context.ApplicationListener;
  35 +import org.springframework.context.annotation.Lazy;
35 36 import org.springframework.stereotype.Service;
36 37 import org.springframework.util.Assert;
37 38 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  39 +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
38 40 import org.thingsboard.server.utils.MiscUtils;
39 41
40 42 import javax.annotation.PostConstruct;
... ... @@ -68,6 +70,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
68 70 @Autowired
69 71 private ServerInstanceService serverInstance;
70 72
  73 + @Autowired
  74 + @Lazy
  75 + private TelemetrySubscriptionService tsSubService;
  76 +
71 77 private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
72 78
73 79 private CuratorFramework client;
... ... @@ -196,12 +202,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
196 202 log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
197 203 switch (pathChildrenCacheEvent.getType()) {
198 204 case CHILD_ADDED:
  205 + tsSubService.onClusterUpdate();
199 206 listeners.forEach(listener -> listener.onServerAdded(instance));
200 207 break;
201 208 case CHILD_UPDATED:
202 209 listeners.forEach(listener -> listener.onServerUpdated(instance));
203 210 break;
204 211 case CHILD_REMOVED:
  212 + tsSubService.onClusterUpdate();
205 213 listeners.forEach(listener -> listener.onServerRemoved(instance));
206 214 break;
207 215 default:
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ...
... ... @@ -61,7 +61,7 @@ public final class GrpcSession implements Closeable {
61 61 public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
62 62 if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
63 63 connected = true;
64   - ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAdresss().getHost(), clusterMessage.getServerAdresss().getPort());
  64 + ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort());
65 65 remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
66 66 listener.onConnected(GrpcSession.this);
67 67 }
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.rpc;
17 17
18 18 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
19 19 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
  20 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
20 21 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
21 22
22 23 /**
... ... @@ -24,7 +25,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
24 25 */
25 26
26 27 public interface RpcMsgListener {
27   - void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg);
  28 + void onReceivedMsg(ServerAddress remoteServer, ClusterAPIProtos.ClusterMessage msg);
28 29 void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
29 30 void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
30 31 void onBroadcastMsg(RpcBroadcastMsg msg);
... ...
... ... @@ -25,7 +25,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
25 25
26 26 import java.util.Optional;
27 27
28   -import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
  28 +import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
29 29
30 30
31 31 @Slf4j
... ... @@ -55,12 +55,12 @@ public class ProtoWithJavaSerializationDecodingEncodingService implements DataDe
55 55 TbActorMsg msg) {
56 56 return ClusterAPIProtos.ClusterMessage
57 57 .newBuilder()
58   - .setServerAdresss(ClusterAPIProtos.ServerAddress
  58 + .setServerAddress(ClusterAPIProtos.ServerAddress
59 59 .newBuilder()
60 60 .setHost(serverAddress.getHost())
61 61 .setPort(serverAddress.getPort())
62 62 .build())
63   - .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
  63 + .setMessageType(CLUSTER_ACTOR_MESSAGE)
64 64 .setPayload(ByteString.copyFrom(encode(msg))).build();
65 65
66 66 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ...
... ... @@ -21,7 +21,7 @@ import com.google.common.base.Function;
21 21 import com.google.common.util.concurrent.FutureCallback;
22 22 import com.google.common.util.concurrent.Futures;
23 23 import com.google.common.util.concurrent.ListenableFuture;
24   -import com.hazelcast.util.function.Consumer;
  24 +import java.util.function.Consumer;
25 25 import lombok.extern.slf4j.Slf4j;
26 26 import org.springframework.beans.factory.annotation.Autowired;
27 27 import org.springframework.stereotype.Service;
... ...
... ... @@ -107,7 +107,7 @@ mqtt:
107 107 # CoAP server parameters
108 108 coap:
109 109 # Enable/disable coap transport protocol.
110   - enabled: "${COAP_ENABLED:true}"
  110 + enabled: "${COAP_ENABLED:false}"
111 111 bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
112 112 bind_port: "${COAP_BIND_PORT:5683}"
113 113 adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}"
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.mqtt;
2 17
3 18 import org.junit.rules.TestRule;
... ...
  1 +#
  2 +# Copyright © 2016-2018 The Thingsboard Authors
  3 +#
  4 +# Licensed under the Apache License, Version 2.0 (the "License");
  5 +# you may not use this file except in compliance with the License.
  6 +# You may obtain a copy of the License at
  7 +#
  8 +# http://www.apache.org/licenses/LICENSE-2.0
  9 +#
  10 +# Unless required by applicable law or agreed to in writing, software
  11 +# distributed under the License is distributed on an "AS IS" BASIS,
  12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +# See the License for the specific language governing permissions and
  14 +# limitations under the License.
  15 +#
  16 +
1 17 version: '3.3'
2 18 services:
3 19 zookeeper:
... ... @@ -7,20 +23,15 @@ services:
7 23 ports:
8 24 - "2181:2181"
9 25
10   - cassandra-tb:
11   - build:
12   - context: .
13   - dockerfile: Dockerfile.cassandra
14   - image: cassandra
  26 + cassandra:
  27 + image: cassandra:3.11.2
15 28 networks:
16 29 - core
17 30 ports:
18 31 - "7199:7199"
19 32 - "9160:9160"
20 33 - "9042:9042"
21   - volumes:
22   - - /cassandra:/var/lib/cassandra
23   - - ./db-schema:/docker-entrypoint-initdb.d/
  34 +
24 35 redis:
25 36 image: redis:4.0
26 37 networks:
... ...
... ... @@ -153,22 +153,10 @@
153 153 <artifactId>curator-x-discovery</artifactId>
154 154 </dependency>
155 155 <dependency>
156   - <groupId>com.hazelcast</groupId>
157   - <artifactId>hazelcast-zookeeper</artifactId>
158   - </dependency>
159   - <dependency>
160   - <groupId>com.hazelcast</groupId>
161   - <artifactId>hazelcast</artifactId>
162   - </dependency>
163   - <dependency>
164 156 <groupId>com.github.ben-manes.caffeine</groupId>
165 157 <artifactId>caffeine</artifactId>
166 158 </dependency>
167 159 <dependency>
168   - <groupId>com.hazelcast</groupId>
169   - <artifactId>hazelcast-spring</artifactId>
170   - </dependency>
171   - <dependency>
172 160 <groupId>org.springframework.boot</groupId>
173 161 <artifactId>spring-boot-autoconfigure</artifactId>
174 162 </dependency>
... ...
... ... @@ -222,15 +222,14 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
222 222 public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(AlarmId alarmId) {
223 223 log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId);
224 224 validateId(alarmId, "Incorrect alarmId " + alarmId);
225   - return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()),
226   - (AsyncFunction<Alarm, AlarmInfo>) alarm1 -> {
227   - AlarmInfo alarmInfo = new AlarmInfo(alarm1);
  225 + return Futures.transformAsync(alarmDao.findAlarmByIdAsync(alarmId.getId()),
  226 + a -> {
  227 + AlarmInfo alarmInfo = new AlarmInfo(a);
228 228 return Futures.transform(
229   - entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
230   - originatorName -> {
231   - alarmInfo.setOriginatorName(originatorName);
232   - return alarmInfo;
233   - }
  229 + entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> {
  230 + alarmInfo.setOriginatorName(originatorName);
  231 + return alarmInfo;
  232 + }
234 233 );
235 234 });
236 235 }
... ... @@ -239,18 +238,17 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
239 238 public ListenableFuture<TimePageData<AlarmInfo>> findAlarms(AlarmQuery query) {
240 239 ListenableFuture<List<AlarmInfo>> alarms = alarmDao.findAlarms(query);
241 240 if (query.getFetchOriginator() != null && query.getFetchOriginator().booleanValue()) {
242   - alarms = Futures.transform(alarms, (AsyncFunction<List<AlarmInfo>, List<AlarmInfo>>) input -> {
  241 + alarms = Futures.transformAsync(alarms, input -> {
243 242 List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
244 243 for (AlarmInfo alarmInfo : input) {
245 244 alarmFutures.add(Futures.transform(
246   - entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
247   - originatorName -> {
248   - if (originatorName == null) {
249   - originatorName = "Deleted";
250   - }
251   - alarmInfo.setOriginatorName(originatorName);
252   - return alarmInfo;
253   - }
  245 + entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> {
  246 + if (originatorName == null) {
  247 + originatorName = "Deleted";
  248 + }
  249 + alarmInfo.setOriginatorName(originatorName);
  250 + return alarmInfo;
  251 + }
254 252 ));
255 253 }
256 254 return Futures.successfulAsList(alarmFutures);
... ...
... ... @@ -102,12 +102,12 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao<AlarmEntity, Al
102 102 }
103 103 String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
104 104 ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink());
105   - return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
  105 + return Futures.transformAsync(relations, input -> {
106 106 List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
107 107 for (EntityRelation relation : input) {
108 108 alarmFutures.add(Futures.transform(
109 109 findAlarmByIdAsync(relation.getTo().getId()),
110   - (Function<Alarm, AlarmInfo>) AlarmInfo::new));
  110 + AlarmInfo::new));
111 111 }
112 112 return Futures.successfulAsList(alarmFutures);
113 113 });
... ...
... ... @@ -194,10 +194,10 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
194 194 @Override
195 195 public ListenableFuture<List<Asset>> findAssetsByQuery(AssetSearchQuery query) {
196 196 ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
197   - ListenableFuture<List<Asset>> assets = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Asset>>) relations1 -> {
  197 + ListenableFuture<List<Asset>> assets = Futures.transformAsync(relations, r -> {
198 198 EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
199 199 List<ListenableFuture<Asset>> futures = new ArrayList<>();
200   - for (EntityRelation relation : relations1) {
  200 + for (EntityRelation relation : r) {
201 201 EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
202 202 if (entityId.getEntityType() == EntityType.ASSET) {
203 203 futures.add(findAssetByIdAsync(new AssetId(entityId.getId())));
... ...
... ... @@ -18,8 +18,12 @@ package org.thingsboard.server.dao.cassandra;
18 18
19 19 import com.datastax.driver.core.*;
20 20 import com.datastax.driver.core.ProtocolOptions.Compression;
  21 +import com.datastax.driver.mapping.DefaultPropertyMapper;
21 22 import com.datastax.driver.mapping.Mapper;
  23 +import com.datastax.driver.mapping.MappingConfiguration;
22 24 import com.datastax.driver.mapping.MappingManager;
  25 +import com.datastax.driver.mapping.PropertyAccessStrategy;
  26 +import com.datastax.driver.mapping.PropertyMapper;
23 27 import lombok.extern.slf4j.Slf4j;
24 28 import org.apache.commons.lang3.StringUtils;
25 29 import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -145,7 +149,10 @@ public abstract class AbstractCassandraCluster {
145 149 } else {
146 150 session = cluster.connect();
147 151 }
148   - mappingManager = new MappingManager(session);
  152 + DefaultPropertyMapper propertyMapper = new DefaultPropertyMapper();
  153 + propertyMapper.setPropertyAccessStrategy(PropertyAccessStrategy.FIELDS);
  154 + MappingConfiguration configuration = MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
  155 + mappingManager = new MappingManager(session, configuration);
149 156 break;
150 157 } catch (Exception e) {
151 158 log.warn("Failed to initialize cassandra cluster due to {}. Will retry in {} ms", e.getMessage(), initRetryInterval);
... ...
... ... @@ -77,7 +77,7 @@ public class CassandraDashboardInfoDao extends CassandraAbstractSearchTextDao<Da
77 77
78 78 ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink);
79 79
80   - return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DashboardInfo>>) input -> {
  80 + return Futures.transformAsync(relations, input -> {
81 81 List<ListenableFuture<DashboardInfo>> dashboardFutures = new ArrayList<>(input.size());
82 82 for (EntityRelation relation : input) {
83 83 dashboardFutures.add(findByIdAsync(relation.getTo().getId()));
... ...
... ... @@ -227,10 +227,10 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
227 227 @Override
228 228 public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) {
229 229 ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
230   - ListenableFuture<List<Device>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Device>>) relations1 -> {
  230 + ListenableFuture<List<Device>> devices = Futures.transformAsync(relations, r -> {
231 231 EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
232 232 List<ListenableFuture<Device>> futures = new ArrayList<>();
233   - for (EntityRelation relation : relations1) {
  233 + for (EntityRelation relation : r) {
234 234 EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
235 235 if (entityId.getEntityType() == EntityType.DEVICE) {
236 236 futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId())));
... ...
... ... @@ -36,14 +36,14 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
36 36 private final ListenableFuture<Void> rateLimitFuture;
37 37
38 38 public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
39   - this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> {
  39 + this.rateLimitFuture = Futures.catchingAsync(rateLimiter.acquireAsync(), Throwable.class, t -> {
40 40 if (!(t instanceof BufferLimitException)) {
41 41 rateLimiter.release();
42 42 }
43 43 return Futures.immediateFailedFuture(t);
44 44 });
45 45 this.originalFuture = Futures.transform(rateLimitFuture,
46   - (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
  46 + i -> executeAsyncWithRelease(rateLimiter, session, statement));
47 47
48 48 }
49 49
... ...
... ... @@ -227,8 +227,8 @@ public class BaseRelationService implements RelationService {
227 227 inboundRelationsListTo.add(relationDao.findAllByTo(entity, typeGroup));
228 228 }
229 229 ListenableFuture<List<List<EntityRelation>>> inboundRelationsTo = Futures.allAsList(inboundRelationsListTo);
230   - ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelationsTo,
231   - (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
  230 + ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelationsTo,
  231 + relations -> {
232 232 List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, true);
233 233 return Futures.allAsList(results);
234 234 });
... ... @@ -240,7 +240,7 @@ public class BaseRelationService implements RelationService {
240 240 inboundRelationsListFrom.add(relationDao.findAllByTo(entity, typeGroup));
241 241 }
242 242 ListenableFuture<List<List<EntityRelation>>> inboundRelationsFrom = Futures.allAsList(inboundRelationsListFrom);
243   - Futures.transform(inboundRelationsFrom, (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
  243 + Futures.transformAsync(inboundRelationsFrom, relations -> {
244 244 List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, false);
245 245 return Futures.allAsList(results);
246 246 });
... ... @@ -252,7 +252,7 @@ public class BaseRelationService implements RelationService {
252 252 private List<ListenableFuture<Boolean>> getListenableFutures(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
253 253 List<ListenableFuture<Boolean>> results = new ArrayList<>();
254 254 for (List<EntityRelation> relationList : relations) {
255   - relationList.stream().forEach(relation -> {
  255 + relationList.forEach(relation -> {
256 256 checkFromDeleteAsync(cache, results, relation, isRemove);
257 257 });
258 258 }
... ... @@ -325,17 +325,16 @@ public class BaseRelationService implements RelationService {
325 325 validate(from);
326 326 validateTypeGroup(typeGroup);
327 327 ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByFrom(from, typeGroup);
328   - ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
329   - (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
  328 + return Futures.transformAsync(relations,
  329 + relations1 -> {
330 330 List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
331   - relations1.stream().forEach(relation ->
  331 + relations1.forEach(relation ->
332 332 futures.add(fetchRelationInfoAsync(relation,
333   - relation2 -> relation2.getTo(),
334   - (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName)))
  333 + EntityRelation::getTo,
  334 + EntityRelationInfo::setToName))
335 335 );
336 336 return Futures.successfulAsList(futures);
337 337 });
338   - return relationsInfo;
339 338 }
340 339
341 340 @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}")
... ... @@ -381,8 +380,8 @@ public class BaseRelationService implements RelationService {
381 380 validate(to);
382 381 validateTypeGroup(typeGroup);
383 382 ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByTo(to, typeGroup);
384   - ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
385   - (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
  383 + return Futures.transformAsync(relations,
  384 + relations1 -> {
386 385 List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
387 386 relations1.stream().forEach(relation ->
388 387 futures.add(fetchRelationInfoAsync(relation,
... ... @@ -391,7 +390,6 @@ public class BaseRelationService implements RelationService {
391 390 );
392 391 return Futures.successfulAsList(futures);
393 392 });
394   - return relationsInfo;
395 393 }
396 394
397 395 private ListenableFuture<EntityRelationInfo> fetchRelationInfoAsync(EntityRelation relation,
... ... @@ -463,8 +461,8 @@ public class BaseRelationService implements RelationService {
463 461 log.trace("Executing findInfoByQuery [{}]", query);
464 462 ListenableFuture<List<EntityRelation>> relations = findByQuery(query);
465 463 EntitySearchDirection direction = query.getParameters().getDirection();
466   - ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
467   - (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
  464 + return Futures.transformAsync(relations,
  465 + relations1 -> {
468 466 List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
469 467 relations1.stream().forEach(relation ->
470 468 futures.add(fetchRelationInfoAsync(relation,
... ... @@ -479,7 +477,6 @@ public class BaseRelationService implements RelationService {
479 477 );
480 478 return Futures.successfulAsList(futures);
481 479 });
482   - return relationsInfo;
483 480 }
484 481
485 482 protected void validate(EntityRelation relation) {
... ...
... ... @@ -102,12 +102,12 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
102 102 }
103 103 String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
104 104 ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink());
105   - return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
  105 + return Futures.transformAsync(relations, input -> {
106 106 List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
107 107 for (EntityRelation relation : input) {
108 108 alarmFutures.add(Futures.transform(
109 109 findAlarmByIdAsync(relation.getTo().getId()),
110   - (Function<Alarm, AlarmInfo>) AlarmInfo::new));
  110 + AlarmInfo::new));
111 111 }
112 112 return Futures.successfulAsList(alarmFutures);
113 113 });
... ...
... ... @@ -86,7 +86,7 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao<DashboardInfoE
86 86
87 87 ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink);
88 88
89   - return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DashboardInfo>>) input -> {
  89 + return Futures.transformAsync(relations, input -> {
90 90 List<ListenableFuture<DashboardInfo>> dashboardFutures = new ArrayList<>(input.size());
91 91 for (EntityRelation relation : input) {
92 92 dashboardFutures.add(findByIdAsync(relation.getTo().getId()));
... ...
... ... @@ -217,7 +217,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
217 217
218 218 ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
219 219
220   - ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture,
  220 + ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
221 221 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
222 222
223 223 return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor);
... ...
1   -#!/bin/bash
2   -
3   -if [[ $1 = 'cassandra' ]]; then
4   -
5   - until cqlsh -f/opt/cassandra/schema.cql; do
6   - echo "cqlsh: Cassandra is unavailable - retrying"
7   - sleep 2
8   - done &
9   -
10   -fi
11   -
12   -exec /docker-entrypoint.sh "$@"
... ... @@ -41,10 +41,10 @@
41 41 <logback.version>1.2.3</logback.version>
42 42 <mockito.version>1.9.5</mockito.version>
43 43 <rat.version>0.10</rat.version>
44   - <cassandra.version>3.0.7</cassandra.version>
  44 + <cassandra.version>3.5.0</cassandra.version>
45 45 <cassandra-unit.version>3.0.0.1</cassandra-unit.version>
46 46 <takari-cpsuite.version>1.2.7</takari-cpsuite.version>
47   - <guava.version>18.0</guava.version>
  47 + <guava.version>20.0</guava.version>
48 48 <caffeine.version>2.6.1</caffeine.version>
49 49 <commons-lang3.version>3.4</commons-lang3.version>
50 50 <commons-validator.version>1.5.0</commons-validator.version>
... ... @@ -61,15 +61,13 @@
61 61 <mail.version>1.4.3</mail.version>
62 62 <curator.version>2.11.0</curator.version>
63 63 <protobuf.version>3.0.2</protobuf.version>
64   - <grpc.version>1.0.0</grpc.version>
  64 + <grpc.version>1.12.0</grpc.version>
65 65 <lombok.version>1.16.18</lombok.version>
66 66 <paho.client.version>1.1.0</paho.client.version>
67 67 <netty.version>4.1.22.Final</netty.version>
68 68 <os-maven-plugin.version>1.5.0</os-maven-plugin.version>
69 69 <rabbitmq.version>3.6.5</rabbitmq.version>
70 70 <kafka.version>0.9.0.0</kafka.version>
71   - <hazelcast.version>3.6.6</hazelcast.version>
72   - <hazelcast-zookeeper.version>3.6.1</hazelcast-zookeeper.version>
73 71 <surfire.version>2.19.1</surfire.version>
74 72 <jar-plugin.version>3.0.2</jar-plugin.version>
75 73 <springfox-swagger.version>2.6.1</springfox-swagger.version>
... ... @@ -761,26 +759,11 @@
761 759 <version>${paho.client.version}</version>
762 760 </dependency>
763 761 <dependency>
764   - <groupId>com.hazelcast</groupId>
765   - <artifactId>hazelcast-spring</artifactId>
766   - <version>${hazelcast.version}</version>
767   - </dependency>
768   - <dependency>
769 762 <groupId>org.apache.curator</groupId>
770 763 <artifactId>curator-x-discovery</artifactId>
771 764 <version>${curator.version}</version>
772 765 </dependency>
773 766 <dependency>
774   - <groupId>com.hazelcast</groupId>
775   - <artifactId>hazelcast-zookeeper</artifactId>
776   - <version>${hazelcast-zookeeper.version}</version>
777   - </dependency>
778   - <dependency>
779   - <groupId>com.hazelcast</groupId>
780   - <artifactId>hazelcast</artifactId>
781   - <version>${hazelcast.version}</version>
782   - </dependency>
783   - <dependency>
784 767 <groupId>io.springfox</groupId>
785 768 <artifactId>springfox-swagger-ui</artifactId>
786 769 <version>${springfox-swagger.version}</version>
... ...
... ... @@ -56,7 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
56 56 @Override
57 57 protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
58 58 ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
59   - return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
  59 + return Futures.transformAsync(latest, a -> {
60 60 if (a != null && !a.getStatus().isCleared()) {
61 61 return clearAlarm(ctx, msg, a);
62 62 }
... ... @@ -66,9 +66,9 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
66 66
67 67 private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
68 68 ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails());
69   - return Futures.transform(asyncDetails, (AsyncFunction<JsonNode, AlarmResult>) details -> {
  69 + return Futures.transformAsync(asyncDetails, details -> {
70 70 ListenableFuture<Boolean> clearFuture = ctx.getAlarmService().clearAlarm(alarm.getId(), details, System.currentTimeMillis());
71   - return Futures.transform(clearFuture, (AsyncFunction<Boolean, AlarmResult>) cleared -> {
  71 + return Futures.transformAsync(clearFuture, cleared -> {
72 72 if (cleared && details != null) {
73 73 alarm.setDetails(details);
74 74 }
... ...
... ... @@ -58,7 +58,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
58 58 @Override
59 59 protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
60 60 ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
61   - return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
  61 + return Futures.transformAsync(latest, a -> {
62 62 if (a == null || a.getStatus().isCleared()) {
63 63 return createNewAlarm(ctx, msg);
64 64 } else {
... ... @@ -70,10 +70,10 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
70 70
71 71 private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
72 72 ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null),
73   - (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
  73 + details -> buildAlarm(msg, details, ctx.getTenantId()));
74 74 ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
75   - (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
76   - return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm));
  75 + alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
  76 + return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm));
77 77 }
78 78
79 79 private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
... ... @@ -85,7 +85,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
85 85 return ctx.getAlarmService().createOrUpdateAlarm(alarm);
86 86 }, ctx.getDbCallbackExecutor());
87 87
88   - return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a));
  88 + return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a));
89 89 }
90 90
91 91 private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {
... ...
... ... @@ -43,8 +43,7 @@ public class EntitiesCustomerIdAsyncLoader {
43 43 }
44 44
45 45 private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
46   - return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
47   - return in != null ? Futures.immediateFuture(in.getCustomerId())
48   - : Futures.immediateFuture(null);});
  46 + return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getCustomerId())
  47 + : Futures.immediateFuture(null));
49 48 }
50 49 }
... ...
... ... @@ -39,9 +39,8 @@ public class EntitiesRelatedDeviceIdAsyncLoader {
39 39
40 40 ListenableFuture<List<Device>> asyncDevices = deviceService.findDevicesByQuery(query);
41 41
42   - return Futures.transform(asyncDevices, (AsyncFunction<List<Device>, DeviceId>)
43   - d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
44   - : Futures.immediateFuture(null));
  42 + return Futures.transformAsync(asyncDevices, d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
  43 + : Futures.immediateFuture(null));
45 44 }
46 45
47 46 private static DeviceSearchQuery buildQuery(EntityId originator, DeviceRelationsQuery deviceRelationsQuery) {
... ...
... ... @@ -38,13 +38,11 @@ public class EntitiesRelatedEntityIdAsyncLoader {
38 38 EntityRelationsQuery query = buildQuery(originator, relationsQuery);
39 39 ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByQuery(query);
40 40 if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
41   - return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
42   - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
43   - : Futures.immediateFuture(null));
  41 + return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
  42 + : Futures.immediateFuture(null));
44 43 } else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
45   - return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
46   - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
47   - : Futures.immediateFuture(null));
  44 + return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
  45 + : Futures.immediateFuture(null));
48 46 }
49 47 return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
50 48 }
... ...
... ... @@ -51,7 +51,7 @@ public class EntitiesTenantIdAsyncLoader {
51 51 }
52 52
53 53 private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
54   - return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
  54 + return Futures.transformAsync(future, in -> {
55 55 return in != null ? Futures.immediateFuture(in.getTenantId())
56 56 : Futures.immediateFuture(null);});
57 57 }
... ...