Permalänk
Avstängd

Spark och RDDs

Hej.

Håller på att skriva ett program i Spark men får inte riktigt till lastbalanceringen. Att köra på single nod fungerar bra men när jag skall köra i klustermode fungerar det inte. Det blir localmode hela tiden.

Kommandot jag startar med:

~/spark-2.0.0-bin-hadoop2.7/bin/spark-submit --master spark://master:6066 --deploy-mode cluster --supervise --driver-cores 4 --num-executors 6 --executor-cores 6 --driver-memory 490m --executor-memory 480m --class somepackage.Driver FirstSparkExample-1.0-SNAPSHOT.jar hdfs://master:8020/bikeshuge.txt hdfs://master:8020/fromspark158 60000

Workernoderna visas korrekt på adminverktyget men av någon orsak så är det endast 1st worker som får allt jobb.

Det submit som görs till REST ser också bra ut, vad jag kan se

Running Spark using the REST application submission protocol.
16/08/11 15:39:21 INFO rest.RestSubmissionClient: Submitting a request to launch an application in spark://master:6066.
16/08/11 15:39:24 INFO rest.RestSubmissionClient: Submission successfully created as driver-20160811153923-0003. Polling submission state...
16/08/11 15:39:24 INFO rest.RestSubmissionClient: Submitting a request for the status of submission driver-20160811153923-0003 in spark://master:6066.
16/08/11 15:39:24 INFO rest.RestSubmissionClient: State of driver driver-20160811153923-0003 is now RUNNING.
16/08/11 15:39:24 INFO rest.RestSubmissionClient: Driver is running on worker worker-20160811152006-192.168.0.3-41943 at 192.168.0.3:41943.
16/08/11 15:39:24 INFO rest.RestSubmissionClient: Server responded with CreateSubmissionResponse:
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20160811153923-0003",
"serverSparkVersion" : "2.0.0",
"submissionId" : "driver-20160811153923-0003",
"success" : true
}

