Spark Structured Streaming框架(一)之大旨用法

   斯Parker Struntured Streaming是Spark2.一.0本子后新扩充的流计算引擎,本博将通过几篇博文详细介绍这一个框架。这篇是介绍SparkStructured Streaming的着力开发方法。以Spark自带的example实行测试和介绍,其为”StructuredNetworkWordcount.scala”文件。

一. 准备工作

1. Quick Example

  由于大家是在单机上开始展览测试,所以必要修单机运转模型,修改后的先后如下:

package org.apache.spark.examples.sql.streaming

 

import org.apache.spark.sql.SparkSession

 

/**

* Counts words in UTF8 encoded, ‘\n’ delimited text received from the network.

*

* Usage: StructuredNetworkWordCount <hostname> <port>

* <hostname> and <port> describe the TCP server that Structured Streaming

* would connect to receive data.

*

* To run this on your local machine, you need to first run a Netcat server

* `$ nc -lk 9999`

* and then run the example

* `$ bin/run-example sql.streaming.StructuredNetworkWordCount

* localhost 9999`

*/

object StructuredNetworkWordCount {

def main(args: Array[String]) {

if (args.length < 2) {

System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCount")

.master("local[*]")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

// Start running the query that prints the running counts to the console

val query = wordCounts.writeStream

.outputMode("complete")

.format("console")

.start()

 

query.awaitTermination()

}

}

 

1.一. 软件准备

2. 剖析

  对于上述所示的顺序,实行如下的解读和剖析:

      1、安装VMWare

2.一 数据输入

  在开立斯ParkerSessiion对象之后,必要设置数据源的连串,及数据源的配备。然后就会数据源中源源不断的接收数据,接收到的多少以DataFrame对象存在,该类型与斯ParkerSQL中定义类型壹样,内部由Dataset数组构成。

正如程序所示,设置输入源的档次为socket,并配备socket源的IP地址和端口号。接收到的多寡流存款和储蓄到lines对象中,其品种为DataFarme。

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

      2、在VMWare上安装CentOS6.5

二.二 单词总结

  如下程序所示,首先将收受到的多少流lines转换为String类型的系列;接着每一群数量都是空格分隔为单独的单词;最终再对各种单词实行分组并总括次数。

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

      三、安装XShell5,用来远程登录系统

2.3 数据输出

透过DataFrame对象的writeStream方法得到DataStreamWrite对象,DataStreamWrite类定义了1些数额输出的方法。Quick
example程序将数据输出到控制终端。注意唯有在调用start()方法后,才起先进行Streaming进程,start()方法会重回一个StreamingQuery对象,用户能够选拔该目的来治本Streaming进度。如上述顺序调用awaitTermination()方法阻塞接收全部数据。

 

     
4、通过rpm -qa | grep ssh 检查cent os 是还是不是安装了ssh server和ssh client ,然后采取ssh localhost测试一下子SSH是不是可用。

3. 异常

一经未有安装那么使用上边包车型客车一声令下安装:

三.一 拒绝连接

  当直接交给编写翻译后的架包时,即未有运维”nc –lk 999九”时,会油可是生图
11所示的错误。消除该特别,只需在付给(spark-submit)程序之前,先运转”nc”命令即可缓解,且无法利用”nc
–lk localhost 999玖”.

图片 1

图 11

yum install openssh-clients  

yum install openssh-server  

yum install openssh-clients

yum install openssh-server

3.2 NoSuchMethodError

  当通过mvn打包程序后,在命令行通过spark-submit提交架包,能够寻常执行,而透过IDEA执行时会出现图
1二所示的失实。

图片 2

图 12

  出现那么些尤其,是由于项目中凭借的Scala版本与斯Parker编译的本子不雷同,从而导致出现这种错误。图
一三和图 1四所示,Spark 二.十是由Scala
贰.十本子编写翻译而成的,而项目信赖的Scala版本是二.1一.八,从而导致出现了不当。

图片 3

图 13

 

图片 4

图 14

