Hadoop in Python最佳实践 -Memo

目的

  • 学习HDFS&MapRedue&BigTable(Hadoop架构)
  • 升高对Python数据结构&语法通晓

SparkSQL和DataFrame

SparkSQL简介

SparkSQL是斯Parker用来处理结构化数据的3个模块,它提供了2个编制程序抽象叫做DataFrame并且作为分布式SQL查询引擎的机能。它是将SparkSQL转换到福特ExplorerDD,然后提交到集群执行,执行成效尤其快!

SparkSQL的特性

1.易整合
2.联合的数量访问方式
3.兼容Hive
4.正经的数码连接

DataFrames简介
与本田UR-VDD类似,DataFrame也是三个分布式数据容器。但是DataFrame更像守旧数据库的二维表格,除了数量以外,还记录数据的组织音信,即schema。同时,与Hive类似,DataFrame也支撑嵌套数据类型(struct、array和map)。从API易用性的角度上
看,DataFrame API提供的是一套高层的关联操作,比函数式的XC60DD
API要进一步协调,门槛更低。由于与ENVISION和Pandas的DataFrame类似,斯ParkerDataFrame很好地持续了观念单机数据解析的支付体验。

创建DataFrames

在SparkSQL中SQLContext是创立DataFrames和推行SQL的输入,在spark-1.5.第22中学已经松手了一个sqlContext
1.在地面创立三个文书,有三列,分别是id、name、age,用空格分隔,然后上传播hdfs上
hdfs dfs -put person.txt /
2.在spark shell执行上面发号施令,读取数据,将每一行的多少应用列分隔符分割
val lineRDD = sc.textFile(“hdfs://node1:9000/person.txt”).map(_.split(”
“))
3.定义case class(也正是表的schema)
case class Person(id:Int, name:String, age:Int)
4.将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1),
x(2).toInt))
5.将RDD转换成DataFrame
val personDF = personRDD.toDF
6.对DataFrame实行拍卖
personDF.show

DataFrames常见操作

1.//查看DataFrame中的内容
personDF.show
2.//查看DataFrame部分列中的内容
personDF.select(personDF.col(“name”)).show
personDF.select(col(“name”), col(“age”)).show
personDF.select(“name”).show
3.//打印DataFrame的Schema信息
personDF.printSchema
4.//查询全体的name和age,并将age+1
personDF.select(col(“id”), col(“name”), col(“age”) + 1).show
personDF.select(personDF(“id”), personDF(“name”), personDF(“age”) +
1).show
5.//过滤age大于等于18的
personDF.filter(col(“age”) >= 18).show
6.//按年华实行分组并总结相同年龄的人头
personDF.groupBy(“age”).count().show()

使用SparkSQL风格
若是想采用SQL风格的语法,供给将DataFrame注册成表
personDF.registerTempTable(“t_person”)

1.//查询年华最大的前两名
sqlContext.sql(“select * from t_person order by age desc limit
2”).show
2.//显示表的Schema音讯
sqlContext.sql(“desc t_person”).show

sqlContext.sql()中的内容和写普通的基本一致,然则要注意SparkSQL不帮衬子查询

编写程序执行斯ParkerSQL语句
1.首先在maven项目的pom.xml中添加Spark SQL的依赖

<dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-sql_2.10</artifactId>  
    <version>1.5.2</version>  
</dependency>  

2.现实写法1用到case class

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object InferringSchema {
  def main(args: Array[String]) {

//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-1")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)

//从指定的地址创建RDD
val lineRDD = sc.textFile(args(0)).map(_.split(" "))

//创建case class
//将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//导入隐式转换,如果不到人无法将RDD转换成DataFrame
//将RDD转换成DataFrame
import sqlContext.implicits._
val personDF = personRDD.toDF
 //注册表
    personDF.registerTempTable("t_person")
    //传入SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 2")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)

将顺序打成jar包,上传到spark集群,提交斯Parker职责
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit  
–class spark.sql.InferringSchema  
–master spark://node1:7077  
/root/spark-mvn-1.0-SNAPSHOT.jar  
hdfs://node1:9000/person.txt  
hdfs://node1:9000/out

查阅运营结果
hdfs dfs -cat hdfs://node1:9000/out/part-r-*

3.具体写法2,通过StructType直接钦赐Schema

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

object SpecifyingSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //注册表
    personDataFrame.registerTempTable("t_person")
    //执行SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}

从MySQL中加载数据(斯Parker Shell格局)
1.起步Spark Shell,必须钦定mysql连接驱动jar包
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell  
–master spark://node1:7077  
–jars
/usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar
 
–driver-class-path
/usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar

2.从mysql中加载数据
val jdbcDF = sqlContext.read.format(“jdbc”).options(Map(“url” ->
“jdbc:mysql://XXX:3306/bigdata”, “driver” -> “com.mysql.jdbc.Driver”,
“dbtable” -> “person”, “user” -> “root”, “password” ->
“123456”)).load()

3.推行查询
jdbcDF.show()

将数据写入到MySQL中

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MySQL-Demo")
    val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)
    //通过并行化创建RDD
    val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //创建Properties存储数据库相关属性
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    //将数据追加到数据库
    personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata", "bigdata.person", prop)
    //停止SparkContext
    sc.stop()
  }
}

