Commit c83d600bddcfa42b37e7793773bef0983ed17185

Authored by Igor Kulikov
2 parents 175a9c90 b49a7f57

Merge branch 'master' of github.com:thingsboard/thingsboard

@@ -20,7 +20,6 @@ import com.google.common.cache.CacheLoader; @@ -20,7 +20,6 @@ import com.google.common.cache.CacheLoader;
20 import com.google.common.cache.LoadingCache; 20 import com.google.common.cache.LoadingCache;
21 import com.google.common.util.concurrent.Futures; 21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture; 22 import com.google.common.util.concurrent.ListenableFuture;
23 -import com.google.common.util.concurrent.MoreExecutors;  
24 import lombok.AllArgsConstructor; 23 import lombok.AllArgsConstructor;
25 import lombok.Data; 24 import lombok.Data;
26 import lombok.NoArgsConstructor; 25 import lombok.NoArgsConstructor;
@@ -78,7 +77,8 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA @@ -78,7 +77,8 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
78 77
79 @Override 78 @Override
80 public void onMsg(TbContext ctx, TbMsg msg) { 79 public void onMsg(TbContext ctx, TbMsg msg) {
81 - withCallback(processEntityRelationAction(ctx, msg), 80 + String relationType = processPattern(msg, config.getRelationType());
  81 + withCallback(processEntityRelationAction(ctx, msg, relationType),
82 filterResult -> ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); 82 filterResult -> ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
83 } 83 }
84 84
@@ -86,13 +86,13 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA @@ -86,13 +86,13 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
86 public void destroy() { 86 public void destroy() {
87 } 87 }
88 88
89 - protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {  
90 - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor()); 89 + protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) {
  90 + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), ctx.getDbCallbackExecutor());