  化解该难题,只需在品种的pom.xml文件中内定与spark编写翻译的本子壹样,即可缓解该难点。如图
壹伍所示的施行结果。

图片 5

图 15

 

4. 参考文献

[1]. Structured
Streaming Programming
Guide.

 

     
 5、使用XShell远程登录服务器,接下去的操作就在XShell上通过命令行来实行了。虚拟机中的服务器能够过ifconfig这些命令来博取分配的ip地址(这一个地点恐怕随着虚拟机的重启会发生变化)。

 

1.2. 创建hadoop用户

   
当前虚拟机中尚无设置任何软件,那么大家的目标是搭建二个Hadoop单机格局,那么首先要求在集群中成立三个hadoop用户,用来运行Hadoop的进度,那样避免采纳root用户运转进度,这也是比较规范的服务器用户管理,所以上面先创设hadoop用户:

 

useradd -m hadoop -s /bin/bash   
passwd hadoop    #为hadoop用户设置密码

useradd -m hadoop -s /bin/bash

passwd hadoop    #为hadoop用户设置密码

 

接下去的装置进度中会涉及到root用户和hadoop用户的切换,请我们瞩目!!!

 

一.三. 布局SSH无密码访问

       
 在备选干活中大家早就设置配备了SSH客户端和服务器,集群和单节点形式都必要到SSH,Hadoop中namenode要求运转集群中的全体机器的Hadoop守护进程,而那一个进度要求通过SSH登录来贯彻。而Hadoop并不曾提供SSH输入密码的登录情势,因而为了确认保障能够万事大吉登录每台机械,必要将全部机器配置为namenode能够无密码登录它们。实际中布局战败也足以运作,可是每一次输入密码是尤其麻烦的,所以我们需求配置SSH的无密码访问(注意无密码访问是为hadoop用户配置的,故以下操作需求在hadoop用户下完毕):

 

   
 1. 先是大家须求进入/home/hadoop/.ssh目录下,就算这么些目录不设有,须求履行一下ssh localhost   那样就会转移这一个目录。 

     二. 推行以下命令,创立密钥并且将密钥加入授权:

cd ~/.ssh/                       # 若没有该目录,请先执行一次ssh localhost  

ssh-keygen -t dsa                 # 会有提示,都按回车就可以  

cat id_dsa.pub >> authorized_keys   # 加入授权  

chmod 600 ./authorized_keys       # 修改文件权限,如果不改,无法通过,原因好像是cent os的权限验证比较严格  

cd ~/.ssh/                       # 若没有该目录,请先执行一次ssh localhost

ssh-keygen -t dsa                 # 会有提示,都按回车就可以

cat id_dsa.pub >> authorized_keys   # 加入授权

chmod 600 ./authorized_keys       # 修改文件权限,如果不改,无法通过,原因好像是cent os的权限验证比较严格

 

   
 三. 切换来hdoop用户,接下去,输入ssh localhost测试一下无密码登录,直接enter就能够,无需密码

1.4. 安装JAVA环境

   将jdk安装包复制到/tmp目录,然后解压到/usr/java目录中

  tar zxvf jdk-8u131-linux-x64.tar.gz -C /usr/java

  解压完结后意识生成了jdk目录

[root@cluster02 tmp]# cd /usr/java/

[root@cluster02 java]# ls

jdk1.8.0_131

[root@cluster02 java]#

修改/etc/profile配置java路径

JAVA_HOME=/usr/java/jdk1.8.0_131

export JAVA_HOME

PATH=$JAVA_HOME/bin:$PATH

export PATH

CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export CLASSPATH

二. 单机形式安装

一) 通过xftp将地点hadoop安装包和mds文件发送到服务器/home/hadoop目录下,然后验证tar包的完整性;若文件不完整则那八个值1般差距非常的大,能够不难比较下前多少个字符跟后多少个字符是还是不是等于即可,如下图所示,即使四个值不壹致,请务必重新下载。

cat hadoop-2.6.0.tar.gz.mds | grep 'MD5'  

md5sum hadoop-2.6.0.tar.gz | tr "a-z" "A-Z"  

cat hadoop-2.6.0.tar.gz.mds | grep 'MD5'

md5sum hadoop-2.6.0.tar.gz | tr "a-z" "A-Z"

Hadoop下载地址:

http://archive.apache.org/dist/hadoop/core/

 

二) 使用root账号,进入/home/hadoop目录下,解压安装文件到/usr/local/hadoop下

tar -zxvf hadoop-2.6.0.tar.gz -C /usr/local  

tar -zxvf hadoop-2.6.0.tar.gz -C /usr/local

 

三) 修改权限

mv hadoop-2.6.0/ hadoop/   #更改文件夹名称  

chown -R hadoop:hadoop ./hadoop   #修改权限  

mv hadoop-2.6.0/ hadoop/   #更改文件夹名称

chown -R hadoop:hadoop ./hadoop   #修改权限

四) 验证单机形式是还是不是安装成功,进入/usr/local/hadoop/bin目录下,执行./hadoop -version

5) 执行三个小例子

Hadoop 私下认可形式为非分布式方式,无需举行别的计划即可运营。非分布式即单 Java 进度,方便开始展览调节和测试。

今昔大家能够推行例子来感受下 Hadoop 的运维。Hadoop 附带了增进的例子(运转./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.陆.0.jar 能够见到有着例子),包含 wordcount、terasort、join、grep 等。

