Problem z transakcyjnoscią przy imporcie batchowym

Problem z transakcyjnoscią przy imporcie batchowym
XW
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 19
0

Cześć.

Mam problem z importem danych z pliku CSV za pomocą Springa, gdzie operacja jest podzielona na batchowy zapis po 5000 wierszy.

W przypadku, gdy w pliku 45001 wiersz jest błędny, oczekiwane jest, że żadne dane nie zostaną zapisane do bazy (transakcja "wszystko albo nic"). Jednak w obecnej implementacji, mimo że część danych jest błędna, poprawne batch'e (w tym przypadku 45000 wierszy) są zapisywane do bazy.

Problem polega na tym, że transakcja nie jest wycofywana w całości przy napotkaniu błędu, co skutkuje częściowym zapisaniem danych do bazy, zamiast pełnego rollbacku.

Kopiuj
@Transactional
    public synchronized void importCsvFile(Long importId, InputStream csvInputStream) {

        CsvImportStatus status = csvImportStatusRepository.findById(importId)
                .orElseThrow(() -> new InvalidImportIdException("Invalid import ID"));

        status.setStartDateTime(LocalDateTime.now());
        csvImportStatusRepository.save(status);

        AtomicInteger processedRecords = new AtomicInteger();
        List<String> personCsvLines = new ArrayList<>();

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> monitorProgress(status.getId(), processedRecords));
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(csvInputStream))) {
            String line;
            while ((line = reader.readLine()) != null) {
                log.debug("Processing line: {}", line);
                Person person = createPersonFromData(line.split(","));
                log.debug("Created person: {}", person);

                validatePerson(person);
                log.debug("Validated person: {}", person);

                String csvLine = convertPersonToCsvLine(person);
                log.debug("Converted person to CSV line: {}", csvLine);
                personCsvLines.add(csvLine);

                processedRecords.incrementAndGet();

                if (personCsvLines.size() >= BATCH_SIZE) {
                    String tempFilePath = "/tmp/persons.csv";
                    saveToTempFile(personCsvLines, tempFilePath);
                    log.info("Batch size reached. Saved to temp file: {}", tempFilePath);

                    importCsvUsingBatchInsert(tempFilePath);

                    personCsvLines.clear();

                    status.setProcessedRecords(processedRecords.get());
                    csvImportStatusRepository.save(status);
                }
            }

            if (!personCsvLines.isEmpty()) {
                String tempFilePath = "/tmp/persons.csv";
                saveToTempFile(personCsvLines, tempFilePath);
                log.info("Remaining lines saved to temp file: {}", tempFilePath);
                importCsvUsingBatchInsert(tempFilePath);
            }

            status.setProcessedRecords(processedRecords.get());
            status.setStatus("COMPLETED");
            status.setEndDateTime(LocalDateTime.now());
            csvImportStatusRepository.save(status);

        } catch (Exception e) {
            log.error("CSV import failed", e);
            status.setStatus("FAILED");
            status.setEndDateTime(LocalDateTime.now());
            csvImportStatusRepository.save(status);
            throw new CsvImportException("CSV import failed", e);
        } finally {
            executorService.shutdownNow();
        }
    }

