Commit 5029445b8c6018151073ff000799d13ba7e92db7

Authored by Andrii Shvaika
1 parent 61723da3

Deduplication of the partition change events

... ... @@ -59,6 +59,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
59 59 import org.thingsboard.server.queue.util.TbCoreComponent;
60 60 import org.thingsboard.server.service.queue.TbClusterService;
61 61 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
  62 +import org.thingsboard.server.utils.EventDeduplicationExecutor;
62 63
63 64 import javax.annotation.Nullable;
64 65 import javax.annotation.PostConstruct;
... ... @@ -126,13 +127,13 @@ public class DefaultDeviceStateService implements DeviceStateService {
126 127 @Getter
127 128 private int initFetchPackSize;
128 129
129   - private volatile boolean clusterUpdatePending = false;
130   -
131 130 private ListeningScheduledExecutorService queueExecutor;
132 131 private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
133 132 private final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
134 133 private final ConcurrentMap<DeviceId, Long> deviceLastReportedActivity = new ConcurrentHashMap<>();
135 134 private final ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>();
  135 + private volatile EventDeduplicationExecutor<Set<TopicPartitionInfo>> deduplicationExecutor;
  136 +
136 137
137 138 public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
138 139 AttributesService attributesService, TimeseriesService tsService,
... ... @@ -155,6 +156,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
155 156 // Should be always single threaded due to absence of locks.
156 157 queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));
157 158 queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
  159 + deduplicationExecutor = new EventDeduplicationExecutor<>(DefaultDeviceStateService.class.getSimpleName(), queueExecutor, this::initStateFromDB);
158 160 }
159 161
160 162 @PreDestroy
... ... @@ -292,25 +294,14 @@ public class DefaultDeviceStateService implements DeviceStateService {
292 294 }
293 295 }
294 296
295   - volatile Set<TopicPartitionInfo> pendingPartitions;
296   -
297 297 @Override
298 298 public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
299 299 if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
300   - synchronized (this) {
301   - pendingPartitions = partitionChangeEvent.getPartitions();
302   - if (!clusterUpdatePending) {
303   - clusterUpdatePending = true;
304   - queueExecutor.submit(() -> {
305   - clusterUpdatePending = false;
306   - initStateFromDB();
307   - });
308   - }
309   - }
  300 + deduplicationExecutor.submit(partitionChangeEvent.getPartitions());