在此我们挑选运维 grep 例子,大家将 input 文件夹中的全数文件作为输入,筛选个中符合正则表明式 dfs[a-z.]+ 的单词并总结出现的次数,最终输出结果到 output 文件夹中。

cd /usr/local/hadoop  

mkdir ./input  

cp ./etc/hadoop/*.xml ./input   # 将配置文件作为输入文件  

./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep ./input ./output 'dfs[a-z.]+'  

cat ./output/*          # 查看运行结果  

cd /usr/local/hadoop

mkdir ./input

cp ./etc/hadoop/*.xml ./input   # 将配置文件作为输入文件

./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep ./input ./output 'dfs[a-z.]+'

cat ./output/*          # 查看运行结果

 

⑥) 为了有利于Hadoop的操作,大家在为hadoop用户增加环境变量,让用户能够在其余地点执行hadoop命令,我们修改~/.bashrc文件配置,在最终添加以下命令,并且使生效(source ~/.bashrc):

export HADOOP_HOME=/usr/local/hadoop  

export HADOOP_INSTALL=$HADOOP_HOME  

export HADOOP_MAPRED_HOME=$HADOOP_HOME  

export HADOOP_COMMON_HOME=$HADOOP_HOME  

export HADOOP_HDFS_HOME=$HADOOP_HOME  

export YARN_HOME=$HADOOP_HOME  

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native  

export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin  

 

3. 伪分布式安装

前提:已经设置到位单机情势。

 

伪分布式只必要在单机方式的底蕴上改三个布局文件同时格式化namenode即可。

 

vim /usr/local/hadoop/etc/hadoop/core-site.xml

<property>  

    <name>hadoop.tmp.dir</name>  

    <value>file:/usr/local/hadoop/tmp</value>  

</property>  

<property>  

    <name>fs.defaultFS</name>  

    <value>hdfs://host1:9000</value>  

</property>  

Hdfs-site.xml

<property>  

        <name>dfs.replication</name>  

        <value>1</value>  

    </property>  

    <property>  

        <name>dfs.namenode.name.dir</name>  

        <value>file:/usr/local/hadoop/tmp/dfs/name</value>  

    </property>  

    <property>  

        <name>dfs.datanode.data.dir</name>  

        <value>file:/usr/local/hadoop/tmp/dfs/data</value>  

    </property>  

 

(伪分布式不运维 YA纳瓦拉N 也得以,1般不会影响程序执行)

部分读者或然会猜疑,怎么运行 Hadoop 后,见不到书上所说的 JobTracker 和
TaskTracker,那是因为新版的 Hadoop 使用了新的 MapReduce 框架(MapReduce
V2,也称之为 YASportageN,Yet Another Resource Negotiator)。

YA讴歌MDXN 是从 MapReduce 中分离出来的,负责能源管理与任务调度。YA酷威N 运维于
MapReduce 之上,提供了高可用性、高增加性,YALX570N
的更加多介绍在此不进行,有趣味的可查看有关材质。

上述通过 ./sbin/start-dfs.sh 运转 Hadoop,仅仅是运维了 MapReduce
环境,大家能够运维 YA路虎极光N ,让 YA悍马H2N 来担负财富管理与职务调度。

先是修改配置文件 mapred-site.xml,这边须求先进行重命名:

  1. mv ./etc/hadoop/mapred-site.xml.template ./etc/hadoop/mapred-site.xml

接下来再拓展编辑,同样利用 gedit 编辑会比较便宜些 gedit
./etc/hadoop/mapred-site.xml :

  1. <property>
     <name>mapred.job.tracker</name>
       <value>http://hadoop01:9001</value>
       </property>
            <property>
                 <name>mapreduce.framework.name</name>
                 <value>yarn</value>
           </property>
    

     

随即修改配置文件 yarn-site.xml

  1. yarn.nodemanager.aux-services
    mapreduce_shuffle
     

各自修改/usr/local/hadoop/etc/hadoop下的hadoop-env.sh和yarn-env.sh:

export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.85.x86_64  

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native  

export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib" 

 

格式化namenode:     

hdfs namenode -format 

中标的话,会看出 successfully formatted 的提示,且尾数第四行的提示如下,Exitting with status 0 表示成功,若为 Exitting with status 壹 则是失误。若出错,请密切检查从前步骤。 
       

start-all.sh   #启动hdfs  

start-all.sh   #启动hdfs 

jps

非得看到namenode和datanode都运行了才行!!!!

地面包车型地铁伪分布式环境搭建实现!!!

http://hostname:8088/cluster

http://hostname:50070/

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注