Kopiuj
protected void importCsvUsingBatchInsert(String filePath) throws IOException {
        log.info("Starting batch insert from file: {}", filePath);
        try (BufferedReader reader = new BufferedReader(new FileReader(filePath));
             Connection connection = dataSource.getConnection()) {

            connection.setAutoCommit(false);
            String personInsertSQL = generateInsertSQL("person");

            try (PreparedStatement personPreparedStatement = connection.prepareStatement(personInsertSQL, Statement.RETURN_GENERATED_KEYS);
                 PreparedStatement professionPreparedStatement = connection.prepareStatement(generateInsertSQL("employee_profession"));
                 PreparedStatement positionPreparedStatement = connection.prepareStatement(generateInsertSQL("employee_position"))) {

                String line;
                int count = 0;

                while ((line = reader.readLine()) != null) {
                    log.debug("Processing line from temp file: {}", line);
                    Person person = createPersonFromData(line.split(","));
                    preparePersonInsertStatement(personPreparedStatement, person);
                    personPreparedStatement.executeUpdate();
                    log.debug("Person inserted: {}", person);

                    ResultSet generatedKeys = personPreparedStatement.getGeneratedKeys();
                    long personId = 0;
                    if (generatedKeys.next()) {
                        personId = generatedKeys.getLong(1);
                        log.debug("Generated person ID: {}", personId);
                    }

                    PersonFactory factory = personFactoryRegistry.getFactory(person.getClass().getSimpleName().toUpperCase());
                    factory.prepareAdditionalStatements(professionPreparedStatement, positionPreparedStatement, person, personId);

                    if (++count % BATCH_SIZE == 0) {
                        executeBatchAndCommit(professionPreparedStatement, positionPreparedStatement, connection);
                        log.info("Batch insert executed and committed.");
                    }
                }

                executeBatchAndCommit(professionPreparedStatement, positionPreparedStatement, connection);
                log.info("Final batch insert executed and committed.");
            } catch (SQLException e) {
                log.error("Batch insert failed, rolling back transaction.", e);
                connection.rollback();
                throw new BatchInsertException("Batch insert failed", e);
            }

        } catch (SQLException e) {
            log.error("Database connection failed during batch insert.", e);
            throw new DatabaseConnectionException("Database connection failed", e);
        }
    }

Będę wdzięczny za pomoc :)

abrakadaber
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 6610
4

Przecież co każde 5000 insertów masz wołane COMMIT więc czego oczekujesz? Rollback działa na dane od ostatniego start transaction

KE
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 758
0

Nie wiemy co robi executeBatchAndCommit ale jeśli to co mówi, to jak wyżej.

RequiredNickname
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 646
0

W metodzie importCsvUsingBatchInsert deklarujesz checked exception w postaci IOException:

Kopiuj
protected void importCsvUsingBatchInsert(String filePath) throws IOException 

a domyślnie Spring nie robi rollbacka gdy wystąpi checked exception:

Checked exceptions that are thrown from a transactional method do not result in a rollback in the default configuration.

https://docs.spring.io/spring-framework/reference/data-access/transaction/declarative/rolling-back.html

Naturalnie można to zmienić za pomocą np:

