导航:首页 > 版本升级 > cdh5升级spark20

cdh5升级spark20

发布时间:2022-01-21 16:53:38

❶ 如何在CDH5上运行Spark应用

几个基本概念: (1)job:包含多个task组成的并行计算,往往由action催生。 (2)stage:job的调度单位。 (3)task:被送到某个executor上的工作单元。 (4)taskSet:一组关联的,相互之间没有shuffle依赖关系的任务组成的任务集。 一个应用...

❷ 为什么spark中saveastextfile保存的文件看不到,但是可以从相应目录下读取其中的数据/

重新说明来一下,之前我测试用源的是谷歌浏览器不是IE。在IE中file.FileName包含路径名的,而谷歌不包含。所以我之前的回答中才会说文件名中不包含路径名,我想你问题就是这个引起的。你可以在保存前加个判断

❸ spark sql怎么处理hive的null

前面已经有篇文章介绍如何编译包含hive的spark-assembly.jar了,不清楚的可以翻看一下前面的文章。
cloudera manager装好的spark,直接执行spark-shell进入命令行后,写入如下语句:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

你会发现没法执行通过,因为cm装的原生的spark是不支持spark hql的,我们需要手动进行一些调整:
第一步,将编译好的包含hive的JAR包上传到hdfs上配置的默认的spark的sharelib目录:/user/spark/share/lib


第二步:在你要运行spark-shell脚本的节点上的/opt/cloudera/parcels/CDH-
5.3.0-1.cdh5.3.0.p0.30/lib/spark/lib/目录下面,下载这个jar到这个目录:hadoop fs -get
hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar(具
体路径替换成你自己的)。然后这个目录下面原来会有个软链接spark-assembly.jar指向的是spark-assembly-1.2.0-
cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,我们把这个软链接删除掉重新创建一个同名的软链接:ln -s
spark-assembly-with-hive-maven.jar
spark-assembly.jar,指向我们刚下载下来的那个JAR包,这个JAR包会在启动spark-shell脚本时装载到driver
program的classpath中去的,sparkContext也是在driver中创建出来的,所以需要将我们编译的JAR包替换掉原来的
spark-assembly.jar包,这样在启动spark-shell的时候,包含hive的spark-assembly就被装载到
classpath中去了。

❹ 从cdh 3可以直接升级到cdh5吗

不行,得先升级到4

下面是升级方法
Step 1: 做下saveNamespace操作,停掉集群,并备份下 HDFS 的 Metadata
1.1 让namenode进入safe mode状态
$ bin/hadoop dfsadmin -safemode enter
1.2 执行saveNamespace操作

$ bin/hadoop dfsadmin -saveNamespace

1.3 stop 集群

1.4 备份 dfs.name.dir 下面的元数据

Step 2: 下载 CDH4,把CDH3的配置拷过来
注意CDH3配置文件是在conf目录下面,CDH4的配置文件目录已经改成了etc/hadoop目录

Step 3: 升级 HDFS Metadata
3.1 进入CDH4目录下执行:
sbin/hadoop-daemon.sh start namenode -upgrade -clusterid mycluster-test
说明mycluster-test是clusterid,可以指定,也可以不指定,如果不指定那么系统会自动生成一个
3.2 查看日志目录下的namenode日志,如果出现:
Upgrade of ${dfs.namenode.name.dir} is complete
说明元数据已经升级成功
3.3 启动DataNodes:
在每一台datanode上面启动datanode服务
sbin/hadoop-daemon.sh start datanode
datanode节点会自动升级
3.4 等待namenode退出安全模式,然后执行fsck
bin/hdfs fsck /
3.5 确认目录健康,没有block丢失后可以执行finalzeUpgrade及启动secondarynamenode
bin/hdfs dfsadmin -finalizeUpgrade
#finalized后将不能rollback
sbin/hadoop-daemon.sh start secondarynamenode
#请清理掉dfs.namenode.checkpoint.dir目录下老版本文件,否则会启动失败

