導航:首頁 > 版本升級 > cdh5升級spark20

cdh5升級spark20

發布時間:2022-01-21 16:53:38

❶ 如何在CDH5上運行Spark應用

幾個基本概念: (1)job:包含多個task組成的並行計算,往往由action催生。 (2)stage:job的調度單位。 (3)task:被送到某個executor上的工作單元。 (4)taskSet:一組關聯的,相互之間沒有shuffle依賴關系的任務組成的任務集。 一個應用...

❷ 為什麼spark中saveastextfile保存的文件看不到,但是可以從相應目錄下讀取其中的數據/

重新說明來一下,之前我測試用源的是谷歌瀏覽器不是IE。在IE中file.FileName包含路徑名的,而谷歌不包含。所以我之前的回答中才會說文件名中不包含路徑名,我想你問題就是這個引起的。你可以在保存前加個判斷

❸ spark sql怎麼處理hive的null

前面已經有篇文章介紹如何編譯包含hive的spark-assembly.jar了,不清楚的可以翻看一下前面的文章。
cloudera manager裝好的spark,直接執行spark-shell進入命令行後,寫入如下語句:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

你會發現沒法執行通過,因為cm裝的原生的spark是不支持spark hql的,我們需要手動進行一些調整:
第一步,將編譯好的包含hive的JAR包上傳到hdfs上配置的默認的spark的sharelib目錄:/user/spark/share/lib


第二步:在你要運行spark-shell腳本的節點上的/opt/cloudera/parcels/CDH-
5.3.0-1.cdh5.3.0.p0.30/lib/spark/lib/目錄下面,下載這個jar到這個目錄:hadoop fs -get
hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar(具
體路徑替換成你自己的)。然後這個目錄下面原來會有個軟鏈接spark-assembly.jar指向的是spark-assembly-1.2.0-
cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,我們把這個軟鏈接刪除掉重新創建一個同名的軟鏈接:ln -s
spark-assembly-with-hive-maven.jar
spark-assembly.jar,指向我們剛下載下來的那個JAR包,這個JAR包會在啟動spark-shell腳本時裝載到driver
program的classpath中去的,sparkContext也是在driver中創建出來的,所以需要將我們編譯的JAR包替換掉原來的
spark-assembly.jar包,這樣在啟動spark-shell的時候,包含hive的spark-assembly就被裝載到
classpath中去了。

❹ 從cdh 3可以直接升級到cdh5嗎

不行,得先升級到4

下面是升級方法
Step 1: 做下saveNamespace操作,停掉集群,並備份下 HDFS 的 Metadata
1.1 讓namenode進入safe mode狀態
$ bin/hadoop dfsadmin -safemode enter
1.2 執行saveNamespace操作

$ bin/hadoop dfsadmin -saveNamespace

1.3 stop 集群

1.4 備份 dfs.name.dir 下面的元數據

Step 2: 下載 CDH4,把CDH3的配置拷過來
注意CDH3配置文件是在conf目錄下面,CDH4的配置文件目錄已經改成了etc/hadoop目錄

Step 3: 升級 HDFS Metadata
3.1 進入CDH4目錄下執行:
sbin/hadoop-daemon.sh start namenode -upgrade -clusterid mycluster-test
說明mycluster-test是clusterid,可以指定,也可以不指定,如果不指定那麼系統會自動生成一個
3.2 查看日誌目錄下的namenode日誌,如果出現:
Upgrade of ${dfs.namenode.name.dir} is complete
說明元數據已經升級成功
3.3 啟動DataNodes:
在每一台datanode上面啟動datanode服務
sbin/hadoop-daemon.sh start datanode
datanode節點會自動升級
3.4 等待namenode退出安全模式,然後執行fsck
bin/hdfs fsck /
3.5 確認目錄健康,沒有block丟失後可以執行finalzeUpgrade及啟動secondarynamenode
bin/hdfs dfsadmin -finalizeUpgrade
#finalized後將不能rollback
sbin/hadoop-daemon.sh start secondarynamenode
#請清理掉dfs.namenode.checkpoint.dir目錄下老版本文件,否則會啟動失敗

回滾操作:
若在升級過程中出了問題,想回滾到cdh3版本,一定不能執行bin/hdfs dfsadmin -finalizeUpgrade。在執行finalizeUpgrade之前都可以回滾