Kopiuj
@Transactional(rollbackFor = IOException.class)
    public synchronized void importCsvFile(Long importId, InputStream csvInputStream) {
XW
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 19
0
abrakadaber napisał(a):

Przecież co każde 5000 insertów masz wołane COMMIT więc czego oczekujesz? Rollback działa na dane od ostatniego start transaction

Jak więc sugerowałbyś rozwiązać ten problem?

Planuję dodawać dane w milionach, więc podzielenie tego na inserty po 5000, pozwoliło na większą wydajność.
W jaki sposób mogę uzyskać rollback na całej transakcji - a nie na ostatniej jej partii?

RequiredNickname napisał(a):

W metodzie importCsvUsingBatchInsert deklarujesz checked exception w postaci IOException:

Kopiuj
protected void importCsvUsingBatchInsert(String filePath) throws IOException 

a domyślnie Spring nie robi rollbacka gdy wystąpi checked exception:

Checked exceptions that are thrown from a transactional method do not result in a rollback in the default configuration.

https://docs.spring.io/spring-framework/reference/data-access/transaction/declarative/rolling-back.html

Naturalnie można to zmienić za pomocą np:

Kopiuj
@Transactional(rollbackFor = IOException.class)
    public synchronized void importCsvFile(Long importId, InputStream csvInputStream) {

Próbowałem - cofa wówczas jedynie ostatni insert (5000 rekordów). Zależy mi, aby w przypadku faila cofnęło całą transakcję od A do Z.

Dzięki za odpowiedzi.

abrakadaber
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 6610
2
xwns napisał(a):
abrakadaber napisał(a):

Przecież co każde 5000 insertów masz wołane COMMIT więc czego oczekujesz? Rollback działa na dane od ostatniego start transaction

Jak więc sugerowałbyś rozwiązać ten problem?

Planuję dodawać dane w milionach, więc podzielenie tego na inserty po 5000, pozwoliło na większą wydajność.
W jaki sposób mogę uzyskać rollback na całej transakcji - a nie na ostatniej jej partii?

rollback ZAWSZE dotyczy CAŁEJ transakcji (tak samo jak commit). Zrozum, że ty tutaj nie masz JEDNEJ transakcji tylko (liczba wierszy / 5000 + 1) ODDZIELNYCH transakcji. Kod działa dokładnie tak jak został napisany - cofa OSTATNIĄ transakcję jeśli wystąpi błąd.

Co do sedna problemu - to nie jest batch import - to jest najprostszy import przy pomocy insertów podzielonych na paczki po 5000 insertów. Także masz rację, że taki zabieg (commit co 5000 insertów) znacząco przyśpiesza taki import bo dane dla rollbacku muszą obejmować co najwyżej 5000 ostatnich insertów.

Wiele baz ma coś takiego jak "prawdziwy" bach insert, np. postgresql ma coś takiego jak copy do szybkiego ładowania dużej ilości danych. Być może rozwiązaniem będzie szybkie załadowanie całości danych do tabeli tymczasowej i potem przerzucenie ich do docelowych tabel po wcześniejszym sprawdzeniu poprawności danych.

RequiredNickname
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 646
0

Być może masz tam N transakcji jak pisał @abrakadaber

XW
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 19
0
abrakadaber napisał(a):
xwns napisał(a):
abrakadaber napisał(a):

Przecież co każde 5000 insertów masz wołane COMMIT więc czego oczekujesz? Rollback działa na dane od ostatniego start transaction

Jak więc sugerowałbyś rozwiązać ten problem?

Planuję dodawać dane w milionach, więc podzielenie tego na inserty po 5000, pozwoliło na większą wydajność.
W jaki sposób mogę uzyskać rollback na całej transakcji - a nie na ostatniej jej partii?

rollback ZAWSZE dotyczy CAŁEJ transakcji (tak samo jak commit). Zrozum, że ty tutaj nie masz JEDNEJ transakcji tylko (liczba wierszy / 5000 + 1) ODDZIELNYCH transakcji. Kod działa dokładnie tak jak został napisany - cofa OSTATNIĄ transakcję jeśli wystąpi błąd.

Co do sedna problemu - to nie jest batch import - to jest najprostszy import przy pomocy insertów podzielonych na paczki po 5000 insertów. Także masz rację, że taki zabieg (commit co 5000 insertów) znacząco przyśpiesza taki import bo dane dla rollbacku muszą obejmować co najwyżej 5000 ostatnich insertów.

Wiele baz ma coś takiego jak "prawdziwy" bach insert, np. postgresql ma coś takiego jak copy do szybkiego ładowania dużej ilości danych. Być może rozwiązaniem będzie szybkie załadowanie całości danych do tabeli tymczasowej i potem przerzucenie ich do docelowych tabel po wcześniejszym sprawdzeniu poprawności danych.

Okej dziękuję za obszerną odpowiedź.

Lepszym pomysłem będzie przejście na COPY czy zliczanie ilości commitów i próba cofnięcia o tyle commitów?
Zależy mi na zachowaniu wydajności oraz pełnej transakcyjności.

RequiredNickname napisał(a):

Być może masz tam N transakcji jak pisał @abrakadaber

Tak - dokładnie tak się dzieje. Transakcji jest tak jak, któryś z Panów napisał ILOŚĆ REKORDÓW/5000, i rollback cofa jedynie tą transakcje, która nie przeszła z powodu walidacji czy innych błędów.

abrakadaber
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 6610
1

nie da się cofnąć N transakcji (albo ja o czymś nie wiem) - transakcja zatwierdzona = transakcja zakończona i tyle

jarekr000000
  • Rejestracja: dni
  • Ostatnio: dni
  • Lokalizacja: U krasnoludów - pod górą
  • Postów: 4712
0

Zasadniczo to jest coś co przypomina "subtransakcje" i niektóre bazy to wspierają:
https://learn.microsoft.com/en-us/sql/connect/jdbc/using-savepoints?view=sql-server-ver16

YA
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 2384
1
jarekr000000 napisał(a):

Zasadniczo to jest coś co przypomina "subtransakcje" i niektóre bazy to wspierają:
https://learn.microsoft.com/en-us/sql/connect/jdbc/using-savepoints?view=sql-server-ver16

Ten mechanizm pozwala na wstawienie "markerów" w ramach pojedynczej transakcji (=savepointy nie działają pomiędzy transakcjami).
Nie nadaje się do opisanego case'a, bo kolega op commituje co bulk.

@xwns

Zależy mi na zachowaniu wydajności oraz pełnej transakcyjności.

IMO sprzeczne wymagania i nie da się. Jak masz jedną transakcję -> będzie wolno. Masz >1 transakcja -> całość nie jest transakcyjna.

Możesz mieć jakiś kompromis, czyli w miarę szybko i finalnie poprawnie. Do tego jednak powinieneś zbudować mały workflow (logikę biznesową dla procesu ładowania danych), która np. usunie ładowane dane w przypadku wywalenia się batcha_nr_K.

KE
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 758
0

Baj de łej

W przypadku, gdy w pliku 45001 wiersz jest błędny, oczekiwane jest, że żadne dane nie zostaną zapisane do bazy (transakcja "wszystko albo nic")

Nie prościej po prostu zwalidować te wiersze najpierw, a potem, jak wiemy że na pewno są wszystkie poprawne, ładować je do bazy? Jeśli nie masz zależności pomiędzy nimi, to może tak się da zrobić i problem zupełnie znika.

Wiem że tam masz InputStream więc możliwe że nie możesz zaczytać 2x, wtedy trzeba coś wykombinować.

Piszesz że

Zależy mi na zachowaniu wydajności

Ale nie wiadomo, o co chodzi - ładujesz tam bardziej megabajt czy terabajt? Ma się skończyć w sekundę czy godzinę?

XW
  • Rejestracja: dni
  • Ostatnio: dni
  • Postów: 19
0
kelog napisał(a):

Baj de łej

W przypadku, gdy w pliku 45001 wiersz jest błędny, oczekiwane jest, że żadne dane nie zostaną zapisane do bazy (transakcja "wszystko albo nic")

Nie prościej po prostu zwalidować te wiersze najpierw, a potem, jak wiemy że na pewno są wszystkie poprawne, ładować je do bazy? Jeśli nie masz zależności pomiędzy nimi, to może tak się da zrobić i problem zupełnie znika.

Wiem że tam masz InputStream więc możliwe że nie możesz zaczytać 2x, wtedy trzeba coś wykombinować.

Piszesz że

Zależy mi na zachowaniu wydajności

Ale nie wiadomo, o co chodzi - ładujesz tam bardziej megabajt czy terabajt? Ma się skończyć w sekundę czy godzinę?

yarel napisał(a):
jarekr000000 napisał(a):

Zasadniczo to jest coś co przypomina "subtransakcje" i niektóre bazy to wspierają:
https://learn.microsoft.com/en-us/sql/connect/jdbc/using-savepoints?view=sql-server-ver16

Ten mechanizm pozwala na wstawienie "markerów" w ramach pojedynczej transakcji (=savepointy nie działają pomiędzy transakcjami).
Nie nadaje się do opisanego case'a, bo kolega op commituje co bulk.

@xwns

Zależy mi na zachowaniu wydajności oraz pełnej transakcyjności.

IMO sprzeczne wymagania i nie da się. Jak masz jedną transakcję -> będzie wolno. Masz >1 transakcja -> całość nie jest transakcyjna.

Możesz mieć jakiś kompromis, czyli w miarę szybko i finalnie poprawnie. Do tego jednak powinieneś zbudować mały workflow (logikę biznesową dla procesu ładowania danych), która np. usunie ładowane dane w przypadku wywalenia się batcha_nr_K.

Dzięki Panowie za podpowiedzi.

Zmieniłem formę przyjmowania danych na COPY zamiast insert w batchach po 5000.
Dzięki temu wszystko dzieje się w obrębie 1 transakcji. W razie, gdy ktoryś z milionowych rekordów złapie walidacja cała transakcja się cofa. Obawiałem się trochę o możliwość podglądu ilości przetworzonych danych, ale również udało mi się to zrobić.

Z ciekawszych rzeczy to wydajność przy insercie - około 4500 rekordów na sekundę, przy COPY jest to około 10 x więcej.

Dzięki jeszcze raz z odpowiedzi.

Zarejestruj się i dołącz do największej społeczności programistów w Polsce.

Otrzymaj wsparcie, dziel się wiedzą i rozwijaj swoje umiejętności z najlepszymi.