Commit a2dceb93a69a2d69ce971e8fccd9b4345e6ba458

Authored by Igor Kulikov
2 parents 0e6c6af2 c0c71284

Merge branch 'develop/2.0' of github.com:thingsboard/thingsboard into develop/2.0

@@ -62,7 +62,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura @@ -62,7 +62,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
62 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared"); 62 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
63 } 63 }
64 }, 64 },
65 - t -> ctx.tellFailure(msg, t)); 65 + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
66 } 66 }
67 67
68 protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg); 68 protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg);
@@ -70,7 +70,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC @@ -70,7 +70,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
70 putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"), 70 putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
71 putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_") 71 putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_")
72 ); 72 );
73 - withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellFailure(msg, t)); 73 + withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
74 } 74 }
75 75
76 private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, String prefix) { 76 private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, String prefix) {
@@ -54,7 +54,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode @@ -54,7 +54,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
54 withCallback( 54 withCallback(
55 findEntityAsync(ctx, msg.getOriginator()), 55 findEntityAsync(ctx, msg.getOriginator()),
56 entityId -> safeGetAttributes(ctx, msg, entityId), 56 entityId -> safeGetAttributes(ctx, msg, entityId),
57 - t -> ctx.tellFailure(msg, t)); 57 + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
58 } catch (Throwable th) { 58 } catch (Throwable th) {
59 ctx.tellFailure(msg, th); 59 ctx.tellFailure(msg, th);
60 } 60 }
@@ -68,7 +68,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode @@ -68,7 +68,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
68 68
69 withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId), 69 withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
70 attributes -> putAttributesAndTell(ctx, msg, attributes), 70 attributes -> putAttributesAndTell(ctx, msg, attributes),
71 - t -> ctx.tellFailure(msg, t)); 71 + t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
72 } 72 }
73 73
74 private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) { 74 private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
@@ -69,7 +69,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode { @@ -69,7 +69,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
69 return null; 69 return null;
70 } 70 }
71 return ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData()); 71 return ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData());
72 - }); 72 + }, ctx.getDbCallbackExecutor());
73 } 73 }
74 74
75 private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) { 75 private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {