Commit 058b382d authored by LAVENIER's avatar LAVENIER
Browse files

Merge branch 'release/1.8.5'

parents aca0afa8 91e3bfcc
......@@ -3,7 +3,7 @@
<groupId>net.sumaris</groupId>
<artifactId>sumaris-pod</artifactId>
<version>1.8.4</version>
<version>1.8.5</version>
<packaging>pom</packaging>
<name>SUMARiS</name>
<description>SUMARiS :: Maven parent</description>
......
......@@ -5,7 +5,7 @@
<parent>
<groupId>net.sumaris</groupId>
<artifactId>sumaris-pod</artifactId>
<version>1.8.4</version>
<version>1.8.5</version>
</parent>
<artifactId>sumaris-core-extraction</artifactId>
......
......@@ -76,6 +76,10 @@ public class ExtractionConfiguration {
}
}
public String getJdbcURL() {
return delegate.getJdbcURL();
}
public boolean enableExtractionProduct() {
return getApplicationConfig().getOptionAsBoolean(ExtractionConfigurationOption.EXTRACTION_PRODUCT_ENABLE.getKey());
}
......@@ -92,4 +96,16 @@ public class ExtractionConfiguration {
return delegate.getApplicationConfig();
}
/**
* Extraction query timeout, in millisecond
* @return
*/
public int getExtractionQueryTimeout() {
return getApplicationConfig().getOptionAsInt(ExtractionConfigurationOption.EXTRACTION_QUERY_TIMEOUT.getKey());
}
public String getCsvSeparator() {
return delegate.getCsvSeparator();
}
}
......@@ -72,6 +72,13 @@ public enum ExtractionConfigurationOption implements ConfigOptionDef {
n("sumaris.config.option.extraction.product.enable.description"),
Boolean.FALSE.toString(),
Boolean.class,
false),
EXTRACTION_QUERY_TIMEOUT(
"sumaris.extraction.query.timeout",
n("sumaris.config.option.extraction.query.timeout.description"),
String.valueOf(5 * 60 * 1000), // 5min
Integer.class,
false)
;
......
......@@ -26,13 +26,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.config.SumarisConfiguration;
import net.sumaris.core.dao.technical.Daos;
import net.sumaris.core.dao.technical.DatabaseType;
import net.sumaris.core.dao.technical.hibernate.HibernateDaoSupport;
import net.sumaris.core.dao.technical.schema.SumarisDatabaseMetadata;
import net.sumaris.core.dao.technical.schema.SumarisTableMetadata;
import net.sumaris.core.exception.SumarisTechnicalException;
import net.sumaris.core.extraction.config.ExtractionConfiguration;
import net.sumaris.core.extraction.dao.technical.schema.SumarisTableMetadatas;
import net.sumaris.core.extraction.vo.ExtractionContextVO;
import net.sumaris.core.extraction.vo.ExtractionFilterVO;
......@@ -41,9 +41,12 @@ import net.sumaris.core.service.referential.ReferentialService;
import net.sumaris.core.util.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.hibernate.dialect.Dialect;
import org.hibernate.query.internal.NativeQueryImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.dao.DataRetrievalFailureException;
import org.springframework.orm.hibernate5.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.PostConstruct;
import javax.persistence.Query;
......@@ -62,7 +65,7 @@ public abstract class ExtractionBaseDaoImpl extends HibernateDaoSupport {
protected static final String XSL_ORACLE_FILENAME = "xmlQuery/queryOracle.xsl";
@Autowired
protected SumarisConfiguration configuration;
protected ExtractionConfiguration configuration;
@Autowired
protected ReferentialService referentialService;
......@@ -73,15 +76,17 @@ public abstract class ExtractionBaseDaoImpl extends HibernateDaoSupport {
@Autowired
protected SumarisDatabaseMetadata databaseMetadata;
protected DatabaseType databaseType = null;
protected String dropTableQuery;
protected int hibernateQueryTimeout;
@PostConstruct
public void init() {
this.databaseType = Daos.getDatabaseType(configuration.getJdbcURL());
this.dropTableQuery = getDialect().getDropTableString("%s");
this.hibernateQueryTimeout = Math.max(1, Math.round(configuration.getExtractionQueryTimeout() / 1000));
}
protected Dialect getDialect() {
......@@ -103,19 +108,19 @@ public abstract class ExtractionBaseDaoImpl extends HibernateDaoSupport {
@SuppressWarnings("unchecked")
protected <R> List<R> query(String query, Class<R> jdbcClass) {
Query nativeQuery = getEntityManager().createNativeQuery(query);
Query nativeQuery = createNativeQuery(query);
Stream<R> resultStream = (Stream<R>) nativeQuery.getResultStream().map(jdbcClass::cast);
return resultStream.collect(Collectors.toList());
}
protected <R> List<R> query(String query, Function<Object[], R> rowMapper) {
Query nativeQuery = getEntityManager().createNativeQuery(query);
Query nativeQuery = createNativeQuery(query);
Stream<Object[]> resultStream = (Stream<Object[]>) nativeQuery.getResultStream();
return resultStream.map(rowMapper).collect(Collectors.toList());
}
protected <R> List<R> query(String query, Function<Object[], R> rowMapper, int offset, int size) {
Query nativeQuery = getEntityManager().createNativeQuery(query)
Query nativeQuery = createNativeQuery(query)
.setFirstResult(offset)
.setMaxResults(size);
Stream<Object[]> resultStream = (Stream<Object[]>) nativeQuery.getResultStream();
......@@ -125,7 +130,7 @@ public abstract class ExtractionBaseDaoImpl extends HibernateDaoSupport {
protected int queryUpdate(String query) {
if (log.isDebugEnabled()) log.debug("execute update: " + query);
Query nativeQuery = getEntityManager().createNativeQuery(query);
Query nativeQuery = createNativeQuery(query);
return nativeQuery.executeUpdate();
}
......@@ -154,7 +159,7 @@ public abstract class ExtractionBaseDaoImpl extends HibernateDaoSupport {
protected long queryCount(String query) {
if (log.isDebugEnabled()) log.debug("aggregate: " + query);
Query nativeQuery = getEntityManager().createNativeQuery(query);
Query nativeQuery = createNativeQuery(query);
Object result = nativeQuery.getSingleResult();
if (result == null)
throw new DataRetrievalFailureException(String.format("query count result is null.\nquery: %s", query));
......@@ -237,4 +242,19 @@ public abstract class ExtractionBaseDaoImpl extends HibernateDaoSupport {
queryUpdate(sql);
});
}
/**
* Create a native query, with the timeout for extraction (should b longer than the default timeout)
* @param sql
* @return
*/
protected Query createNativeQuery(String sql) {
Query query = getEntityManager().createNativeQuery(sql);
// Set the query timeout (in seconds)
query.unwrap(org.hibernate.query.Query.class)
.setTimeout(this.hibernateQueryTimeout);
return query;
}
}
......@@ -62,7 +62,7 @@ public interface AggregationService {
* @param productId
*/
@Transactional
void updateProduct(int productId);
AggregationTypeVO updateProduct(int productId);
/**
* Do an aggregate
......@@ -113,11 +113,11 @@ public interface AggregationService {
@Nullable String sort,
@Nullable SortDirection direction);
@Transactional
@Transactional(timeout = 10000000)
AggregationTypeVO save(AggregationTypeVO type, @Nullable ExtractionFilterVO filter);
@Async
@Transactional(timeout = -1, propagation = Propagation.REQUIRES_NEW)
@Transactional(timeout = 10000000, propagation = Propagation.REQUIRES_NEW)
CompletableFuture<AggregationTypeVO> asyncSave(AggregationTypeVO type, @Nullable ExtractionFilterVO filter);
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
......
......@@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.exception.DataNotFoundException;
import net.sumaris.core.exception.SumarisTechnicalException;
import net.sumaris.core.extraction.config.ExtractionCacheConfiguration;
import net.sumaris.core.extraction.dao.AggregationDao;
......@@ -324,21 +325,14 @@ public class AggregationServiceImpl implements AggregationService {
}
@Override
public void updateProduct(int productId) {
public AggregationTypeVO updateProduct(int productId) {
ExtractionProductVO target = productService.findById(productId, ExtractionProductFetchOptions.FOR_UPDATE).orElse(null);
Collection<String> existingTablesToDrop = Lists.newArrayList(target.getTableNames());
ExtractionProductVO target = productService.findById(productId, ExtractionProductFetchOptions.FOR_UPDATE)
.orElseThrow(() -> new DataNotFoundException(String.format("Unknown product {id: %s}", productId)));
Collection<String> tablesToDrop = Lists.newArrayList(target.getTableNames());
// Read filter
ExtractionFilterVO filter = null;
if (StringUtils.isNotBlank(target.getFilter())) {
try {
filter = objectMapper.readValue(target.getFilter(), ExtractionFilterVO.class);
}
catch(Exception e) {
throw new SumarisTechnicalException("Unparseable filter string: " + e.getMessage(), e);
}
}
ExtractionFilterVO filter = readFilter(target.getFilter());
String rawFormat = target.getRawFormatLabel();
if (rawFormat.startsWith(AggSpecification.FORMAT_PREFIX)) {
......@@ -360,7 +354,10 @@ public class AggregationServiceImpl implements AggregationService {
productService.save(target);
// Drop old tables
dropTables(existingTablesToDrop);
dropTables(tablesToDrop);
// Transform to type
return toAggregationType(target);
}
@Override
......@@ -374,7 +371,7 @@ public class AggregationServiceImpl implements AggregationService {
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(source.getLabel());
Preconditions.checkNotNull(source.getName());
Collection<String> existingTablesToDrop = Lists.newArrayList();
Collection<String> tablesToDrop = Lists.newArrayList();
// Load the product
ExtractionProductVO target = null;
......@@ -394,7 +391,8 @@ public class AggregationServiceImpl implements AggregationService {
String previousLabel = target.getLabel();
Preconditions.checkArgument(previousLabel.equalsIgnoreCase(source.getLabel()), "Cannot change a product label");
filter = filter != null ? filter : readFilterString(target.getFilter());
// If not given in arguments, parse the product's filter string
filter = filter != null ? filter : readFilter(target.getFilter());
}
......@@ -413,19 +411,22 @@ public class AggregationServiceImpl implements AggregationService {
// Should clean existing table
if (!isNew) {
existingTablesToDrop.addAll(target.getTableNames());
tablesToDrop.addAll(Beans.getList(target.getTableNames()));
}
// Prepare a executable type (with label=format)
AggregationTypeVO executableType = new AggregationTypeVO();
executableType.setLabel(source.getRawFormatLabel());
executableType.setCategory(source.getCategory());
// Execute the aggregation
AggregationContextVO context = aggregate(executableType, filter, null);
{
// Prepare a executable type (with label=format)
AggregationTypeVO executableType = new AggregationTypeVO();
executableType.setLabel(source.getRawFormatLabel());
executableType.setCategory(source.getCategory());
// Execute the aggregation
AggregationContextVO context = aggregate(executableType, filter, null);
// Update product tables, using the aggregation result
toProductVO(context, target);
// Update product, using the aggregation result
toProductVO(context, target);
}
// Copy some properties from the given type
target.setName(source.getName());
......@@ -464,7 +465,7 @@ public class AggregationServiceImpl implements AggregationService {
target = productService.save(target);
// Drop old tables
dropTables(existingTablesToDrop);
dropTables(tablesToDrop);
// Transform back to type
return toAggregationType(target);
......@@ -688,8 +689,9 @@ public class AggregationServiceImpl implements AggregationService {
}
}
protected ExtractionFilterVO readFilterString(String json) {
protected ExtractionFilterVO readFilter(String json) {
if (StringUtils.isBlank(json)) return null;
try {
return objectMapper.readValue(json, ExtractionFilterVO.class);
}
......
......@@ -22,15 +22,17 @@ package net.sumaris.core.extraction.service;
* #L%
*/
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.dao.schema.DatabaseSchemaDao;
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.dao.technical.extraction.ExtractionProductRepository;
import net.sumaris.core.dao.technical.model.IEntity;
import net.sumaris.core.dao.technical.schema.SumarisDatabaseMetadata;
import net.sumaris.core.dao.technical.schema.SumarisTableMetadata;
......@@ -62,6 +64,7 @@ import net.sumaris.core.model.referential.location.Location;
import net.sumaris.core.model.referential.location.LocationLevelEnum;
import net.sumaris.core.model.technical.extraction.ExtractionCategoryEnum;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
import net.sumaris.core.model.technical.history.ProcessingFrequencyEnum;
import net.sumaris.core.service.referential.LocationService;
import net.sumaris.core.service.referential.ReferentialService;
import net.sumaris.core.util.*;
......@@ -103,6 +106,9 @@ public class ExtractionServiceImpl implements ExtractionService {
@Autowired
protected ExtractionConfiguration configuration;
@Autowired
private ObjectMapper objectMapper;
@Autowired
protected DataSource dataSource;
......@@ -113,7 +119,7 @@ public class ExtractionServiceImpl implements ExtractionService {
protected ExtractionStrategyDao extractionStrategyDao;
@Autowired
protected ExtractionProductRepository extractionProductRepository;
protected ExtractionProductService productService;
@Autowired
protected ExtractionTableDao extractionTableDao;
......@@ -178,21 +184,26 @@ public class ExtractionServiceImpl implements ExtractionService {
@Override
public List<ExtractionTypeVO> getLiveExtractionTypes() {
MutableInt id = new MutableInt(-1);
return Arrays.stream(LiveFormatEnum.values())
.map(format -> {
ExtractionTypeVO type = new ExtractionTypeVO();
type.setId(id.getValue());
type.setLabel(format.getLabel().toLowerCase());
type.setCategory(ExtractionCategoryEnum.LIVE);
type.setSheetNames(format.getSheetNames());
type.setStatusId(StatusEnum.TEMPORARY.getId()); // = not public by default
type.setVersion(format.getVersion());
type.setLiveFormat(format);
id.decrement();
return type;
})
.collect(Collectors.toList());
// Sort by label
.sorted(Comparator.comparing(LiveFormatEnum::getLabel))
.map(format -> {
ExtractionTypeVO type = new ExtractionTypeVO();
// Generate a negative an unique id
type.setId(-1 * format.hashCode());
type.setLabel(format.getLabel().toLowerCase());
type.setCategory(ExtractionCategoryEnum.LIVE);
type.setSheetNames(format.getSheetNames());
type.setStatusId(StatusEnum.TEMPORARY.getId()); // = not public by default
type.setVersion(format.getVersion());
type.setLiveFormat(format);
return type;
})
.collect(Collectors.toList());
}
@Override
......@@ -234,7 +245,7 @@ public class ExtractionServiceImpl implements ExtractionService {
switch (type.getCategory()) {
case PRODUCT:
ExtractionProductVO product = extractionProductRepository.getByLabel(type.getLabel(),
ExtractionProductVO product = productService.getByLabel(type.getLabel(),
ExtractionProductFetchOptions.TABLES_AND_COLUMNS);
Set<String> hiddenColumns = Beans.getStream(product.getTables())
.map(ExtractionTableVO::getColumns)
......@@ -300,7 +311,7 @@ public class ExtractionServiceImpl implements ExtractionService {
switch (type.getCategory()) {
case PRODUCT:
ExtractionProductVO product = extractionProductRepository.getByLabel(type.getLabel(),
ExtractionProductVO product = productService.getByLabel(type.getLabel(),
ExtractionProductFetchOptions.builder()
.withRecorderDepartment(false)
.withRecorderPerson(false)
......@@ -367,35 +378,86 @@ public class ExtractionServiceImpl implements ExtractionService {
}
@Override
public ExtractionTypeVO save(ExtractionTypeVO type, ExtractionFilterVO filter) {
Preconditions.checkNotNull(type);
public ExtractionTypeVO save(ExtractionTypeVO source, ExtractionFilterVO filter) {
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(source.getLabel(), "Missing 'type.label'");
Preconditions.checkNotNull(source.getName(), "Missing 'type.name'");
Preconditions.checkArgument(source.getId() == null || source.getId() >= 0); // Negative ID not allowed
Collection<String> tablesToDrop = Lists.newArrayList();
// Load the product
ExtractionProductVO target = extractionProductRepository.findByLabel(
type.getLabel(), ExtractionProductFetchOptions.builder().withTables(false).build()
).orElse(null);
ExtractionProductVO target = null;
if (source.getId() != null) {
target = productService.findById(source.getId(), ExtractionProductFetchOptions.FOR_UPDATE).orElse(null);
}
if (target == null) {
boolean isNew = target == null;
if (isNew) {
target = new ExtractionProductVO();
target.setLabel(type.getLabel());
target.setRecorderDepartment(type.getRecorderDepartment());
target.setLabel(source.getLabel());
target.setRecorderDepartment(source.getRecorderDepartment());
target.setRecorderPerson(source.getRecorderPerson());
}
else {
// Check label was not changed
String previousLabel = target.getLabel();
Preconditions.checkArgument(previousLabel.equalsIgnoreCase(source.getLabel()), "Cannot change a product label");
// Execute the aggregation
ExtractionContextVO context;
{
ExtractionTypeVO cleanType = new ExtractionTypeVO();
cleanType.setLabel(type.getRawFormatLabel());
cleanType.setCategory(type.getCategory());
context = execute(cleanType, filter);
// If not given in arguments, parse the product's filter string
filter = filter != null ? filter : readFilter(target.getFilter());
}
toProductVO(context, target);
// Set the status
target.setStatusId(type.getStatusId());
// Check if need aggregate (ig new or if filter changed)
ProcessingFrequencyEnum frequency = source.getProcessingFrequencyId() != null
? ProcessingFrequencyEnum.valueOf(source.getProcessingFrequencyId())
: ProcessingFrequencyEnum.MANUALLY;
String filterAsString = writeFilterAsString(filter);
boolean needExecution = (isNew || !Objects.equals(target.getFilter(), filterAsString))
&& (frequency == ProcessingFrequencyEnum.MANUALLY);
// Execute the extraction
if (needExecution) {
// Should clean existing table
if (!isNew) {
tablesToDrop.addAll(Beans.getList(target.getTableNames()));
}
// Prepare a executable type (with label=format)
ExtractionTypeVO executableType = new ExtractionTypeVO();
executableType.setLabel(source.getRawFormatLabel());
executableType.setCategory(source.getCategory());
// Run the execution
ExtractionContextVO context = execute(executableType, filter);
// Update product, using the extraction result
toProductVO(context, target);
}
// Copy some properties from the given type
{
target.setName(source.getName());
target.setDescription(source.getDescription());
target.setIsSpatial(false);
// Default status
Integer statusId = source.getStatusId() != null ? source.getStatusId() : StatusEnum.TEMPORARY.getId();
target.setStatusId(statusId);
// Frequency
Integer frequencyId = source.getProcessingFrequencyId() != null ? source.getProcessingFrequencyId()
: ProcessingFrequencyEnum.MANUALLY.getId();
target.setProcessingFrequencyId(frequencyId);
}
// Save the product
target = extractionProductRepository.save(target);
target = productService.save(target);
// Drop old tables
dropTables(tablesToDrop);
// Transform back to type
return toExtractionTypeVO(target);
......@@ -506,7 +568,7 @@ public class ExtractionServiceImpl implements ExtractionService {
Preconditions.checkNotNull(filter);
return ListUtils.emptyIfNull(
extractionProductRepository.findAll(filter, ExtractionProductFetchOptions.builder()
productService.findByFilter(filter, ExtractionProductFetchOptions.builder()
.withRecorderDepartment(true)
.withTables(true)
.build()))
......@@ -753,4 +815,28 @@ public class ExtractionServiceImpl implements ExtractionService {
if (dao == null) throw new SumarisTechnicalException("Unknown extraction format (no targeted dao): " + format);
return dao;
}
protected ExtractionFilterVO readFilter(String json) {
if (StringUtils.isBlank(json)) return null;
try {
return objectMapper.readValue(json, ExtractionFilterVO.class);
}
catch(JsonProcessingException e) {
throw new SumarisTechnicalException(e);
}
}
protected String writeFilterAsString(ExtractionFilterVO filter) {
if (filter == null) return null;
try {
return objectMapper.writeValueAsString(filter);
}
catch(JsonProcessingException e) {
throw new SumarisTechnicalException(e);
}
}
protected void dropTables(Collection<String> tableNames) {
Beans.getStream(tableNames).forEach(extractionTableDao::dropTable);
}
}
......@@ -42,11 +42,7 @@ import java.util.List;
@ToString(callSuper = true)
public class AggregationTypeVO extends ExtractionTypeVO {