Commit b49a7f573b7ed8ec0efc536a9a4538596f85506c

Authored by Andrew Shvayka
Committed by GitHub
2 parents 1c82df42 d67ab523

Merge pull request #2736 from volodymyr-babak/bug/relation-concurrent-fix

Changed direct executors to db executors
@@ -20,7 +20,6 @@ import com.google.common.cache.CacheLoader; @@ -20,7 +20,6 @@ import com.google.common.cache.CacheLoader;
20 import com.google.common.cache.LoadingCache; 20 import com.google.common.cache.LoadingCache;
21 import com.google.common.util.concurrent.Futures; 21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture; 22 import com.google.common.util.concurrent.ListenableFuture;
23 -import com.google.common.util.concurrent.MoreExecutors;  
24 import lombok.AllArgsConstructor; 23 import lombok.AllArgsConstructor;
25 import lombok.Data; 24 import lombok.Data;
26 import lombok.NoArgsConstructor; 25 import lombok.NoArgsConstructor;
@@ -88,7 +87,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA @@ -88,7 +87,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
88 } 87 }
89 88
90 protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) { 89 protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) {
91 - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), MoreExecutors.directExecutor()); 90 + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), ctx.getDbCallbackExecutor());
92 } 91 }
93 92
94 protected abstract boolean createEntityIfNotExists(); 93 protected abstract boolean createEntityIfNotExists();
@@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action; @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action;
17 17
18 import com.google.common.util.concurrent.Futures; 18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture; 19 import com.google.common.util.concurrent.ListenableFuture;
20 -import com.google.common.util.concurrent.MoreExecutors;  
21 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
22 import org.thingsboard.rule.engine.api.RuleNode; 21 import org.thingsboard.rule.engine.api.RuleNode;
23 import org.thingsboard.rule.engine.api.TbContext; 22 import org.thingsboard.rule.engine.api.TbContext;
@@ -80,7 +79,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -80,7 +79,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
80 } 79 }
81 container.setResult(result); 80 container.setResult(result);
82 return container; 81 return container;
83 - }, MoreExecutors.directExecutor()); 82 + }, ctx.getDbCallbackExecutor());
84 } 83 }
85 84
86 private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { 85 private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
@@ -118,7 +117,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -118,7 +117,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
118 for (EntityRelation relation : entityRelations) { 117 for (EntityRelation relation : entityRelations) {
119 list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation)); 118 list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation));
120 } 119 }
121 - return Futures.transform(Futures.allAsList(list), result -> false, MoreExecutors.directExecutor()); 120 + return Futures.transform(Futures.allAsList(list), result -> false, ctx.getDbCallbackExecutor());
122 } 121 }
123 return Futures.immediateFuture(false); 122 return Futures.immediateFuture(false);
124 }, ctx.getDbCallbackExecutor()); 123 }, ctx.getDbCallbackExecutor());
@@ -159,7 +158,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -159,7 +158,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
159 } else { 158 } else {
160 return Futures.immediateFuture(true); 159 return Futures.immediateFuture(true);
161 } 160 }
162 - }, MoreExecutors.directExecutor()); 161 + }, ctx.getDbCallbackExecutor());
163 } 162 }
164 163
165 private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) { 164 private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
@@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action; @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action;
17 17
18 import com.google.common.util.concurrent.Futures; 18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture; 19 import com.google.common.util.concurrent.ListenableFuture;
20 -import com.google.common.util.concurrent.MoreExecutors;  
21 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
22 import org.thingsboard.rule.engine.api.RuleNode; 21 import org.thingsboard.rule.engine.api.RuleNode;
23 import org.thingsboard.rule.engine.api.TbContext; 22 import org.thingsboard.rule.engine.api.TbContext;
@@ -65,14 +64,14 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR @@ -65,14 +64,14 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
65 64
66 @Override 65 @Override
67 protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { 66 protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
68 - return Futures.transform(processSingle(ctx, msg, entityContainer, relationType), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor()); 67 + return Futures.transform(processSingle(ctx, msg, entityContainer, relationType), result -> new RelationContainer(msg, result), ctx.getDbCallbackExecutor());
69 } 68 }
70 69
71 private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg, String relationType) { 70 private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg, String relationType) {
72 if (config.isDeleteForSingleEntity()) { 71 if (config.isDeleteForSingleEntity()) {
73 - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), MoreExecutors.directExecutor()); 72 + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), ctx.getDbCallbackExecutor());
74 } else { 73 } else {
75 - return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor()); 74 + return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), ctx.getDbCallbackExecutor());
76 } 75 }
77 } 76 }
78 77
@@ -92,9 +91,9 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR @@ -92,9 +91,9 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
92 } 91 }
93 } 92 }
94 return Futures.immediateFuture(true); 93 return Futures.immediateFuture(true);
95 - }, MoreExecutors.directExecutor()); 94 + }, ctx.getDbCallbackExecutor());
96 } 95 }
97 - }, MoreExecutors.directExecutor()); 96 + }, ctx.getDbCallbackExecutor());
98 } 97 }
99 98
100 private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { 99 private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
@@ -105,7 +104,7 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR @@ -105,7 +104,7 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
105 return processSingleDeleteRelation(ctx, sdId, relationType); 104 return processSingleDeleteRelation(ctx, sdId, relationType);
106 } 105 }
107 return Futures.immediateFuture(true); 106 return Futures.immediateFuture(true);
108 - }, MoreExecutors.directExecutor()); 107 + }, ctx.getDbCallbackExecutor());
109 } 108 }
110 109
111 private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) { 110 private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) {