|
@@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext { |
|
@@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext { |
125
|
|
125
|
|
126
|
@Override
|
126
|
@Override
|
127
|
public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
127
|
public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
128
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
|
128
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
|
129
|
enqueue(tpi, tbMsg, onFailure, onSuccess);
|
129
|
enqueue(tpi, tbMsg, onFailure, onSuccess);
|
130
|
}
|
130
|
}
|
131
|
|
131
|
|
|
@@ -142,46 +142,54 @@ class DefaultTbContext implements TbContext { |
|
@@ -142,46 +142,54 @@ class DefaultTbContext implements TbContext { |
142
|
|
142
|
|
143
|
@Override
|
143
|
@Override
|
144
|
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
|
144
|
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
|
145
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
|
145
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
146
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
|
146
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
|
147
|
}
|
147
|
}
|
148
|
|
148
|
|
149
|
@Override
|
149
|
@Override
|
150
|
public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
|
150
|
public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
|
151
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
|
151
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
152
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
|
152
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
|
153
|
}
|
153
|
}
|
154
|
|
154
|
|
155
|
@Override
|
155
|
@Override
|
156
|
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
|
156
|
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
|
157
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
|
157
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
158
|
enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
|
158
|
enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
|
159
|
}
|
159
|
}
|
160
|
|
160
|
|
161
|
@Override
|
161
|
@Override
|
162
|
public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
162
|
public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
163
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
|
163
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
164
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
|
164
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
|
165
|
}
|
165
|
}
|
166
|
|
166
|
|
167
|
@Override
|
167
|
@Override
|
168
|
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
168
|
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
169
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
|
169
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
170
|
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
|
170
|
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
|
171
|
}
|
171
|
}
|
172
|
|
172
|
|
173
|
@Override
|
173
|
@Override
|
174
|
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
174
|
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
175
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
|
175
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
|
176
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
|
176
|
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
|
177
|
}
|
177
|
}
|
178
|
|
178
|
|
179
|
@Override
|
179
|
@Override
|
180
|
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
180
|
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
181
|
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
|
181
|
+ TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
|
182
|
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
|
182
|
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
|
183
|
}
|
183
|
}
|
184
|
|
184
|
|
|
|
185
|
+ private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
|
|
|
186
|
+ return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
|
|
|
187
|
+ }
|
|
|
188
|
+
|
|
|
189
|
+ private TopicPartitionInfo resolvePartition(TbMsg tbMsg) {
|
|
|
190
|
+ return resolvePartition(tbMsg, tbMsg.getQueueName());
|
|
|
191
|
+ }
|
|
|
192
|
+
|
185
|
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
193
|
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
186
|
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
|
194
|
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
|
187
|
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
|
195
|
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
|