在cdh3 版本下面執行
(1)回滾Namenode,在namenode機器上面執行
bin/hadoop-daemon.sh start namenode -rollback
(2)回滾DataNode,在namenode機器上面執行
bin/hadoop-daemons.sh start datanode -rollback

也可以手工操作,把數據move回來,然後正常啟動
(1)回滾Namenode的數據
remove dfs.name.dir/current目錄,mv dfs.name.dir/previous dfs.name.dir/current目錄
這樣子就恢復回namenode的元數據了
(2)回滾DataNode的數據
remove dfs.data.dir/current目錄,mv dfs.data.dir/previous dfs.data.dir/current目錄
這樣子就恢復回datanode的數據了
操作完後就可以重新啟動cdh3版本了

總的來說,升級是:mv current previous,創建current,讀舊的元數據,寫成新版本到current裡面,而DataNode節點上面的block數據通過hardlink來放到current目錄下面
回滾是:rm current,mv previous current

❺ 如何在CDH 5上運行Spark應用程序

創建 maven 工程
使用下面命令創建一個普通的 maven 工程:
bash
$ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
將 sparkwordcount 目錄重命名為simplesparkapp,然後,在 simplesparkapp 目錄下添加 scala 源文件目錄:
bash
$ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount
修改 pom.xml 添加 scala 和 spark 依賴:
xml
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0-cdh5.3.0</version>
</dependency>
</dependencies>
添加編譯 scala 的插件:
xml
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
添加 scala 編譯插件需要的倉庫:
xml
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
另外,添加 cdh hadoop 的倉庫:
xml
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>maven-hadoop</id>
<name>Hadoop Releases</name>
<url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
<repository>
<id>cloudera-repos</id>
<name>Cloudera Repos</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
最後,完整的 pom.xml 文件見: https://github.com/javachen/simplesparkapp/blob/master/pom.xml 。
運行下面命令檢查工程是否能夠成功編譯:
bash
mvn package
編寫示例代碼
以 WordCount 為例,該程序需要完成以下邏輯:
讀一個輸入文件
統計每個單詞出現次數
過濾少於一定次數的單詞
對剩下的單詞統計每個字母出現次數
在 MapRece 中,上面的邏輯需要兩個 MapRece 任務,而在 Spark 中,只需要一個簡單的任務,並且代碼量會少 90%。
編寫 Scala 程序 如下:
scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
val threshold = args(1).toInt
// split each document into words
val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).receByKey(_ + _)
// filter out words with less than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)
// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).receByKey(_ + _)
System.out.println(charCounts.collect().mkString(", "))
charCounts.saveAsTextFile("world-count-result")
}
}
Spark 使用懶執行的策略,意味著只有當 動作 執行的時候, 轉換 才會運行。上面例子中的 動作 操作是 collect 和 saveAsTextFile ,前者是將數據推送給客戶端,後者是將數據保存到 HDFS。
作為對比, Java 版的程序 如下:
java
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class JavaWordCount {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
final int threshold = Integer.parseInt(args[1]);
// split each document into words
JavaRDD tokenized = sc.textFile(args[0]).flatMap(
new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
}
);
// count the occurrence of each word
JavaPairRDD counts = tokenized.mapToPair(
new PairFunction() {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}
).receByKey(
new Function2() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
另外, Python 版的程序 如下:
python
import sys
from pyspark import SparkContext
file="inputfile.txt"
count=2
if __name__ == "__main__":
sc = SparkContext(appName="PythonWordCount")
lines = sc.textFile(file, 1)
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.receByKey(lambda a, b: a + b) \
.filter(lambda (a, b) : b >= count) \
.flatMap(lambda (a, b): list(a)) \
.map(lambda x: (x, 1)) \
.receByKey(lambda a, b: a + b)
print ",".join(str(t) for t in counts.collect())
sc.stop()
編譯
運行下面命令生成 jar:
bash
$ mvn package
運行成功之後,會在 target 目錄生成 sparkwordcount-0.0.1-SNAPSHOT.jar 文件。
運行
因為項目依賴的 spark 版本是 1.2.0-cdh5.3.0 ,所以下面的命令只能在 CDH 5.3 集群上運行。
首先,將測試文件 inputfile.txt 上傳到 HDFS 上;
bash
$ wget https://github.com/javachen/simplesparkapp/blob/master/data/inputfile.txt
$ hadoop fs -put inputfile.txt
其次,將 sparkwordcount-0.0.1-SNAPSHOT.jar 上傳到集群中的一個節點;然後,使用 spark-submit 腳本運行 Scala 版的程序:
bash
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
或者,運行 Java 版本的程序:
bash
$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
對於 Python 版的程序,運行腳本為:
bash
$ spark-submit --master local PythonWordCount.py
如果,你的集群部署的是 standalone 模式,則你可以替換 master 參數的值為 spark://<master host>:<master port> ,也可以以 Yarn 的模式運行。

❻ 如何使用Spark SQL 的JDBC server

運行環境
集群環境:CDH5.3.0

具體JAR版本如下:
spark版本:1.2.0-cdh5.3.0
hive版本:0.13.1-cdh5.3.0
hadoop版本:2.5.0-cdh5.3.0
啟動 JDBC server
cd /etc/spark/conf
ln -s /etc/hive/conf/hive-site.xml hive-site.xml
cd /opt/cloudera/parcels/CDH/lib/spark/
chmod- -R 777 logs/
cd /opt/cloudera/parcels/CDH/lib/spark/sbin
./start-thriftserver.sh --master yarn --hiveconf hive.server2.thrift.port=10008

Connecting to the JDBC server with Beeline
cd /opt/cloudera/parcels/CDH/lib/spark/bin
beeline -u jdbc:hive2://hadoop04:10000

[root@hadoop04 bin]# beeline -u jdbc:hive2://hadoop04:10000
scan complete in 2ms
Connecting to jdbc:hive2://hadoop04:10000
Connected to: Spark SQL (version 1.2.0)

❼ hadoop 2.6.0-cdh5.7.0要求centos哪個版本

①從低於cdh5.4.0升級到cdh5.4.0或更高版本,需要進行hdfs元數據升級;
②從低於cdh5.2.0版本升版級需要做如下升權級:升級hdfs元數據
升級Sentry database
升級hive資料庫
升級sqoop2資料庫
③另外還要確保如下升級:升級Oozie資料庫和共享資料庫
如果向hdfs上傳了spark集合jar文件,要上傳文件的最新版本!

❽ 如何在spark-shell命令行執行spark hql

前面已經有篇文章介紹如何編譯包含hive的spark-assembly.jar了,不清楚的可以翻看一下前面的文章。
cloudera manager裝好的spark,直接執行spark-shell進入命令行後,寫入如下語句:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

你會發現沒法執行通過,因為cm裝的原生的spark是不支持spark hql的,我們需要手動進行一些調整:
第一步,將編譯好的包含hive的JAR包上傳到hdfs上配置的默認的spark的sharelib目錄:/user/spark/share/lib

第二步:在你要運行spark-shell腳本的節點上的/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/lib/目錄下面,下載這個jar到這個目錄:hadoop fs -get hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar(具體路徑替換成你自己的)。然後這個目錄下面原來會有個軟鏈接spark-assembly.jar指向的是spark-assembly-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,我們把這個軟鏈接刪除掉重新創建一個同名的軟鏈接:ln -s spark-assembly-with-hive-maven.jar spark-assembly.jar,指向我們剛下載下來的那個JAR包,這個JAR包會在啟動spark-shell腳本時裝載到driver program的classpath中去的,sparkContext也是在driver中創建出來的,所以需要將我們編譯的JAR包替換掉原來的spark-assembly.jar包,這樣在啟動spark-shell的時候,包含hive的spark-assembly就被裝載到classpath中去了。
第三步:在/opt/cloudera/parcels/CDH/lib/spark/conf/目錄下面創建一個hive-site.xml。/opt/cloudera/parcels/CDH/lib/spark/conf目錄是默認的spark的配置目錄,當然你可以修改默認配置目錄的位置。hive-site.xml內容如下:

<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://n1:9083</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

這個應該大家都懂的,總要讓spark找到hive的元數據在哪吧,於是就有了上面一些配置。

第四步:修改/opt/cloudera/parcels/CDH/lib/spark/conf/spark-defaults.conf,添加一個屬性:spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。這個是讓每個executor下載到本地然後裝載到自己的classpath下面去的,主要是用在yarn-cluster模式。local模式由於driver和executor是同一個進程所以沒關系。
以上完事之後,運行spark-shell,再輸入:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

應該就沒問題了。我們再執行一個語句驗證一下是不是連接的我們指定的hive元資料庫:
hiveContext.sql("show tables").take(10) //取前十個表看看

最後要重點說明一下這里的第二步第三步和第四步,如果是yarn-cluster模式的話,應該替換掉集群所有節點的spark-assembly.jar集群所有節點的spark conf目錄都需要添加hive-site.xml,每個節點spark-defaults.conf都需要添加spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。可以寫個shell腳本來替換,不然手動一個一個節點去替換也是蠻累的。

❾ 如何構建第一個Spark項目代碼

操作系統
Window7/Mac
IDE
IntelliJ IDEA Community Edition 14.1.6
下載地址
JDK 1.8.0_65
下載地址
Scala 2.11.7
下載地址
其它環境
Spark:1.4.1
下載地址
Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2
IDE項目創建
新建一個項目
New Project

使用Maven模型創建一個Scala項目

填寫自己的GroupId、ArtifactId,Version不需要修改,Maven會根據GroupId生成相應的目錄結構,GroupId的取值一般為a.b.c 結構,ArtifactId為項目名稱。之後點擊next,填寫完項目名稱和目錄,點擊finish就可以讓maven幫你創建Scala項目

項目創建完成後,目錄結構如下

4.為項目添加JDK以及Scala SDK
點擊File->Project Structure,在SDKS和Global Libraries中為項目配置環境。

至此整個項目結構、項目環境都搭建好了
編寫主函數
主函數的編寫在 projectName/src/main/scala/…/下完成,如果按照上述步驟完成代碼搭建,將在目錄最後發現
MyRouteBuild
MyRouteMain

這兩個文件為模塊文件,刪除MyRouteBuild,重命名MyRouteMain為DirectKafkaWordCount。這里,我使用Spark Streaming官方提供的一個代碼為實例代碼,代碼如下
package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("...")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).receByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

