Commit 1c82df42e2e0b353e47861163a9fa4a6c297b377

Authored by Andrew Shvayka
Committed by GitHub
2 parents 92c9bc0a 7a633ed6

Merge pull request #2735 from volodymyr-babak/bug/relation-concurrent-fix

Bug/relation concurrent fix
... ... @@ -78,7 +78,8 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
78 78
79 79 @Override
80 80 public void onMsg(TbContext ctx, TbMsg msg) {
81   - withCallback(processEntityRelationAction(ctx, msg),
  81 + String relationType = processPattern(msg, config.getRelationType());
  82 + withCallback(processEntityRelationAction(ctx, msg, relationType),
82 83 filterResult -> ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
83 84 }
84 85
... ... @@ -86,13 +87,13 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
86 87 public void destroy() {
87 88 }
88 89
89   - protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {
90   - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor());
  90 + protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) {
  91 + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), MoreExecutors.directExecutor());
91 92 }
92 93
93 94 protected abstract boolean createEntityIfNotExists();
94 95
95   - protected abstract ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer);
  96 + protected abstract ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType);
96 97
97 98 protected abstract C loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException;
98 99
... ... @@ -120,11 +121,11 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
120 121 if (EntitySearchDirection.FROM.name().equals(this.config.getDirection())) {
121 122 searchDirectionIds.setFromId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString()));
122 123 searchDirectionIds.setToId(msg.getOriginator());
123   - searchDirectionIds.setOrignatorDirectionFrom(false);
  124 + searchDirectionIds.setOriginatorDirectionFrom(false);
124 125 } else {
125 126 searchDirectionIds.setToId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString()));
126 127 searchDirectionIds.setFromId(msg.getOriginator());
127   - searchDirectionIds.setOrignatorDirectionFrom(true);
  128 + searchDirectionIds.setOriginatorDirectionFrom(true);
128 129 }
129 130 return searchDirectionIds;
130 131 }
... ... @@ -153,7 +154,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
153 154 protected static class SearchDirectionIds {
154 155 private EntityId fromId;
155 156 private EntityId toId;
156   - private boolean orignatorDirectionFrom;
  157 + private boolean originatorDirectionFrom;
157 158 }
158 159
159 160 private static class EntityCacheLoader extends CacheLoader<EntityKey, EntityContainer> {
... ...
... ... @@ -57,8 +57,6 @@ import java.util.List;
57 57 )
58 58 public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateRelationNodeConfiguration> {
59 59
60   - private String relationType;
61   -
62 60 @Override
63 61 protected TbCreateRelationNodeConfiguration loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException {
64 62 return TbNodeUtils.convert(configuration, TbCreateRelationNodeConfiguration.class);
... ... @@ -70,8 +68,8 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
70 68 }
71 69
72 70 @Override
73   - protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity) {
74   - ListenableFuture<Boolean> future = createIfAbsent(ctx, msg, entity);
  71 + protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity, String relationType) {
  72 + ListenableFuture<Boolean> future = createIfAbsent(ctx, msg, entity, relationType);
75 73 return Futures.transform(future, result -> {
76 74 RelationContainer container = new RelationContainer();
77 75 if (result && config.isChangeOriginatorToRelatedEntity()) {
... ... @@ -85,13 +83,12 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
85 83 }, MoreExecutors.directExecutor());
86 84 }
87 85
88   - private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
89   - relationType = processPattern(msg, config.getRelationType());
  86 + private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
90 87 SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer);
91 88 ListenableFuture<Boolean> checkRelationFuture = Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON), result -> {
92 89 if (!result) {
93 90 if (config.isRemoveCurrentRelations()) {
94   - return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId));
  91 + return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId, relationType));
