Commit 23938a3b authored by LAVENIER's avatar LAVENIER
Browse files

[fix] PersonRepository: fix nullpointerexception when appName has no...

[fix] PersonRepository: fix nullpointerexception when appName has no corresponding Software in the DB
[enh] Extraction: use @Async for asynchronous operation (clean, aggregate)
[enh] Extraction/Aggregation: better log message (display filter criteria)
parent 34368f2f
......@@ -127,6 +127,20 @@ public class AggregationRdbTripDaoImpl<
context.setSpeciesLengthMapTableName(String.format(HL_MAP_TABLE_NAME_PATTERN, context.getId()));
context.setLandingTableName(String.format(CL_TABLE_NAME_PATTERN, context.getId()));
if (log.isInfoEnabled()) {
StringBuilder filterInfo = new StringBuilder();
String filterStr = (filter != null) ? Beans.getStream(filter.getCriteria())
.map(ExtractionFilterCriterionVO::toString)
.collect(Collectors.joining("\n - ")) : null;
if (StringUtils.isNotBlank(filterStr)) {
filterInfo.append("with filter:\n - ").append(filterStr);
} else {
filterInfo.append("(without filter)");
}
log.info(String.format("Starting aggregation #%s-%s... %s", context.getLabel(), context.getId(), filterInfo.toString()));
}
// Expected sheet name
String sheetName = filter != null && filter.isPreview() ? filter.getSheetName() : null;
......
......@@ -119,13 +119,14 @@ public class ExtractionRdbTripDaoImpl<C extends ExtractionRdbTripContextVO, F ex
if (log.isInfoEnabled()) {
StringBuilder filterInfo = new StringBuilder();
if (filter != null) {
filterInfo.append("with filter:").append(tripFilter.toString("\n - "));
String filterStr = filter != null ? tripFilter.toString("\n - ") : null;
if (StringUtils.isNotBlank(filterStr)) {
filterInfo.append("with filter:").append(filterStr);
}
else {
filterInfo.append("(without filter)");
}
log.info(String.format("Starting extraction %s-%s (raw data / trips)... %s", context.getLabel(), context.getId(), filterInfo.toString()));
log.info(String.format("Starting extraction #%s-%s (raw data / trips)... %s", context.getLabel(), context.getId(), filterInfo.toString()));
}
// Fill context table names
......
......@@ -27,6 +27,7 @@ import net.sumaris.core.model.technical.extraction.IExtractionFormat;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.extraction.vo.filter.AggregationTypeFilterVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductFetchOptions;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
......@@ -36,6 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* Create aggregation tables, from a data extraction.
......@@ -105,9 +107,14 @@ public interface AggregationService {
@Transactional
AggregationTypeVO save(AggregationTypeVO type, @Nullable ExtractionFilterVO filter);
@Async
@Transactional(timeout = -1, propagation = Propagation.REQUIRES_NEW)
CompletableFuture<AggregationTypeVO> asyncSave(AggregationTypeVO type, @Nullable ExtractionFilterVO filter);
@Transactional(isolation = Isolation.READ_UNCOMMITTED, propagation = Propagation.REQUIRES_NEW)
void clean(AggregationContextVO context);
@Transactional(propagation = Propagation.SUPPORTS)
void asyncClean(AggregationContextVO context);
@Async
CompletableFuture<Boolean> asyncClean(AggregationContextVO context);
}
......@@ -38,6 +38,7 @@ import net.sumaris.core.extraction.dao.trip.rdb.AggregationRdbTripDao;
import net.sumaris.core.extraction.dao.trip.survivalTest.AggregationSurvivalTestDao;
import net.sumaris.core.extraction.format.ProductFormatEnum;
import net.sumaris.core.extraction.util.ExtractionFormats;
import net.sumaris.core.extraction.vo.trip.rdb.ExtractionRdbTripContextVO;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
import net.sumaris.core.extraction.format.LiveFormatEnum;
import net.sumaris.core.extraction.vo.*;
......@@ -60,6 +61,7 @@ import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.context.ApplicationContext;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;
......@@ -67,6 +69,7 @@ import javax.annotation.Nullable;
import javax.annotation.Resource;
import java.io.File;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
......@@ -99,10 +102,10 @@ public class AggregationServiceImpl implements AggregationService {
protected TaskExecutor taskExecutor = null;
@Autowired
private AggregationService self;
private ObjectMapper objectMapper;
@Autowired
private ObjectMapper objectMapper;
private ApplicationContext applicationContext;
@Override
public List<AggregationTypeVO> findTypesByFilter(AggregationTypeFilterVO filter, ExtractionProductFetchOptions fetchOptions) {
......@@ -278,7 +281,7 @@ public class AggregationServiceImpl implements AggregationService {
return getAggBySpace(context, readFilter, strata, offset, size, sort, direction);
} finally {
// Clean created tables
asyncClean(context);
clean(context, true);
}
}
......@@ -291,10 +294,15 @@ public class AggregationServiceImpl implements AggregationService {
}
finally {
// Delete aggregation tables, after dump
asyncClean(context);
clean(context, true);
}
}
@Override
public CompletableFuture<AggregationTypeVO> asyncSave(AggregationTypeVO type, @Nullable ExtractionFilterVO filter) {
return CompletableFuture.completedFuture(save(type, filter));
}
@Override
@Caching(
evict = {
......@@ -468,34 +476,25 @@ public class AggregationServiceImpl implements AggregationService {
}
@Override
public void asyncClean(AggregationContextVO context) {
if (taskExecutor == null) {
public CompletableFuture<Boolean> asyncClean(AggregationContextVO context) {
try {
clean(context);
} else {
taskExecutor.execute(() -> {
try {
Thread.sleep(2000); // Wait 2 s, to to sure the table is not used anymore
// Call self, to be sure to have a transaction
self.clean(context);
} catch (Exception e) {
log.warn("Error while cleaning extraction tables", e);
}
});
return CompletableFuture.completedFuture(Boolean.TRUE);
} catch (Exception e) {
log.warn(String.format("Error while cleaning aggregation #%s: %s", context.getId(), e.getMessage()), e);
return CompletableFuture.completedFuture(Boolean.FALSE);
}
}
@Override
public void clean(AggregationContextVO context) {
if (context == null) return;
if (context instanceof AggregationRdbTripContextVO) {
aggregationRdbTripDao.clean((AggregationRdbTripContextVO) context);
}
clean(context, false);
}
/* -- protected methods -- */
protected AggregationTypeVO toAggregationType(ExtractionProductVO source) {
AggregationTypeVO target = new AggregationTypeVO();
......@@ -639,4 +638,25 @@ public class AggregationServiceImpl implements AggregationService {
throw new SumarisTechnicalException(e);
}
}
protected void clean(AggregationContextVO context, boolean async) {
if (context == null) return;
if (async) {
getSelfBean().asyncClean(context);
}
else if (context instanceof AggregationRdbTripContextVO) {
log.info("Cleaning aggregation #{}-{}", context.getLabel(), context.getId());
aggregationRdbTripDao.clean((AggregationRdbTripContextVO) context);
}
}
/**
* Get self bean, to be able to use new transation
* @return
*/
protected AggregationService getSelfBean() {
return applicationContext.getBean("aggregationService", AggregationService.class);
}
}
......@@ -23,20 +23,27 @@ package net.sumaris.core.extraction.service;
*/
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.event.entity.EntityDeleteEvent;
import net.sumaris.core.event.entity.EntityInsertEvent;
import net.sumaris.core.event.entity.EntityUpdateEvent;
import net.sumaris.core.model.technical.extraction.IExtractionFormat;
import net.sumaris.core.extraction.format.LiveFormatEnum;
import net.sumaris.core.extraction.vo.*;
import net.sumaris.core.extraction.vo.filter.ExtractionTypeFilterVO;
import net.sumaris.core.extraction.vo.trip.ExtractionTripFilterVO;
import net.sumaris.core.vo.technical.extraction.ExtractionProductVO;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* @author peck7 on 17/12/2018.
......@@ -78,8 +85,9 @@ public interface ExtractionService {
@Transactional(isolation = Isolation.READ_UNCOMMITTED, propagation = Propagation.REQUIRES_NEW)
void clean(ExtractionContextVO context);
@Transactional(propagation = Propagation.SUPPORTS)
void asyncClean(ExtractionContextVO context);
@Transactional(isolation = Isolation.READ_UNCOMMITTED, propagation = Propagation.REQUIRES_NEW)
@Async
CompletableFuture<Boolean> asyncClean(ExtractionContextVO context);
@Transactional(readOnly = true, propagation = Propagation.SUPPORTS)
ExtractionProductVO toProductVO(ExtractionContextVO context);
......
......@@ -39,6 +39,7 @@ import net.sumaris.core.event.config.ConfigurationUpdatedEvent;
import net.sumaris.core.exception.DataNotFoundException;
import net.sumaris.core.exception.SumarisTechnicalException;
import net.sumaris.core.extraction.dao.technical.Daos;
import net.sumaris.core.extraction.dao.technical.XMLQuery;
import net.sumaris.core.extraction.dao.technical.csv.ExtractionCsvDao;
import net.sumaris.core.extraction.dao.technical.schema.SumarisTableMetadatas;
import net.sumaris.core.extraction.dao.technical.table.ExtractionTableColumnOrder;
......@@ -77,9 +78,11 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Nonnull;
......@@ -91,6 +94,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
......@@ -144,7 +148,7 @@ public class ExtractionServiceImpl implements ExtractionService {
protected TaskExecutor taskExecutor = null;
@Autowired
private ExtractionService self;
protected ApplicationContext applicationContext;
@Autowired
protected DatabaseSchemaDao databaseSchemaDao;
......@@ -315,10 +319,7 @@ public class ExtractionServiceImpl implements ExtractionService {
@Override
public void clean(ExtractionContextVO context) {
if (context == null) return;
if (context instanceof ExtractionRdbTripContextVO) {
extractionRdbTripDao.clean((ExtractionRdbTripContextVO) context);
}
clean(context, false);
}
@Override
......@@ -441,11 +442,22 @@ public class ExtractionServiceImpl implements ExtractionService {
}
// Remove created tables
asyncClean(context);
clean(context, true);
return outputFile;
}
@Override
public CompletableFuture<Boolean> asyncClean(ExtractionContextVO context) {
try {
clean(context);
return CompletableFuture.completedFuture(Boolean.TRUE);
} catch (Exception e) {
log.warn(String.format("Error while cleaning extraction #%s: %s", context.getId(), e.getMessage()), e);
return CompletableFuture.completedFuture(Boolean.FALSE);
}
}
/* -- protected -- */
protected List<ExtractionTypeVO> getAllTypes() {
......@@ -511,7 +523,7 @@ public class ExtractionServiceImpl implements ExtractionService {
return read(context, filter, offset, size, sort, direction);
} finally {
// Clean created tables
asyncClean(context);
clean(context, true);
}
}
......@@ -617,23 +629,6 @@ public class ExtractionServiceImpl implements ExtractionService {
}
public void asyncClean(ExtractionContextVO context) {
if (taskExecutor == null) {
clean(context);
} else {
taskExecutor.execute(() -> {
try {
Thread.sleep(2000); // Wait 2 s, to to sure the table is not used anymore
// Call self, to be sure to have a transaction
self.clean(context);
} catch (Exception e) {
log.warn("Error while cleaning extraction tables", e);
}
});
}
}
protected boolean initRectangleLocations() {
try {
// Insert missing rectangles
......@@ -746,4 +741,23 @@ public class ExtractionServiceImpl implements ExtractionService {
DataSourceUtils.releaseConnection(conn, dataSource);
}
}
protected void clean(ExtractionContextVO context, boolean async) {
if (context == null) return;
if (async && taskExecutor != null) {
taskExecutor.execute(() -> getSelfBean().clean(context));
}
else if (context instanceof ExtractionRdbTripContextVO) {
log.info("Cleaning extraction #{}-{}", context.getLabel(), context.getId());
extractionRdbTripDao.clean((ExtractionRdbTripContextVO) context);
}
}
/**
* Get self bean, to be able to use new transation
* @return
*/
protected ExtractionService getSelfBean() {
return applicationContext.getBean("extractionService", ExtractionService.class);
}
}
......@@ -22,6 +22,7 @@ package net.sumaris.core.extraction.vo;
* #L%
*/
import com.google.common.base.Joiner;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
......@@ -38,4 +39,18 @@ public class ExtractionFilterCriterionVO {
private String operator;
private String value;
private String[] values;
public String toString() {
StringBuilder sb = new StringBuilder();
if (this.getSheetName() != null) sb.append("Sheet: ").append(this.getSheetName());
if (this.getName() != null && this.getOperator() != null){
sb.append(", ").append(this.getName()).append(this.getOperator());
}
else {
sb.append(", ").append("Value: ");
}
if (this.getValue() != null) sb.append(this.getValue());
if (this.getValues() != null) sb.append('[').append(Joiner.on(',').join(this.getValues())).append(']');
return sb.toString();
}
}
......@@ -49,12 +49,14 @@ import net.sumaris.server.geojson.ExtractionGeoJsonConverter;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@Service
@Transactional
......@@ -76,6 +78,7 @@ public class AggregationGraphQLService {
/* -- aggregation service -- */
@GraphQLQuery(name = "aggregationType", description = "Get one aggregation type")
@Transactional(readOnly = true)
public AggregationTypeVO getAggregationType(@GraphQLArgument(name = "id") int id,
@GraphQLEnvironment() Set<String> fields) {
securityService.checkReadAccess(id);
......@@ -83,6 +86,7 @@ public class AggregationGraphQLService {
}
@GraphQLQuery(name = "aggregationTypes", description = "Get all available aggregation types")
@Transactional(readOnly = true)
public List<AggregationTypeVO> getAllAggregationTypes(@GraphQLArgument(name = "filter") AggregationTypeFilterVO filter,
@GraphQLEnvironment() Set<String> fields) {
filter = fillFilterDefaults(filter);
......@@ -90,6 +94,7 @@ public class AggregationGraphQLService {
}
@GraphQLQuery(name = "aggregationRows", description = "Read an aggregation")
@Transactional(readOnly = true)
public AggregationResultVO getAggregationRows(@GraphQLArgument(name = "type") AggregationTypeVO type,
@GraphQLArgument(name = "filter") ExtractionFilterVO filter,
@GraphQLArgument(name = "strata") AggregationStrataVO strata,
......@@ -105,6 +110,7 @@ public class AggregationGraphQLService {
}
@GraphQLQuery(name = "aggregationColumns", description = "Read columns from aggregation")
@Transactional(readOnly = true)
public List<ExtractionTableColumnVO> getAggregationColumns(@GraphQLArgument(name = "type") AggregationTypeVO type,
@GraphQLArgument(name = "sheet") String sheetName,
@GraphQLEnvironment() Set<String> fields) {
......@@ -124,6 +130,7 @@ public class AggregationGraphQLService {
@GraphQLQuery(name = "aggregationGeoJson", description = "Execute an aggregation and return as GeoJson")
@Transactional(readOnly = true)
public Object getGeoJsonAggregation(@GraphQLArgument(name = "type") AggregationTypeVO format,
@GraphQLArgument(name = "filter") ExtractionFilterVO filter,
@GraphQLArgument(name = "strata") AggregationStrataVO strata,
......@@ -173,6 +180,7 @@ public class AggregationGraphQLService {
}
@GraphQLQuery(name = "aggregationTech", description = "Execute an aggregation and return as GeoJson")
@Transactional(readOnly = true)
public AggregationTechResultVO getAggregationByTech(@GraphQLArgument(name = "type") AggregationTypeVO format,
@GraphQLArgument(name = "filter") ExtractionFilterVO filter,
@GraphQLArgument(name = "strata") AggregationStrataVO strata,
......@@ -190,6 +198,7 @@ public class AggregationGraphQLService {
@GraphQLQuery(name = "aggregationTechMinMax", description = "Execute an aggregation and return as GeoJson")
@Transactional(readOnly = true)
public MinMaxVO getAggregationByTech(@GraphQLArgument(name = "type") AggregationTypeVO format,
@GraphQLArgument(name = "filter") ExtractionFilterVO filter,
@GraphQLArgument(name = "strata") AggregationStrataVO strata) {
......@@ -204,9 +213,10 @@ public class AggregationGraphQLService {
}
@GraphQLMutation(name = "saveAggregation", description = "Create or update a data aggregation")
@Async
public AggregationTypeVO saveAggregation(@GraphQLArgument(name = "type") AggregationTypeVO type,
@GraphQLArgument(name = "filter") ExtractionFilterVO filter
) {
) throws ExecutionException, InterruptedException {
boolean isNew = type.getId() == null;
if (isNew) {
securityService.checkWriteAccess();
......@@ -215,7 +225,7 @@ public class AggregationGraphQLService {
securityService.checkWriteAccess(type.getId());
}
return aggregationService.save(type, filter);
return aggregationService.asyncSave(type, filter).get();
}
@GraphQLMutation(name = "deleteAggregations", description = "Delete some aggregations")
......
......@@ -25,6 +25,9 @@ package net.sumaris.rdf.service.store;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import net.sumaris.core.event.config.ConfigurationEvent;
import net.sumaris.core.event.config.ConfigurationReadyEvent;
import net.sumaris.core.event.config.ConfigurationUpdatedEvent;
import net.sumaris.core.exception.SumarisTechnicalException;
import net.sumaris.core.service.crypto.CryptoService;
import net.sumaris.core.util.file.FileContentReplacer;
......@@ -57,6 +60,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.event.EventListener;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import org.tdwg.rs.DWC;
......@@ -123,13 +127,16 @@ public class RdfDatasetServiceImpl implements RdfDatasetService {
@PostConstruct
public void init() {
// Register external loaders
registerNameModel(mnhnTaxonLoader,10000L); // FIXME MNHN endpoint has problem when offset >= 10000
registerNameModel(mnhnTaxonLoader, 10000L); // FIXME MNHN endpoint has problem when offset >= 10000
registerNameModel(sandreTaxonLoader, -1L);
registerNameModel(sandreDepartmentLoader, -1L);
// Init the query dataset
this.dataset = createDataset();
}
@EventListener({ConfigurationReadyEvent.class})
protected void onConfigurationReady(ConfigurationEvent event) {
// If auto import, load dataset
if (config.isRdfImportEnable()) {
if (!containsAllTypes(this.dataset, W3NS.Org.Organization, DWC.Voc.TaxonName)) {
......
......@@ -24,6 +24,7 @@ package net.sumaris.core.dao.administration.user;
import com.google.common.base.Preconditions;
import lombok.NonNull;
import net.sumaris.core.config.SumarisConfiguration;
import net.sumaris.core.dao.cache.CacheNames;
import net.sumaris.core.dao.referential.ReferentialDao;
import net.sumaris.core.dao.technical.Daos;
......@@ -32,13 +33,14 @@ import net.sumaris.core.dao.technical.SoftwareDao;
import net.sumaris.core.dao.technical.SortDirection;
import net.sumaris.core.dao.technical.jpa.BindableSpecification;
import net.sumaris.core.dao.technical.jpa.SumarisJpaRepositoryImpl;
import net.sumaris.core.event.config.ConfigurationEvent;
import net.sumaris.core.event.config.ConfigurationReadyEvent;
import net.sumaris.core.event.config.ConfigurationUpdatedEvent;
import net.sumaris.core.event.entity.EntityDeleteEvent;
import net.sumaris.core.event.entity.EntityInsertEvent;
import net.sumaris.core.event.entity.EntityUpdateEvent;
import net.sumaris.core.model.administration.user.Department;
import net.sumaris.core.model.administration.user.Person;
import net.sumaris.core.model.data.Operation;
import net.sumaris.core.model.data.Trip;
import net.sumaris.core.model.referential.Status;
import net.sumaris.core.model.referential.UserProfile;
import net.sumaris.core.util.crypto.MD5Util;
......@@ -46,6 +48,8 @@ import net.sumaris.core.vo.administration.user.DepartmentVO;
import net.sumaris.core.vo.administration.user.PersonVO;
import net.sumaris.core.vo.filter.PersonFilterVO;
import net.sumaris.core.vo.referential.ReferentialVO;
import net.sumaris.core.vo.technical.SoftwareVO;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -56,6 +60,7 @@ import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.domain.Specification;
......@@ -63,7 +68,6 @@ import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
......@@ -93,10 +97,18 @@ public class PersonRepositoryImpl
@Autowired
private ApplicationEventPublisher publisher;