Loggfilen:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/08/11 15:24:24 INFO SecurityManager: Changing view acls to: hduser
16/08/11 15:24:24 INFO SecurityManager: Changing modify acls to: hduser
16/08/11 15:24:24 INFO SecurityManager: Changing view acls groups to:
16/08/11 15:24:24 INFO SecurityManager: Changing modify acls groups to:
16/08/11 15:24:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser); groups with view permissions: Set(); users with modify permissions: Set(hduser); groups with modify permissions: Set()
16/08/11 15:24:26 INFO Utils: Successfully started service 'Driver' on port 45714.
16/08/11 15:24:26 INFO WorkerWatcher: Connecting to worker spark://Worker@192.168.0.3:41943
16/08/11 15:24:26 INFO TransportClientFactory: Successfully created connection to /192.168.0.3:41943 after 256 ms (0 ms spent in bootstraps)
16/08/11 15:24:26 INFO WorkerWatcher: Successfully connected to spark://Worker@192.168.0.3:41943
16/08/11 15:24:26 INFO SparkContext: Running Spark version 2.0.0
16/08/11 15:24:27 INFO SecurityManager: Changing view acls to: hduser
16/08/11 15:24:27 INFO SecurityManager: Changing modify acls to: hduser
16/08/11 15:24:27 INFO SecurityManager: Changing view acls groups to:
16/08/11 15:24:27 INFO SecurityManager: Changing modify acls groups to:
16/08/11 15:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser); groups with view permissions: Set(); users with modify permissions: Set(hduser); groups with modify permissions: Set()
16/08/11 15:24:27 INFO Utils: Successfully started service 'sparkDriver' on port 43118.
16/08/11 15:24:27 INFO SparkEnv: Registering MapOutputTracker
16/08/11 15:24:27 INFO SparkEnv: Registering BlockManagerMaster
16/08/11 15:24:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-fc28a12f-9b56-45c3-91de-c78e4944dcce
16/08/11 15:24:27 INFO MemoryStore: MemoryStore started with capacity 104.3 MB
16/08/11 15:24:27 INFO SparkEnv: Registering OutputCommitCoordinator
16/08/11 15:24:28 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/08/11 15:24:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.3:4040
16/08/11 15:24:28 INFO SparkContext: Added JAR file:/opt/hadoop-2.7.1/share/hadoop/mapreduce/sources/FirstSparkExample-1.0-SNAPSHOT.jar at spark://192.168.0.3:43118/jars/FirstSparkExample-1.0-SNAPSHOT... with timestamp 1470929068860
16/08/11 15:24:29 INFO Executor: Starting executor ID driver on host localhost
16/08/11 15:24:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41368.
16/08/11 15:24:29 INFO NettyBlockTransferService: Server created on 192.168.0.3:41368
16/08/11 15:24:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.3, 41368)
16/08/11 15:24:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.3:41368 with 104.3 MB RAM, BlockManagerId(driver, 192.168.0.3, 41368)
16/08/11 15:24:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.3, 41368)
16/08/11 15:24:30 INFO EventLoggingListener: Logging events to file:/tmp/spark-events/local-1470929069046
16/08/11 15:24:31 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
16/08/11 15:24:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.3 KB, free 104.0 MB)
16/08/11 15:24:33 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.6 KB, free 104.0 MB)
16/08/11 15:24:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.3:41368 (size: 22.6 KB, free: 104.2 MB)
16/08/11 15:24:33 INFO SparkContext: Created broadcast 0 from textFile at Driver.java:50
16/08/11 15:24:35 INFO FileInputFormat: Total input paths to process : 1
16/08/11 15:24:36 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/08/11 15:24:36 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/08/11 15:24:36 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/08/11 15:24:36 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/08/11 15:24:36 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/08/11 15:24:36 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/08/11 15:24:36 INFO SparkContext: Starting job: saveAsTextFile at Driver.java:100
16/08/11 15:24:37 INFO DAGScheduler: Registering RDD 4 (mapToPair at Driver.java:77)
16/08/11 15:24:37 INFO DAGScheduler: Got job 0 (saveAsTextFile at Driver.java:100) with 176 output partitions
16/08/11 15:24:37 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at Driver.java:100)
16/08/11 15:24:37 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/08/11 15:24:37 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/08/11 15:24:37 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[4] at mapToPair at Driver.java:77), which has no missing parents
16/08/11 15:24:37 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.4 KB, free 104.0 MB)
16/08/11 15:24:37 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.1 KB, free 104.0 MB)
16/08/11 15:24:37 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.0.3:41368 (size: 3.1 KB, free: 104.2 MB)
16/08/11 15:24:37 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
16/08/11 15:24:37 INFO DAGScheduler: Submitting 176 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[4] at mapToPair at Driver.java:77)
16/08/11 15:24:37 INFO TaskSchedulerImpl: Adding task set 0.0 with 176 tasks
16/08/11 15:24:37 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, ANY, 5441 bytes)
16/08/11 15:24:37 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/08/11 15:24:37 INFO Executor: Fetching spark://192.168.0.3:43118/jars/FirstSparkExample-1.0-SNAPSHOT... with timestamp 1470929068860
16/08/11 15:24:37 INFO TransportClientFactory: Successfully created connection to /192.168.0.3:43118 after 4 ms (0 ms spent in bootstraps)
16/08/11 15:24:37 INFO Utils: Fetching spark://192.168.0.3:43118/jars/FirstSparkExample-1.0-SNAPSHOT... to /tmp/spark-7300266a-43c3-4b2f-a9d3-d7c249c2a292/userFiles-1016dd1c-6803-4256-bf37-33ab4c4558ef/fetchFileTemp3498829233791480649.tmp
16/08/11 15:24:38 INFO Executor: Adding file:/tmp/spark-7300266a-43c3-4b2f-a9d3-d7c249c2a292/userFiles-1016dd1c-6803-4256-bf37-33ab4c4558ef/FirstSparkExample-1.0-SNAPSHOT.jar to class loader
16/08/11 15:24:38 INFO HadoopRDD: Input split: hdfs://master:8020/bikeshuge.txt:0+5242880
16/08/11 15:24:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1751 bytes result sent to driver
16/08/11 15:24:41 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, ANY, 5441 bytes)
16/08/11 15:24:41 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/08/11 15:24:41 INFO HadoopRDD: Input split: hdfs://master:8020/bikeshuge.txt:5242880+5242880
16/08/11 15:24:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4181 ms on localhost (1/176)
16/08/11 15:24:44 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1751 bytes result sent to driver
16/08/11 15:24:44 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2, ANY, 5441 bytes)
16/08/11 15:24:44 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/08/11 15:24:44 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2556 ms on localhost (2/176)
16/08/11 15:24:44 INFO HadoopRDD: Input split: hdfs://master:8020/bikeshuge.txt:10485760+5242880
16/08/11 15:24:46 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1751 bytes result sent to driver
16/08/11 15:24:46 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, partition 3, ANY, 5441 bytes)
16/08/11 15:24:46 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
16/08/11 15:24:46 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 2461 ms on localhost (3/176)
16/08/11 15:24:46 INFO HadoopRDD: Input split: hdfs://master:8020/bikeshuge.txt:15728640+5242880
16/08/11 15:24:49 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 1751 bytes result sent to driver
16/08/11 15:24:49 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, partition 4, ANY, 5441 bytes)
16/08/11 15:24:49 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)

och sedan fortsätter det i samma stil med ytterligare några tusen rader.

Kanske värt att istället köra det i Yarn?

Någon som har någon idé?

Permalänk
Avstängd

Hittade problemet om någon gör något liknande. Den SparkContext jag skapade var för lokalmode och inte clustermode. Det var fel i koden helt enkelt.

Nu fungerar det som det ska

Inga fartrekord direkt, tog ca 5min att arbeta genom en fil på 2600MB med 3st noder och ca 8 min för 1st nod.