Commit 6ccb9d6036b81cee83e77d9acdfb732409cf4f34
1 parent
f4bec22a
Send array of KV entry grouped by timestamp to rule chain
Showing
1 changed file
with
20 additions
and
9 deletions
@@ -108,9 +108,11 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; | @@ -108,9 +108,11 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; | ||
108 | import javax.mail.MessagingException; | 108 | import javax.mail.MessagingException; |
109 | import javax.servlet.http.HttpServletResponse; | 109 | import javax.servlet.http.HttpServletResponse; |
110 | import java.util.List; | 110 | import java.util.List; |
111 | +import java.util.Map; | ||
111 | import java.util.Optional; | 112 | import java.util.Optional; |
112 | import java.util.Set; | 113 | import java.util.Set; |
113 | import java.util.UUID; | 114 | import java.util.UUID; |
115 | +import java.util.stream.Collectors; | ||
114 | 116 | ||
115 | import static org.thingsboard.server.dao.service.Validator.validateId; | 117 | import static org.thingsboard.server.dao.service.Validator.validateId; |
116 | 118 | ||
@@ -770,15 +772,8 @@ public abstract class BaseController { | @@ -770,15 +772,8 @@ public abstract class BaseController { | ||
770 | keys.forEach(attrsArrayNode::add); | 772 | keys.forEach(attrsArrayNode::add); |
771 | } | 773 | } |
772 | } else if (actionType == ActionType.TIMESERIES_UPDATED) { | 774 | } else if (actionType == ActionType.TIMESERIES_UPDATED) { |
773 | - List<TsKvEntry> telemetry = extractParameter(List.class, 0, additionalInfo); | ||
774 | - if (telemetry != null && !telemetry.isEmpty()) { | ||
775 | - ObjectNode values = json.createObjectNode(); | ||
776 | - for (TsKvEntry tsKvEntry : telemetry) { | ||
777 | - addKvEntry(values, tsKvEntry); | ||
778 | - } | ||
779 | - entityNode.put("ts", telemetry.get(0).getTs()); | ||
780 | - entityNode.set("values", values); | ||
781 | - } | 775 | + List<TsKvEntry> timeseries = extractParameter(List.class, 0, additionalInfo); |
776 | + addTimeseries(entityNode, timeseries); | ||
782 | } | 777 | } |
783 | } | 778 | } |
784 | TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); | 779 | TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); |
@@ -831,4 +826,20 @@ public abstract class BaseController { | @@ -831,4 +826,20 @@ public abstract class BaseController { | ||
831 | return null; | 826 | return null; |
832 | } | 827 | } |
833 | 828 | ||
829 | + private void addTimeseries(ObjectNode entityNode, List<TsKvEntry> timeseries) throws Exception { | ||
830 | + if (timeseries != null && !timeseries.isEmpty()) { | ||
831 | + ArrayNode result = entityNode.putArray("timeseries"); | ||
832 | + Map<Long, List<TsKvEntry>> groupedTelemetry = timeseries.stream() | ||
833 | + .collect(Collectors.groupingBy(TsKvEntry::getTs)); | ||
834 | + for (Map.Entry<Long, List<TsKvEntry>> entry : groupedTelemetry.entrySet()) { | ||
835 | + ObjectNode element = json.createObjectNode(); | ||
836 | + element.put("ts", entry.getKey()); | ||
837 | + ObjectNode values = element.putObject("values"); | ||
838 | + for (TsKvEntry tsKvEntry : entry.getValue()) { | ||
839 | + addKvEntry(values, tsKvEntry); | ||
840 | + } | ||
841 | + result.add(element); | ||
842 | + } | ||
843 | + } | ||
844 | + } | ||
834 | } | 845 | } |