Commit 6c1074a8b04767c652940aba3b3298f075727b89

Authored by Andrii Shvaika
1 parent 411c9dab

Fix for race condition in the partition change events

... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.actors.app.AppInitMsg;
34 34 import org.thingsboard.server.actors.stats.StatsActor;
35 35 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
36 36 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
  37 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
37 38
38 39 import javax.annotation.PostConstruct;
39 40 import javax.annotation.PreDestroy;
... ... @@ -43,7 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
43 44
44 45 @Service
45 46 @Slf4j
46   -public class DefaultActorService implements ActorService {
  47 +public class DefaultActorService extends TbApplicationEventListener<PartitionChangeEvent> implements ActorService {
47 48
48 49 public static final String APP_DISPATCHER_NAME = "app-dispatcher";
49 50 public static final String TENANT_DISPATCHER_NAME = "tenant-dispatcher";
... ... @@ -120,10 +121,10 @@ public class DefaultActorService implements ActorService {
120 121 appActor.tellWithHighPriority(new AppInitMsg());
121 122 }
122 123
123   - @EventListener(PartitionChangeEvent.class)
124   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  124 + @Override
  125 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
125 126 log.info("Received partition change event.");
126   - this.appActor.tellWithHighPriority(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
  127 + this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceQueueKey(), event.getPartitions()));
127 128 }
128 129
129 130 @PreDestroy
... ...
... ... @@ -54,6 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
54 54 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
55 55 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
56 56 import org.thingsboard.server.queue.discovery.PartitionService;
  57 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
57 58 import org.thingsboard.server.queue.scheduler.SchedulerComponent;
58 59 import org.thingsboard.server.service.queue.TbClusterService;
59 60 import org.thingsboard.server.service.telemetry.InternalTelemetryService;
... ... @@ -78,7 +79,7 @@ import java.util.stream.Collectors;
78 79
79 80 @Slf4j
80 81 @Service
81   -public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
  82 +public class DefaultTbApiUsageStateService extends TbApplicationEventListener<PartitionChangeEvent> implements TbApiUsageStateService {
82 83
83 84 public static final String HOURLY = "Hourly";
84 85 public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
... ... @@ -188,7 +189,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
188 189 }
189 190
190 191 @Override
191   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  192 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
192 193 if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
193 194 myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
194 195 otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
... ...
... ... @@ -151,12 +151,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
151 151 }
152 152
153 153 @Override
154   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
155   - if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
156   - log.info("Subscribing to partitions: {}", partitionChangeEvent.getPartitions());
157   - this.mainConsumer.subscribe(partitionChangeEvent.getPartitions());
  154 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
  155 + if (event.getServiceType().equals(getServiceType())) {
  156 + log.info("Subscribing to partitions: {}", event.getPartitions());
  157 + this.mainConsumer.subscribe(event.getPartitions());
158 158 this.usageStatsConsumer.subscribe(
159   - partitionChangeEvent
  159 + event
160 160 .getPartitions()
161 161 .stream()
162 162 .map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
... ...
... ... @@ -140,11 +140,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
140 140 }
141 141
142 142 @Override
143   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
144   - if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
145   - ServiceQueue serviceQueue = partitionChangeEvent.getServiceQueueKey().getServiceQueue();
146   - log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), partitionChangeEvent.getPartitions());
147   - consumers.get(serviceQueue.getQueue()).subscribe(partitionChangeEvent.getPartitions());
  143 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
  144 + if (event.getServiceType().equals(getServiceType())) {
  145 + ServiceQueue serviceQueue = event.getServiceQueueKey().getServiceQueue();
  146 + log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), event.getPartitions());
  147 + consumers.get(serviceQueue.getQueue()).subscribe(event.getPartitions());
148 148 }
149 149 }
150 150
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
36 36 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
37 37 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
38 38 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
  39 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