回滚操作:
若在升级过程中出了问题,想回滚到cdh3版本,一定不能执行bin/hdfs dfsadmin -finalizeUpgrade。在执行finalizeUpgrade之前都可以回滚

在cdh3 版本下面执行
(1)回滚Namenode,在namenode机器上面执行
bin/hadoop-daemon.sh start namenode -rollback
(2)回滚DataNode,在namenode机器上面执行
bin/hadoop-daemons.sh start datanode -rollback

也可以手工操作,把数据move回来,然后正常启动
(1)回滚Namenode的数据
remove dfs.name.dir/current目录,mv dfs.name.dir/previous dfs.name.dir/current目录
这样子就恢复回namenode的元数据了
(2)回滚DataNode的数据
remove dfs.data.dir/current目录,mv dfs.data.dir/previous dfs.data.dir/current目录
这样子就恢复回datanode的数据了
操作完后就可以重新启动cdh3版本了

总的来说,升级是:mv current previous,创建current,读旧的元数据,写成新版本到current里面,而DataNode节点上面的block数据通过hardlink来放到current目录下面
回滚是:rm current,mv previous current

❺ 如何在CDH 5上运行Spark应用程序

创建 maven 工程
使用下面命令创建一个普通的 maven 工程:
bash
$ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
将 sparkwordcount 目录重命名为simplesparkapp,然后,在 simplesparkapp 目录下添加 scala 源文件目录:
bash
$ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount
修改 pom.xml 添加 scala 和 spark 依赖:
xml
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0-cdh5.3.0</version>
</dependency>
</dependencies>
添加编译 scala 的插件:
xml
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
添加 scala 编译插件需要的仓库:
xml
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
另外,添加 cdh hadoop 的仓库:
xml
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>maven-hadoop</id>
<name>Hadoop Releases</name>
<url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
<repository>
<id>cloudera-repos</id>
<name>Cloudera Repos</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
最后,完整的 pom.xml 文件见: https://github.com/javachen/simplesparkapp/blob/master/pom.xml 。
运行下面命令检查工程是否能够成功编译:
bash
mvn package
编写示例代码
以 WordCount 为例,该程序需要完成以下逻辑:
读一个输入文件
统计每个单词出现次数
过滤少于一定次数的单词
对剩下的单词统计每个字母出现次数
在 MapRece 中,上面的逻辑需要两个 MapRece 任务,而在 Spark 中,只需要一个简单的任务,并且代码量会少 90%。
编写 Scala 程序 如下:
scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
val threshold = args(1).toInt
// split each document into words
val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).receByKey(_ + _)
// filter out words with less than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)
// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).receByKey(_ + _)
System.out.println(charCounts.collect().mkString(", "))
charCounts.saveAsTextFile("world-count-result")
}
}
Spark 使用懒执行的策略,意味着只有当 动作 执行的时候, 转换 才会运行。上面例子中的 动作 操作是 collect 和 saveAsTextFile ,前者是将数据推送给客户端,后者是将数据保存到 HDFS。
作为对比, Java 版的程序 如下:
java
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class JavaWordCount {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
final int threshold = Integer.parseInt(args[1]);
// split each document into words
JavaRDD tokenized = sc.textFile(args[0]).flatMap(
new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
}
);
// count the occurrence of each word
JavaPairRDD counts = tokenized.mapToPair(
new PairFunction() {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}
).receByKey(
new Function2() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
另外, Python 版的程序 如下:
python
import sys
from pyspark import SparkContext
file="inputfile.txt"
count=2
if __name__ == "__main__":
sc = SparkContext(appName="PythonWordCount")
lines = sc.textFile(file, 1)
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.receByKey(lambda a, b: a + b) \
.filter(lambda (a, b) : b >= count) \
.flatMap(lambda (a, b): list(a)) \
.map(lambda x: (x, 1)) \
.receByKey(lambda a, b: a + b)
print ",".join(str(t) for t in counts.collect())
sc.stop()
编译
运行下面命令生成 jar:
bash
$ mvn package
运行成功之后,会在 target 目录生成 sparkwordcount-0.0.1-SNAPSHOT.jar 文件。
运行
因为项目依赖的 spark 版本是 1.2.0-cdh5.3.0 ,所以下面的命令只能在 CDH 5.3 集群上运行。
首先,将测试文件 inputfile.txt 上传到 HDFS 上;
bash
$ wget https://github.com/javachen/simplesparkapp/blob/master/data/inputfile.txt
$ hadoop fs -put inputfile.txt
其次,将 sparkwordcount-0.0.1-SNAPSHOT.jar 上传到集群中的一个节点;然后,使用 spark-submit 脚本运行 Scala 版的程序:
bash
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
或者,运行 Java 版本的程序:
bash
$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
对于 Python 版的程序,运行脚本为:
bash
$ spark-submit --master local PythonWordCount.py
如果,你的集群部署的是 standalone 模式,则你可以替换 master 参数的值为 spark://<master host>:<master port> ,也可以以 Yarn 的模式运行。

❻ 如何使用Spark SQL 的JDBC server

运行环境
集群环境:CDH5.3.0

具体JAR版本如下:
spark版本:1.2.0-cdh5.3.0
hive版本:0.13.1-cdh5.3.0
hadoop版本:2.5.0-cdh5.3.0
启动 JDBC server
cd /etc/spark/conf
ln -s /etc/hive/conf/hive-site.xml hive-site.xml
cd /opt/cloudera/parcels/CDH/lib/spark/
chmod- -R 777 logs/
cd /opt/cloudera/parcels/CDH/lib/spark/sbin
./start-thriftserver.sh --master yarn --hiveconf hive.server2.thrift.port=10008

Connecting to the JDBC server with Beeline
cd /opt/cloudera/parcels/CDH/lib/spark/bin
beeline -u jdbc:hive2://hadoop04:10000

[root@hadoop04 bin]# beeline -u jdbc:hive2://hadoop04:10000
scan complete in 2ms
Connecting to jdbc:hive2://hadoop04:10000
Connected to: Spark SQL (version 1.2.0)

❼ hadoop 2.6.0-cdh5.7.0要求centos哪个版本

①从低于cdh5.4.0升级到cdh5.4.0或更高版本,需要进行hdfs元数据升级;
②从低于cdh5.2.0版本升版级需要做如下升权级:升级hdfs元数据
升级Sentry database
升级hive数据库
升级sqoop2数据库
③另外还要确保如下升级:升级Oozie数据库和共享数据库
如果向hdfs上传了spark集合jar文件,要上传文件的最新版本!

❽ 如何在spark-shell命令行执行spark hql

前面已经有篇文章介绍如何编译包含hive的spark-assembly.jar了,不清楚的可以翻看一下前面的文章。
cloudera manager装好的spark,直接执行spark-shell进入命令行后,写入如下语句:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

你会发现没法执行通过,因为cm装的原生的spark是不支持spark hql的,我们需要手动进行一些调整:
第一步,将编译好的包含hive的JAR包上传到hdfs上配置的默认的spark的sharelib目录:/user/spark/share/lib

第二步:在你要运行spark-shell脚本的节点上的/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/lib/目录下面,下载这个jar到这个目录:hadoop fs -get hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar(具体路径替换成你自己的)。然后这个目录下面原来会有个软链接spark-assembly.jar指向的是spark-assembly-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,我们把这个软链接删除掉重新创建一个同名的软链接:ln -s spark-assembly-with-hive-maven.jar spark-assembly.jar,指向我们刚下载下来的那个JAR包,这个JAR包会在启动spark-shell脚本时装载到driver program的classpath中去的,sparkContext也是在driver中创建出来的,所以需要将我们编译的JAR包替换掉原来的spark-assembly.jar包,这样在启动spark-shell的时候,包含hive的spark-assembly就被装载到classpath中去了。
第三步:在/opt/cloudera/parcels/CDH/lib/spark/conf/目录下面创建一个hive-site.xml。/opt/cloudera/parcels/CDH/lib/spark/conf目录是默认的spark的配置目录,当然你可以修改默认配置目录的位置。hive-site.xml内容如下:

<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://n1:9083</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

这个应该大家都懂的,总要让spark找到hive的元数据在哪吧,于是就有了上面一些配置。

第四步:修改/opt/cloudera/parcels/CDH/lib/spark/conf/spark-defaults.conf,添加一个属性:spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。这个是让每个executor下载到本地然后装载到自己的classpath下面去的,主要是用在yarn-cluster模式。local模式由于driver和executor是同一个进程所以没关系。
以上完事之后,运行spark-shell,再输入:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

应该就没问题了。我们再执行一个语句验证一下是不是连接的我们指定的hive元数据库:
hiveContext.sql("show tables").take(10) //取前十个表看看

最后要重点说明一下这里的第二步第三步和第四步,如果是yarn-cluster模式的话,应该替换掉集群所有节点的spark-assembly.jar集群所有节点的spark conf目录都需要添加hive-site.xml,每个节点spark-defaults.conf都需要添加spark.yarn.jar=hdfs://n1:8020/user/spark/share/lib/spark-assembly-with-hive-maven.jar。可以写个shell脚本来替换,不然手动一个一个节点去替换也是蛮累的。

❾ 如何构建第一个Spark项目代码

操作系统
Window7/Mac
IDE
IntelliJ IDEA Community Edition 14.1.6
下载地址
JDK 1.8.0_65
下载地址
Scala 2.11.7
下载地址
其它环境
Spark:1.4.1
下载地址
Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2
IDE项目创建
新建一个项目
New Project

使用Maven模型创建一个Scala项目

填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目

项目创建完成后,目录结构如下

4.为项目添加JDK以及Scala SDK
点击File->Project Structure,在SDKS和Global Libraries中为项目配置环境。

至此整个项目结构、项目环境都搭建好了
编写主函数
主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现
MyRouteBuild
MyRouteMain

这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下
package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("...")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).receByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。
修改pom.xml,为项目打包做准备
pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
</dependency>