hive on spark-SQL

1.设置hive,修改元数据库,加上hive-site.xml(mysql连接)
2.将hive-site.xml文件拷贝到spark集群的conf下
3.将mysql.jar拷贝到spark.lib下
4.执行:sqlCOntext.sql(“select * from table1”).show()
.write.mode(“append”)
.jdbc()
.foreachPartition(it=>{
1.初阶化连接
2.it.map(x=>{
写多少到存款和储蓄层
})
3.关连接
})

方式

  • 阅读用Python实现的MapReduce代码
  • 在VPS搭建伪分布式的单节点集群
  • 在VPS搭建分布式的多节点集群

范围

不讲现实际操作作系统层面包车型大巴规律,重要讲述逻辑和备忘碰着的坑。

SSH登陆后怎么修改Ubuntu root角色密码

修改vps密码:

passwd root

#reference

Ubuntu 10.04

Ubuntu版本过老,须求安插把package
sources指向旧repository。才能sudo apt-get update

sudo sed -i ‘s/archive.ubuntu/old-releases.ubuntu/’
/etc/apt/sources.list

#reference
#reference-linux
sed命令详解

#reference-http://mirrors.tencentyun.com/ubuntu
#reference-How to install software or upgrade from an old unsupported
release?

以此类推,假设默许镜像使用了别的源而该服务又挂了,也急需修改。(以腾讯云为例)

sudo sed -i ‘s/mirrors.tencentyun/old-releases.ubuntu/’
/etc/apt/sources.list

#reference-ubuntu14.04
腾讯云的源不可用,如何设置软件

SSH服务端配置(被动访问-host)

  • 陈设地址:/etc/ssh/sshd_config
  • bandwagonhost 为了服务器安全,会轻易分配SSH端口(非私下认可22)
  • 是因为服务端和客户端变量(eg.LANG)争辩,须要注释掉locale变量

\# AcceptEnv LANG LC_*

#reference

  • reboot后生效

SSH客户端配置(主动走访-guest)

  • 计划地址:/etc/ssh/ssh_config
  • 设置暗中同意访问端口

host *
Port <端口号>

  • reboot后生效

sudo apt-get install sun-java6-jdk
package已经被移除,供给再行apt-get update

Hadoop 1.0.3

sample使用的Hadoop
1.0.3,且小说历史较久,为了维持环境的同样。备份下载地址

wget
https://archive.apache.org/dist/hadoop/core/hadoop-1.0.3/hadoop-1.0.3.tar.gz

.sh文件执行debug

bash -x <.sh地址>
#reference

开始化hadoop要求授权给hduser
chown命令很有用
mkdir -p用于创制文件夹

Starting your single-node
cluster的时候会蒙受莫明其妙的报错现实java开始化虚拟机败北。

localhost: Could not create the Java virtual machine.
localhost: Error occurred during initialization of VM
localhost: Could not reserve enough space for object heap
localhost: Could not create the Java virtual machine

google回来的缓解方案都以java set heap 可能 32/6四个人顶牛。
变更了32/陆九位系统以及 java -Xmx64m 后无效。
构成个人的与众区别景况(vps
128m),检查hadoop配置/usr/local/hadoop/conf/hadoop-env.sh品味设置export HADOOP_HEAPSIZE=4生效。(64或128都不行,因为虚拟机分配不了这么多)
默认为1g.

root password 和 hduser password 保持一致。
sudo adduser hduser sudo到sudo权限组

wget
http://www.gutenberg.org/etext/20417
wget
http://www.gutenberg.org/etext/5000
wget
http://www.gutenberg.org/etext/4300

VPS:搬瓦工(128m)
为了网络安全,限制了ssh接口。
vps系统环境为ubuntu x_86_64(ubuntu x_86)

后续

  • 斯Parker&MapReduce范式学习&相比较
  • YARN
  • Thrift

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None#每个词

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)#转类型
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:#出现连续数字则统计(不连续不统计)
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
#迭代结束
# do not forget to output the last word if needed!
if current_word == word:
    print '%s\\t%s' % (current_word, current_count)

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["<current_word>", "<count>"] items
    #itertools.groupby(iterator,keyfunc)连续的值[](https://docs.python.org/dev/library/itertools.html#itertools.groupby)
    for current_word, group in groupby(data, itemgetter(0)):#operator.itemgetter()[](http://blog.csdn.net/dongtingzhizi/article/details/12068205)
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:#第一次迭代,每一行
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()#分词
    # increase counters
    for word in words:#第二期迭代(嵌套),每个词
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\\t%s' % (word, 1)

#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""

import sys

def read_input(file):
    for line in file:#每一行
        # split the line into words
        yield line.split()#分词

def main(separator='\\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:#每一行
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:#每个词
            print '%s%s%d' % (word, separator, 1)

if __name__ == "__main__":
    main()

发表评论

电子邮件地址不会被公开。 必填项已用*标注