39 40 import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
40 41 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
41 42 import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
... ... @@ -56,7 +57,7 @@ import java.util.function.Function;
56 57 import java.util.stream.Collectors;
57 58
58 59 @Slf4j
59   -public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> implements ApplicationListener<PartitionChangeEvent> {
  60 +public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
60 61
61 62 protected volatile ExecutorService consumersExecutor;
62 63 protected volatile ExecutorService notificationsConsumerExecutor;
... ...
... ... @@ -56,6 +56,7 @@ import org.thingsboard.server.dao.util.mapping.JacksonUtil;
56 56 import org.thingsboard.server.gen.transport.TransportProtos;
57 57 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
58 58 import org.thingsboard.server.queue.discovery.PartitionService;
  59 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
59 60 import org.thingsboard.server.queue.util.TbCoreComponent;
60 61 import org.thingsboard.server.service.queue.TbClusterService;
61 62 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
... ... @@ -90,7 +91,7 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
90 91 @Service
91 92 @TbCoreComponent
92 93 @Slf4j
93   -public class DefaultDeviceStateService implements DeviceStateService {
  94 +public class DefaultDeviceStateService extends TbApplicationEventListener<PartitionChangeEvent> implements DeviceStateService {
94 95
95 96 public static final String ACTIVITY_STATE = "active";
96 97 public static final String LAST_CONNECT_TIME = "lastConnectTime";
... ... @@ -294,7 +295,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
294 295 }
295 296
296 297 @Override
297   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  298 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
298 299 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
299 300 deduplicationExecutor.submit(partitionChangeEvent.getPartitions());
300 301 }
... ...
... ... @@ -48,6 +48,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
48 48 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
49 49 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
50 50 import org.thingsboard.server.queue.discovery.PartitionService;
  51 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
51 52 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
52 53 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
53 54 import org.thingsboard.server.queue.util.TbCoreComponent;
... ... @@ -76,7 +77,7 @@ import java.util.function.Predicate;
76 77 @Slf4j
77 78 @TbCoreComponent
78 79 @Service
79   -public class DefaultSubscriptionManagerService implements SubscriptionManagerService {
  80 +public class DefaultSubscriptionManagerService extends TbApplicationEventListener<PartitionChangeEvent> implements SubscriptionManagerService {
80 81
81 82 @Autowired
82 83 private AttributesService attrService;
... ... @@ -178,7 +179,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
178 179 }
179 180
180 181 @Override
181   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  182 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
182 183 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
183 184 Set<TopicPartitionInfo> removedPartitions = new HashSet<>(currentPartitions);
184 185 removedPartitions.removeAll(partitionChangeEvent.getPartitions());
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
28 28 import org.thingsboard.server.common.msg.queue.ServiceType;
29 29 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
30 30 import org.thingsboard.server.common.msg.queue.TbCallback;
  31 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
31 32 import org.thingsboard.server.queue.util.TbCoreComponent;
32 33 import org.thingsboard.server.service.queue.TbClusterService;
33 34 import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
... ... @@ -62,6 +63,34 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
62 63 private SubscriptionManagerService subscriptionManagerService;
63 64
64 65 private ExecutorService subscriptionUpdateExecutor;
  66 +
  67 + private TbApplicationEventListener<PartitionChangeEvent> partitionChangeListener = new TbApplicationEventListener<>() {
  68 + @Override
  69 + protected void onTbApplicationEvent(PartitionChangeEvent event) {
  70 + if (ServiceType.TB_CORE.equals(event.getServiceType())) {
  71 + currentPartitions.clear();
  72 + currentPartitions.addAll(event.getPartitions());
  73 + }
  74 + }
  75 + };
  76 +
  77 + private TbApplicationEventListener<ClusterTopologyChangeEvent> clusterTopologyChangeListener = new TbApplicationEventListener<>() {
  78 + @Override
  79 + protected void onTbApplicationEvent(ClusterTopologyChangeEvent event) {
  80 + if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
  81 + /*
  82 + * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
  83 + * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
  84 + * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
  85 + * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
  86 + * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
  87 + * Since number of subscriptions is usually much less then number of devices that are pushing data.
  88 + */
  89 + subscriptionsBySessionId.values().forEach(map -> map.values()
  90 + .forEach(sub -> pushSubscriptionToManagerService(sub, true)));
  91 + }
  92 + }
  93 + };
65 94
66 95 @PostConstruct
67 96 public void initExecutor() {
... ... @@ -77,28 +106,14 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
77 106
78 107 @Override
79 108 @EventListener(PartitionChangeEvent.class)
80   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
81   - if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
82   - currentPartitions.clear();
83   - currentPartitions.addAll(partitionChangeEvent.getPartitions());
84   - }
  109 + public void onApplicationEvent(PartitionChangeEvent event) {
  110 + partitionChangeListener.onApplicationEvent(event);
85 111 }
86 112
87 113 @Override
88 114 @EventListener(ClusterTopologyChangeEvent.class)
89 115 public void onApplicationEvent(ClusterTopologyChangeEvent event) {
90   - if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
91   - /*
92   - * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
93   - * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
94   - * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
95   - * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
96   - * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
97   - * Since number of subscriptions is usually much less then number of devices that are pushing data.
98   - */
99   - subscriptionsBySessionId.values().forEach(map -> map.values()
100   - .forEach(sub -> pushSubscriptionToManagerService(sub, true)));
101   - }
  116 + clusterTopologyChangeListener.onApplicationEvent(event);
102 117 }
103 118
104 119 //TODO 3.1: replace null callbacks with callbacks from websocket service.
... ...
... ... @@ -41,6 +41,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
41 41 import org.thingsboard.server.gen.transport.TransportProtos;
42 42 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
43 43 import org.thingsboard.server.queue.discovery.PartitionService;
  44 +import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
