Skip to contents

OmopSparkConnector provides a Spark specific implementation of an OMOP CDM reference as defined by the omopgenerics R package.

Installation

You can install the development version of OmopSparkConnector from GitHub with:

# install.packages("devtools")
devtools::install_github("oxford-pharmacoepi/OmopSparkConnector")

Creating a cdm reference using Sparklyr

Let’s first load the R libraries.

To work with OmopSparkConnector, we will first need to create a connection to our data using the sparklyr. In the example below, we have a schema called “omop” that contains all the OMOP CDM tables and then we have another schema where we can write results during the course of a study. We also set a write prefix so that all the tables we write start with this (which makes it easy to clean up afterwards and avoid any name conflicts with other users).

con <- sparklyr::spark_connect(.....)
cdm <- cdmFromSpark(con,
  cdmSchema = "omop",
  writeSchema = "results",
  writePrefix = "study_1_"
)

For this introduction we’ll use a mock cdm where we have a small synthetic dataset in a local spark database.

cdm <- mockSparkCdm(path = file.path(tempdir(), "temp_spark"))
#> ! Validation has been turned off, this is not recommended as analytical
#>   packages assumed the cdm_reference object fulfills the cdm validation
#>   criteria.
#> ! Validation has been turned off, this is not recommended as analytical
#>   packages assumed the cdm_reference object fulfills the cdm validation
#>   criteria.

Cross platform support

With our cdm reference created, we now a single object in R that represents our OMOP CDM data.

cdm
#> 
#> ── # OMOP CDM reference (sparklyr) of mock local spark ─────────────────────────
#> • omop tables: cdm_source, concept, concept_ancestor, concept_relationship,
#> concept_synonym, condition_occurrence, drug_strength, observation_period,
#> person, vocabulary
#> • cohort tables: -
#> • achilles tables: -
#> • other tables: -

This object contains references to each of our tables

cdm$person |>
  dplyr::glimpse()
#> Rows: ??
#> Columns: 18
#> Database: spark_connection
#> $ person_id                   <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
#> $ gender_concept_id           <int> 8507, 8532, 8532, 8507, 8507, 8507, 8507, …
#> $ year_of_birth               <int> 1960, 1969, 1976, 1981, 1987, 1953, 1999, …
#> $ month_of_birth              <int> 6, 3, 11, 3, 12, 10, 5, 12, 10, 4
#> $ day_of_birth                <int> 24, 7, 12, 2, 26, 31, 29, 10, 17, 25
#> $ race_concept_id             <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ ethnicity_concept_id        <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ birth_datetime              <dttm> 1970-01-01 01:00:00, 1970-01-01 01:00:00, …
#> $ location_id                 <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ provider_id                 <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ care_site_id                <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ person_source_value         <chr> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ gender_source_value         <chr> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ gender_source_concept_id    <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ race_source_value           <chr> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ race_source_concept_id      <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ ethnicity_source_value      <chr> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA
#> $ ethnicity_source_concept_id <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA

cdm$observation_period |>
  dplyr::glimpse()
#> Rows: ??
#> Columns: 5
#> Database: spark_connection
#> $ observation_period_id         <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
#> $ person_id                     <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
#> $ observation_period_start_date <date> 1990-05-26, 1980-12-06, 2004-02-22, 2017…
#> $ observation_period_end_date   <date> 2009-03-08, 1981-08-30, 2006-01-11, 2018…
#> $ period_type_concept_id        <int> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA

With this we can use familiar dplyr code . For example, we can quickly get a count of our person table.

cdm$person |> 
  tally()
#> # Source:   SQL [?? x 1]
#> # Database: spark_connection
#>       n
#>   <dbl>
#> 1    10

Behind the scenes, the dbplyr R package is translating this to SQL.

cdm$person |> 
  tally() |> 
  show_query()
#> <SQL>
#> SELECT COUNT(*) AS `n`
#> FROM omop.person

We can also make use of various existing packages that work with a cdm reference using this approach. For example, we can extract a summary of missing data in our condition occurrence table using the OmopSketch package.

library(OmopSketch)
library(flextable)

missing_condition_data <- OmopSketch::summariseMissingData(cdm, "condition_occurrence")
tableMissingData(missing_condition_data, type = "flextable")

Native spark support

As well as making use of packages that provide cross-platform functionality with the cdm reference such as OmopSketch, because OmopSparkConnector is built on top of the sparklyr package we can also make use of native spark queries. For example we can compute summary statistics on one of our cdm tables using spark functions.

cdm$person |>
  sdf_describe(cols = c(
    "gender_concept_id",
    "year_of_birth",
    "month_of_birth",
    "day_of_birth"
  ))
#> # Source:   table<`sparklyr_tmp_08565f48_368a_4fd3_996d_6ce84d0937eb`> [?? x 5]
#> # Database: spark_connection
#>   summary gender_concept_id  year_of_birth      month_of_birth     day_of_birth 
#>   <chr>   <chr>              <chr>              <chr>              <chr>        
#> 1 count   10                 10                 10                 10           
#> 2 mean    8519.5             1974.7             7.6                18.3         
#> 3 stddev  13.176156917368234 15.867857098199766 3.7475918193480524 10.089047967…
#> 4 min     8507               1953               3                  2            
#> 5 max     8532               1999               12                 31

With this we are hopefully achieving the best of both worlds. On the one hand we can participate in network studies where code has been written in such a way to work across database platforms. And then on the other we are able to go beyond this approach, writing bespoke code that makes use of Spark-specific functionality.

Disconnecting from your spark connection

We can disconnect from our spark connection like so