91 } 91 }
92 92
93 protected abstract boolean createEntityIfNotExists(); 93 protected abstract boolean createEntityIfNotExists();
94 94
95 - protected abstract ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer); 95 + protected abstract ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType);
96 96
97 protected abstract C loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException; 97 protected abstract C loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException;
98 98
@@ -120,11 +120,11 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA @@ -120,11 +120,11 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
120 if (EntitySearchDirection.FROM.name().equals(this.config.getDirection())) { 120 if (EntitySearchDirection.FROM.name().equals(this.config.getDirection())) {
121 searchDirectionIds.setFromId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString())); 121 searchDirectionIds.setFromId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString()));
122 searchDirectionIds.setToId(msg.getOriginator()); 122 searchDirectionIds.setToId(msg.getOriginator());
123 - searchDirectionIds.setOrignatorDirectionFrom(false); 123 + searchDirectionIds.setOriginatorDirectionFrom(false);
124 } else { 124 } else {
125 searchDirectionIds.setToId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString())); 125 searchDirectionIds.setToId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString()));
126 searchDirectionIds.setFromId(msg.getOriginator()); 126 searchDirectionIds.setFromId(msg.getOriginator());
127 - searchDirectionIds.setOrignatorDirectionFrom(true); 127 + searchDirectionIds.setOriginatorDirectionFrom(true);
128 } 128 }
129 return searchDirectionIds; 129 return searchDirectionIds;
130 } 130 }
@@ -153,7 +153,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA @@ -153,7 +153,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
153 protected static class SearchDirectionIds { 153 protected static class SearchDirectionIds {
154 private EntityId fromId; 154 private EntityId fromId;
155 private EntityId toId; 155 private EntityId toId;
156 - private boolean orignatorDirectionFrom; 156 + private boolean originatorDirectionFrom;
157 } 157 }
158 158
159 private static class EntityCacheLoader extends CacheLoader<EntityKey, EntityContainer> { 159 private static class EntityCacheLoader extends CacheLoader<EntityKey, EntityContainer> {
@@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action; @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action;
17 17
18 import com.google.common.util.concurrent.Futures; 18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture; 19 import com.google.common.util.concurrent.ListenableFuture;
20 -import com.google.common.util.concurrent.MoreExecutors;  
21 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
22 import org.thingsboard.rule.engine.api.RuleNode; 21 import org.thingsboard.rule.engine.api.RuleNode;
23 import org.thingsboard.rule.engine.api.TbContext; 22 import org.thingsboard.rule.engine.api.TbContext;
@@ -57,8 +56,6 @@ import java.util.List; @@ -57,8 +56,6 @@ import java.util.List;
57 ) 56 )
58 public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateRelationNodeConfiguration> { 57 public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateRelationNodeConfiguration> {
59 58
60 - private String relationType;  
61 -  
62 @Override 59 @Override
63 protected TbCreateRelationNodeConfiguration loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException { 60 protected TbCreateRelationNodeConfiguration loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException {
64 return TbNodeUtils.convert(configuration, TbCreateRelationNodeConfiguration.class); 61 return TbNodeUtils.convert(configuration, TbCreateRelationNodeConfiguration.class);
@@ -70,8 +67,8 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -70,8 +67,8 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
70 } 67 }
71 68
72 @Override 69 @Override
73 - protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity) {  
74 - ListenableFuture<Boolean> future = createIfAbsent(ctx, msg, entity); 70 + protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity, String relationType) {
  71 + ListenableFuture<Boolean> future = createIfAbsent(ctx, msg, entity, relationType);
75 return Futures.transform(future, result -> { 72 return Futures.transform(future, result -> {
76 RelationContainer container = new RelationContainer(); 73 RelationContainer container = new RelationContainer();
77 if (result && config.isChangeOriginatorToRelatedEntity()) { 74 if (result && config.isChangeOriginatorToRelatedEntity()) {
@@ -82,16 +79,15 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -82,16 +79,15 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
82 } 79 }
83 container.setResult(result); 80 container.setResult(result);
84 return container; 81 return container;
85 - }, MoreExecutors.directExecutor()); 82 + }, ctx.getDbCallbackExecutor());
86 } 83 }
87 84
88 - private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {  
89 - relationType = processPattern(msg, config.getRelationType()); 85 + private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
90 SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer); 86 SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer);
91 ListenableFuture<Boolean> checkRelationFuture = Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON), result -> { 87 ListenableFuture<Boolean> checkRelationFuture = Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON), result -> {
92 if (!result) { 88 if (!result) {
93 if (config.isRemoveCurrentRelations()) { 89 if (config.isRemoveCurrentRelations()) {
94 - return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId)); 90 + return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId, relationType));
95 } 91 }
96 return Futures.immediateFuture(false); 92 return Futures.immediateFuture(false);
97 } 93 }
@@ -100,14 +96,14 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -100,14 +96,14 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
100 96
101 return Futures.transformAsync(checkRelationFuture, result -> { 97 return Futures.transformAsync(checkRelationFuture, result -> {
102 if (!result) { 98 if (!result) {
103 - return processCreateRelation(ctx, entityContainer, sdId); 99 + return processCreateRelation(ctx, entityContainer, sdId, relationType);
104 } 100 }
105 return Futures.immediateFuture(true); 101 return Futures.immediateFuture(true);
106 }, ctx.getDbCallbackExecutor()); 102 }, ctx.getDbCallbackExecutor());
107 } 103 }
108 104
109 - private ListenableFuture<List<EntityRelation>> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId) {  
110 - if (sdId.isOrignatorDirectionFrom()) { 105 + private ListenableFuture<List<EntityRelation>> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) {
  106 + if (sdId.isOriginatorDirectionFrom()) {
111 return ctx.getRelationService().findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON); 107 return ctx.getRelationService().findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON);
112 } else { 108 } else {
113 return ctx.getRelationService().findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON); 109 return ctx.getRelationService().findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON);
@@ -121,91 +117,91 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR @@ -121,91 +117,91 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
121 for (EntityRelation relation : entityRelations) { 117 for (EntityRelation relation : entityRelations) {
122 list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation)); 118 list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation));
123 } 119 }
124 - return Futures.transform(Futures.allAsList(list), result -> false, MoreExecutors.directExecutor()); 120 + return Futures.transform(Futures.allAsList(list), result -> false, ctx.getDbCallbackExecutor());
125 } 121 }
126 return Futures.immediateFuture(false); 122 return Futures.immediateFuture(false);
127 }, ctx.getDbCallbackExecutor()); 123 }, ctx.getDbCallbackExecutor());
128 } 124 }
129 125
130 - private ListenableFuture<Boolean> processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 126 + private ListenableFuture<Boolean> processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
131 switch (entityContainer.getEntityType()) { 127 switch (entityContainer.getEntityType()) {
132 case ASSET: 128 case ASSET:
133 - return processAsset(ctx, entityContainer, sdId); 129 + return processAsset(ctx, entityContainer, sdId, relationType);
134 case DEVICE: 130 case DEVICE:
135 - return processDevice(ctx, entityContainer, sdId); 131 + return processDevice(ctx, entityContainer, sdId, relationType);
136 case CUSTOMER: 132 case CUSTOMER:
137 - return processCustomer(ctx, entityContainer, sdId); 133 + return processCustomer(ctx, entityContainer, sdId, relationType);
138 case DASHBOARD: 134 case DASHBOARD:
139 - return processDashboard(ctx, entityContainer, sdId); 135 + return processDashboard(ctx, entityContainer, sdId, relationType);
140 case ENTITY_VIEW: 136 case ENTITY_VIEW:
141 - return processView(ctx, entityContainer, sdId); 137 + return processView(ctx, entityContainer, sdId, relationType);
142 case TENANT: 138 case TENANT:
143 - return processTenant(ctx, entityContainer, sdId); 139 + return processTenant(ctx, entityContainer, sdId, relationType);
144 } 140 }
145 return Futures.immediateFuture(true); 141 return Futures.immediateFuture(true);
146 } 142 }
147 143
148 - private ListenableFuture<Boolean> processView(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 144 + private ListenableFuture<Boolean> processView(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
149 return Futures.transformAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), new EntityViewId(entityContainer.getEntityId().getId())), entityView -> { 145 return Futures.transformAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), new EntityViewId(entityContainer.getEntityId().getId())), entityView -> {
150 if (entityView != null) { 146 if (entityView != null) {
151 - return processSave(ctx, sdId); 147 + return processSave(ctx, sdId, relationType);
152 } else { 148 } else {
153 return Futures.immediateFuture(true); 149 return Futures.immediateFuture(true);
154 } 150 }
155 }, ctx.getDbCallbackExecutor()); 151 }, ctx.getDbCallbackExecutor());
156 } 152 }
157 153
158 - private ListenableFuture<Boolean> processDevice(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 154 + private ListenableFuture<Boolean> processDevice(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
159 return Futures.transformAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), new DeviceId(entityContainer.getEntityId().getId())), device -> { 155 return Futures.transformAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), new DeviceId(entityContainer.getEntityId().getId())), device -> {
160 if (device != null) { 156 if (device != null) {
161 - return processSave(ctx, sdId); 157 + return processSave(ctx, sdId, relationType);
162 } else { 158 } else {
163 return Futures.immediateFuture(true); 159 return Futures.immediateFuture(true);
164 } 160 }
165 - }, MoreExecutors.directExecutor()); 161 + }, ctx.getDbCallbackExecutor());
166 } 162 }
167 163
168 - private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 164 + private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
169 return Futures.transformAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), new AssetId(entityContainer.getEntityId().getId())), asset -> { 165 return Futures.transformAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), new AssetId(entityContainer.getEntityId().getId())), asset -> {
170 if (asset != null) { 166 if (asset != null) {
171 - return processSave(ctx, sdId); 167 + return processSave(ctx, sdId, relationType);
172 } else { 168 } else {
173 return Futures.immediateFuture(true); 169 return Futures.immediateFuture(true);
174 } 170 }
175 }, ctx.getDbCallbackExecutor()); 171 }, ctx.getDbCallbackExecutor());
176 } 172 }
177 173
178 - private ListenableFuture<Boolean> processCustomer(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 174 + private ListenableFuture<Boolean> processCustomer(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
179 return Futures.transformAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), new CustomerId(entityContainer.getEntityId().getId())), customer -> { 175 return Futures.transformAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), new CustomerId(entityContainer.getEntityId().getId())), customer -> {
180 if (customer != null) { 176 if (customer != null) {
181 - return processSave(ctx, sdId); 177 + return processSave(ctx, sdId, relationType);
182 } else { 178 } else {
183 return Futures.immediateFuture(true); 179 return Futures.immediateFuture(true);
184 } 180 }
185 }, ctx.getDbCallbackExecutor()); 181 }, ctx.getDbCallbackExecutor());
186 } 182 }
187 183
188 - private ListenableFuture<Boolean> processDashboard(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 184 + private ListenableFuture<Boolean> processDashboard(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
189 return Futures.transformAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), new DashboardId(entityContainer.getEntityId().getId())), dashboard -> { 185 return Futures.transformAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), new DashboardId(entityContainer.getEntityId().getId())), dashboard -> {
190 if (dashboard != null) { 186 if (dashboard != null) {
191 - return processSave(ctx, sdId); 187 + return processSave(ctx, sdId, relationType);
192 } else { 188 } else {
193 return Futures.immediateFuture(true); 189 return Futures.immediateFuture(true);
194 } 190 }
195 }, ctx.getDbCallbackExecutor()); 191 }, ctx.getDbCallbackExecutor());
196 } 192 }
197 193
198 - private ListenableFuture<Boolean> processTenant(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { 194 + private ListenableFuture<Boolean> processTenant(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
199 return Futures.transformAsync(ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), new TenantId(entityContainer.getEntityId().getId())), tenant -> { 195 return Futures.transformAsync(ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), new TenantId(entityContainer.getEntityId().getId())), tenant -> {
200 if (tenant != null) { 196 if (tenant != null) {
201 - return processSave(ctx, sdId); 197 + return processSave(ctx, sdId, relationType);
202 } else { 198 } else {
203 return Futures.immediateFuture(true); 199 return Futures.immediateFuture(true);
204 } 200 }
205 }, ctx.getDbCallbackExecutor()); 201 }, ctx.getDbCallbackExecutor());
206 } 202 }
207 203
208 - private ListenableFuture<Boolean> processSave(TbContext ctx, SearchDirectionIds sdId) { 204 + private ListenableFuture<Boolean> processSave(TbContext ctx, SearchDirectionIds sdId, String relationType) {
209 return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON)); 205 return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON));
210 } 206 }
211 207
@@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action; @@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action;
17 17
18 import com.google.common.util.concurrent.Futures; 18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture; 19 import com.google.common.util.concurrent.ListenableFuture;
20 -import com.google.common.util.concurrent.MoreExecutors;  
21 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
22 import org.thingsboard.rule.engine.api.RuleNode; 21 import org.thingsboard.rule.engine.api.RuleNode;
23 import org.thingsboard.rule.engine.api.TbContext; 22 import org.thingsboard.rule.engine.api.TbContext;
@@ -48,8 +47,6 @@ import java.util.List; @@ -48,8 +47,6 @@ import java.util.List;
48 ) 47 )
49 public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteRelationNodeConfiguration> { 48 public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteRelationNodeConfiguration> {
50 49
51 - private String relationType;  
52 -  
53 @Override 50 @Override
54 protected TbDeleteRelationNodeConfiguration loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException { 51 protected TbDeleteRelationNodeConfiguration loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException {
55 return TbNodeUtils.convert(configuration, TbDeleteRelationNodeConfiguration.class); 52 return TbNodeUtils.convert(configuration, TbDeleteRelationNodeConfiguration.class);
@@ -61,21 +58,20 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR @@ -61,21 +58,20 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
61 } 58 }
62 59
63 @Override 60 @Override
64 - protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {  
65 - return getRelationContainerListenableFuture(ctx, msg); 61 + protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) {
  62 + return getRelationContainerListenableFuture(ctx, msg, relationType);
66 } 63 }
67 64
68 @Override 65 @Override
69 - protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {  
70 - return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor()); 66 + protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
  67 + return Futures.transform(processSingle(ctx, msg, entityContainer, relationType), result -> new RelationContainer(msg, result), ctx.getDbCallbackExecutor());
71 } 68 }
72 69
73 - private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg) {  
74 - relationType = processPattern(msg, config.getRelationType()); 70 + private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg, String relationType) {
75 if (config.isDeleteForSingleEntity()) { 71 if (config.isDeleteForSingleEntity()) {
76 - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor()); 72 + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), ctx.getDbCallbackExecutor());
77 } else { 73 } else {
78 - return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor()); 74 + return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), ctx.getDbCallbackExecutor());
79 } 75 }
80 } 76 }
81 77
@@ -95,23 +91,23 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR @@ -95,23 +91,23 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
95 } 91 }
96 } 92 }
97 return Futures.immediateFuture(true); 93 return Futures.immediateFuture(true);
98 - }, MoreExecutors.directExecutor()); 94 + }, ctx.getDbCallbackExecutor());
99 } 95 }
100 - }, MoreExecutors.directExecutor()); 96 + }, ctx.getDbCallbackExecutor());
101 } 97 }
102 98
103 - private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer) { 99 + private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
104 SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer); 100 SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer);
105 return Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON), 101 return Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON),
106 result -> { 102 result -> {
107 if (result) { 103 if (result) {
108 - return processSingleDeleteRelation(ctx, sdId); 104 + return processSingleDeleteRelation(ctx, sdId, relationType);
109 } 105 }
110 return Futures.immediateFuture(true); 106 return Futures.immediateFuture(true);
111 - }, MoreExecutors.directExecutor()); 107 + }, ctx.getDbCallbackExecutor());
112 } 108 }
113 109
114 - private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId) { 110 + private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) {
115 return ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON); 111 return ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON);
116 } 112 }
117 113