apache-spark


Is there a better alternative for a right outer join in Spark DataFrame? And how to conditionally transform rows?


My apologies in advance for a long question. I'll try my best to explain with a minimal example. I have the following two Data Frames df1 and df2
df1:
+------+---------+-------+-------+-------+
| PID| CATEGORY| REGION| VOLUME| RESULT|
+------+-------- +-------+-------+-------+
|123456| CAT-1| NORTH| 200| 1.345|
|123456| CAT-1| SOUTH| 125| 2.890|
|123456| CAT-2| NORTH| 75| 6.755|
|123456| CAT-2| SOUTH| 100| 0.432|
|123456| CAT-3| NORTH| 50| 5.111|
+------+---------+-------+-------+-------+
df2
+---------+-------+-------+
| CATEGORY| REGION| avgCol|
+-------- +-------+-------+
| CAT-1| NORTH| 0.575|
| CAT-1| SOUTH| 15.879|
| CAT-2| NORTH| 5.255|
+---------+-------+-------+
I want to join these two DFs to get a 3rd DF that looks like
df3
+------+---------+-------+-------+-------+-------+
| PID| CATEGORY| REGION| avgCol| VOLUME| RESULT|
+------+-------- +-------+-------+-------+-------+
|123456| CAT-1| NORTH| 0.575| 200| 1.345|
|123456| CAT-1| SOUTH| 15.879| 125| 2.890|
|123456| CAT-2| NORTH| 5.255| 75| 6.755|
|123456| CAT-2| SOUTH| null| 100| 0.432|
|123456| CAT-3| NORTH| null| 50| 5.111|
+------+---------+-------+-------+-------+-------+
I used right outer join for this. The code looks like
DataFrame df3 = df1.join(df2, joinCols, "right_outer");
where joinCols is the collection of columns I want to join on.
Then, I want to transform df3 into a Data Frame in which the value of RESULT is newly computed only for rows where avgCol is null. The code that achieves this looks like
df3.withColumn("NEW_RESULT", when(col("avgCol").isNotNull(), getResult(avgCol).otherwise(col("RESULT"))
.drop("RESULT").withColumnRenamed("NEW_RESULT", "RESULT)
So df4 looks like
+------+---------+-------+-------+-------+-------+
| PID| CATEGORY| REGION| avgCol| VOLUME| RESULT|
+------+-------- +-------+-------+-------+-------+
|123456| CAT-1| NORTH| 0.575| 200| 8.543|
|123456| CAT-1| SOUTH| 15.879| 125| 4.321|
|123456| CAT-2| NORTH| 5.255| 75| 7.012|
|123456| CAT-2| SOUTH| null| 100| 0.432|
|123456| CAT-3| NORTH| null| 50| 5.111|
+------+---------+-------+-------+-------+-------+
NOTE: I'm dropping and renaming the column as RESULT because I want to use this table for my next iteration.
Now, this produces expected results, but is not performant. Especially, when I enclose this is a loop and the number join columns keeps increasing, it takes a long time for the join and the subsequent operation. Every iteration, it becomes worse.
My question is - Is there a more efficient way to do both/one of these operations in Spark, preferably using Data Frames?
I read a few posts suggesting to analyze the physical plan of the join operation to find ways to optimize, but I couldn't gather much from that. I'm unable to include it here because it is huge. It would be really helpful if anyone can point me to specific things to look at in the physical plan.
I'm running this on a single Spark node (standalone) - version 1.6.1. My code is written in Java. The machine on which I'm running this has abundant processing power and memory. So, I'm pretty sure that's not an issue. Any help with this is much appreciated.
Edit: I'm reading the base data off a CSV file using spark-csv. df1 and df2 are the result of some simple group by + aggregations which I believe is not the bottleneck.

Related Links

how to introduce the schema in a Row in Spark?
Apache Spark WARN MemoryStore: Not enough space
Why does Spark 1.5.2 throw “local class incompatible” in standalone mode?
Spark mllib shuffling the data
Replaying an RDD in spark streaming to update an accumulator
rdd action will be suspended in DStream foreachRDD function
Spark - write Avro file
Connecting Bluemix virtual sensors to an instance of Spark service
How to upgrade Spark to newer version?
1.5.1| Spark Streaming | NullPointerException with SQL createDataFrame
DataFrame partitionBy to a single Parquet file (per partition)
Apache Spark Multi Node Clustering - java.io.FileNotFoundException
How to deal with concatenated Avro files?
Why does Spark task take a long time to find block locally?
Apache Spark:executor driver lost
spark saveAsTextFile method is really strange in java api,it just not work right in my program

Categories

HOME
magento
google-search
configuration
geolocation
google-cloud-dataproc
cql3
twitter-digits
xquery
http2
python-xarray
hadoop2
doorkeeper
luci
scope
yii2-advanced-app
axios
facebook-opengraph
condor
pyfftw
message
apache-kafka-streams
git-bash
powershell-v2.0
code-analysis
android-button
angular-ngmodel
nest-thermostat
kube-dns
detach
haxm
rhel7
deep-copy
nav
extraction
xaf
increment
zipline
apartment-gem
amazon-elasticsearch
jpype
wp-api
pci
strip-tags
openwebanalytics
ssms-2012
smartbanner
laravel-routing
android-identifiers
cmsmadesimple
a-star
itertools
zenity
email-notifications
masspay
tfs2008
ultratree
xendesktop
heritrix
pygments
android-number-picker
activemodel
python-3.1
bash-completion
balana
js-of-ocaml
subroutine
radix-sort
uat
hypervisor
xsb
wso2as
zim-database
simpleadapter
android-tablelayout
misra
bit.ly
liferay-hook
mpj-express
laravel-validation
bunny
email-spam
nhunspell
nine-patch
emitmapper
flash-cs4
micro-orm
mms-gateway
asp.net-authentication
wpf-4.0
gigaspaces
android-actionbar-compat
ariatemplates
ed
objcopy
ruby-1.9.2
dired
textboxlist
blackberry-os-v5
jquery-pagination
having
jdic
gumstix
posterous
contracts
iphone-sdk-3.1
resharper-4.5
web-based
plumtree

Resources

Mobile Apps Dev
Database Users
javascript
java
csharp
php
android
MS Developer
developer works
python
ios
c
html
jquery
RDBMS discuss
Cloud Virtualization
Database Dev&Adm
javascript
java
csharp
php
python
android
jquery
ruby
ios
html
Mobile App
Mobile App
Mobile App