Subscribe to DSC Newsletter
.

In this document, we will discuss about the steps to be followed along with the necessary libraries and code chunks needed to establish the connectivity from R to S3 using aws roles and between R and Redshift using jdbc drivers.

This steps will even work when the S3 and RStudio are in two different AWS account. We need to have a Cross account role to access the S3 buckets from RStudio.

Install the following packages and load them to start with.

install.packages("RJDBC")  install.packages('devtools') 
devtools::install_github("RcppCore/Rcpp")
devtools::install_github("rstats-db/DBI")
install.packages("aws.s3")
devtools::install_github("sicarul/redshiftTools")
install.packages("RPostgres")
install.packages("aws.signature")
install.packages("RAthena")
library(RJDBC) library(aws.s3)
library(redshiftTools)
library(RPostgres)
library(aws.signature)
library(RAthena)

Registered S3 method overwritten by 'data.table': method from print.data.table
Attaching package: ‘RAthena’
The following object is masked from ‘package:RJDBC’: dbGetTables

Might get some warnings. You can ignore them.

Let’s start with reading the data from S3 and writing the data back as csv with some changes. Let’s try writing a new file and appending the same file with new updates.

To start with, let’s get the credentials to access the S3 bucket “my-test-bkt” using the role “ReadWrite-myrole” created for R Server.

s3_role <- assume_role(   profile_name = NULL,   
region_name = "us-east-1",
role_arn = "arn:aws:iam::<AWSAccountID>:role/ReadWrite-myrole",
role_session_name = sprintf("RAthena-session-%s", as.integer(Sys.time())),
duration_seconds = 3600L,
set_env = FALSE )

Let’s set the environment variables for this R session to establish the connection with S3 using the variable “s3_role” variable created above.

Sys.setenv("AWS_ACCESS_KEY_ID" = s3_role$AccessKeyId,            
"AWS_SECRET_ACCESS_KEY" = s3_role$SecretAccessKey,
"AWS_DEFAULT_REGION" = "us-east-1",
"AWS_SESSION_TOKEN" = s3_role$SessionToken )

Using the environment variables defined above, let’s read the file “Sales_Syed.csv” available inside the S3 bucket “my-test-bkt”.

obj <- get_object("s3://my-test-bkt/Sales_Syed.csv") 
csvcharobj <- rawToChar(obj)
con <- textConnection(csvcharobj)
temp_sales <- read.csv(file = con)
nrow(temp_sales)
[1] 10
head(temp_sales)
 
 
orderid
<int>
ordernumber
<int>
quantityordered
<int>
priceeach
<dbl>
orderlinenumber
<int>
sales
<int>
orderdate
<fctr>
status
<fctr>
1 11 10001 12 65.7 101 100 2/4/2021 Shipped
2 12 10002 15 69.0 102 100 2/4/2021 Shipped
3 13 10030 19 6.7 103 100 2/4/2021 Shipped
4 14 10050 34 5.7 104 100 2/4/2021 Shipped
5 15 10065 56 34.7 105 100 2/4/2021 Shipped
6 16 10068 87 65.0 106 100 2/4/2021 Shipped

Writing the data above as a new file in the same S3 bucket.

s3write_using(temp_sales, FUN = write.csv, row.names = FALSE,               
bucket = "my-test-bkt",
object = "data_write_new_append_Role_R.csv")

# Reading back the file to verify the content & row count are same as above
obj <- get_object("s3://my-test-bkt/data_write_new_append_Role_R.csv")
csvcharobj <- rawToChar(obj)
con <- textConnection(csvcharobj)
new_data <- read.csv(file = con)
nrow(new_data)
[1] 10
head(new_data)
 
 
orderid
<int>
ordernumber
<int>
quantityordered
<int>
priceeach
<dbl>
orderlinenumber
<int>
sales
<int>
orderdate
<fctr>
status
<fctr>
1 11 10001 12 65.7 101 100 2/4/2021 Shipped
2 12 10002 15 69.0 102 100 2/4/2021 Shipped
3 13 10030 19 6.7 103 100 2/4/2021 Shipped
4 14 10050 34 5.7 104 100 2/4/2021 Shipped
5 15 10065 56 34.7 105 100 2/4/2021 Shipped
6 16 10068 87 65.0 106 100 2/4/2021 Shipped

In order to append the above file with new set of data (by default the files will be replaced/rewritten), we need to read the entire content into a variable in R and append the dataframe in R session itself and then write back to S3 with the same file name. Actually, it is a workaround to simulate the append scenario on S3 files.