<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>

除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:
<plugins>
<!-- Plugin to create a single jar that includes all dependencies -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

pom.xml文件修改完成后,即可开始maven打包,操作如图:

点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package

Spark作业提交
在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
将jar包导到Spark服务器,运行Spark作业,运行操作如下
../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
利用spark-submit把任务提交到Yarn集群,即可看到运行结果。

❿ hadoop cdh5 安装是哪个版本整合了spark

5.0就整合了spark,不过spark版本比较低,是1.0吧,spark可以脱离cdh单独部署

阅读全文

与cdh5升级spark20相关的资料

热点内容
word反向选择 浏览:234
win10如何查看自己的ip地址 浏览:292
韩国爱情推理片电影OK 浏览:118
韩国电影危机的小岛 浏览:810
pdf文件图标黑色 浏览:500
苹果6s为啥没有3Dtouch 浏览:997
电影夫妻上小岛旅游却另有目的的电影叫什么名字 浏览:216
我唾弃你的坟墓同款电影 浏览:142
成奎安经典电影坤叔 浏览:290
电影图片小说在线 浏览:688
数控编程中怎么调用子程序 浏览:522
电影院英玉 浏览:170
主人公叫叶天明柳韶的小说 浏览:447
苹果开机白屏带文件夹问号 浏览:341
很黄很肉的古文小说 浏览:989
cosplay.pk互动微电影 浏览:785
centos65yumjava 浏览:998
十部顶级雇佣兵小说 浏览:644
男主角叫杨伟的一个微电影 浏览:242
韩国推理片免费电影 浏览:200

友情链接