Commit 670a218e6da7643f8a15801b0be5d9828ead7ba1

Authored by Andrii Shvaika
1 parent b25b05c7

Added ts to TbMsg

@@ -40,6 +40,7 @@ import java.util.UUID; @@ -40,6 +40,7 @@ import java.util.UUID;
40 public final class TbMsg implements Serializable { 40 public final class TbMsg implements Serializable {
41 41
42 private final UUID id; 42 private final UUID id;
  43 + private final long ts;
43 private final String type; 44 private final String type;
44 private final EntityId originator; 45 private final EntityId originator;
45 private final TbMsgMetaData metaData; 46 private final TbMsgMetaData metaData;
@@ -51,38 +52,43 @@ public final class TbMsg implements Serializable { @@ -51,38 +52,43 @@ public final class TbMsg implements Serializable {
51 transient private final TbMsgCallback callback; 52 transient private final TbMsgCallback callback;
52 53
53 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { 54 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
54 - return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY); 55 + return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
55 } 56 }
56 57
57 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 58 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
58 - return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); 59 + return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
59 } 60 }
60 61
61 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { 62 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
62 - return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY); 63 + return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY);
63 } 64 }
64 65
65 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 66 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
66 - return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); 67 + return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
67 } 68 }
68 69
69 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { 70 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
70 - return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback); 71 + return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback);
71 } 72 }
72 73
73 public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { 74 public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
74 - return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), 75 + return new TbMsg(origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(),
75 data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback()); 76 data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
76 } 77 }
77 78
78 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 79 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
79 - return new TbMsg(UUID.randomUUID(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), 80 + return new TbMsg(UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
80 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); 81 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
81 } 82 }
82 83
83 - private TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, 84 + private TbMsg(UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
84 RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) { 85 RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
85 this.id = id; 86 this.id = id;
  87 + if (ts > 0) {
  88 + this.ts = ts;
  89 + } else {
  90 + this.ts = System.currentTimeMillis();
  91 + }
86 this.type = type; 92 this.type = type;
87 this.originator = originator; 93 this.originator = originator;
88 this.metaData = metaData; 94 this.metaData = metaData;
@@ -105,6 +111,7 @@ public final class TbMsg implements Serializable { @@ -105,6 +111,7 @@ public final class TbMsg implements Serializable {
105 public static byte[] toByteArray(TbMsg msg) { 111 public static byte[] toByteArray(TbMsg msg) {
106 MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); 112 MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
107 builder.setId(msg.getId().toString()); 113 builder.setId(msg.getId().toString());
  114 + builder.setTs(msg.getTs());
108 builder.setType(msg.getType()); 115 builder.setType(msg.getType());
109 builder.setEntityType(msg.getOriginator().getEntityType().name()); 116 builder.setEntityType(msg.getOriginator().getEntityType().name());
110 builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits()); 117 builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits());
@@ -124,7 +131,6 @@ public final class TbMsg implements Serializable { @@ -124,7 +131,6 @@ public final class TbMsg implements Serializable {
124 builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build()); 131 builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
125 } 132 }
126 133
127 -  
128 builder.setDataType(msg.getDataType().ordinal()); 134 builder.setDataType(msg.getDataType().ordinal());
129 builder.setData(msg.getData()); 135 builder.setData(msg.getData());
130 return builder.build().toByteArray(); 136 return builder.build().toByteArray();
@@ -144,18 +150,18 @@ public final class TbMsg implements Serializable { @@ -144,18 +150,18 @@ public final class TbMsg implements Serializable {
144 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); 150 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
145 } 151 }
146 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; 152 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
147 - return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback); 153 + return new TbMsg(UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
148 } catch (InvalidProtocolBufferException e) { 154 } catch (InvalidProtocolBufferException e) {
149 throw new IllegalStateException("Could not parse protobuf for TbMsg", e); 155 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
150 } 156 }
151 } 157 }
152 158
153 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) { 159 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) {
154 - return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback); 160 + return new TbMsg(this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
155 } 161 }
156 162
157 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) { 163 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
158 - return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback); 164 + return new TbMsg(this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
159 } 165 }
160 166
161 public TbMsgCallback getCallback() { 167 public TbMsgCallback getCallback() {
@@ -44,4 +44,5 @@ message TbMsgProto { @@ -44,4 +44,5 @@ message TbMsgProto {
44 int32 dataType = 13; 44 int32 dataType = 13;
45 string data = 14; 45 string data = 14;
46 46
  47 + int64 ts = 15;
47 } 48 }
@@ -70,7 +70,7 @@ public class TbMsgTimeseriesNode implements TbNode { @@ -70,7 +70,7 @@ public class TbMsgTimeseriesNode implements TbNode {
70 } catch (NumberFormatException e) { 70 } catch (NumberFormatException e) {
71 } 71 }
72 } else { 72 } else {
73 - ts = System.currentTimeMillis(); 73 + ts = msg.getTs();
74 } 74 }
75 String src = msg.getData(); 75 String src = msg.getData();
76 Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); 76 Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);