Como evitar erro durante execução paralela {furrr} usando o SQLite?

Olá comunidade

Estou tentando várias alternativas para evitar que o uma função (que grava dados no SQLite) pare a sua execução, ao ser executada em paralelo com o pacote {furrr}.

Ao realizar escritas simultâneas no SQLite, para um dos processos é lançado o “Erro: banco de dados bloqueado”.

Para tentar contornar esse problema, utilizei a função purrr::insistently . “Aparentemente”, acho que consegui que o código (ver exemplo abaixo) não parasse no “Erro: banco de dados bloqueado”.
Contudo, curiosamente, embora os print de sucesso de cada loop apareçam no console, nem todos os dados são inscrito no SQLite.

No reprex abaixo, embora print 1 a 30 apareça, há loops que não estão registrados no SQLite. Ainda não sei o porquê desse fracasso.

Alguma sugestão para enfrentar este problema?

Exemplo de reprex abaixo:

library(DBI)
library(RSQLite)
library(furrr)
#> Warning: package 'furrr' was built under R version 4.0.3
#> Carregando pacotes exigidos: future
library(future)
library(magrittr)
#> Warning: package 'magrittr' was built under R version 4.0.3
library(purrr)
#> 
#> Attaching package: 'purrr'
#> The following object is masked from 'package:magrittr':
#> 
#>     set_names


n_teste <- 1:30


connect_sgbd <- function() {

        tentativas <- purrr::rate_delay(pause = 0.3,
                                        max_times = 10)

        insist_sqlite <- purrr::insistently(DBI::dbConnect,
                                            tentativas,
                                            quiet = FALSE)

        conexao <- insist_sqlite(RSQLite::SQLite(),
                                 dbname = file.path("bd_test.db"))

        return(conexao)

}


teste_write_sqlite <- function(n_teste) {

        tb_teste_iris <- iris %>%
                dplyr::mutate(n_loop = n_teste)

        tentativas <- purrr::rate_delay(pause = 0.3,
                                        max_times = 10)

        insist_write_sqlite <- purrr::insistently(DBI::dbWriteTable,
                                                  tentativas,
                                                  quiet = FALSE)

        conn <- connect_sgbd()

        insist_write_sqlite(conn,
                            "tb_teste_iris",
                            tb_teste_iris,
                            append = TRUE)

        DBI::dbDisconnect(conn)

        print(n_teste)

}



future::plan("multisession",
             workers = 5)

