NY Taxi Data in Spark

Author

Lee Durbin

Introduction

This notebook demonstrates how to work with data in Spark via the sparklyr package. The purpose of this exercise is to complete the Module 5 homework for the Data Engineering Zoomcamp.

Getting Started

First, we install the sparklyr package and load it in R. Let’s also make sure we have the dplyr, readr, fs, and lubridate packages too, which will help us:

install.packages(c("sparklyr", "dplyr", "readr", "fs", "lubridate"))

sparklyr::spark_install()

Now, let’s set an environment variable to point to where we’ve installed Java. Then we can connect to Spark and we’re ready to answer the homework questions:

Sys.setenv(JAVA_HOME = "/usr/local/Cellar/openjdk@17/17.0.9")

library(sparklyr)

sc <- spark_connect(
  master = "local",
  spark_home = "/usr/local/Cellar/apache-spark/3.5.0/libexec"
  )

Question 1

The first question just requires that we return the version of Spark that we’re using:

sparklyr::spark_version(sc) 
[1] '3.5.0'

Question 2

Let’s read the October 2019 FHV data into a Spark Dataframe:

tbl_fhv <- spark_read_csv(
  sc,
  name = "spark_fhv2019",
  path = fs::dir_ls(here::here("data"), glob = "*.csv.gz"),
  overwrite = TRUE
)

Next, we’ll repartition the Dataframe to 6 partitions and save it to parquet:

fhv_partitioned <- sdf_repartition(tbl_fhv, 6)

parquet_dir <- here::here("data/fhv/2019/10/")

sparklyr::spark_write_parquet(fhv_partitioned, parquet_dir, mode = "overwrite")

The answer for Question 2 requires us to get the average file size of the Parquet Files. Here’s how we do that:

file_info <- file.info( fs::dir_ls(parquet_dir, glob = "*.snappy.parquet"))

mean(file_info$size) 
[1] 6660810

Question 3

For this question we need to filter the data to return only those trips that started on 15 October.

First, let’s use Spark to read the Parquet files we previously saved:

tbl_fhv_partitioned <- spark_read_parquet(
  sc,
  name = "spark_fhv2019_partitioned",
  path = parquet_dir)

Now we apply the filter using some SQL and count the number of rows returned:

library(DBI)

dbGetQuery(
  sc,
  "SELECT * FROM spark_fhv2019_partitioned WHERE date(from_utc_timestamp(to_timestamp(pickup_datetime), 'America/New_York')) = '2019-10-15'"
  ) |> 
  nrow()
[1] 62944

Question 4

What is the length of the longest trip in the dataset in hours?

To determine this, we first need to calculate the number of hours for each trip. Unfortunately we can’t use lubridate functions when we’re querying Spark, but by calling unix_timestamp() on the pickup and dropoff times and then dividing by 3,600 we can calculate the difference ourselves:

fhv_hours_diff <- tbl_fhv_partitioned |> 
  mutate(
    hours_difference = (
      unix_timestamp(dropOff_datetime) - unix_timestamp(pickup_datetime)
      ) / 3600
    )

To get the answer, we’ll sort our table by this newly-created column in descending order and return the first value:

fhv_hours_diff_arranged <- fhv_hours_diff |> 
  dplyr::arrange(desc(hours_difference)) |> 
  head(1) |> 
  collect()

fhv_hours_diff_arranged$hours_difference
[1] 631152.5

Question 6

Now let’s load the zone lookup data into a temp view in Spark:

zones <- spark_read_csv(
  sc,
  name = "spark_fhv_zones",
  path = here::here("data/fhv_zones.csv"),
  overwrite = TRUE
)

Next, let’s join the spark_fhv_zones table with the spark_fhv2019_partitioned table:

fhv_with_zones <- left_join(
  tbl_fhv_partitioned,
  zones,
  by = c("PUlocationID" = "LocationID"))

fhv_with_zones
# Source: spark<?> [?? x 10]
   dispatching_base_num pickup_datetime     dropOff_datetime    PUlocationID
   <chr>                <dttm>              <dttm>                     <int>
 1 B02285               2019-10-17 14:00:00 2019-10-17 14:49:00          264
 2 B01437               2019-10-07 19:07:08 2019-10-07 19:11:53          264
 3 B02855               2019-10-24 14:10:02 2019-10-24 14:15:15          264
 4 B02677               2019-10-16 09:35:46 2019-10-16 09:42:37          264
 5 B01129               2019-10-14 13:08:00 2019-10-14 13:35:00          264
 6 B00381               2019-10-25 21:06:28 2019-10-25 21:51:41          264
 7 B02182               2019-10-20 12:29:05 2019-10-20 12:51:49          264
 8 B01626               2019-10-06 20:33:24 2019-10-06 20:39:04          264
 9 B00477               2019-10-11 19:18:14 2019-10-11 19:22:54          229
10 B01239               2019-10-02 13:29:43 2019-10-02 13:57:00          264
# ℹ more rows
# ℹ 6 more variables: DOlocationID <int>, SR_Flag <chr>,
#   Affiliated_base_number <chr>, Borough <chr>, Zone <chr>, service_zone <chr>

What is the name of the LEAST frequent pickup location Zone? To determine this, we’ll group by Zone and count the rows:

zone_counts <- fhv_with_zones |> 
  dplyr::group_by(Zone) |> 
  dplyr::summarise(count = n()) |> 
  dplyr::arrange(count) |> 
  head(1) |> 
  collect()

From this we can see that the answer is Jamaica Bay.

Finish

Let’s make sure we close the connection to Spark now that we’re done:

spark_disconnect(sc)