Commit 79fe0912 authored by LAVENIER's avatar LAVENIER
Browse files

[fix] Program: do not fetch properties as eager, but as lazy

[fix] Websocket subscription: manage many subscriptions (close all when session closed)
parent 9242e8a5
......@@ -48,24 +48,24 @@ public interface ProgramSpecifications {
String UPDATE_DATE_GREATER_THAN_PARAM = "updateDateGreaterThan";
default Specification<Program> hasProperty(String propertyLabel) {
BindableSpecification<Program> specification = BindableSpecification.where((root, query, criteriaBuilder) -> {
if (propertyLabel == null) return null;
return BindableSpecification.where((root, query, criteriaBuilder) -> {
ParameterExpression<String> param = criteriaBuilder.parameter(String.class, PROPERTY_LABEL_PARAM);
return criteriaBuilder.or(
criteriaBuilder.isNull(param),
criteriaBuilder.equal(root.join(Program.Fields.PROPERTIES, JoinType.LEFT).get(ProgramProperty.Fields.LABEL), param)
);
});
specification.addBind(PROPERTY_LABEL_PARAM, propertyLabel);
return specification;
})
.addBind(PROPERTY_LABEL_PARAM, propertyLabel);
}
default Specification<Program> newerThan(Date updateDate) {
BindableSpecification<Program> specification = BindableSpecification.where((root, query, criteriaBuilder) -> {
if (updateDate == null) return null;
return BindableSpecification.where((root, query, criteriaBuilder) -> {
ParameterExpression<Date> updateDateParam = criteriaBuilder.parameter(Date.class, UPDATE_DATE_GREATER_THAN_PARAM);
return criteriaBuilder.greaterThan(root.get(Program.Fields.UPDATE_DATE), updateDateParam);
});
specification.addBind(UPDATE_DATE_GREATER_THAN_PARAM, updateDate);
return specification;
})
.addBind(UPDATE_DATE_GREATER_THAN_PARAM, updateDate);
}
Optional<ProgramVO> findIfNewerByLabel(String label, Date updateDate, ProgramFetchOptions fetchOptions);
......
......@@ -79,7 +79,7 @@ public class Program implements IItemReferentialEntity {
@Cascade(org.hibernate.annotations.CascadeType.DELETE)
private List<Strategy> strategies = new ArrayList<>();
@OneToMany(fetch = FetchType.EAGER, targetEntity = ProgramProperty.class, mappedBy = ProgramProperty.Fields.PROGRAM)
@OneToMany(fetch = FetchType.LAZY, targetEntity = ProgramProperty.class, mappedBy = ProgramProperty.Fields.PROGRAM)
@Cascade(org.hibernate.annotations.CascadeType.DELETE)
private List<ProgramProperty> properties = new ArrayList<>();
......
......@@ -24,6 +24,8 @@ package net.sumaris.server.http.graphql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
......@@ -56,17 +58,16 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
private final boolean debug;
private List<WebSocketSession> sessions = new CopyOnWriteArrayList();
private Map<String, List<Subscription>> subscriptionsBySessionId = Maps.newConcurrentMap();
private GraphQL graphQL;
@Autowired
......@@ -92,7 +93,9 @@ public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
if (subscriptionRef.get() != null) subscriptionRef.get().cancel();
// Closing session's subscriptions
closeSubscriptions(session.getId());
}
@Override
......@@ -113,7 +116,8 @@ public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
handleInitConnection(session, request);
}
else if ("stop".equals(type)) {
if (subscriptionRef.get() != null) subscriptionRef.get().cancel();
// Closing session's subscriptions
closeSubscriptions(session.getId());
}
else if ("start".equals(type)) {
handleStartConnection(session, request);
......@@ -173,6 +177,7 @@ public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
return;
}
final String sessionId = session.getId();
String query = Objects.toString(payload.get("query"));
ExecutionResult executionResult = graphQL.execute(ExecutionInput.newExecutionInput()
.query(query)
......@@ -196,8 +201,7 @@ public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
stream.subscribe(new Subscriber<ExecutionResult>() {
@Override
public void onSubscribe(Subscription subscription) {
subscriptionRef.set(subscription);
if (subscriptionRef.get() != null) subscriptionRef.get().request(1);
addSubscription(sessionId, subscription);
}
@Override
......@@ -207,8 +211,7 @@ public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
"type", "data",
"payload", GraphQLHelper.processExecutionResult(result))
);
if (subscriptionRef.get() != null) subscriptionRef.get().request(1);
requestSubscription(sessionId, 1);
}
@Override
......@@ -256,4 +259,26 @@ public class SubscriptionWebSocketHandler extends TextWebSocketHandler {
protected String getUnauthorizedErrorString() {
return GraphQLHelper.toJsonErrorString(ErrorCodes.UNAUTHORIZED, "Authentication required");
}
protected void closeSubscriptions(String sessionId) {
// Closing session's subscriptions
List<Subscription> subscriptions = subscriptionsBySessionId.get(sessionId);
if (subscriptions != null) {
subscriptions.forEach(Subscription::cancel);
}
}
protected void addSubscription(String sessionId, Subscription subscription) {
// Closing session's subscriptions
List<Subscription> subscriptions = subscriptionsBySessionId.computeIfAbsent(sessionId, k -> Lists.newCopyOnWriteArrayList());
subscriptions.add(subscription);
}
protected void requestSubscription(String sessionId, int l) {
// Closing session's subscriptions
List<Subscription> subscriptions = subscriptionsBySessionId.get(sessionId);
if (subscriptions != null) {
subscriptions.forEach(s -> s.request(l));
}
}
}
......@@ -76,7 +76,7 @@ public class AuthenticationFilter extends AbstractAuthenticationProcessingFilter
// When not ready, always auth as anonymous
if (!this.ready) {
getAuthenticationManager().authenticate(new UsernamePasswordAuthenticationToken(AnonymousUserDetails.TOKEN, AnonymousUserDetails.TOKEN));
return null;
}
String authorization = request.getHeader(AUTHORIZATION);
......
......@@ -207,13 +207,15 @@ public class ChangesPublisherServiceImpl implements ChangesPublisherService {
// Create stop event, after a too long delay (to be sure old publisher are closed)
Observable stop = Observable.just(Boolean.TRUE).delay(1, TimeUnit.HOURS);
stop.subscribe(o -> log.debug(String.format("Closing publisher on %s #%s: max time reached. (total publishers: %s)", entityClass.getSimpleName(), id, publisherCount.get() - 1)));
stop.subscribe(o -> log.debug("Closing publisher on {} #{}: max time reached. (total publishers: {})", entityClass.getSimpleName(), id, publisherCount.get() - 1));
Observable<V> observable = Observable
.interval(intervalInSecond, TimeUnit.SECONDS)
.takeUntil(stop)
.observeOn(Schedulers.io())
.flatMap(n -> {
log.debug("Refreshing {} #{}", entityClass.getSimpleName(), id);
// Try to find a newer bean
V newerVOOrNull = self.getIfNewer(entityClass, targetClass, id, lastUpdateDate.getTime());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment