Commit 00e1ed3aa8004e34b69df35782939ec88266ad6d

Authored by Andrii Shvaika
1 parent 5029445b

More tests for Deduplication Executor

@@ -56,14 +56,14 @@ public class EventDeduplicationExecutor<P> { @@ -56,14 +56,14 @@ public class EventDeduplicationExecutor<P> {
56 log.info("[{}] Executing task: {}", name, params); 56 log.info("[{}] Executing task: {}", name, params);
57 function.accept(params); 57 function.accept(params);
58 } catch (Throwable e) { 58 } catch (Throwable e) {
59 - log.warn("Failed to process task with parameters: {}", params, e); 59 + log.warn("[{}] Failed to process task with parameters: {}", name, params, e);
60 throw e; 60 throw e;
61 } finally { 61 } finally {
62 unlockAndProcessIfAny(); 62 unlockAndProcessIfAny();
63 } 63 }
64 }); 64 });
65 } catch (Throwable e) { 65 } catch (Throwable e) {
66 - log.warn("Failed to submit task with parameters: {}", params, e); 66 + log.warn("[{}] Failed to submit task with parameters: {}", name, params, e);
67 unlockAndProcessIfAny(); 67 unlockAndProcessIfAny();
68 throw e; 68 throw e;
69 } 69 }
@@ -41,15 +41,24 @@ public class EventDeduplicationExecutorTest { @@ -41,15 +41,24 @@ public class EventDeduplicationExecutorTest {
41 periodicFlow(MoreExecutors.newDirectExecutorService()); 41 periodicFlow(MoreExecutors.newDirectExecutorService());
42 } 42 }
43 43
  44 + @Test
  45 + public void testExceptionFlowSameThread() throws InterruptedException {
  46 + exceptionFlow(MoreExecutors.newDirectExecutorService());
  47 + }
44 48
45 @Test 49 @Test
46 public void testSimpleFlowSingleThread() throws InterruptedException { 50 public void testSimpleFlowSingleThread() throws InterruptedException {
47 - simpleFlow(Executors.newFixedThreadPool(1)); 51 + simpleFlow(Executors.newSingleThreadExecutor());
48 } 52 }
49 53
50 @Test 54 @Test
51 public void testPeriodicFlowSingleThread() throws InterruptedException { 55 public void testPeriodicFlowSingleThread() throws InterruptedException {
52 - periodicFlow(Executors.newFixedThreadPool(1)); 56 + periodicFlow(Executors.newSingleThreadExecutor());
  57 + }
  58 +
  59 + @Test
  60 + public void testExceptionFlowSingleThread() throws InterruptedException {
  61 + exceptionFlow(Executors.newSingleThreadExecutor());
53 } 62 }
54 63
55 @Test 64 @Test
@@ -62,6 +71,11 @@ public class EventDeduplicationExecutorTest { @@ -62,6 +71,11 @@ public class EventDeduplicationExecutorTest {
62 periodicFlow(Executors.newFixedThreadPool(3)); 71 periodicFlow(Executors.newFixedThreadPool(3));
63 } 72 }
64 73
  74 + @Test
  75 + public void testExceptionFlowMultiThread() throws InterruptedException {
  76 + exceptionFlow(Executors.newFixedThreadPool(3));
  77 + }
  78 +
65 private void simpleFlow(ExecutorService executorService) throws InterruptedException { 79 private void simpleFlow(ExecutorService executorService) throws InterruptedException {
66 try { 80 try {
67 Consumer<String> function = Mockito.spy(StringConsumer.class); 81 Consumer<String> function = Mockito.spy(StringConsumer.class);
@@ -105,6 +119,28 @@ public class EventDeduplicationExecutorTest { @@ -105,6 +119,28 @@ public class EventDeduplicationExecutorTest {
105 } 119 }
106 } 120 }
107 121
  122 + private void exceptionFlow(ExecutorService executorService) throws InterruptedException {
  123 + try {
  124 + Consumer<String> function = Mockito.spy(StringConsumer.class);
  125 + EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function);
  126 +
  127 + String params1 = "params1";
  128 + String params2 = "params2";
  129 + String params3 = "params3";
  130 +
  131 + Mockito.doThrow(new RuntimeException()).when(function).accept("params1");
  132 + executor.submit(params1);
  133 + executor.submit(params2);
  134 + Thread.sleep(500);
  135 + executor.submit(params3);
  136 + Thread.sleep(500);
  137 + Mockito.verify(function).accept(params2);
  138 + Mockito.verify(function).accept(params3);
  139 + } finally {
  140 + executorService.shutdownNow();
  141 + }
  142 + }
  143 +
108 public static class StringConsumer implements Consumer<String> { 144 public static class StringConsumer implements Consumer<String> {
109 @Override 145 @Override
110 public void accept(String s) { 146 public void accept(String s) {