In this document, we will discuss about the steps to be followed along with the necessary libraries and code chunks needed to establish the connectivity from R to S3 using aws roles and between R and Redshift using jdbc drivers.
This steps will even work when the S3 and RStudio are in two different AWS account. We need to have a Cross account role to access the S3 buckets from RStudio.
Install the following packages and load them to start with.
install.packages("RJDBC") install.packages('devtools')
devtools::install_github("RcppCore/Rcpp")
devtools::install_github("rstats-db/DBI")
install.packages("aws.s3")
devtools::install_github("sicarul/redshiftTools")
install.packages("RPostgres")
install.packages("aws.signature")
install.packages("RAthena")
library(RJDBC) library(aws.s3)
library(redshiftTools)
library(RPostgres)
library(aws.signature)
library(RAthena)
Registered S3 method overwritten by 'data.table': method from print.data.table
Attaching package: ‘RAthena’
The following object is masked from ‘package:RJDBC’: dbGetTables
Might get some warnings. You can ignore them.
Let’s start with reading the data from S3 and writing the data back as csv with some changes. Let’s try writing a new file and appending the same file with new updates.
To start with, let’s get the credentials to access the S3 bucket “my-test-bkt” using the role “ReadWrite-myrole” created for R Server.
s3_role <- assume_role( profile_name = NULL,
region_name = "us-east-1",
role_arn = "arn:aws:iam::<AWSAccountID>:role/ReadWrite-myrole",
role_session_name = sprintf("RAthena-session-%s", as.integer(Sys.time())),
duration_seconds = 3600L,
set_env = FALSE )
Let’s set the environment variables for this R session to establish the connection with S3 using the variable “s3_role” variable created above.
Sys.setenv("AWS_ACCESS_KEY_ID" = s3_role$AccessKeyId,
"AWS_SECRET_ACCESS_KEY" = s3_role$SecretAccessKey,
"AWS_DEFAULT_REGION" = "us-east-1",
"AWS_SESSION_TOKEN" = s3_role$SessionToken )
Using the environment variables defined above, let’s read the file “Sales_Syed.csv” available inside the S3 bucket “my-test-bkt”.
obj <- get_object("s3://my-test-bkt/Sales_Syed.csv")
csvcharobj <- rawToChar(obj)
con <- textConnection(csvcharobj)
temp_sales <- read.csv(file = con)
nrow(temp_sales)
[1] 10
head(temp_sales)
orderid
<int>
|
ordernumber
<int>
|
quantityordered
<int>
|
priceeach
<dbl>
|
orderlinenumber
<int>
|
sales
<int>
|
orderdate
<fctr>
|
status
<fctr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 11 | 10001 | 12 | 65.7 | 101 | 100 | 2/4/2021 | Shipped | |
2 | 12 | 10002 | 15 | 69.0 | 102 | 100 | 2/4/2021 | Shipped | |
3 | 13 | 10030 | 19 | 6.7 | 103 | 100 | 2/4/2021 | Shipped | |
4 | 14 | 10050 | 34 | 5.7 | 104 | 100 | 2/4/2021 | Shipped | |
5 | 15 | 10065 | 56 | 34.7 | 105 | 100 | 2/4/2021 | Shipped | |
6 | 16 | 10068 | 87 | 65.0 | 106 | 100 | 2/4/2021 | Shipped |
Writing the data above as a new file in the same S3 bucket.
s3write_using(temp_sales, FUN = write.csv, row.names = FALSE,
bucket = "my-test-bkt",
object = "data_write_new_append_Role_R.csv")
# Reading back the file to verify the content & row count are same as above
obj <- get_object("s3://my-test-bkt/data_write_new_append_Role_R.csv")
csvcharobj <- rawToChar(obj)
con <- textConnection(csvcharobj)
new_data <- read.csv(file = con)
nrow(new_data)
[1] 10
head(new_data)
orderid
<int>
|
ordernumber
<int>
|
quantityordered
<int>
|
priceeach
<dbl>
|
orderlinenumber
<int>
|
sales
<int>
|
orderdate
<fctr>
|
status
<fctr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 11 | 10001 | 12 | 65.7 | 101 | 100 | 2/4/2021 | Shipped | |
2 | 12 | 10002 | 15 | 69.0 | 102 | 100 | 2/4/2021 | Shipped | |
3 | 13 | 10030 | 19 | 6.7 | 103 | 100 | 2/4/2021 | Shipped | |
4 | 14 | 10050 | 34 | 5.7 | 104 | 100 | 2/4/2021 | Shipped | |
5 | 15 | 10065 | 56 | 34.7 | 105 | 100 | 2/4/2021 | Shipped | |
6 | 16 | 10068 | 87 | 65.0 | 106 | 100 | 2/4/2021 | Shipped |
In order to append the above file with new set of data (by default the files will be replaced/rewritten), we need to read the entire content into a variable in R and append the dataframe in R session itself and then write back to S3 with the same file name. Actually, it is a workaround to simulate the append scenario on S3 files.
append_dat = new_data
append_data$orderid = new_data$orderid + 100
full_data = rbind(new_data, append_data)
nrow(full_data)
[1] 20
s3write_using(full_data, FUN = write.csv, row.names = FALSE,
bucket = "my-test-bkt",
object = "data_write_new_append_Role_R.csv")
obj <- get_object("s3://my-test-bkt/data_write_new_append_Role_R.csv")
csvcharobj <- rawToChar(obj)
con <- textConnection(csvcharobj)
new_data <- read.csv(file = con)
nrow(new_data) #It is having 20 records now. 10 old records and 10 new records
[1] 20
Rstudio to Redshift:
Let’s move on to establishing connection from R to Amazon Redshift. Recommended connection to Amazon Redshift database is through jdbc. In RStudio we have RJDBC package that can help us establishing secure connection with Amazon Redshift.
If the Redshift jdbc driver is not installed in your machine already, please download it by executing the code below.
download Amazon Redshift JDBC driver #download.file('http://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC41-1.1.9.1009.jar','RedshiftJDBC41-1.1.9.1009.jar')
Once the driver is downloaded, let’s create the jdbc connection channel.
connect to Amazon Redshift
driver <- JDBC("com.amazon.redshift.jdbc41.Driver", "RedshiftJDBC41-1.1.9.1009.jar", identifier.quote="`")
url <- "<JDBCURL>:<PORT>/<DBNAME>?user=<USER>&password=<PW>
conn <- dbConnect(driver, url)
Once the connection is established, read one of the tables available in “SYED_TEST” schema.
temp_sales_rs = dbGetQuery(conn, "SELECT * FROM SYED_TEST.TEMP_SALES")
nrow(temp_sales_rs)
[1] 135
head(temp_sales_rs)
orderid
<dbl>
|
ordernumber
<dbl>
|
quantityordered
<dbl>
|
priceeach
<dbl>
|
orderlinenumber
<dbl>
|
sales
<dbl>
|
orderdate
<chr>
|
status
<chr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 261 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
2 | 341 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
3 | 3001 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
4 | 251 | 10039 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
5 | 331 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
6 | 2901 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped |
Let’s try to write back to the Redshift table record by record using jdbc connection.
dbSendUpdate(conn, "INSERT INTO TAB_TEST.TEMP_SALES values
(2500, 10039, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2600, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2700, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2800, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2900, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3000, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3100, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3200, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3300, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3400, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75);")
temp_sales_rs = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES")
nrow(temp_sales_rs)
[1] 145
dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES WHERE ORDERID = 2500")
orderid
<dbl>
|
ordernumber
<dbl>
|
quantityordered
<dbl>
|
priceeach
<dbl>
|
orderlinenumber
<dbl>
|
sales
<dbl>
|
orderdate
<chr>
|
status
<chr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 2500 | 10039 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped |
As record by record insert is always not possible, we need to find a way to bulk load the data to Redshift tables. Unfortunately, jdbc does not support bulk load and bulk load using odbc connection is too slow and not recommended. The recommended approach for bulk loading data to Amazon Redshift from R is as follows.
All the above steps can be achieved through “redshiftTools” library along with “RPostgres”.
my_data = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES");
nrow(my_data)
[1] 145
head(my_data)
orderid
<dbl>
|
ordernumber
<dbl>
|
quantityordered
<dbl>
|
priceeach
<dbl>
|
orderlinenumber
<dbl>
|
sales
<dbl>
|
orderdate
<chr>
|
status
<chr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 1102 | 10001 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
2 | 301 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
3 | 2601 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
4 | 3401 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
5 | 102 | 10001 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
6 | 291 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped |
my_data$orderid = my_data$orderid+1;
rs_replace_table(my_data, dbcon=conn, table_name = "SYED_TEST.TEMP_SALES",
bucket="my-test-bkt")
Initiating Redshift table replacement for table TAB_TEST.TEMP_SALES
The provided data.frame has 145 rows and 10 columns
Getting number of slices from Redshift 2 slices detected,
will split into 1 files Uploading 1 files with prefix mmzekbwwoxpyrapnowtfocuxccpcllkcgevffaqnfqykxitgcy to bucket my-test-bkt
Upload to S3 complete!
Deleting target table for replacement
Insert new rows
Drop staging table
Committing changes
Deleting temporary files from S3 bucket
Deleting 1 files with prefix mmzekbwwoxpyrapnowtfocuxccpcllkcgevffaqnfqykxitgcy from bucket my-test-bkt
[1] TRUE
my_data = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES"); nrow(my_data)
[1] 145
head(my_data)
orderid
<dbl>
|
ordernumber
<dbl>
|
quantityordered
<dbl>
|
priceeach
<dbl>
|
orderlinenumber
<dbl>
|
sales
<dbl>
|
orderdate
<chr>
|
status
<chr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 3402 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
2 | 3302 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
3 | 2702 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
4 | 1114 | 10021 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
5 | 127 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
6 | 1135 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped |
There could be situations when we need to just insert only the new records. The existing records (identified using the keys) have to be discarded. In that case, use the following code.
my_data = dbGetQuery(conn, "SELECT * FROM SYED_TEST.TEMP_SALES"); nrow(my_data)
[1] 145
head(my_data)
orderid
<dbl>
|
ordernumber
<dbl>
|
quantityordered
<dbl>
|
priceeach
<dbl>
|
orderlinenumber
<dbl>
|
sales
<dbl>
|
orderdate
<chr>
|
status
<chr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 3402 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
2 | 3302 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
3 | 2702 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
4 | 1114 | 10021 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
5 | 127 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
6 | 1135 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped |
my_data$orderid = my_data$orderid+1;
rs_upsert_table(my_data, dbcon=conn, table_name = 'SYED_TEST.TEMP_SALES',
bucket="my-test-bkt", keys=c('orderid'))
Initiating Redshift table upsert for table SYED_TEST.TEMP_SALES
The provided data.frame has 145 rows and 10 columns
Getting number of slices from Redshift 2 slices detected,
will split into 1 files Uploading 1 files with prefix nblfsjauxvjdzqlywxtuahafexkjwfmrhuaflomxzmsxoupvht to bucket my-test-bkt
Upload to S3 complete!
Insert new rows
Drop staging table
Commiting
Deleting temporary files from S3 bucket
Deleting 1 files with prefix nblfsjauxvjdzqlywxtuahafexkjwfmrhuaflomxzmsxoupvht from bucket my-test-bkt
[1] TRUE
my_data = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES");
nrow(my_data)
[1] 168
head(my_data)
orderid
<dbl>
|
ordernumber
<dbl>
|
quantityordered
<dbl>
|
priceeach
<dbl>
|
orderlinenumber
<dbl>
|
sales
<dbl>
|
orderdate
<chr>
|
status
<chr>
|
||
---|---|---|---|---|---|---|---|---|---|
1 | 302 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
2 | 292 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
3 | 2701 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
4 | 262 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
5 | 332 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped | |
6 | 2801 | 10059 | 12 | 65.7 | 101 | 100 | 2021-02-04 | Shipped |
That’s it!!! This concludes the RStudio connectivity with AWS S3 and Amazon Redshift along with the necessary read write operations.
Posted 1 March 2021
© 2021 TechTarget, Inc.
Powered by
Badges | Report an Issue | Privacy Policy | Terms of Service
Most Popular Content on DSC
To not miss this type of content in the future, subscribe to our newsletter.
Other popular resources
Archives: 2008-2014 | 2015-2016 | 2017-2019 | Book 1 | Book 2 | More
Most popular articles
You need to be a member of Data Science Central to add comments!
Join Data Science Central