append_dat = new_data 
append_data$orderid = new_data$orderid + 100
full_data = rbind(new_data, append_data)
nrow(full_data)
[1] 20
s3write_using(full_data, FUN = write.csv, row.names = FALSE,               
bucket = "my-test-bkt",
object = "data_write_new_append_Role_R.csv")
obj <- get_object("s3://my-test-bkt/data_write_new_append_Role_R.csv")
csvcharobj <- rawToChar(obj)
con <- textConnection(csvcharobj)
new_data <- read.csv(file = con)
nrow(new_data) #It is having 20 records now. 10 old records and 10 new records
[1] 20

Rstudio to Redshift:

Let’s move on to establishing connection from R to Amazon Redshift. Recommended connection to Amazon Redshift database is through jdbc. In RStudio we have RJDBC package that can help us establishing secure connection with Amazon Redshift.

If the Redshift jdbc driver is not installed in your machine already, please download it by executing the code below.

download Amazon Redshift JDBC driver #download.file('http://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC41-1.1.9.1009.jar','RedshiftJDBC41-1.1.9.1009.jar')

Once the driver is downloaded, let’s create the jdbc connection channel.

connect to Amazon Redshift 
driver <- JDBC("com.amazon.redshift.jdbc41.Driver", "RedshiftJDBC41-1.1.9.1009.jar", identifier.quote="`")
url <- "<JDBCURL>:<PORT>/<DBNAME>?user=<USER>&password=<PW>
conn <- dbConnect(driver, url)

Once the connection is established, read one of the tables available in “SYED_TEST” schema.

temp_sales_rs = dbGetQuery(conn, "SELECT * FROM SYED_TEST.TEMP_SALES") 
nrow(temp_sales_rs)
[1] 135
head(temp_sales_rs)
 
 
orderid
<dbl>
ordernumber
<dbl>
quantityordered
<dbl>
priceeach
<dbl>
orderlinenumber
<dbl>
sales
<dbl>
orderdate
<chr>
status
<chr>
1 261 10059 12 65.7 101 100 2021-02-04 Shipped
2 341 10059 12 65.7 101 100 2021-02-04 Shipped
3 3001 10059 12 65.7 101 100 2021-02-04 Shipped
4 251 10039 12 65.7 101 100 2021-02-04 Shipped
5 331 10059 12 65.7 101 100 2021-02-04 Shipped
6 2901 10059 12 65.7 101 100 2021-02-04 Shipped

Let’s try to write back to the Redshift table record by record using jdbc connection.