furrr::future_walk(n_teste, teste_write_sqlite, .progress = TRUE)
#> [1] 1
#> [1] 2
#> [1] 3
#> [1] 4
#> [1] 5
#> [1] 6
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: UNRELIABLE VALUE: Future ('<none>') unexpectedly generated random
#> numbers without specifying argument 'seed'. There is a risk that those random
#> numbers are not statistically sound and the overall results might be invalid.
#> To fix this, specify 'seed=TRUE'. This ensures that proper, parallel-safe random
#> numbers are produced via the L'Ecuyer-CMRG method. To disable this check, use
#> 'seed=NULL', or set option 'future.rng.onMisuse' to "ignore".
#> [1] 7
#> [1] 8
#> [1] 9
#> [1] 10
#> [1] 11
#> [1] 12
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: UNRELIABLE VALUE: Future ('<none>') unexpectedly generated random
#> numbers without specifying argument 'seed'. There is a risk that those random
#> numbers are not statistically sound and the overall results might be invalid.
#> To fix this, specify 'seed=TRUE'. This ensures that proper, parallel-safe random
#> numbers are produced via the L'Ecuyer-CMRG method. To disable this check, use
#> 'seed=NULL', or set option 'future.rng.onMisuse' to "ignore".
#> [1] 13
#> [1] 14
#> [1] 15
#> [1] 16
#> [1] 17
#> [1] 18
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: UNRELIABLE VALUE: Future ('<none>') unexpectedly generated random
#> numbers without specifying argument 'seed'. There is a risk that those random
#> numbers are not statistically sound and the overall results might be invalid.
#> To fix this, specify 'seed=TRUE'. This ensures that proper, parallel-safe random
#> numbers are produced via the L'Ecuyer-CMRG method. To disable this check, use
#> 'seed=NULL', or set option 'future.rng.onMisuse' to "ignore".
#> [1] 19
#> [1] 20
#> [1] 21
#> [1] 22
#> [1] 23
#> [1] 24
#> Warning: Couldn't set synchronous mode: database is locked
#> Use `synchronous` = NULL to turn off this warning.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Couldn't set synchronous mode: database is locked
#> Use `synchronous` = NULL to turn off this warning.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: UNRELIABLE VALUE: Future ('<none>') unexpectedly generated random
#> numbers without specifying argument 'seed'. There is a risk that those random
#> numbers are not statistically sound and the overall results might be invalid.
#> To fix this, specify 'seed=TRUE'. This ensures that proper, parallel-safe random
#> numbers are produced via the L'Ecuyer-CMRG method. To disable this check, use
#> 'seed=NULL', or set option 'future.rng.onMisuse' to "ignore".
#> [1] 25
#> [1] 26
#> [1] 27
#> [1] 28
#> [1] 29
#> [1] 30
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: Closing open result set, pending rows
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Error: database is locked
#> Retrying in 0.3 seconds.
#> Warning: UNRELIABLE VALUE: Future ('<none>') unexpectedly generated random
#> numbers without specifying argument 'seed'. There is a risk that those random
#> numbers are not statistically sound and the overall results might be invalid.
#> To fix this, specify 'seed=TRUE'. This ensures that proper, parallel-safe random
#> numbers are produced via the L'Ecuyer-CMRG method. To disable this check, use
#> 'seed=NULL', or set option 'future.rng.onMisuse' to "ignore".
```

<sup>Created on 2020-12-20 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>

obrigado, desde já

Exemplo de outra execução do código acima.

Veja o banco de dados SQLite aqui: bd_test.zip

Conforme observado no comentário acima, embora print 1 a 30 apareça no console, os dados da íris não foram gravados no SQLite nos loops 2, 3, 5, 9, 16, 19, 26 (ver coluna n_loop na bd do SQLite do link acima).

[1] 1
[1] 2
[1] 3
[1] 4
[1] 5
[1] 6
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
[1] 7
[1] 8
[1] 9
[1] 10
[1] 11
[1] 12
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
[1] 13
[1] 14
[1] 15
[1] 16
[1] 17
[1] 18
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
[1] 19
[1] 20
[1] 21
[1] 22
[1] 23
[1] 24
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
[1] 25
[1] 26
[1] 27
[1] 28
[1] 29
[1] 30
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
Error: database is locked
Retrying in 0.3 seconds.
There were 25 warnings (use warnings() to see them)
>

George,

Salvo engano, você não pode tentar fazer escritas paralelas desta forma. O aviso de que a base está bloqueada indica que já há um processo fazendo escritas nela.

Para saber mais, veja os requisitos ACID: https://en.wikipedia.org/wiki/ACID

Olá, @clente

Obrigado pela resposta.

Veja. Se meu raciocínio está errado.

No caso, eu sei que o SQLite não permite escritas simultâneas (concorrentes). Por isso, a minha ideia seria: Criar uma etapa de verificação no ato da escrita. Se o sessão R (em paralelo) retornasse um erro, já que outra sessão R poderia estar escrevendo no banco… A sessão que recebeu o erro tentaria novamente, a cada 0.3 segundos durante 10 vezes. A minha ideia seria que, após essas 10 tentativas, a sessão R que recebeu o erro conseguisse escrever no SQLite em alguma das tentativas, já que o choque de escrita entre as sessões R é algo esporádico.

Isso me parece equivalente a escrever sequencialmente.

Exatamente! Só que em sessões R diferentes. Contudo, eu preciso de algum mecanismo de verificação para que uma determinada sessão R tente várias vezes escrever no SQLite e não pare com um erro.
Estou tentando outras variações utilizando purrr::insistently e purrr::safely. Contudo, o resultado ainda é estranho, pois ele indica que as escritas foram bem sucedidas, mas elas não aparecem no banco de dados. No caso abaixo, era para eu conseguir 4500 linhas. Mas consigo 4050 ou 4200 . Sempre há algum problema num determinado loop, embora ele indique TRUE para a escrita.

library(DBI)
library(RSQLite)
library(furrr)
library(future)
library(magrittr)
library(purrr)


n_teste <- 1:30

connect_sgbd <- function() {        
        
        conexao <- DBI::dbConnect(RSQLite::SQLite(),
                       dbname = file.path("bd_test.db"))
        
        return(conexao)
        
}

teste_write_sqlite <- function(n_teste) {


        tb_teste_iris <- iris %>%
                dplyr::mutate(n_loop = n_teste)

       
        tentativas <- purrr::rate_backoff(
                                        pause_base = 1,
                                        pause_cap = 30,
                                        pause_min = 1,
                                        max_times = 10,
                                        jitter = TRUE
        )
        


        insist_write_sqlite <- purrr::insistently(DBI::dbWriteTable,
                                                  tentativas,
                                                  quiet = FALSE)
        
        safely_write_sqlite <- purrr::safely(insist_write_sqlite)
        

        conn <- connect_sgbd()

        safely_write_sqlite_teste <- safely_write_sqlite(conn,
                                                         "tb_teste_iris",
                                                         tb_teste_iris,
                                                         append = TRUE)
        
        if(is.null(safely_write_sqlite_teste$error) == FALSE){
                
                DBI::dbDisconnect(conn)
                
                return(
                         print(paste(n_teste, "-", "Failed to write"))
                        )
        
                }

        DBI::dbDisconnect(conn)

                print(paste(n_teste, "-", safely_write_sqlite_teste$result))

        
}



future::plan("multisession",
             workers = 5)

furrr::future_walk(n_teste, teste_write_sqlite, .progress = TRUE)