Commit a4ae4162a385ec7770ec14395682c14372f93536

Authored by Andrew Shvayka
Committed by GitHub
2 parents 92da11aa 9b18b2a8

Merge pull request #10 from ytalashko/master

Remove redundant conversions into streams
Showing 26 changed files with 67 additions and 75 deletions
... ... @@ -121,7 +121,7 @@ public class AppActor extends ContextAwareActor {
121 121
122 122 private void broadcast(Object msg) {
123 123 pluginManager.broadcast(msg);
124   - tenantActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
  124 + tenantActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
125 125 }
126 126
127 127 private void onToRuleMsg(ToRuleActorMsg msg) {
... ...
... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.actors.rule;
17 17
18 18 import java.util.ArrayList;
19   -import java.util.Collections;
20 19 import java.util.List;
21 20 import java.util.Set;
22 21
... ... @@ -26,7 +25,7 @@ public class SimpleRuleActorChain implements RuleActorChain {
26 25
27 26 public SimpleRuleActorChain(Set<RuleActorMetaData> ruleSet) {
28 27 rules = new ArrayList<>(ruleSet);
29   - Collections.sort(rules, RuleActorMetaData.RULE_ACTOR_MD_COMPARATOR);
  28 + rules.sort(RuleActorMetaData.RULE_ACTOR_MD_COMPARATOR);
30 29 }
31 30
32 31 public int size() {
... ...
... ... @@ -111,7 +111,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
111 111 Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
112 112 if (!newTargetServer.equals(currentTargetServer)) {
113 113 currentTargetServer = newTargetServer;
114   - pendingMap.values().stream().forEach(v -> {
  114 + pendingMap.values().forEach(v -> {
115 115 forwardToAppActor(context, v, currentTargetServer);
116 116 if (currentTargetServer.isPresent()) {
117 117 logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get());
... ...
... ... @@ -66,7 +66,7 @@ public class SessionManagerActor extends ContextAwareActor {
66 66 }
67 67
68 68 private void broadcast(Object msg) {
69   - sessionActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
  69 + sessionActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
70 70 }
71 71
72 72 private void onSessionTimeout(SessionTimeoutMsg msg) {
... ...
... ... @@ -19,7 +19,6 @@ import akka.actor.ActorContext;
19 19 import akka.actor.ActorRef;
20 20 import akka.actor.Scheduler;
21 21 import akka.event.LoggingAdapter;
22   -import com.fasterxml.jackson.core.JsonProcessingException;
23 22 import com.fasterxml.jackson.databind.JsonNode;
24 23 import com.fasterxml.jackson.databind.ObjectMapper;
25 24 import lombok.AllArgsConstructor;
... ...
... ... @@ -15,9 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.actors.shared.plugin;
17 17
18   -import java.util.HashMap;
19   -import java.util.Map;
20   -
  18 +import akka.actor.ActorContext;
  19 +import akka.actor.ActorRef;
  20 +import akka.actor.Props;
21 21 import lombok.extern.slf4j.Slf4j;
22 22 import org.thingsboard.server.actors.ActorSystemContext;
23 23 import org.thingsboard.server.actors.plugin.PluginActor;
... ... @@ -29,12 +29,9 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
29 29 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
30 30 import org.thingsboard.server.common.data.plugin.PluginMetaData;
31 31 import org.thingsboard.server.dao.plugin.PluginService;
32   -import org.slf4j.Logger;
33   -import org.slf4j.LoggerFactory;
34 32
35   -import akka.actor.ActorContext;
36   -import akka.actor.ActorRef;
37   -import akka.actor.Props;
  33 +import java.util.HashMap;
  34 +import java.util.Map;
38 35
39 36 @Slf4j
40 37 public abstract class PluginManager {
... ... @@ -64,17 +61,13 @@ public abstract class PluginManager {
64 61 abstract TenantId getTenantId();
65 62
66 63 public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
67   - ActorRef pluginActor = pluginActors.get(pluginId);
68   - if (pluginActor == null) {
69   - pluginActor = context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pluginId))
70   - .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pluginId.toString());
71   - pluginActors.put(pluginId, pluginActor);
72   - }
73   - return pluginActor;
  64 + return pluginActors.computeIfAbsent(pluginId, pId ->
  65 + context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
  66 + .withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pId.toString()));
74 67 }
75 68
76 69 public void broadcast(Object msg) {
77   - pluginActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
  70 + pluginActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
78 71 }
79 72
80 73 public void remove(PluginId id) {
... ...
... ... @@ -20,7 +20,6 @@ import org.thingsboard.server.common.data.id.TenantId;
20 20 import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
21 21 import org.thingsboard.server.common.data.plugin.PluginMetaData;
22 22 import org.thingsboard.server.dao.plugin.BasePluginService;
23   -import org.thingsboard.server.dao.plugin.PluginService;
24 23
25 24 public class SystemPluginManager extends PluginManager {
26 25
... ... @@ -30,7 +29,7 @@ public class SystemPluginManager extends PluginManager {
30 29
31 30 @Override
32 31 FetchFunction<PluginMetaData> getFetchPluginsFunction() {
33   - return link -> pluginService.findSystemPlugins(link);
  32 + return pluginService::findSystemPlugins;
34 33 }
35 34
36 35 @Override
... ...
... ... @@ -18,8 +18,7 @@ package org.thingsboard.server.actors.shared.rule;
18 18 import akka.actor.ActorContext;
19 19 import akka.actor.ActorRef;
20 20 import akka.actor.Props;
21   -import org.slf4j.Logger;
22   -import org.slf4j.LoggerFactory;
  21 +import lombok.extern.slf4j.Slf4j;
23 22 import org.thingsboard.server.actors.ActorSystemContext;
24 23 import org.thingsboard.server.actors.rule.RuleActor;
25 24 import org.thingsboard.server.actors.rule.RuleActorChain;
... ... @@ -38,10 +37,9 @@ import org.thingsboard.server.dao.rule.RuleService;
38 37
39 38 import java.util.*;
40 39
  40 +@Slf4j
41 41 public abstract class RuleManager {
42 42
43   - protected static final Logger logger = LoggerFactory.getLogger(RuleManager.class);
44   -
45 43 protected final ActorSystemContext systemContext;
46 44 protected final RuleService ruleService;
47 45 protected final Map<RuleId, ActorRef> ruleActors;
... ... @@ -63,11 +61,11 @@ public abstract class RuleManager {
63 61 ruleMap = new HashMap<>();
64 62
65 63 for (RuleMetaData rule : ruleIterator) {
66   - logger.debug("[{}] Creating rule actor {}", rule.getId(), rule);
  64 + log.debug("[{}] Creating rule actor {}", rule.getId(), rule);
67 65 ActorRef ref = getOrCreateRuleActor(context, rule.getId());
68 66 RuleActorMetaData actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref);
69 67 ruleMap.put(rule, actorMd);
70   - logger.debug("[{}] Rule actor created.", rule.getId());
  68 + log.debug("[{}] Rule actor created.", rule.getId());
71 69 }
72 70
73 71 refreshRuleChain();
... ... @@ -79,8 +77,11 @@ public abstract class RuleManager {
79 77 rule = systemContext.getRuleService().findRuleById(ruleId);
80 78 }
81 79 if (rule == null) {
82   - rule = ruleMap.keySet().stream().filter(r -> r.getId().equals(ruleId)).findFirst().orElse(null);
83   - rule.setState(ComponentLifecycleState.SUSPENDED);
  80 + rule = ruleMap.keySet().stream()
  81 + .filter(r -> r.getId().equals(ruleId))
  82 + .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED))
  83 + .findFirst()
  84 + .orElse(null);
84 85 }
85 86 if (rule != null) {
86 87 RuleActorMetaData actorMd = ruleMap.get(rule);
... ... @@ -92,7 +93,7 @@ public abstract class RuleManager {
92 93 refreshRuleChain();
93 94 return Optional.of(actorMd.getActorRef());
94 95 } else {
95   - logger.warn("[{}] Can't process unknown rule!", rule.getId());
  96 + log.warn("[{}] Can't process unknown rule!", ruleId);
96 97 return Optional.empty();
97 98 }
98 99 }
... ... @@ -100,13 +101,9 @@ public abstract class RuleManager {
100 101 abstract FetchFunction<RuleMetaData> getFetchRulesFunction();
101 102
102 103 public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) {
103   - ActorRef ruleActor = ruleActors.get(ruleId);
104   - if (ruleActor == null) {
105   - ruleActor = context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, ruleId))
106   - .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), ruleId.toString());
107   - ruleActors.put(ruleId, ruleActor);
108   - }
109   - return ruleActor;
  104 + return ruleActors.computeIfAbsent(ruleId, rId ->
  105 + context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId))
  106 + .withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), rId.toString()));
110 107 }
111 108
112 109 public RuleActorChain getRuleChain() {
... ...
... ... @@ -29,7 +29,7 @@ public class SystemRuleManager extends RuleManager {
29 29
30 30 @Override
31 31 FetchFunction<RuleMetaData> getFetchRulesFunction() {
32   - return link -> ruleService.findSystemRules(link);
  32 + return ruleService::findSystemRules;
33 33 }
34 34
35 35 }
... ...
... ... @@ -100,7 +100,7 @@ public class TenantActor extends ContextAwareActor {
100 100
101 101 private void broadcast(Object msg) {
102 102 pluginManager.broadcast(msg);
103   - deviceActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
  103 + deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
104 104 }
105 105
106 106 private void onToDeviceActorMsg(ToDeviceActorMsg msg) {
... ...
... ... @@ -166,7 +166,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
166 166 @Override
167 167 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
168 168 publishCurrentServer();
169   - getOtherServers().stream().forEach(
  169 + getOtherServers().forEach(
170 170 server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
171 171 );
172 172 }
... ... @@ -194,13 +194,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
194 194 log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
195 195 switch (pathChildrenCacheEvent.getType()) {
196 196 case CHILD_ADDED:
197   - listeners.stream().forEach(listener -> listener.onServerAdded(instance));
  197 + listeners.forEach(listener -> listener.onServerAdded(instance));
198 198 break;
199 199 case CHILD_UPDATED:
200   - listeners.stream().forEach(listener -> listener.onServerUpdated(instance));
  200 + listeners.forEach(listener -> listener.onServerUpdated(instance));
201 201 break;
202 202 case CHILD_REMOVED:
203   - listeners.stream().forEach(listener -> listener.onServerRemoved(instance));
  203 + listeners.forEach(listener -> listener.onServerRemoved(instance));
204 204 break;
205 205 }
206 206 }
... ...
... ... @@ -135,7 +135,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
135 135
136 136 private void logCircle() {
137 137 log.trace("Consistent Hash Circle Start");
138   - circle.entrySet().stream().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
  138 + circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
139 139 log.trace("Consistent Hash Circle End");
140 140 }
141 141
... ...
... ... @@ -31,7 +31,6 @@ import org.thingsboard.server.dao.component.ComponentDescriptorService;
31 31 import org.thingsboard.server.extensions.api.component.*;
32 32
33 33 import javax.annotation.PostConstruct;
34   -import java.io.IOException;
35 34 import java.lang.annotation.Annotation;
36 35 import java.util.*;
37 36 import java.util.stream.Collectors;
... ... @@ -72,7 +71,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
72 71 }
73 72
74 73 private void registerComponents(Collection<ComponentDescriptor> comps) {
75   - comps.stream().forEach(c -> components.put(c.getClazz(), c));
  74 + comps.forEach(c -> components.put(c.getClazz(), c));
76 75 }
77 76
78 77 private List<ComponentDescriptor> persist(Set<BeanDefinition> filterDefs, ComponentType type) {
... ... @@ -119,7 +118,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
119 118 throw new RuntimeException("Plugin " + def.getBeanClassName() + "action " + actionClazz.getName() + " has wrong component type!");
120 119 }
121 120 }
122   - scannedComponent.setActions(Arrays.asList(pluginAnnotation.actions()).stream().map(action -> action.getName()).collect(Collectors.joining(",")));
  121 + scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(action -> action.getName()).collect(Collectors.joining(",")));