dbSendUpdate(conn, "INSERT INTO TAB_TEST.TEMP_SALES values            
(2500, 10039, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2600, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2700, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2800, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(2900, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3000, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3100, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3200, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3300, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75),
(3400, 10059, 12, 65.70, 101, 100, '2021-02-04', 'Shipped', 'Pringles', 75);"
)

temp_sales_rs = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES")
nrow(temp_sales_rs)
[1] 145
dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES WHERE ORDERID = 2500")
 
 
orderid
<dbl>
ordernumber
<dbl>
quantityordered
<dbl>
priceeach
<dbl>
orderlinenumber
<dbl>
sales
<dbl>
orderdate
<chr>
status
<chr>
1 2500 10039 12 65.7 101 100 2021-02-04 Shipped

As record by record insert is always not possible, we need to find a way to bulk load the data to Redshift tables. Unfortunately, jdbc does not support bulk load and bulk load using odbc connection is too slow and not recommended. The recommended approach for bulk loading data to Amazon Redshift from R is as follows.

  1. Split the dataframe into multiple smaller chunks and write small temporary files to S3.
  2. Copy all the smaller files from s3 to Redshift using COPY command.
  3. Delete all the temporary files from S3.

All the above steps can be achieved through “redshiftTools” library along with “RPostgres”.

my_data = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES"); 
nrow(my_data)
[1] 145
head(my_data)
 
 
orderid
<dbl>
ordernumber
<dbl>
quantityordered
<dbl>
priceeach
<dbl>
orderlinenumber
<dbl>
sales
<dbl>
orderdate
<chr>
status
<chr>
1 1102 10001 12 65.7 101 100 2021-02-04 Shipped
2 301 10059 12 65.7 101 100 2021-02-04 Shipped
3 2601 10059 12 65.7 101 100 2021-02-04 Shipped
4 3401 10059 12 65.7 101 100 2021-02-04 Shipped
5 102 10001 12 65.7 101 100 2021-02-04 Shipped
6 291 10059 12 65.7 101 100 2021-02-04 Shipped
my_data$orderid = my_data$orderid+1;  
rs_replace_table(my_data, dbcon=conn, table_name = "SYED_TEST.TEMP_SALES",
bucket="my-test-bkt")
Initiating Redshift table replacement for table TAB_TEST.TEMP_SALES 
The provided data.frame has 145 rows and 10 columns
Getting number of slices from Redshift 2 slices detected,
will split into 1 files Uploading 1 files with prefix mmzekbwwoxpyrapnowtfocuxccpcllkcgevffaqnfqykxitgcy to bucket my-test-bkt
Upload to S3 complete!
Deleting target table for replacement
Insert new rows
Drop staging table
Committing changes
Deleting temporary files from S3 bucket
Deleting 1 files with prefix mmzekbwwoxpyrapnowtfocuxccpcllkcgevffaqnfqykxitgcy from bucket my-test-bkt
[1] TRUE
my_data = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES"); nrow(my_data)
[1] 145
head(my_data)
 
 
orderid
<dbl>
ordernumber
<dbl>
quantityordered
<dbl>
priceeach
<dbl>
orderlinenumber
<dbl>
sales
<dbl>
orderdate
<chr>
status
<chr>
1 3402 10059 12 65.7 101 100 2021-02-04 Shipped
2 3302 10059 12 65.7 101 100 2021-02-04 Shipped
3 2702 10059 12 65.7 101 100 2021-02-04 Shipped
4 1114 10021 12 65.7 101 100 2021-02-04 Shipped
5 127 10059 12 65.7 101 100 2021-02-04 Shipped
6 1135 10059 12 65.7 101 100 2021-02-04 Shipped

There could be situations when we need to just insert only the new records. The existing records (identified using the keys) have to be discarded. In that case, use the following code.

my_data = dbGetQuery(conn, "SELECT * FROM SYED_TEST.TEMP_SALES"); nrow(my_data)
[1] 145
head(my_data)
 
 
orderid
<dbl>
ordernumber
<dbl>
quantityordered
<dbl>
priceeach
<dbl>
orderlinenumber
<dbl>
sales
<dbl>
orderdate
<chr>
status
<chr>
1 3402 10059 12 65.7 101 100 2021-02-04 Shipped
2 3302 10059 12 65.7 101 100 2021-02-04 Shipped
3 2702 10059 12 65.7 101 100 2021-02-04 Shipped
4 1114 10021 12 65.7 101 100 2021-02-04 Shipped
5 127 10059 12 65.7 101 100 2021-02-04 Shipped
6 1135 10059 12 65.7 101 100 2021-02-04 Shipped
 my_data$orderid = my_data$orderid+1;  
rs_upsert_table(my_data, dbcon=conn, table_name = 'SYED_TEST.TEMP_SALES',
bucket="my-test-bkt", keys=c('orderid'))
Initiating Redshift table upsert for table SYED_TEST.TEMP_SALES 
The provided data.frame has 145 rows and 10 columns
Getting number of slices from Redshift 2 slices detected,
will split into 1 files Uploading 1 files with prefix nblfsjauxvjdzqlywxtuahafexkjwfmrhuaflomxzmsxoupvht to bucket my-test-bkt
Upload to S3 complete!
Insert new rows
Drop staging table
Commiting
Deleting temporary files from S3 bucket
Deleting 1 files with prefix nblfsjauxvjdzqlywxtuahafexkjwfmrhuaflomxzmsxoupvht from bucket my-test-bkt
[1] TRUE
my_data = dbGetQuery(conn, "SELECT * FROM TAB_TEST.TEMP_SALES"); 
nrow(my_data)
[1] 168
head(my_data)
 
 
orderid
<dbl>
ordernumber
<dbl>
quantityordered
<dbl>
priceeach
<dbl>
orderlinenumber
<dbl>
sales
<dbl>
orderdate
<chr>
status
<chr>
1 302 10059 12 65.7 101 100 2021-02-04 Shipped
2 292 10059 12 65.7 101 100 2021-02-04 Shipped
3 2701 10059 12 65.7 101 100 2021-02-04 Shipped
4 262 10059 12 65.7 101 100 2021-02-04 Shipped
5 332 10059 12 65.7 101 100 2021-02-04 Shipped
6 2801 10059 12 65.7 101 100 2021-02-04 Shipped

That’s it!!! This concludes the RStudio connectivity with AWS S3 and Amazon Redshift along with the necessary read write operations.

Views: 102

Tags: AWS Cross account Role access, RStudio connection with Redshift using jdbc, RStudio connection with S3 using AWS role, Redshift Bulk load from R, S3 &amp; Redshift connection from R, S3 access from R using Role, dsc_code

Comment

You need to be a member of Data Science Central to add comments!

Join Data Science Central

© 2021   TechTarget, Inc.   Powered by

Badges  |  Report an Issue  |  Privacy Policy  |  Terms of Service