install.packages(c("sparklyr", "dplyr", "readr", "fs", "lubridate"))
::spark_install() sparklyr
NY Taxi Data in Spark
Introduction
This notebook demonstrates how to work with data in Spark via the sparklyr package. The purpose of this exercise is to complete the Module 5 homework for the Data Engineering Zoomcamp.
Getting Started
First, we install the sparklyr package and load it in R. Let’s also make sure we have the dplyr, readr, fs, and lubridate packages too, which will help us:
Now, let’s set an environment variable to point to where we’ve installed Java. Then we can connect to Spark and we’re ready to answer the homework questions:
Sys.setenv(JAVA_HOME = "/usr/local/Cellar/openjdk@17/17.0.9")
library(sparklyr)
<- spark_connect(
sc master = "local",
spark_home = "/usr/local/Cellar/apache-spark/3.5.0/libexec"
)
Question 1
The first question just requires that we return the version of Spark that we’re using:
::spark_version(sc) sparklyr
[1] '3.5.0'
Question 2
Let’s read the October 2019 FHV data into a Spark Dataframe:
<- spark_read_csv(
tbl_fhv
sc,name = "spark_fhv2019",
path = fs::dir_ls(here::here("data"), glob = "*.csv.gz"),
overwrite = TRUE
)
Next, we’ll repartition the Dataframe to 6 partitions and save it to parquet:
<- sdf_repartition(tbl_fhv, 6)
fhv_partitioned
<- here::here("data/fhv/2019/10/")
parquet_dir
::spark_write_parquet(fhv_partitioned, parquet_dir, mode = "overwrite") sparklyr
The answer for Question 2 requires us to get the average file size of the Parquet Files. Here’s how we do that:
<- file.info( fs::dir_ls(parquet_dir, glob = "*.snappy.parquet"))
file_info
mean(file_info$size)
[1] 6660810
Question 3
For this question we need to filter the data to return only those trips that started on 15 October.
First, let’s use Spark to read the Parquet files we previously saved:
<- spark_read_parquet(
tbl_fhv_partitioned
sc,name = "spark_fhv2019_partitioned",
path = parquet_dir)
Now we apply the filter using some SQL and count the number of rows returned:
library(DBI)
dbGetQuery(
sc,"SELECT * FROM spark_fhv2019_partitioned WHERE date(from_utc_timestamp(to_timestamp(pickup_datetime), 'America/New_York')) = '2019-10-15'"
|>
) nrow()
[1] 62944
Question 4
What is the length of the longest trip in the dataset in hours?
To determine this, we first need to calculate the number of hours for each trip. Unfortunately we can’t use lubridate functions when we’re querying Spark, but by calling unix_timestamp() on the pickup and dropoff times and then dividing by 3,600 we can calculate the difference ourselves:
<- tbl_fhv_partitioned |>
fhv_hours_diff mutate(
hours_difference = (
unix_timestamp(dropOff_datetime) - unix_timestamp(pickup_datetime)
/ 3600
) )
To get the answer, we’ll sort our table by this newly-created column in descending order and return the first value:
<- fhv_hours_diff |>
fhv_hours_diff_arranged ::arrange(desc(hours_difference)) |>
dplyrhead(1) |>
collect()
$hours_difference fhv_hours_diff_arranged
[1] 631152.5
Question 6
Now let’s load the zone lookup data into a temp view in Spark:
<- spark_read_csv(
zones
sc,name = "spark_fhv_zones",
path = here::here("data/fhv_zones.csv"),
overwrite = TRUE
)
Next, let’s join the spark_fhv_zones table with the spark_fhv2019_partitioned table:
<- left_join(
fhv_with_zones
tbl_fhv_partitioned,
zones,by = c("PUlocationID" = "LocationID"))
fhv_with_zones
# Source: spark<?> [?? x 10]
dispatching_base_num pickup_datetime dropOff_datetime PUlocationID
<chr> <dttm> <dttm> <int>
1 B02285 2019-10-17 14:00:00 2019-10-17 14:49:00 264
2 B01437 2019-10-07 19:07:08 2019-10-07 19:11:53 264
3 B02855 2019-10-24 14:10:02 2019-10-24 14:15:15 264
4 B02677 2019-10-16 09:35:46 2019-10-16 09:42:37 264
5 B01129 2019-10-14 13:08:00 2019-10-14 13:35:00 264
6 B00381 2019-10-25 21:06:28 2019-10-25 21:51:41 264
7 B02182 2019-10-20 12:29:05 2019-10-20 12:51:49 264
8 B01626 2019-10-06 20:33:24 2019-10-06 20:39:04 264
9 B00477 2019-10-11 19:18:14 2019-10-11 19:22:54 229
10 B01239 2019-10-02 13:29:43 2019-10-02 13:57:00 264
# ℹ more rows
# ℹ 6 more variables: DOlocationID <int>, SR_Flag <chr>,
# Affiliated_base_number <chr>, Borough <chr>, Zone <chr>, service_zone <chr>
What is the name of the LEAST frequent pickup location Zone? To determine this, we’ll group by Zone and count the rows:
<- fhv_with_zones |>
zone_counts ::group_by(Zone) |>
dplyr::summarise(count = n()) |>
dplyr::arrange(count) |>
dplyrhead(1) |>
collect()
From this we can see that the answer is Jamaica Bay.
Finish
Let’s make sure we close the connection to Spark now that we’re done:
spark_disconnect(sc)