123 122 break;
124 123 default:
125 124 throw new RuntimeException(type + " is not supported yet!");
... ...
... ... @@ -20,9 +20,9 @@ import org.springframework.security.core.authority.SimpleGrantedAuthority;
20 20 import org.thingsboard.server.common.data.User;
21 21 import org.thingsboard.server.common.data.id.UserId;
22 22
23   -import java.util.Arrays;
24 23 import java.util.Collection;
25 24 import java.util.stream.Collectors;
  25 +import java.util.stream.Stream;
26 26
27 27 public class SecurityUser extends User {
28 28
... ... @@ -46,7 +46,7 @@ public class SecurityUser extends User {
46 46
47 47 public Collection<? extends GrantedAuthority> getAuthorities() {
48 48 if (authorities == null) {
49   - authorities = Arrays.asList(SecurityUser.this.getAuthority()).stream()
  49 + authorities = Stream.of(SecurityUser.this.getAuthority())
50 50 .map(authority -> new SimpleGrantedAuthority(authority.name()))
51 51 .collect(Collectors.toList());
52 52 }
... ...
... ... @@ -129,8 +129,10 @@ public abstract class AbstractControllerTest {
129 129 @Autowired
130 130 void setConverters(HttpMessageConverter<?>[] converters) {
131 131
132   - this.mappingJackson2HttpMessageConverter = Arrays.asList(converters).stream().filter(
133   - hmc -> hmc instanceof MappingJackson2HttpMessageConverter).findAny().get();
  132 + this.mappingJackson2HttpMessageConverter = Arrays.stream(converters)
  133 + .filter(hmc -> hmc instanceof MappingJackson2HttpMessageConverter)
  134 + .findAny()
  135 + .get();
134 136
135 137 Assert.assertNotNull("the JSON message converter must not be null",
136 138 this.mappingJackson2HttpMessageConverter);
... ...
... ... @@ -61,8 +61,10 @@ public class AbstractFeatureIntegrationTest {
61 61 @Autowired
62 62 void setConverters(HttpMessageConverter<?>[] converters) {
63 63
64   - this.mappingJackson2HttpMessageConverter = Arrays.asList(converters).stream().filter(
65   - hmc -> hmc instanceof MappingJackson2HttpMessageConverter).findAny().get();
  64 + this.mappingJackson2HttpMessageConverter = Arrays.stream(converters)
  65 + .filter(hmc -> hmc instanceof MappingJackson2HttpMessageConverter)
  66 + .findAny()
  67 + .get();
66 68
67 69 assertNotNull("the JSON message converter must not be null",
68 70 this.mappingJackson2HttpMessageConverter);
... ...
... ... @@ -140,7 +140,7 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
140 140 List<Row> rows = resultSet.all();
141 141 List<AttributeKvEntry> entries = new ArrayList<>(rows.size());
142 142 if (!rows.isEmpty()) {
143   - rows.stream().forEach(row -> {
  143 + rows.forEach(row -> {
144 144 String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN);
145 145 AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row);
146 146 if (kvEntry != null) {
... ...
... ... @@ -143,7 +143,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
143 143 public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
144 144 List<TsKvEntry> entries = new ArrayList<>(rows.size());
145 145 if (!rows.isEmpty()) {
146   - rows.stream().forEach(row -> {
  146 + rows.forEach(row -> {
147 147 TsKvEntry kvEntry = convertResultToTsKvEntry(row);
148 148 if (kvEntry != null) {
149 149 entries.add(kvEntry);
... ...
... ... @@ -91,7 +91,7 @@ public class DefaultWebsocketMsgHandler implements WebsocketMsgHandler {
91 91 }
92 92
93 93 public void clear(PluginContext ctx) {
94   - wsSessionsMap.values().stream().forEach(v -> {
  94 + wsSessionsMap.values().forEach(v -> {
95 95 try {
96 96 ctx.close(v.getSessionRef());
97 97 } catch (IOException e) {
... ...
... ... @@ -40,7 +40,9 @@ public class MethodNameFilter extends SimpleRuleLifecycleComponent implements Ru
40 40
41 41 @Override
42 42 public void init(MethodNameFilterConfiguration configuration) {
43   - methods = Arrays.asList(configuration.getMethodNames()).stream().map(m -> m.getName()).collect(Collectors.toSet());
  43 + methods = Arrays.stream(configuration.getMethodNames())
  44 + .map(m -> m.getName())
  45 + .collect(Collectors.toSet());
44 46 }
45 47
46 48 @Override
... ...
... ... @@ -39,7 +39,7 @@ public class MsgTypeFilter extends SimpleRuleLifecycleComponent implements RuleF
39 39
40 40 @Override
41 41 public void init(MsgTypeFilterConfiguration configuration) {
42   - msgTypes = Arrays.asList(configuration.getMessageTypes()).stream().map(type -> {
  42 + msgTypes = Arrays.stream(configuration.getMessageTypes()).map(type -> {
43 43 switch (type) {
44 44 case "GET_ATTRIBUTES":
45 45 return MsgType.GET_ATTRIBUTES_REQUEST;
... ...
... ... @@ -75,7 +75,7 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
75 75 if (configuration.getOtherProperties() != null) {
76 76 Properties mailProperties = new Properties();
77 77 configuration.getOtherProperties()
78   - .stream().forEach(p -> mailProperties.put(p.getKey(), p.getValue()));
  78 + .forEach(p -> mailProperties.put(p.getKey(), p.getValue()));
79 79 mail.setJavaMailProperties(mailProperties);
80 80 }
81 81 mailSender = mail;
... ...
... ... @@ -68,7 +68,7 @@ public class SubscriptionManager {
68 68 registerSubscription(sessionId, deviceId, subscription);
69 69 List<TsKvEntry> missedUpdates = new ArrayList<>();
70 70 if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
71   - subscription.getKeyStates().entrySet().stream().forEach(e -> {
  71 + subscription.getKeyStates().entrySet().forEach(e -> {
72 72 Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
73 73 if (latestOpt.isPresent()) {
74 74 AttributeKvEntry latestEntry = latestOpt.get();
... ...
... ... @@ -97,7 +97,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
97 97 builder.setDeviceId(cmd.getDeviceId().toString());
98 98 builder.setType(cmd.getType().name());
99 99 builder.setAllKeys(cmd.isAllKeys());
100   - cmd.getKeyStates().entrySet().stream().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
  100 + cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
101 101 ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
102 102 }
103 103
... ... @@ -144,7 +144,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
144 144 if (update.getErrorMsg() != null) {
145 145 builder.setErrorMsg(update.getErrorMsg());
146 146 }
147   - update.getData().entrySet().stream().forEach(
  147 + update.getData().entrySet().forEach(
148 148 e -> {
149 149 SubscriptionUpdateValueListProto.Builder dataBuilder = SubscriptionUpdateValueListProto.newBuilder();
150 150
... ... @@ -166,7 +166,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
166 166 return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
167 167 } else {
168 168 Map<String, List<Object>> data = new TreeMap<>();
169   - proto.getDataList().stream().forEach(v -> {
  169 + proto.getDataList().forEach(v -> {
170 170 List<Object> values = data.get(v.getKey());
171 171 if (values == null) {
172 172 values = new ArrayList<>();
... ...
... ... @@ -109,8 +109,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
109 109 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
110 110
111 111 Map<String, Long> subState = new HashMap<>(keys.size());
112   - keys.stream().forEach(key -> subState.put(key, 0L));
113   - attributesData.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
  112 + keys.forEach(key -> subState.put(key, 0L));
  113 + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
114 114
115 115 sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
116 116 } else {
... ... @@ -119,7 +119,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
119 119 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
120 120
121 121 Map<String, Long> subState = new HashMap<>(attributesData.size());
122   - attributesData.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
  122 + attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
123 123
124 124 sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
125 125 }
... ... @@ -154,8 +154,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
154 154 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
155 155
156 156 Map<String, Long> subState = new HashMap<>(keys.size());
157   - keys.stream().forEach(key -> subState.put(key, startTs));
158   - data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
  157 + keys.forEach(key -> subState.put(key, startTs));
  158 + data.forEach(v -> subState.put(v.getKey(), v.getTs()));
159 159 SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
160 160 subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
161 161 } else {
... ... @@ -168,8 +168,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
168 168 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
169 169
170 170 Map<String, Long> subState = new HashMap<>(keys.size());
171   - keys.stream().forEach(key -> subState.put(key, startTs));
172   - data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
  171 + keys.forEach(key -> subState.put(key, startTs));
  172 + data.forEach(v -> subState.put(v.getKey(), v.getTs()));
173 173 SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
174 174 subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
175 175 }
... ... @@ -188,7 +188,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
188 188 public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
189 189 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
190 190 Map<String, Long> subState = new HashMap<>(data.size());
191   - data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
  191 + data.forEach(v -> subState.put(v.getKey(), v.getTs()));
192 192 SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, true, subState);
193 193 subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
194 194 }
... ...
... ... @@ -47,7 +47,7 @@ public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> {
47 47 properties.put("buffer.memory", configuration.getBufferMemory());
48 48 if (configuration.getOtherProperties() != null) {
49 49 configuration.getOtherProperties()
50   - .stream().forEach(p -> properties.put(p.getKey(), p.getValue()));
  50 + .forEach(p -> properties.put(p.getKey(), p.getValue()));
51 51 }
52 52 init();
53 53 }
... ...