apache-spark


Is there a better alternative for a right outer join in Spark DataFrame? And how to conditionally transform rows?


My apologies in advance for a long question. I'll try my best to explain with a minimal example. I have the following two Data Frames df1 and df2
df1:
+------+---------+-------+-------+-------+
| PID| CATEGORY| REGION| VOLUME| RESULT|
+------+-------- +-------+-------+-------+
|123456| CAT-1| NORTH| 200| 1.345|
|123456| CAT-1| SOUTH| 125| 2.890|
|123456| CAT-2| NORTH| 75| 6.755|
|123456| CAT-2| SOUTH| 100| 0.432|
|123456| CAT-3| NORTH| 50| 5.111|
+------+---------+-------+-------+-------+
df2
+---------+-------+-------+
| CATEGORY| REGION| avgCol|
+-------- +-------+-------+
| CAT-1| NORTH| 0.575|
| CAT-1| SOUTH| 15.879|
| CAT-2| NORTH| 5.255|
+---------+-------+-------+
I want to join these two DFs to get a 3rd DF that looks like
df3
+------+---------+-------+-------+-------+-------+
| PID| CATEGORY| REGION| avgCol| VOLUME| RESULT|
+------+-------- +-------+-------+-------+-------+
|123456| CAT-1| NORTH| 0.575| 200| 1.345|
|123456| CAT-1| SOUTH| 15.879| 125| 2.890|
|123456| CAT-2| NORTH| 5.255| 75| 6.755|
|123456| CAT-2| SOUTH| null| 100| 0.432|
|123456| CAT-3| NORTH| null| 50| 5.111|
+------+---------+-------+-------+-------+-------+
I used right outer join for this. The code looks like
DataFrame df3 = df1.join(df2, joinCols, "right_outer");
where joinCols is the collection of columns I want to join on.
Then, I want to transform df3 into a Data Frame in which the value of RESULT is newly computed only for rows where avgCol is null. The code that achieves this looks like
df3.withColumn("NEW_RESULT", when(col("avgCol").isNotNull(), getResult(avgCol).otherwise(col("RESULT"))
.drop("RESULT").withColumnRenamed("NEW_RESULT", "RESULT)
So df4 looks like
+------+---------+-------+-------+-------+-------+
| PID| CATEGORY| REGION| avgCol| VOLUME| RESULT|
+------+-------- +-------+-------+-------+-------+
|123456| CAT-1| NORTH| 0.575| 200| 8.543|
|123456| CAT-1| SOUTH| 15.879| 125| 4.321|
|123456| CAT-2| NORTH| 5.255| 75| 7.012|
|123456| CAT-2| SOUTH| null| 100| 0.432|
|123456| CAT-3| NORTH| null| 50| 5.111|
+------+---------+-------+-------+-------+-------+
NOTE: I'm dropping and renaming the column as RESULT because I want to use this table for my next iteration.
Now, this produces expected results, but is not performant. Especially, when I enclose this is a loop and the number join columns keeps increasing, it takes a long time for the join and the subsequent operation. Every iteration, it becomes worse.
My question is - Is there a more efficient way to do both/one of these operations in Spark, preferably using Data Frames?
I read a few posts suggesting to analyze the physical plan of the join operation to find ways to optimize, but I couldn't gather much from that. I'm unable to include it here because it is huge. It would be really helpful if anyone can point me to specific things to look at in the physical plan.
I'm running this on a single Spark node (standalone) - version 1.6.1. My code is written in Java. The machine on which I'm running this has abundant processing power and memory. So, I'm pretty sure that's not an issue. Any help with this is much appreciated.
Edit: I'm reading the base data off a CSV file using spark-csv. df1 and df2 are the result of some simple group by + aggregations which I believe is not the bottleneck.

Related Links

Spark - Issue with PairFlatMapFunction function
Parallel tree processing - would Spark fit in?
How to find first non-null values in groups? (secondary sorting using dataset api)
Spark SQL: bad performance of “Insert into/overwrite spark unmanaged bucket table”
Online learning of LDA model in Spark
groupByKey in Spark dataset
Apache Spark Peromance S3 vs EC2 HDFS
Partitioning incompletely specified error in my spark application
Acessing nested columns in pyspark dataframe
While creating sequenceFile getting ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries
How do we optimize data transfer between cpu and gpu in Apache Spark? [duplicate]
Gradual Increase in old generation heap memory
Do DISK_ONLY blocks still disappear in Spark 2 if an executor dies?
Tree reduction aggregation in Spark Graphx?
Is there any limit on the value returned by `count()` in Apache Spark
How to process DynamoDB Stream in a Spark streaming application

Categories

HOME
matlab
common-lisp
google-search
fonts
datetime
garbage-collection
geolocation
saml
middleware
ionic3
http2
regression
asp.net-mvc-3
autofac
google-weather-api
command-line-arguments
vscode-settings
deserialization
prerender
aerospike
cayley
deadbolt
ibm-datapower
dsc
easendmail
burp
centos6
physics
sumifs
nested-loops
memorystream
valueinjecter
reformatting
python-behave
complex-networks
dashboard-designer
outlook-vba
vtable
mamp-pro
automata-theory
permgen
tabview
mergesort
log4perl
strip-tags
mrtg
soql
masm32
biological-neural-network
column-oriented
jce
catalina
android-touch-event
getelementbyid
waf
sizing
mailcore2
mink
sound-synthesis
building
ellipsis
jongo
accounts
schematiq
main-activity
parsoid
phone-number
telescope
spatial-index
wso2as
newsql
java-client
kue
imake
mui
iqueryable
twill
yourls
ituneslibrary
jqgrid-php
xidel
buildout
flash-cs4
gora
rte
nsimageview
xhtml-transitional
sparc
uptime
cloud-hosting
qbwc
calloc
onmouseout
cellid
proxy-classes
ondraw
payment-services
boost-gil
blackberry-jde
having
rails-3.1
seam-conversation
spquery
wmd

Resources

Mobile Apps Dev
Database Users
javascript
java
csharp
php
android
MS Developer
developer works
python
ios
c
html
jquery
RDBMS discuss
Cloud Virtualization
Database Dev&Adm
javascript
java
csharp
php
python
android
jquery
ruby
ios
html
Mobile App
Mobile App
Mobile App