95 92 }
96 93 return Futures.immediateFuture(false);
97 94 }
... ... @@ -100,14 +97,14 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
100 97
101 98 return Futures.transformAsync(checkRelationFuture, result -> {
102 99 if (!result) {
103   - return processCreateRelation(ctx, entityContainer, sdId);
  100 + return processCreateRelation(ctx, entityContainer, sdId, relationType);
104 101 }
105 102 return Futures.immediateFuture(true);
106 103 }, ctx.getDbCallbackExecutor());
107 104 }
108 105
109   - private ListenableFuture<List<EntityRelation>> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId) {
110   - if (sdId.isOrignatorDirectionFrom()) {
  106 + private ListenableFuture<List<EntityRelation>> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) {
  107 + if (sdId.isOriginatorDirectionFrom()) {
111 108 return ctx.getRelationService().findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON);
112 109 } else {
113 110 return ctx.getRelationService().findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON);
... ... @@ -127,85 +124,85 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
127 124 }, ctx.getDbCallbackExecutor());
128 125 }
129 126
130   - private ListenableFuture<Boolean> processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  127 + private ListenableFuture<Boolean> processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
131 128 switch (entityContainer.getEntityType()) {
132 129 case ASSET:
133   - return processAsset(ctx, entityContainer, sdId);
  130 + return processAsset(ctx, entityContainer, sdId, relationType);
134 131 case DEVICE:
135   - return processDevice(ctx, entityContainer, sdId);
  132 + return processDevice(ctx, entityContainer, sdId, relationType);
136 133 case CUSTOMER:
137   - return processCustomer(ctx, entityContainer, sdId);
  134 + return processCustomer(ctx, entityContainer, sdId, relationType);
138 135 case DASHBOARD:
139   - return processDashboard(ctx, entityContainer, sdId);
  136 + return processDashboard(ctx, entityContainer, sdId, relationType);
140 137 case ENTITY_VIEW:
141   - return processView(ctx, entityContainer, sdId);
  138 + return processView(ctx, entityContainer, sdId, relationType);
142 139 case TENANT:
143   - return processTenant(ctx, entityContainer, sdId);
  140 + return processTenant(ctx, entityContainer, sdId, relationType);
144 141 }
145 142 return Futures.immediateFuture(true);
146 143 }
147 144
148   - private ListenableFuture<Boolean> processView(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  145 + private ListenableFuture<Boolean> processView(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
149 146 return Futures.transformAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), new EntityViewId(entityContainer.getEntityId().getId())), entityView -> {
150 147 if (entityView != null) {
151   - return processSave(ctx, sdId);
  148 + return processSave(ctx, sdId, relationType);
152 149 } else {
153 150 return Futures.immediateFuture(true);
154 151 }
155 152 }, ctx.getDbCallbackExecutor());
156 153 }
157 154
158   - private ListenableFuture<Boolean> processDevice(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  155 + private ListenableFuture<Boolean> processDevice(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
159 156 return Futures.transformAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), new DeviceId(entityContainer.getEntityId().getId())), device -> {
160 157 if (device != null) {
161   - return processSave(ctx, sdId);
  158 + return processSave(ctx, sdId, relationType);
162 159 } else {
163 160 return Futures.immediateFuture(true);
164 161 }
165 162 }, MoreExecutors.directExecutor());
166 163 }
167 164
168   - private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  165 + private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
169 166 return Futures.transformAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), new AssetId(entityContainer.getEntityId().getId())), asset -> {
170 167 if (asset != null) {
171   - return processSave(ctx, sdId);
  168 + return processSave(ctx, sdId, relationType);
172 169 } else {
173 170 return Futures.immediateFuture(true);
174 171 }
175 172 }, ctx.getDbCallbackExecutor());
176 173 }
177 174
178   - private ListenableFuture<Boolean> processCustomer(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  175 + private ListenableFuture<Boolean> processCustomer(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
179 176 return Futures.transformAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), new CustomerId(entityContainer.getEntityId().getId())), customer -> {
180 177 if (customer != null) {
181   - return processSave(ctx, sdId);
  178 + return processSave(ctx, sdId, relationType);
182 179 } else {
183 180 return Futures.immediateFuture(true);
184 181 }
185 182 }, ctx.getDbCallbackExecutor());
186 183 }
187 184
188   - private ListenableFuture<Boolean> processDashboard(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  185 + private ListenableFuture<Boolean> processDashboard(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
189 186 return Futures.transformAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), new DashboardId(entityContainer.getEntityId().getId())), dashboard -> {
190 187 if (dashboard != null) {
191   - return processSave(ctx, sdId);
  188 + return processSave(ctx, sdId, relationType);
192 189 } else {
193 190 return Futures.immediateFuture(true);
194 191 }
195 192 }, ctx.getDbCallbackExecutor());
196 193 }
197 194
198   - private ListenableFuture<Boolean> processTenant(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
  195 + private ListenableFuture<Boolean> processTenant(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
199 196 return Futures.transformAsync(ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), new TenantId(entityContainer.getEntityId().getId())), tenant -> {
200 197 if (tenant != null) {
201   - return processSave(ctx, sdId);
  198 + return processSave(ctx, sdId, relationType);
202 199 } else {
203 200 return Futures.immediateFuture(true);
204 201 }
205 202 }, ctx.getDbCallbackExecutor());
206 203 }
207 204
208   - private ListenableFuture<Boolean> processSave(TbContext ctx, SearchDirectionIds sdId) {
  205 + private ListenableFuture<Boolean> processSave(TbContext ctx, SearchDirectionIds sdId, String relationType) {
209 206 return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON));
210 207 }
211 208
... ...
... ... @@ -48,8 +48,6 @@ import java.util.List;
48 48 )
49 49 public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteRelationNodeConfiguration> {
50 50
51   - private String relationType;
52   -
53 51 @Override
54 52 protected TbDeleteRelationNodeConfiguration loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException {
55 53 return TbNodeUtils.convert(configuration, TbDeleteRelationNodeConfiguration.class);
... ... @@ -61,19 +59,18 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
61 59 }
62 60
63 61 @Override
64   - protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {
65   - return getRelationContainerListenableFuture(ctx, msg);
  62 + protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) {
  63 + return getRelationContainerListenableFuture(ctx, msg, relationType);
66 64 }
67 65
68 66 @Override
69   - protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
70   - return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
  67 + protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
  68 + return Futures.transform(processSingle(ctx, msg, entityContainer, relationType), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
71 69 }
72 70
73   - private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg) {
74   - relationType = processPattern(msg, config.getRelationType());
  71 + private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg, String relationType) {
75 72 if (config.isDeleteForSingleEntity()) {
76   - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor());
  73 + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), MoreExecutors.directExecutor());
77 74 } else {
78 75 return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
79 76 }
... ... @@ -100,18 +97,18 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
100 97 }, MoreExecutors.directExecutor());
101 98 }
102 99
103   - private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
  100 + private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
104 101 SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer);
105 102 return Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON),
106 103 result -> {
107 104 if (result) {
108   - return processSingleDeleteRelation(ctx, sdId);
  105 + return processSingleDeleteRelation(ctx, sdId, relationType);
109 106 }
110 107 return Futures.immediateFuture(true);
111 108 }, MoreExecutors.directExecutor());
112 109 }
113 110
114   - private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId) {
  111 + private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) {
115 112 return ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON);
116 113 }
117 114
... ...