Cześć.
Mam problem z importem danych z pliku CSV za pomocą Springa, gdzie operacja jest podzielona na batchowy zapis po 5000 wierszy.
W przypadku, gdy w pliku 45001 wiersz jest błędny, oczekiwane jest, że żadne dane nie zostaną zapisane do bazy (transakcja "wszystko albo nic"). Jednak w obecnej implementacji, mimo że część danych jest błędna, poprawne batch'e (w tym przypadku 45000 wierszy) są zapisywane do bazy.
Problem polega na tym, że transakcja nie jest wycofywana w całości przy napotkaniu błędu, co skutkuje częściowym zapisaniem danych do bazy, zamiast pełnego rollbacku.
@Transactional
public synchronized void importCsvFile(Long importId, InputStream csvInputStream) {
CsvImportStatus status = csvImportStatusRepository.findById(importId)
.orElseThrow(() -> new InvalidImportIdException("Invalid import ID"));
status.setStartDateTime(LocalDateTime.now());
csvImportStatusRepository.save(status);
AtomicInteger processedRecords = new AtomicInteger();
List<String> personCsvLines = new ArrayList<>();
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> monitorProgress(status.getId(), processedRecords));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(csvInputStream))) {
String line;
while ((line = reader.readLine()) != null) {
log.debug("Processing line: {}", line);
Person person = createPersonFromData(line.split(","));
log.debug("Created person: {}", person);
validatePerson(person);
log.debug("Validated person: {}", person);
String csvLine = convertPersonToCsvLine(person);
log.debug("Converted person to CSV line: {}", csvLine);
personCsvLines.add(csvLine);
processedRecords.incrementAndGet();
if (personCsvLines.size() >= BATCH_SIZE) {
String tempFilePath = "/tmp/persons.csv";
saveToTempFile(personCsvLines, tempFilePath);
log.info("Batch size reached. Saved to temp file: {}", tempFilePath);
importCsvUsingBatchInsert(tempFilePath);
personCsvLines.clear();
status.setProcessedRecords(processedRecords.get());
csvImportStatusRepository.save(status);
}
}
if (!personCsvLines.isEmpty()) {
String tempFilePath = "/tmp/persons.csv";
saveToTempFile(personCsvLines, tempFilePath);
log.info("Remaining lines saved to temp file: {}", tempFilePath);
importCsvUsingBatchInsert(tempFilePath);
}
status.setProcessedRecords(processedRecords.get());
status.setStatus("COMPLETED");
status.setEndDateTime(LocalDateTime.now());
csvImportStatusRepository.save(status);
} catch (Exception e) {
log.error("CSV import failed", e);
status.setStatus("FAILED");
status.setEndDateTime(LocalDateTime.now());
csvImportStatusRepository.save(status);
throw new CsvImportException("CSV import failed", e);
} finally {
executorService.shutdownNow();
}
}
protected void importCsvUsingBatchInsert(String filePath) throws IOException {
log.info("Starting batch insert from file: {}", filePath);
try (BufferedReader reader = new BufferedReader(new FileReader(filePath));
Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
String personInsertSQL = generateInsertSQL("person");
try (PreparedStatement personPreparedStatement = connection.prepareStatement(personInsertSQL, Statement.RETURN_GENERATED_KEYS);
PreparedStatement professionPreparedStatement = connection.prepareStatement(generateInsertSQL("employee_profession"));
PreparedStatement positionPreparedStatement = connection.prepareStatement(generateInsertSQL("employee_position"))) {
String line;
int count = 0;
while ((line = reader.readLine()) != null) {
log.debug("Processing line from temp file: {}", line);
Person person = createPersonFromData(line.split(","));
preparePersonInsertStatement(personPreparedStatement, person);
personPreparedStatement.executeUpdate();
log.debug("Person inserted: {}", person);
ResultSet generatedKeys = personPreparedStatement.getGeneratedKeys();
long personId = 0;
if (generatedKeys.next()) {
personId = generatedKeys.getLong(1);
log.debug("Generated person ID: {}", personId);
}
PersonFactory factory = personFactoryRegistry.getFactory(person.getClass().getSimpleName().toUpperCase());
factory.prepareAdditionalStatements(professionPreparedStatement, positionPreparedStatement, person, personId);
if (++count % BATCH_SIZE == 0) {
executeBatchAndCommit(professionPreparedStatement, positionPreparedStatement, connection);
log.info("Batch insert executed and committed.");
}
}
executeBatchAndCommit(professionPreparedStatement, positionPreparedStatement, connection);
log.info("Final batch insert executed and committed.");
} catch (SQLException e) {
log.error("Batch insert failed, rolling back transaction.", e);
connection.rollback();
throw new BatchInsertException("Batch insert failed", e);
}
} catch (SQLException e) {
log.error("Database connection failed during batch insert.", e);
throw new DatabaseConnectionException("Database connection failed", e);
}
}
Będę wdzięczny za pomoc :)