apache-spark


UTF-8 encoding error while connecting Flume twitter stream to spark in python


I am having a trouble while passing the Twitter data collected by the Flume agent to Spark Stream. I can download the twits independently while only using the Flume. But I am getting following error. I feel that it is the issue about the default UTF-8 encoding in the FlumeUtils.createStream(). How can I change it? And to what should I change?
Error on pyspark terminal:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 107, in func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 36, in utf8_decoder
return s.decode('utf-8')
File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0xe4 in position 17: invalid continuation byte
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/01/01 15:36:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
PySpark code:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
ss = SparkSession.builder \
.master("local[2]") \
.appName("Stream_Analysis")\
.config("spark.sql.crossJoin.enabled", "true") \
.getOrCreate()
sc = ss.sparkContext
strm = StreamingContext(sc, 5)
flume = FlumeUtils.createStream(strm,"localhost", 9999)
flume.pprint()
strm.start()
strm.awaitTermination()
Cmd to start pyspark
spark-submit --jars ~/project/spark-streaming-flume-assembly_2.11-2.0.2.jar ~/project/news_stream_flume/news_stream_analysis.py localhost 9999
Flume Conf:
# Name the components on this agent
FlumeAgent.sources = Twitter
FlumeAgent.sinks = spark
FlumeAgent.channels = MemChannel
# Twitter source
FlumeAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
FlumeAgent.sources.Twitter.consumerKey = x
FlumeAgent.sources.Twitter.consumerSecret = y
FlumeAgent.sources.Twitter.accessToken = z
FlumeAgent.sources.Twitter.accessTokenSecret = xx
FlumeAgent.sources.Twitter.keywords = flume, spark
FlumeAgent.sinks.spark.type = avro
FlumeAgent.sinks.spark.channel = memoryChannel
FlumeAgent.sinks.spark.hostname = localhost
FlumeAgent.sinks.spark.port = 9999
FlumeAgent.sinks.spark.batch-size = 1
# Use a channel which buffers events in memory
FlumeAgent.channels.MemChannel.type = memory
FlumeAgent.channels.MemChannel.capacity = 10000
FlumeAgent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
FlumeAgent.sources.Twitter.channels = MemChannel
FlumeAgent.sinks.spark.channel = MemChannel
Cmd to run flume agent:
flume-ng agent --name FlumeAgent --conf-file /home/hduser/project/flume_config_2src_spark_avro -f /usr/lib/flume-ng/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console
FlumeUtils.createStream takes a bodyDecoder argument which is a function used for string decoding. Default implementation just checks for None and decodes to UTF-8:
def utf8_decoder(s):
""" Decode the unicode as UTF-8 """
if s is None:
return None
return s.decode('utf-8')
In Python 2.x you should be able to replace it with your own which uses desired encoding, or even skip decoding completely with identity (lambda x: x).
Python 3.x may require some additional steps (JVM-side mapping with _.getBytes) to get around String -> unicode mapping in Pyrolite.

Related Links

spark-submit: workers do not get assigned to the master
Fuzzy text matching in Spark
Spark: Match columns from two dataframes
Spark Jobs crashing with ExitCodeException exitCode=15
Spark-Cassandra: how to efficiently restrict partitions
Spark job on hbase data
SparkSQL restrict queries by Cassandra partition key ranges
Merging equi-partitioned data frames in Spark
Writing custom UDF in spark on a List to get Index
Getting the cluster hierarchy using BisectingKMeans clustering
Intellij connect hortonwork spark remotely failed
Spark - How can get the Logical / Physical Query execution using - Thirft - Hive Interactor
Spark 1.6 Pearson correlation
How to read .csv file using spark-shell
NODE_LOCAL vs RACK_LOCAL task read time
No output after using the Spark Streaming

Categories

HOME
matlab
magento
xamarin.forms-listview
gluu
webdriver-io
jfreechart
tin-can-api
jena
h2o
pie-chart
vulkan
calabash-android
message
smartsheet-c#-sdk-v2
qualtrics
apache-kafka-streams
zap
jmeter-plugins
spring-restdocs
aikau
missingmethodexception
mouseevent
jcrop
yii1.x
arduino-esp8266
sql-server-2014-express
kubernetes-go-client
apache-commons-httpclient
evercookie
photo-gallery
groove
codeskulptor
edit-distance
titanium-alloy
folly
runge-kutta
cookiestore
amazon-elasticsearch
worldpay
rating
freerdp
keyboard-maestro
smartbanner
android-gps
soql
tpl-dataflow
html-lists
typesetting
netbeans-7
iseries-navigator
informatica-cloud
rsh
insert-update
cufft
vlc-android
nuitka
facebook-comments
illegalstateexception
sqlj
genetic
ane
ssha
android-listfragment
netbsd
viennacl
cbc-mode
gem
abort
alt
gitattributes
iife
epplus-4
imake
bit.ly
prettyfaces
asf
zeroclipboard
aps
image-scaling
sirtrevor
angularjs-google-maps
dfsort
pdf-scraping
appstore-sandbox
angularjs-module
locomotivejs
51degrees
documentviewer
hpple
mknetworkkit
openid4java
eye-detection
coolstorage
myisam
cassandra-0.7
sharepoint-feature
docking

Resources

Mobile Apps Dev
Database Users
javascript
java
csharp
php
android
MS Developer
developer works
python
ios
c
html
jquery
RDBMS discuss
Cloud Virtualization
Database Dev&Adm
javascript
java
csharp
php
python
android
jquery
ruby
ios
html
Mobile App
Mobile App
Mobile App