Commit 8cd83cff authored by LAVENIER's avatar LAVENIER
Browse files

[fix] Fix aggregation async clean (need new transaction)

parent 29922944
......@@ -22,14 +22,33 @@ package net.sumaris.core.extraction.dao;
* #L%
*/
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.extraction.format.ProductFormatEnum;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.vo.technical.extraction.AggregationStrataVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductVO;
/**
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
*/
public interface AggregationDao {
public interface AggregationDao<
C extends AggregationContextVO,
F extends ExtractionFilterVO,
S extends AggregationStrataVO> {
String TABLE_NAME_PREFIX = "AGG_";
ProductFormatEnum getFormat();
<R extends C> R aggregate(ExtractionProductVO source,
F filter,
S strata);
AggregationResultVO getAggBySpace(String tableName, F filter, S strata, int offset, int size, String sortAttribute, SortDirection sortDirection);
AggregationTechResultVO getAggByTech(String tableName, F filter, S strata, String sortAttribute, SortDirection direction);
MinMaxVO getAggMinMaxByTech(String tableName, F filter, S strata);
void clean(C context);
}
......@@ -22,13 +22,16 @@ package net.sumaris.core.extraction.dao.technical;
* #L%
*/
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.util.Dates;
import org.springframework.jdbc.datasource.DataSourceUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Date;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -37,11 +40,16 @@ import java.util.stream.Stream;
*
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
*/
@Slf4j
public class Daos extends net.sumaris.core.dao.technical.Daos {
private final static String SQL_TO_DATE = "TO_DATE('%s', '%s')";
protected Daos() {
// Helper class
}
/**
* Concat single quoted strings with ',' character, without parenthesis
*
......@@ -111,4 +119,20 @@ public class Daos extends net.sumaris.core.dao.technical.Daos {
Dates.formatDate(date, "yyyy-MM-dd HH:mm:ss"),
"YYYY-MM-DD HH24:MI:SS");
}
public static void commitIfHsqldb(DataSource dataSource) {
Connection conn = DataSourceUtils.getConnection(dataSource);
try {
if (net.sumaris.core.extraction.dao.technical.Daos.isHsqlDatabase(conn) && DataSourceUtils.isConnectionTransactional(conn, dataSource)) {
try {
conn.commit();
} catch (SQLException e) {
log.warn("Cannot execute intermediate commit: " + e.getMessage(), e);
}
}
} finally {
DataSourceUtils.releaseConnection(conn, dataSource);
}
}
}
......@@ -27,12 +27,18 @@ import net.sumaris.core.extraction.dao.AggregationDao;
import net.sumaris.core.extraction.dao.ExtractionDao;
import net.sumaris.core.extraction.vo.AggregationResultVO;
import net.sumaris.core.extraction.vo.AggregationTechResultVO;
import net.sumaris.core.extraction.vo.ExtractionFilterVO;
import net.sumaris.core.extraction.vo.MinMaxVO;
import net.sumaris.core.extraction.vo.trip.AggregationTripContextVO;
import net.sumaris.core.vo.technical.extraction.AggregationStrataVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductVO;
/**
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
*/
public interface AggregationTripDao extends AggregationDao {
public interface AggregationTripDao<
C extends AggregationTripContextVO,
F extends ExtractionFilterVO,
S extends AggregationStrataVO> extends AggregationDao<C, F, S> {
}
......@@ -23,6 +23,7 @@ package net.sumaris.core.extraction.dao.trip.cost;
*/
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.extraction.dao.technical.XMLQuery;
import net.sumaris.core.extraction.dao.trip.rdb.AggregationRdbTripDaoImpl;
import net.sumaris.core.extraction.format.LiveFormatEnum;
......@@ -44,11 +45,13 @@ import org.springframework.stereotype.Repository;
*/
@Repository("aggregationCostDao")
@Lazy
public class AggregationCostDaoImpl<C extends AggregationRdbTripContextVO, F extends ExtractionFilterVO, S extends AggregationStrataVO>
extends AggregationRdbTripDaoImpl<C, F, S>
implements AggSurvivalTestSpecification {
private static final Logger log = LoggerFactory.getLogger(AggregationCostDaoImpl.class);
@Slf4j
public class AggregationCostDaoImpl<
C extends AggregationRdbTripContextVO,
F extends ExtractionFilterVO,
S extends AggregationStrataVO>
extends AggregationRdbTripDaoImpl<C, F, S>
implements AggSurvivalTestSpecification {
@Override
public ProductFormatEnum getFormat() {
......
......@@ -24,12 +24,12 @@ package net.sumaris.core.extraction.dao.trip.rdb;
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.extraction.dao.trip.AggregationTripDao;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.extraction.vo.AggregationResultVO;
import net.sumaris.core.extraction.vo.AggregationTechResultVO;
import net.sumaris.core.extraction.vo.ExtractionFilterVO;
import net.sumaris.core.extraction.vo.MinMaxVO;
import net.sumaris.core.extraction.vo.trip.rdb.AggregationRdbTripContextVO;
import net.sumaris.core.vo.technical.extraction.AggregationStrataVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductVO;
import java.util.Map;
/**
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
......@@ -38,16 +38,6 @@ public interface AggregationRdbTripDao<
C extends AggregationRdbTripContextVO,
F extends ExtractionFilterVO,
S extends AggregationStrataVO>
extends AggregationTripDao {
<R extends C> R aggregate(ExtractionProductVO source, F filter, S strata);
AggregationResultVO getAggBySpace(String tableName, F filter, S strata, int offset, int size, String sortAttribute, SortDirection sortDirection);
AggregationTechResultVO getAggByTech(String tableName, F filter, S strata, String sortAttribute, SortDirection direction);
MinMaxVO getAggMinMaxByTech(String tableName, F filter, S strata);
void clean(C context);
extends AggregationTripDao<C, F, S> {
}
......@@ -259,12 +259,12 @@ public class AggregationRdbTripDaoImpl<
/* -- protected methods -- */
protected <C extends AggregationRdbTripContextVO> C createNewContext() {
protected <R extends AggregationRdbTripContextVO> R createNewContext() {
Class<? extends AggregationRdbTripContextVO> contextClass = getContextClass();
Preconditions.checkNotNull(contextClass);
try {
return (C) contextClass.newInstance();
return (R) contextClass.newInstance();
} catch (Exception e) {
throw new SumarisTechnicalException("Could not create an instance of context class " + contextClass.getName());
}
......
......@@ -163,7 +163,7 @@ public class ExtractionRdbTripDaoImpl<C extends ExtractionRdbTripContextVO, F ex
return context;
}
catch (PersistenceException e) {
// If error,clean created tables first, then rethrow the exception
// If error, clean created tables first, then rethrow the exception
clean(context);
throw e;
}
......
......@@ -23,11 +23,11 @@ package net.sumaris.core.extraction.service;
*/
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.extraction.vo.filter.AggregationTypeFilterVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductFetchOptions;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
import net.sumaris.core.vo.technical.extraction.AggregationStrataVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductFetchOptions;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
......@@ -72,9 +72,9 @@ public interface AggregationService {
* @param strata
*/
@Transactional
AggregationContextVO execute(AggregationTypeVO type,
@Nullable ExtractionFilterVO filter,
@Nullable AggregationStrataVO strata);
AggregationContextVO aggregate(AggregationTypeVO type,
@Nullable ExtractionFilterVO filter,
@Nullable AggregationStrataVO strata);
@Transactional(readOnly = true)
AggregationResultVO getAggBySpace(AggregationTypeVO type,
......@@ -120,10 +120,10 @@ public interface AggregationService {
@Transactional(timeout = -1, propagation = Propagation.REQUIRES_NEW)
CompletableFuture<AggregationTypeVO> asyncSave(AggregationTypeVO type, @Nullable ExtractionFilterVO filter);
@Transactional(isolation = Isolation.READ_UNCOMMITTED, propagation = Propagation.REQUIRES_NEW)
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
void clean(AggregationContextVO context);
@Transactional(propagation = Propagation.SUPPORTS)
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
@Async
CompletableFuture<Boolean> asyncClean(AggregationContextVO context);
}
......@@ -27,14 +27,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.exception.SumarisTechnicalException;
import net.sumaris.core.extraction.config.ExtractionCacheConfiguration;
import net.sumaris.core.extraction.dao.AggregationDao;
import net.sumaris.core.extraction.dao.technical.Daos;
import net.sumaris.core.extraction.dao.technical.table.ExtractionTableColumnOrder;
import net.sumaris.core.extraction.dao.technical.table.ExtractionTableDao;
import net.sumaris.core.extraction.dao.trip.rdb.AggregationRdbTripDao;
import net.sumaris.core.extraction.format.LiveFormatEnum;
import net.sumaris.core.extraction.format.ProductFormatEnum;
import net.sumaris.core.extraction.specification.data.trip.AggSpecification;
......@@ -42,7 +44,6 @@ import net.sumaris.core.extraction.util.ExtractionFormats;
import net.sumaris.core.extraction.util.ExtractionProducts;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.extraction.vo.filter.AggregationTypeFilterVO;
import net.sumaris.core.extraction.vo.trip.rdb.AggregationRdbTripContextVO;
import net.sumaris.core.model.referential.StatusEnum;
import net.sumaris.core.model.technical.extraction.ExtractionCategoryEnum;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
......@@ -54,6 +55,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.nuiton.i18n.I18n;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
......@@ -63,7 +65,8 @@ import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Nullable;
import javax.annotation.Resource;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.io.File;
import java.util.*;
import java.util.concurrent.CompletableFuture;
......@@ -76,21 +79,15 @@ import java.util.stream.Collectors;
@Slf4j
public class AggregationServiceImpl implements AggregationService {
@Autowired
protected DataSource dataSource;
@Autowired
private ExtractionService extractionService;
@Autowired
private ExtractionProductService productService;
@Resource(name = "aggregationRdbTripDao")
private AggregationRdbTripDao aggregationRdbTripDao;
@Resource(name = "aggregationSurvivalTestDao")
private AggregationRdbTripDao aggregationSurvivalTestDao;
@Resource(name = "aggregationCostDao")
private AggregationRdbTripDao aggregationCostDao;
@Autowired
private ExtractionTableDao extractionTableDao;
......@@ -103,6 +100,27 @@ public class AggregationServiceImpl implements AggregationService {
@Autowired
private ApplicationContext applicationContext;
private Map<IExtractionFormat, AggregationDao<?,?,?>> daosByFormat = Maps.newHashMap();
@PostConstruct
protected void registerDaos() {
// Register all extraction daos
applicationContext.getBeansOfType(AggregationDao.class).values()
.forEach(dao -> {
IExtractionFormat format = dao.getFormat();
// Check if unique, by format
if (daosByFormat.containsKey(format)) {
throw new BeanInitializationException(
String.format("Too many ExtractionDao for the same format %s: [%s, %s]",
daosByFormat.get(dao.getFormat()).getClass().getSimpleName(),
dao.getClass().getSimpleName()));
}
// Register the dao
daosByFormat.put(format, dao);
});
}
@Override
public List<AggregationTypeVO> findTypesByFilter(AggregationTypeFilterVO filter, ExtractionProductFetchOptions fetchOptions) {
......@@ -130,7 +148,7 @@ public class AggregationServiceImpl implements AggregationService {
}
@Override
public AggregationContextVO execute(AggregationTypeVO type, ExtractionFilterVO filter, AggregationStrataVO strata) {
public AggregationContextVO aggregate(AggregationTypeVO type, ExtractionFilterVO filter, AggregationStrataVO strata) {
type = getTypeByFormat(type);
ExtractionProductVO source;
......@@ -139,7 +157,7 @@ public class AggregationServiceImpl implements AggregationService {
// Get the product VO
source = productService.getByLabel(type.getLabel(), ExtractionProductFetchOptions.TABLES_AND_STRATUM);
// Execute, from product
return aggregate(source, filter, strata);
return aggregateDao(source, filter, strata);
case LIVE:
// First execute the raw extraction
......@@ -154,7 +172,9 @@ public class AggregationServiceImpl implements AggregationService {
}
// Execute, from product
return aggregate(source, aggregationFilter, strata);
AggregationContextVO result = aggregateDao(source, aggregationFilter, strata);
return result;
} finally {
// Clean intermediate tables
extractionService.asyncClean(rawExtractionContext);
......@@ -183,7 +203,7 @@ public class AggregationServiceImpl implements AggregationService {
@Nullable ExtractionFilterVO filter,
@Nullable AggregationStrataVO strata,
int offset, int size, String sort, SortDirection direction) {
filter = filter != null ? filter : new ExtractionFilterVO();
filter = ExtractionFilterVO.nullToEmpty(filter);
strata = strata != null ? strata : (context.getStrata() != null ? context.getStrata() : new AggregationStrataVO());
String sheetName = strata.getSheetName() != null ? strata.getSheetName() : filter.getSheetName();
......@@ -198,15 +218,7 @@ public class AggregationServiceImpl implements AggregationService {
// Read the data
ProductFormatEnum format = ExtractionFormats.getProductFormat(context);
switch (format) {
case AGG_RDB:
case AGG_COST:
case AGG_SURVIVAL_TEST:
return aggregationRdbTripDao.getAggBySpace(tableName, filter, strata, offset, size, sort, direction);
default:
throw new SumarisTechnicalException(String.format("Unable to read data on type '%s': not implemented", context.getLabel()));
}
return getDao(format).getAggBySpace(tableName, filter, strata, offset, size, sort, direction);
}
@Override
......@@ -216,7 +228,7 @@ public class AggregationServiceImpl implements AggregationService {
String sort,
SortDirection direction) {
Preconditions.checkNotNull(type);
filter = filter != null ? filter : new ExtractionFilterVO();
filter = ExtractionFilterVO.nullToEmpty(filter);
ExtractionProductVO product = productService.getByLabel(type.getLabel(),
ExtractionProductFetchOptions.TABLES);
......@@ -233,7 +245,8 @@ public class AggregationServiceImpl implements AggregationService {
String tableName = StringUtils.isNotBlank(sheetName) ? context.getTableNameBySheetName(sheetName) : null;
return aggregationRdbTripDao.getAggByTech(tableName, filter, strata, sort, direction);
ProductFormatEnum format = ExtractionFormats.getProductFormat(context);
return getDao(format).getAggByTech(tableName, filter, strata, sort, direction);
}
@Override
......@@ -256,14 +269,15 @@ public class AggregationServiceImpl implements AggregationService {
String tableName = StringUtils.isNotBlank(sheetName) ? context.getTableNameBySheetName(sheetName) : null;
return aggregationRdbTripDao.getAggMinMaxByTech(tableName, filter, strata);
ProductFormatEnum format = ExtractionFormats.getProductFormat(context);
return getDao(format).getAggMinMaxByTech(tableName, filter, strata);
}
@Override
public AggregationResultVO executeAndRead(AggregationTypeVO type, ExtractionFilterVO filter, AggregationStrataVO strata,
int offset, int size, String sort, SortDirection direction) {
// Execute the aggregation
AggregationContextVO context = execute(type, filter, strata);
AggregationContextVO context = aggregate(type, filter, strata);
// Prepare the read filter
ExtractionFilterVO readFilter = null;
......@@ -284,8 +298,11 @@ public class AggregationServiceImpl implements AggregationService {
@Override
public File executeAndDump(AggregationTypeVO type, @Nullable ExtractionFilterVO filter, @Nullable AggregationStrataVO strata) {
// Execute the aggregation
AggregationContextVO context = execute(type, filter, strata);
AggregationContextVO context = aggregate(type, filter, strata);
Daos.commitIfHsqldb(dataSource);
try {
// Dump to files
return extractionService.dumpTablesToFile(context, null /*already apply*/);
}
finally {
......@@ -333,7 +350,7 @@ public class AggregationServiceImpl implements AggregationService {
executableType.setCategory(ExtractionCategoryEnum.LIVE);
// Execute the aggregation
AggregationContextVO context = execute(executableType, filter, null);
AggregationContextVO context = aggregate(executableType, filter, null);
// Update product tables, using the aggregation result
toProductVO(context, target);
......@@ -404,7 +421,7 @@ public class AggregationServiceImpl implements AggregationService {
executableType.setCategory(source.getCategory());
// Execute the aggregation
AggregationContextVO context = execute(executableType, filter, null);
AggregationContextVO context = aggregate(executableType, filter, null);
// Update product tables, using the aggregation result
toProductVO(context, target);
......@@ -502,29 +519,16 @@ public class AggregationServiceImpl implements AggregationService {
return result;
}
public AggregationContextVO aggregate(ExtractionProductVO source,
ExtractionFilterVO filter,
AggregationStrataVO strata) {
public AggregationContextVO aggregateDao(ExtractionProductVO source,
ExtractionFilterVO filter,
AggregationStrataVO strata) {
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(source.getLabel());
LiveFormatEnum format = ExtractionFormats.getLiveFormat(source);
switch (format) {
case RDB:
return aggregationRdbTripDao.aggregate(source, filter, strata);
case COST:
return aggregationCostDao.aggregate(source, filter, strata);
ProductFormatEnum format = ProductFormatEnum.valueOf(AggSpecification.FORMAT_PREFIX + source.getLabel(), null);
case SURVIVAL_TEST:
return aggregationSurvivalTestDao.aggregate(source, filter, strata);
case FREE1: // TODO
case FREE2: // TODO
default:
throw new SumarisTechnicalException(String.format("Data aggregation on type '%s' is not implemented!", format.name()));
}
// Aggregate (create agg tables)
return getDao(format).aggregate(source, filter, strata);
}
@Override
......@@ -694,12 +698,12 @@ public class AggregationServiceImpl implements AggregationService {
protected void clean(AggregationContextVO context, boolean async) {
if (context == null) return;
if (async) {
getSelfBean().asyncClean(context);
if (async && taskExecutor != null) {
taskExecutor.execute(() -> self().clean(context));
}
else if (context instanceof AggregationRdbTripContextVO) {
else {
log.info("Cleaning aggregation #{}-{}", context.getLabel(), context.getId());
aggregationRdbTripDao.clean((AggregationRdbTripContextVO) context);
getDao(context.getFormat()).clean(context);
}
}
......@@ -707,8 +711,14 @@ public class AggregationServiceImpl implements AggregationService {
* Get self bean, to be able to use new transation
* @return
*/
protected AggregationService getSelfBean() {
protected AggregationService self() {
return applicationContext.getBean("aggregationService", AggregationService.class);
}
protected AggregationDao getDao(IExtractionFormat format) {
AggregationDao dao = daosByFormat.get(format);
if (dao == null) throw new SumarisTechnicalException("Unknown aggregation format (no targeted dao): " + format);
return dao;
}
}
......@@ -28,7 +28,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.sumaris.core.config.SumarisConfiguration;
import net.sumaris.core.dao.schema.DatabaseSchemaDao;
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.dao.technical.extraction.ExtractionProductRepository;
......@@ -55,11 +54,9 @@ import net.sumaris.core.extraction.specification.data.trip.RdbSpecification;
import net.sumaris.core.extraction.util.ExtractionFormats;
import net.sumaris.core.extraction.util.ExtractionProducts;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.extraction.vo.administration.ExtractionStrategyContextVO;
import net.sumaris.core.extraction.vo.administration.ExtractionStrategyFilterVO;
import net.sumaris.core.extraction.vo.filter.ExtractionTypeFilterVO;
import net.sumaris.core.extraction.vo.trip.ExtractionTripFilterVO;
import net.sumaris.core.extraction.vo.trip.rdb.ExtractionRdbTripContextVO;
import net.sumaris.core.model.referential.StatusEnum;
import net.sumaris.core.model.referential.location.Location;
import net.sumaris.core.model.referential.location.LocationLevelEnum;
......@@ -83,7 +80,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Nonnull;
......@@ -93,8 +89,6 @@ import javax.annotation.Resource;
import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
......@@ -152,19 +146,20 @@ public class ExtractionServiceImpl implements ExtractionService {
@PostConstruct
protected void loadExtractionDaos() {
protected void registerDaos() {
// Register all extraction daos
applicationContext.getBeansOfType(ExtractionDao.class).values()
.forEach(dao -> {
IExtractionFormat format = dao.getFormat();
// Check if unique, by format