310 301 }
311 302 }
312 303
313   - private void initStateFromDB() {
  304 + private void initStateFromDB(Set<TopicPartitionInfo> pendingPartitions) {
314 305 try {
315 306 log.info("CURRENT PARTITIONS: {}", partitionedDevices.keySet());
316 307 log.info("NEW PARTITIONS: {}", pendingPartitions);
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.utils;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +
  20 +import java.util.concurrent.Executor;
  21 +import java.util.concurrent.ExecutorService;
  22 +import java.util.function.Consumer;
  23 +
  24 +/**
  25 + * This class deduplicate executions of the specified function.
  26 + * Useful in cluster mode, when you get event about partition change multiple times.
  27 + * Assuming that the function execution is expensive, we should execute it immediately when first time event occurs and
  28 + * later, once the processing of first event is done, process last pending task.
  29 + *
  30 + * @param <P> parameters of the function
  31 + */
  32 +@Slf4j
  33 +public class EventDeduplicationExecutor<P> {
  34 + private final String name;
  35 + private final ExecutorService executor;
  36 + private final Consumer<P> function;
  37 + private P pendingTask;
  38 + private boolean busy;
  39 +
  40 + public EventDeduplicationExecutor(String name, ExecutorService executor, Consumer<P> function) {
  41 + this.name = name;
  42 + this.executor = executor;
  43 + this.function = function;
  44 + }
  45 +
  46 + public void submit(P params) {
  47 + log.info("[{}] Going to submit: {}", name, params);
  48 + synchronized (EventDeduplicationExecutor.this) {
  49 + if (!busy) {
  50 + busy = true;
  51 + pendingTask = null;
  52 + try {
  53 + log.info("[{}] Submitting task: {}", name, params);
  54 + executor.submit(() -> {
  55 + try {
  56 + log.info("[{}] Executing task: {}", name, params);
  57 + function.accept(params);
  58 + } catch (Throwable e) {
  59 + log.warn("Failed to process task with parameters: {}", params, e);
  60 + throw e;
  61 + } finally {
  62 + unlockAndProcessIfAny();
  63 + }
  64 + });
  65 + } catch (Throwable e) {
  66 + log.warn("Failed to submit task with parameters: {}", params, e);
  67 + unlockAndProcessIfAny();
  68 + throw e;
  69 + }
  70 + } else {
  71 + log.info("[{}] Task is already in progress. {} pending task: {}", name, pendingTask == null ? "adding" : "updating", params);
  72 + pendingTask = params;
  73 + }
  74 + }
  75 + }
  76 +
  77 + private void unlockAndProcessIfAny() {
  78 + synchronized (EventDeduplicationExecutor.this) {
  79 + busy = false;
  80 + if (pendingTask != null) {
  81 + submit(pendingTask);
  82 + }
  83 + }
  84 + }
  85 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.util;
  17 +
  18 +import com.google.common.util.concurrent.MoreExecutors;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.junit.Test;
  21 +import org.junit.runner.RunWith;
  22 +import org.mockito.Mockito;
  23 +import org.mockito.runners.MockitoJUnitRunner;
  24 +import org.thingsboard.server.utils.EventDeduplicationExecutor;
  25 +
  26 +import java.util.concurrent.ExecutorService;
  27 +import java.util.concurrent.Executors;
  28 +import java.util.function.Consumer;
  29 +
  30 +@Slf4j
  31 +@RunWith(MockitoJUnitRunner.class)
  32 +public class EventDeduplicationExecutorTest {
  33 +
  34 + @Test
  35 + public void testSimpleFlowSameThread() throws InterruptedException {
  36 + simpleFlow(MoreExecutors.newDirectExecutorService());
  37 + }
  38 +
  39 + @Test
  40 + public void testPeriodicFlowSameThread() throws InterruptedException {
  41 + periodicFlow(MoreExecutors.newDirectExecutorService());
  42 + }
  43 +
  44 +
  45 + @Test
  46 + public void testSimpleFlowSingleThread() throws InterruptedException {
  47 + simpleFlow(Executors.newFixedThreadPool(1));
  48 + }
  49 +
  50 + @Test
  51 + public void testPeriodicFlowSingleThread() throws InterruptedException {
  52 + periodicFlow(Executors.newFixedThreadPool(1));
  53 + }
  54 +
  55 + @Test
  56 + public void testSimpleFlowMultiThread() throws InterruptedException {
  57 + simpleFlow(Executors.newFixedThreadPool(3));
  58 + }
  59 +
  60 + @Test
  61 + public void testPeriodicFlowMultiThread() throws InterruptedException {
  62 + periodicFlow(Executors.newFixedThreadPool(3));
  63 + }
  64 +
  65 + private void simpleFlow(ExecutorService executorService) throws InterruptedException {
  66 + try {
  67 + Consumer<String> function = Mockito.spy(StringConsumer.class);
  68 + EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function);
  69 +
  70 + String params1 = "params1";
  71 + String params2 = "params2";
  72 + String params3 = "params3";
  73 +
  74 + executor.submit(params1);
  75 + executor.submit(params2);
  76 + executor.submit(params3);
  77 + Thread.sleep(500);
  78 + Mockito.verify(function).accept(params1);
  79 + Mockito.verify(function).accept(params3);
  80 + } finally {
  81 + executorService.shutdownNow();
  82 + }
  83 + }
  84 +
  85 + private void periodicFlow(ExecutorService executorService) throws InterruptedException {
  86 + try {
  87 + Consumer<String> function = Mockito.spy(StringConsumer.class);
  88 + EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function);
  89 +
  90 + String params1 = "params1";
  91 + String params2 = "params2";
  92 + String params3 = "params3";
  93 +
  94 + executor.submit(params1);
  95 + Thread.sleep(500);
  96 + executor.submit(params2);
  97 + Thread.sleep(500);
  98 + executor.submit(params3);
  99 + Thread.sleep(500);
  100 + Mockito.verify(function).accept(params1);
  101 + Mockito.verify(function).accept(params2);
  102 + Mockito.verify(function).accept(params3);
  103 + } finally {
  104 + executorService.shutdownNow();
  105 + }
  106 + }
  107 +
  108 + public static class StringConsumer implements Consumer<String> {
  109 + @Override
  110 + public void accept(String s) {
  111 + try {
  112 + Thread.sleep(100);
  113 + } catch (InterruptedException e) {
  114 + throw new RuntimeException(e);
  115 + }
  116 + }
  117 + }
  118 +
  119 +}
... ...