將代碼最上面的package org.apache.spark.examples.streaming,替換為DirectKafkaWordCount里的package部分即可。並覆蓋DirectKafkaWordCount文件。
至此Spark處理代碼已經編寫完成。
修改pom.xml,為項目打包做准備
pom.xml中編寫了整個項目的依賴關系,這個項目中我們需要導入一些Spark Streaming相關的包。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
</dependency>

<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>

除此之外,如果需要把相關依賴打包到最終JAR包中,需要在pom.xml的bulid標簽中寫入以下配置:
<plugins>
<!-- Plugin to create a single jar that includes all dependencies -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

pom.xml文件修改完成後,即可開始maven打包,操作如圖:

點擊右側彈出窗口的Execute Maven Goal,在command line中輸入clean package

Spark作業提交
在項目projectname/target目錄下即可找到兩個jar包,其中一個僅包含Scala代碼,另一個包含所有依賴的包。
將jar包導到Spark伺服器,運行Spark作業,運行操作如下
../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
利用spark-submit把任務提交到Yarn集群,即可看到運行結果。

❿ hadoop cdh5 安裝是哪個版本整合了spark

5.0就整合了spark,不過spark版本比較低,是1.0吧,spark可以脫離cdh單獨部署

閱讀全文

與cdh5升級spark20相關的資料

熱點內容
頸子上長睾丸的電影 瀏覽:453
尺度大les影片 瀏覽:430
主角血親全收的小說 瀏覽:957
槍火粵語電影百度雲 瀏覽:42
周星馳的全部電影粵語 瀏覽:423
歐姆龍plc編程線驅動程序 瀏覽:46
重生紅軍反圍剿的小說 瀏覽:142
主角獲得外星戰艦認主 瀏覽:401
免費能搜索的在線看片 瀏覽:584
韓劇電影在線觀看國語 瀏覽:772
win10系統去廣告嗎 瀏覽:900
無法打開物理文件 瀏覽:487
jar啟用指定配置文件 瀏覽:994
蘋果手機用什麼app拍美顏照片 瀏覽:595
蘇州網路公關公司有哪些比較好的 瀏覽:26
大香蕉第一區 瀏覽:312
韓國電影 下女 百度雲 瀏覽:111
乳電影 瀏覽:312
大數據選址軟體哪個好用 瀏覽:174
男主是蛇女主懷了蛇蛋 瀏覽:47

友情鏈接