Commit 0e2dd427 authored by LAVENIER's avatar LAVENIER
Browse files

[enh] Add extraction job scheduling

[enh] Add ProcessingFrequencyEnum.HOURLY
[fix] Fix ExtractionProduct fetch error, on Oracle database
parent 3089771d
......@@ -30,6 +30,7 @@ import net.sumaris.core.extraction.exception.UnknownFormatException;
import net.sumaris.core.extraction.format.ProductFormatEnum;
import net.sumaris.core.extraction.service.AggregationService;
import net.sumaris.core.extraction.service.ExtractionProductService;
import net.sumaris.core.extraction.service.ExtractionServiceLocator;
import net.sumaris.core.extraction.vo.AggregationTypeVO;
import net.sumaris.core.model.referential.StatusEnum;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
......@@ -45,15 +46,15 @@ import java.util.stream.Collectors;
*
*/
@Slf4j
public class AgggregationAction {
public class AggregationAction {
/**
* <p>run.</p>
*/
public void run() {
ExtractionConfiguration config = ExtractionConfiguration.instance();
AggregationService aggregationService = ServiceLocator.instance().getService("aggregationService", AggregationService.class);
ExtractionProductService productService = ServiceLocator.instance().getService("extractionProductService", ExtractionProductService.class);
ExtractionProductService productService = ExtractionServiceLocator.extractionProductService();
AggregationService aggregationService = ExtractionServiceLocator.aggregationService();
String formatLabel = config.getExtractionCliOutputFormat();
AggregationTypeVO type = null;
......
......@@ -28,7 +28,10 @@ import lombok.extern.slf4j.Slf4j;
import net.sumaris.cli.action.ActionUtils;
import net.sumaris.core.extraction.config.ExtractionConfiguration;
import net.sumaris.core.extraction.exception.UnknownFormatException;
import net.sumaris.core.extraction.service.AggregationService;
import net.sumaris.core.extraction.service.ExtractionProductService;
import net.sumaris.core.extraction.service.ExtractionService;
import net.sumaris.core.extraction.service.ExtractionServiceLocator;
import net.sumaris.core.extraction.util.ExtractionFormats;
import net.sumaris.core.extraction.vo.ExtractionTypeVO;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
......@@ -52,7 +55,7 @@ public class ExtractionAction {
*/
public void run() {
ExtractionConfiguration config = ExtractionConfiguration.instance();
ExtractionService service = ServiceLocator.instance().getService("extractionService", ExtractionService.class);
ExtractionService service = ExtractionServiceLocator.extractionService();
String formatLabel = config.getExtractionCliOutputFormat();
IExtractionFormat format = null;
......
package net.sumaris.core.extraction.action;
/*
* #%L
* SIH-Adagio :: Shared
* $Id:$
* $HeadURL:$
* %%
* Copyright (C) 2012 - 2014 Ifremer
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.extraction.config.ExtractionConfiguration;
import net.sumaris.core.model.technical.history.ProcessingFrequencyEnum;
import net.sumaris.server.scheduler.ExtractionJob;
/**
* <p>DatabaseChangeLogAction class.</p>
*
*/
@Slf4j
public class ExtractionProductUpdateAction {
/**
* <p>Update a product (execute extraction or aggregation).</p>
*/
public void run() throws InterruptedException {
// Create the job
ExtractionJob job = new ExtractionJob();
// Run it !
ProcessingFrequencyEnum frequency = ExtractionConfiguration.instance().getExtractionCliFrequency();
job.execute(frequency);
// Waiting 10s, to let DB drop tables (asynchronously)
Thread.sleep(10000);
}
}
......@@ -24,9 +24,9 @@ package net.sumaris.core.extraction.config;
* #L%
*/
import net.sumaris.core.extraction.action.AgggregationAction;
import net.sumaris.core.extraction.action.AggregationAction;
import net.sumaris.core.extraction.action.ExtractionAction;
import net.sumaris.core.extraction.action.ExtractionProductAction;
import net.sumaris.core.extraction.action.ExtractionProductUpdateAction;
import org.nuiton.config.ConfigActionDef;
/**
......@@ -37,8 +37,8 @@ import org.nuiton.config.ConfigActionDef;
public enum ExtractionConfigurationAction implements ConfigActionDef {
EXTRACTION(ExtractionAction.class.getName() + "#run", "Execute an extraction", "--extraction"),
AGGREGATION(AgggregationAction.class.getName() + "#run", "Execute an aggregation", "--aggregation"),
EXTRACTION_PRODUCT_UPDATE(ExtractionProductAction.class.getName() + "#update", "Update extraction products", "--extraction-product-update");
AGGREGATION(AggregationAction.class.getName() + "#run", "Execute an aggregation", "--aggregation"),
EXTRACTION_PRODUCT_UPDATE(ExtractionProductUpdateAction.class.getName() + "#run", "Update extraction products", "--extraction-product-update");
private final String action;
private final String description;
......
/*
* #%L
* SUMARiS
* %%
* Copyright (C) 2019 SUMARiS Consortium
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
package net.sumaris.core.extraction.service;
import net.sumaris.core.service.ServiceLocator;
public class ExtractionServiceLocator {
public static ExtractionProductService extractionProductService() {
return ServiceLocator.instance().getService("extractionProductService", ExtractionProductService.class);
}
public static AggregationService aggregationService() {
return ServiceLocator.instance().getService("aggregationService", AggregationService.class);
}
public static ExtractionService extractionService() {
return ServiceLocator.instance().getService("extractionService", ExtractionService.class);
}
}
......@@ -22,6 +22,7 @@
package net.sumaris.server.config;
import net.sumaris.core.model.technical.history.ProcessingFrequencyEnum;
import net.sumaris.server.http.ExtractionRestController;
import net.sumaris.server.http.ExtractionRestPaths;
import org.slf4j.Logger;
......@@ -31,6 +32,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.web.servlet.DispatcherServlet;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer;
......@@ -38,6 +42,8 @@ import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.servlet.Servlet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
@ConditionalOnClass({Servlet.class, DispatcherServlet.class})
......@@ -47,6 +53,7 @@ import javax.servlet.Servlet;
name = {"enabled"},
matchIfMissing = true
)
@EnableScheduling
public class ExtractionWebAutoConfiguration {
/**
* Logger.
......@@ -94,4 +101,24 @@ public class ExtractionWebAutoConfiguration {
}
};
}
@Bean
@ConditionalOnProperty(
prefix = "sumaris.extraction.scheduling",
name = {"enabled"},
matchIfMissing = true
)
public SchedulingConfigurer schedulingConfigurer() {
return new SchedulingConfigurer() {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(extractionTaskExecutor());
}
};
}
@Bean
public Executor extractionTaskExecutor() {
return Executors.newScheduledThreadPool((ProcessingFrequencyEnum.values().length - 1) * 2);
}
}
package net.sumaris.core.extraction.action;
/*
* #%L
* SIH-Adagio :: Shared
* $Id:$
* $HeadURL:$
* SUMARiS
* %%
* Copyright (C) 2012 - 2014 Ifremer
* Copyright (C) 2019 SUMARiS Consortium
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
package net.sumaris.server.scheduler;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.event.config.ConfigurationReadyEvent;
import net.sumaris.core.extraction.config.ExtractionConfiguration;
import net.sumaris.core.extraction.service.AggregationService;
import net.sumaris.core.extraction.service.ExtractionProductService;
import net.sumaris.core.extraction.service.ExtractionService;
import net.sumaris.core.extraction.service.ExtractionServiceLocator;
import net.sumaris.core.model.referential.StatusEnum;
import net.sumaris.core.model.technical.extraction.ExtractionProduct;
import net.sumaris.core.model.technical.history.ProcessingFrequency;
import net.sumaris.core.model.technical.history.ProcessingFrequencyEnum;
import net.sumaris.core.service.ServiceLocator;
import net.sumaris.core.vo.technical.extraction.ExtractionProductFetchOptions;
import net.sumaris.core.vo.technical.extraction.ExtractionProductFilterVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductVO;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>DatabaseChangeLogAction class.</p>
*
*/
@Component
@Slf4j
public class ExtractionProductAction {
public class ExtractionJob {
/**
* <p>Update a product (execute extraction or aggregation).</p>
*/
public void update() throws InterruptedException {
// Get beans
ExtractionConfiguration config = ExtractionConfiguration.instance();
ExtractionProductService productService = ServiceLocator.instance().getService("extractionProductService", ExtractionProductService.class);
ExtractionService extractionService = ServiceLocator.instance().getService("extractionService", ExtractionService.class);
AggregationService aggregationService = ServiceLocator.instance().getService("aggregationService", AggregationService.class);
private ExtractionConfiguration config;
private ExtractionProductService productService;
private AggregationService aggregationService;
private boolean ready = false;
// Get execution frequency
ProcessingFrequencyEnum frequency = config.getExtractionCliFrequency();
@Autowired
public ExtractionJob(ExtractionConfiguration config,
ExtractionProductService productService,
AggregationService aggregationService) {
super();
this.config = config;
this.productService = productService;
this.aggregationService = aggregationService;
}
public ExtractionJob() {
super();
this.config = ExtractionConfiguration.instance();
this.productService = ExtractionServiceLocator.extractionProductService();
this.aggregationService = ExtractionServiceLocator.aggregationService();
this.ready = true;
}
public void execute(@NonNull ProcessingFrequencyEnum frequency) {
if (!ready) return; // SKip
if (frequency == ProcessingFrequencyEnum.NEVER) {
log.error("Products with frequency '{}' cannot be updated!", frequency);
throw new IllegalArgumentException(String.format("Cannot update products with frequency '%s'", frequency));
}
long now = System.currentTimeMillis();
log.info("Updating products... {frequency: '{}'}", frequency);
log.info("Updating {} extractions...", frequency.name().toLowerCase());
// Get products to refresh
List<ExtractionProductVO> products = productService.findByFilter(ExtractionProductFilterVO.builder()
......@@ -76,23 +91,76 @@ public class ExtractionProductAction {
.searchJoin(ExtractionProduct.Fields.PROCESSING_FREQUENCY)
.searchAttribute(ProcessingFrequency.Fields.LABEL)
.searchText(frequency.getLabel())
.build(),
ExtractionProductFetchOptions.builder().build());
.build(),
ExtractionProductFetchOptions.MINIMAL);
if (CollectionUtils.isEmpty(products)) {
log.info("No product found.");
log.info("No {} extraction found.", frequency.name().toLowerCase());
return;
}
int successCount = 0;
int errorCount = 0;
for (ExtractionProductVO product: products) {
log.info("Updating product {{}}...", product.getLabel());
try {
log.debug("Updating extraction {id: {}, label: '{}'}...", product.getId(), product.getLabel());
aggregationService.updateProduct(product.getId());
successCount++;
}
catch(Throwable e) {
log.error("Error while updating extraction {id: {}, label: '{}'}: {}", product.getId(), product.getLabel(), e.getMessage(), e);
errorCount++;
aggregationService.updateProduct(product.getId());
Thread.sleep(10000); // Waiting 10s, to let DB drop tables (asynchronously)
// TODO: add to processing history
}
}
log.info("Updating products... {frequency: '{}'}", frequency);
log.info("Updating {} extractions [OK] in {}ms (success: {}, errors: {})",
frequency.name().toLowerCase(),
System.currentTimeMillis() - now,
successCount, errorCount);
}
/* -- protected functions -- */
@EventListener({ConfigurationReadyEvent.class})
protected void onConfigurationReady(ConfigurationReadyEvent event) {
if (!this.ready) {
// Load started
log.info("Started extraction jobs, with frequency {{}}",
Arrays.stream(ProcessingFrequencyEnum.values())
.filter(e -> e != ProcessingFrequencyEnum.MANUALLY && e != ProcessingFrequencyEnum.NEVER)
.map(Enum::name)
.collect(Collectors.joining(",")));
this.ready = true;
}
}
@Scheduled(cron = "${sumaris.extraction.scheduling.hourly.cron:0 0 * * * ?}")
@Async
protected void executeHourly(){
execute(ProcessingFrequencyEnum.HOURLY);
}
@Scheduled(cron = "${sumaris.extraction.scheduling.daily.cron:0 0 0 * * ?}")
@Async
protected void executeDaily(){
execute(ProcessingFrequencyEnum.DAILY);
}
@Scheduled(cron = "${sumaris.extraction.scheduling.daily.cron:0 0 0 2 * MON}")
@Async
protected void executeWeekly(){
execute(ProcessingFrequencyEnum.WEEKLY);
}
@Scheduled(cron = "${sumaris.extraction.scheduling.hourly.cron:0 0 0 1 * ?}")
@Async
protected void executeMonthly(){
execute(ProcessingFrequencyEnum.MONTHLY);
}
}
}
\ No newline at end of file
......@@ -138,6 +138,18 @@
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- Database drivers -->
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<scope>provided</scope>
</dependency>
<!-- Unit test -->
<dependency>
<groupId>net.sumaris</groupId>
......
# Oracle configuration
spring.datasource.driver-class-name=oracle.jdbc.driver.OracleDriver
spring.datasource.hikari.connectionTestQuery=SELECT 1 FROM STATUS WHERE ID=1
spring.datasource.hikari.initializationFailTimeout=-1
spring.jpa.database-platform=org.hibernate.spatial.dialect.oracle.OracleSpatial10gDialect
spring.jpa.properties.hibernate.dialect=${spring.jpa.database-platform}
# Database connection
spring.datasource.platform=oracle
spring.datasource.username=<USERNAME>
spring.datasource.password=
spring.jpa.properties.hibernate.default_catalog=
spring.jpa.properties.hibernate.default_schema=${spring.datasource.username}
spring.datasource.url=jdbc:oracle:thin:@localhost:1521:ORCL
spring.jpa.hibernate.use-new-id-generator-mappings=false
sumaris.cache.directory=${user.home}/.sumaris/data/cache
# Configure search mode (e.g. 'FRENCH_AI', 'SPANISH_AI')
spring.datasource.hikari.connectionInitSql=BEGIN \
DBMS_SESSION.SET_NLS('NLS_SORT', 'FRENCH_AI'); \
DBMS_SESSION.SET_NLS('NLS_COMP', 'LINGUISTIC'); \
END;
# Mandatory for Oracle with sequences incrementing with 1
sumaris.persistence.sequence.increment=1
# Override enumerations (User Profile labels, etc.)
sumaris.enumeration.LocationLevel.COUNTRY.id=21
sumaris.enumeration.LocationLevel.HARBOUR.id=6
sumaris.enumeration.LocationLevel.AUCTION.id=7
\ No newline at end of file
......@@ -57,19 +57,15 @@ public interface ReferentialSpecifications<E extends IReferentialWithStatusEntit
default Specification<E> hasId(Integer id) {
if (id == null) return null;
BindableSpecification<E> specification = BindableSpecification.where((root, query, criteriaBuilder) -> {
query.distinct(true); // Set distinct here because inStatusIds is always used (usually ...)
return BindableSpecification.<E>where((root, query, criteriaBuilder) -> {
ParameterExpression<Integer> idParam = criteriaBuilder.parameter(Integer.class, ID_PARAMETER);
return criteriaBuilder.equal(root.get(IEntity.Fields.ID), idParam);
});
specification.addBind(ID_PARAMETER, id);
return specification;
}).addBind(ID_PARAMETER, id);
}
default Specification<E> inStatusIds(IReferentialFilter filter) {
Integer[] statusIds = filter.getStatusIds();
return BindableSpecification.<E>where((root, query, criteriaBuilder) -> {
query.distinct(true); // Set distinct here because inStatusIds is always used (usually ...)
ParameterExpression<Collection> statusParam = criteriaBuilder.parameter(Collection.class, STATUS_PARAMETER);
ParameterExpression<Boolean> statusSetParam = criteriaBuilder.parameter(Boolean.class, STATUS_SET_PARAMETER);
return criteriaBuilder.or(
......@@ -113,14 +109,12 @@ public interface ReferentialSpecifications<E extends IReferentialWithStatusEntit
// If empty: skip to avoid an unused join
if (ArrayUtils.isEmpty(levelLabels)) return null;
return ReferentialEntities.getLevelPropertyNameByClass(entityClass).map(levelPropertyName -> {
BindableSpecification<E> specification = BindableSpecification.where((root, query, criteriaBuilder) -> {
ParameterExpression<Collection> levelParam = criteriaBuilder.parameter(Collection.class, LEVEL_LABEL_PARAMETER);
return criteriaBuilder.in(root.join(levelPropertyName, JoinType.INNER).get(IItemReferentialEntity.Fields.LABEL)).value(levelParam);
});
specification.addBind(LEVEL_LABEL_PARAMETER, Arrays.asList(levelLabels));
return specification;
})
return ReferentialEntities.getLevelPropertyNameByClass(entityClass).map(levelPropertyName ->
BindableSpecification.<E>where((root, query, criteriaBuilder) -> {
ParameterExpression<Collection> levelParam = criteriaBuilder.parameter(Collection.class, LEVEL_LABEL_PARAMETER);
return criteriaBuilder.in(root.join(levelPropertyName, JoinType.INNER).get(IItemReferentialEntity.Fields.LABEL)).value(levelParam);
}).addBind(LEVEL_LABEL_PARAMETER, Arrays.asList(levelLabels))
)
.orElse(null);
}
......@@ -150,7 +144,7 @@ public interface ReferentialSpecifications<E extends IReferentialWithStatusEntit
));
return criteriaBuilder.or(
// all predicates
predicates.toArray(new Predicate[0])
predicates.toArray(new Predicate[predicates.size()])
);
}
// Search on label+name only
......@@ -169,14 +163,10 @@ public interface ReferentialSpecifications<E extends IReferentialWithStatusEntit
ParameterExpression<String> searchTextParam = criteriaBuilder.parameter(String.class, SEARCH_TEXT_PARAMETER);
// Avoid duplication, for 'one to many' join
query.distinct(true);
query.distinct(shouldQueryDistinct(joinProperty));
// Get the class join, using properties
String[] joinProperties = joinProperty.split("[./]");
Join<Object, Object> join = root.join(joinProperties[0], JoinType.INNER);
for(int i = 1; i < joinProperties.length; i++) {
join = join.join(joinProperties[i], JoinType.INNER);
}
Join<Object, Object> join = Daos.composeJoin(root, joinProperty, JoinType.INNER);
// Search on given attribute
if (StringUtils.isNotBlank(searchAttribute)) {
......@@ -215,4 +205,7 @@ public interface ReferentialSpecifications<E extends IReferentialWithStatusEntit
.addBind(EXCLUDED_IDS_PARAMETER, Arrays.asList(excludedIds));
}
default boolean shouldQueryDistinct(String joinProperty) {
return true;
}
}
......@@ -24,6 +24,7 @@ package net.sumaris.core.dao.technical.extraction;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import lombok.NonNull;
import net.sumaris.core.dao.administration.user.DepartmentRepository;
import net.sumaris.core.dao.administration.user.PersonRepository;
import net.sumaris.core.config.CacheConfiguration;
......@@ -89,19 +90,36 @@ public class ExtractionProductRepositoryImpl
.and(withRecorderDepartmentId(filter.getRecorderDepartmentId()));