44 45 import org.thingsboard.server.service.queue.TbClusterService;
45 46 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
46 47 import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
... ... @@ -61,7 +62,7 @@ import java.util.function.Consumer;
61 62 * Created by ashvayka on 27.03.18.
62 63 */
63 64 @Slf4j
64   -public abstract class AbstractSubscriptionService implements ApplicationListener<PartitionChangeEvent> {
  65 +public abstract class AbstractSubscriptionService extends TbApplicationEventListener<PartitionChangeEvent>{
65 66
66 67 protected final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
67 68
... ... @@ -97,8 +98,7 @@ public abstract class AbstractSubscriptionService implements ApplicationListener
97 98 }
98 99
99 100 @Override
100   - @EventListener(PartitionChangeEvent.class)
101   - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
  101 + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
102 102 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
103 103 currentPartitions.clear();
104 104 currentPartitions.addAll(partitionChangeEvent.getPartitions());
... ...
... ... @@ -22,7 +22,9 @@ import org.thingsboard.server.common.msg.queue.ServiceQueueKey;
22 22 import java.util.Set;
23 23
24 24
25   -public class ClusterTopologyChangeEvent extends ApplicationEvent {
  25 +public class ClusterTopologyChangeEvent extends TbApplicationEvent {
  26 +
  27 + private static final long serialVersionUID = -2441739930040282254L;
26 28
27 29 @Getter
28 30 private final Set<ServiceQueueKey> serviceQueueKeys;
... ...
... ... @@ -126,7 +126,7 @@ public class HashPartitionService implements PartitionService {
126 126 }
127 127
128 128 @Override
129   - public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
  129 + public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
130 130 logServiceInfo(currentService);
131 131 otherServices.forEach(this::logServiceInfo);
132 132 Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
... ... @@ -134,7 +134,7 @@ public class HashPartitionService implements PartitionService {
134 134 for (ServiceInfo other : otherServices) {
135 135 addNode(queueServicesMap, other);
136 136 }
137   - queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));
  137 + queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
138 138
139 139 ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
140 140 TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
... ...
... ... @@ -24,7 +24,9 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
24 24 import java.util.Set;
25 25
26 26
27   -public class PartitionChangeEvent extends ApplicationEvent {
  27 +public class PartitionChangeEvent extends TbApplicationEvent {
  28 +
  29 + private static final long serialVersionUID = -8731788167026510559L;
28 30
29 31 @Getter
30 32 private final ServiceQueueKey serviceQueueKey;
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.discovery;
  17 +
  18 +import lombok.Getter;
  19 +import org.springframework.context.ApplicationEvent;
  20 +
  21 +import java.util.concurrent.atomic.AtomicInteger;
  22 +
  23 +public class TbApplicationEvent extends ApplicationEvent {
  24 +
  25 + private static final long serialVersionUID = 3884264064887765146L;
  26 +
  27 + private static final AtomicInteger sequence = new AtomicInteger();
  28 +
  29 + @Getter
  30 + private final int sequenceNumber;
  31 +
  32 + public TbApplicationEvent(Object source) {
  33 + super(source);
  34 + sequenceNumber = sequence.incrementAndGet();
  35 + }
  36 +
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.discovery;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.context.ApplicationListener;
  20 +
  21 +import java.util.concurrent.locks.Lock;
  22 +import java.util.concurrent.locks.ReentrantLock;
  23 +
  24 +@Slf4j
  25 +public abstract class TbApplicationEventListener<T extends TbApplicationEvent> implements ApplicationListener<T> {
  26 +
  27 + private int lastProcessedSequenceNumber = Integer.MIN_VALUE;
  28 + private final Lock seqNumberLock = new ReentrantLock();
  29 +
  30 + @Override
  31 + public void onApplicationEvent(T event) {
  32 + boolean validUpdate = false;
  33 + seqNumberLock.lock();
  34 + try {
  35 + if (event.getSequenceNumber() > lastProcessedSequenceNumber) {
  36 + validUpdate = true;
  37 + lastProcessedSequenceNumber = event.getSequenceNumber();
  38 + }
  39 + } finally {
  40 + seqNumberLock.unlock();
  41 + }
  42 + if (validUpdate) {
  43 + onTbApplicationEvent(event);
  44 + } else {
  45 + log.info("Application event ignored due to invalid sequence number ({} > {}). Event: {}", lastProcessedSequenceNumber, event.getSequenceNumber(), event);
  46 + }
  47 + }
  48 +
  49 + protected abstract void onTbApplicationEvent(T event);
  50 +
  51 +
  52 +}
... ...