SparkSQL和DataFrame

SparkSQL和DataFrame

SparkSQL简介

Spark
SQL是Spark用来拍卖结构化数据的一个模块,它提供了一个编程抽象为做DataFrame并且作为分布式SQL查询引擎的用意。它是拿Spark
SQL转换成RDD,然后交到到集群执行,执行效率特别抢!

SparkSQL的特性

1.易整合
2.联合的数额访问方式
3.兼容Hive
4.标准的多少连接

DataFrames简介
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更如风数据库的第二维表格,除了数据之外,还记下数据的组织信息,即schema。同时,与Hive类似,DataFrame也支撑嵌套数据类型(struct、array和map)。从API易用性的角度达
看,DataFrame API提供的是平拟高层的干操作,比函数式的RDD
API要更为融洽,门槛更小。由于与R和Pandas的DataFrame类似,Spark
DataFrame很好地继承了民俗单机数据解析的出体验。

创建DataFrames

在Spark
SQL中SQLContext是开创DataFrames和推行SQL的入口,在spark-1.5.2中早已停放了一个sqlContext
1.每当地面创建一个文本,有三列,分别是id、name、age,用空格分隔,然后上流传hdfs上
hdfs dfs -put person.txt /
2.以spark shell执行下发号施令,读取数据,将各国一行的数量采取列分隔符分割
val lineRDD = sc.textFile(“hdfs://node1:9000/person.txt”).map(_.split(”
“))
3.定义case class(相当给表的schema)
case class Person(id:Int, name:String, age:Int)
4.将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1),
x(2).toInt))
5.将RDD转换成DataFrame
val personDF = personRDD.toDF
6.针对性DataFrame进行处理
personDF.show

DataFrames常见操作

1.//查看DataFrame中的内容
personDF.show
2.//查看DataFrame部分列着的情
personDF.select(personDF.col(“name”)).show
personDF.select(col(“name”), col(“age”)).show
personDF.select(“name”).show
3.//打印DataFrame的Schema信息
personDF.printSchema
4.//查询所有的name和age,并拿age+1
personDF.select(col(“id”), col(“name”), col(“age”) + 1).show
personDF.select(personDF(“id”), personDF(“name”), personDF(“age”) +
1).show
5.//过滤age大于等于18底
personDF.filter(col(“age”) >= 18).show
6.//按年龄进行分组并统计相同年龄的人口
personDF.groupBy(“age”).count().show()

使用SparkSQL风格
假使想使用SQL风格的语法,需要以DataFrame注册成表
personDF.registerTempTable(“t_person”)

1.//查询年华最要命之前头片名为
sqlContext.sql(“select * from t_person order by age desc limit
2”).show
2.//显示表的Schema信息
sqlContext.sql(“desc t_person”).show

sqlContext.sql()中的内容和描写普通的基本一致,但是只要顾SparkSQL不支持子查询

编写程序执行SparkSQL语句
1.首先在maven项目的pom.xml中添加Spark SQL的依赖

<dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-sql_2.10</artifactId>  
    <version>1.5.2</version>  
</dependency>  

2.现实写法1采取case class

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object InferringSchema {
  def main(args: Array[String]) {

//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-1")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)

//从指定的地址创建RDD
val lineRDD = sc.textFile(args(0)).map(_.split(" "))

//创建case class
//将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//导入隐式转换,如果不到人无法将RDD转换成DataFrame
//将RDD转换成DataFrame
import sqlContext.implicits._
val personDF = personRDD.toDF
 //注册表
    personDF.registerTempTable("t_person")
    //传入SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 2")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)

以先后打成jar包,上传到spark集群,提交Spark任务
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit  
–class spark.sql.InferringSchema  
–master spark://node1:7077  
/root/spark-mvn-1.0-SNAPSHOT.jar  
hdfs://node1:9000/person.txt  
hdfs://node1:9000/out

翻运行结果
hdfs dfs -cat hdfs://node1:9000/out/part-r-*

3.切实可行写法2,通过StructType直接指定Schema

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

object SpecifyingSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //注册表
    personDataFrame.registerTempTable("t_person")
    //执行SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}

自从MySQL中加载数据(Spark Shell方式)
1.启动Spark Shell,必须指定mysql连接驱动jar包
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell  
–master spark://node1:7077  
–jars
/usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar
 
–driver-class-path
/usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar

2.打mysql中加载数据
val jdbcDF = sqlContext.read.format(“jdbc”).options(Map(“url” ->
“jdbc:mysql://XXX:3306/bigdata”, “driver” -> “com.mysql.jdbc.Driver”,
“dbtable” -> “person”, “user” -> “root”, “password” ->
“123456”)).load()

3.推行查询
jdbcDF.show()

以数据写入到MySQL中

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MySQL-Demo")
    val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)
    //通过并行化创建RDD
    val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //创建Properties存储数据库相关属性
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    //将数据追加到数据库
    personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata", "bigdata.person", prop)
    //停止SparkContext
    sc.stop()
  }
}

hive on spark-SQL

1.安hive,修改元数据库,加上hive-site.xml(mysql连接)
2.将hive-site.xml文件拷贝到spark集群的conf下
3.将mysql.jar拷贝到spark.lib下
4.执行:sqlCOntext.sql(“select * from table1”).show()
.write.mode(“append”)
.jdbc()
.foreachPartition(it=>{
1.初始化连接
2.it.map(x=>{
描绘多少及澳门美高梅手机网站存储层
})
3.关连接
})

正值考虑web应用缓存层的设计,参考了无数资料,估计要要用到对立成熟应用广泛的分布式缓存Memcached。在.net平台达成已发出相对成熟的Memcached客户端产品,如BeITMemcached和EnyimMemcached,业余时间看了瞬间源码,自己分析并调用一下并无紧。这里大概介绍一下使Memcached的一个简单易行的休养生息存层设计,示例代码基于EnyimMemcached,下面为贴补代码为主。

平等、公共缓存接口

浅析asp.net web caching的休养存类,我们大体可以抽象出如下几单接口方法:

澳门美高梅手机网站 1澳门美高梅手机网站 2

Contractnamespace DotNet.Common.EnyimCache
{
    /// <summary>
    /// memcached公共缓存调用方法接口(读)
    /// </summary>
    public interface ICacheReaderService
    {

        /// <summary>
        /// 返回指定key的对象
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        object Get(string key);

        /// <summary>
        /// 返回指定key的对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        T Get<T>(string key);

        /// <summary>
        /// 是否存在
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        bool isExists(string key);
    }

    /// <summary>
    /// memcached公共缓存调用方法接口(写)
    /// </summary>
    public interface ICacheWriterService
    {
        /// <summary>
        /// 缓存有效间隔时间 (以分钟为单位)
        /// </summary>
        int TimeOut { set; get; }

        /// <summary>
        /// 添加指定key的对象
        /// </summary>
        /// <param name="key"></param>
        /// <param name="obj"></param>
        void Add(string key, object obj);

        /// <summary>
        /// 添加指定key的对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="obj"></param>
        void Add<T>(string key, T obj);

        /// <summary>
        /// 移除指定key的对象
        /// </summary>
        /// <param name="key"></param>
        bool Remove(string key);

        /// <summary>
        /// 修改指定key的对象
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        bool Modify(string key, object destObj);

        /// <summary>
        /// 清空缓存
        /// </summary>
        /// <returns></returns>
        bool Release();
    }
}

 

圈命名就知晓,增删改查是吗。根据个人运用缓存的经验,修改操作通常是休欲的,如果确要改缓存数据,直接去然后上加就是是反了。

再有,你或会见咨询,这里为何要定义两单接口?原因要是考虑到读操作(查询)是常常使用的,而写操作(增删改)相对较少,所以呢将她设计成为读写分离的主意。

老二、缓存服务实现

这里就是用调用Memcached客户端封装好的调用方法,实现增删改查等方式。

澳门美高梅手机网站 3澳门美高梅手机网站 4

Implementusing System;

namespace DotNet.Common.EnyimCache
{
    using Enyim.Caching.Memcached;

    public class CacheReaderService : BaseService, ICacheReaderService
    {

        public int TimeOut
        {
            get;
            set;
        }

        public CacheReaderService()
        {

        }

        public object Get(string key)
        {
            object obj = null;
            Client.TryGet(key, out obj);
            return obj;
        }

        public T Get<T>(string key)
        {
            object obj = Get(key);
            T result = default(T);
            if (obj != null)
            {
                result = (T)obj;
            }
            return result;
        }

        public bool isExists(string key)
        {
            object obj = Get(key);
            return (obj == null) ? false : true;
        }
    }

    public class CacheWriterService : BaseService, ICacheWriterService
    {
        public int TimeOut
        {
            get;
            set;
        }

        public CacheWriterService()
        {

        }

        public CacheWriterService(int timeOut)
        {
            this.TimeOut = timeOut;
        }

        public void Add(string key, object obj)
        {
            if (TimeOut > 0)
            {
                Client.Store(StoreMode.Add, key, obj, DateTime.Now.AddMinutes(TimeOut));
            }
            else
            {
                Client.Store(StoreMode.Add, key, obj);
            }
        }

        public void Add<T>(string key, T obj)
        {
            if (TimeOut > 0)
            {
                Client.Store(StoreMode.Add, key, obj, DateTime.Now.AddMinutes(TimeOut));
            }
            else
            {
                Client.Store(StoreMode.Add, key, obj);
            }
        }

        public bool Remove(string key)
        {
            return Client.Remove(key);
        }

        public bool Modify(string key, object destObj)
        {
            return Client.Store(StoreMode.Set, key, destObj);
        }

        /// <summary>
        /// 清空缓存 TO DO
        /// </summary>
        /// <returns></returns>
        public bool Release()
        {
            throw new NotImplementedException();
        }
    }
}

 

基类里初始化一个MemcachedClient示例Client,这个Client的方式里封装了比多之函数。查看源码可以知道,它们本质上且是朝着Memcached服务端发送有关指令(run
command),然后解析返回的二进制数据,如果您熟悉memcached所动的协商,理解起来应当会一定简单。本文示例只使了客户端提供的几只点子。

又如果小心,在贯彻具体缓存服务之下,CacheWriterService有三三两两独构造函数,其中带参数的是也缓存显式指定过期时。这个参数在实际利用中便需安排,显然是较灵活一些的。

备注:在接口中生出一个函数Release,本来的对象是清空所有的缓存数据,但是客户端从未一直提供相应的函数,如果您发出好之道,请不吝赐教。

老三、简单的读写测试

贴一下字符串、时间、单个类和汇的增删改查示例代码:

澳门美高梅手机网站 5澳门美高梅手机网站 6

CRUD            ICacheWriterService writer = CacheBuilder.GetWriterService();//writer 使用memcached默认过期时间
            ICacheReaderService reader = CacheBuilder.GetReaderService();//reader

            #region 字符串

            string strKey = "hello";

            bool isOK = writer.Remove(strKey); //移除
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            writer.Add(strKey, "hello world"); //添加
            Console.WriteLine("Add key {0}, value:hello world", strKey);

            bool isExists = reader.isExists(strKey);//是否存在
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            string result = reader.Get(strKey) as string;//查询
            Console.WriteLine("Get key {0}:{1}", strKey, result);

            bool isModify = writer.Modify(strKey, "Hello Memcached!");//修改
            Console.WriteLine("Modify key {0}, value:Hello Memcached. The result is:{1}", strKey, isModify);

            result = reader.Get<string>(strKey);
            Console.WriteLine("Generic get key {0}:{1}", strKey, result);

            isOK = writer.Remove(strKey);
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            isExists = reader.isExists(strKey);
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            result = reader.Get(strKey) as string;
            Console.WriteLine("Get key {0}:{1}", strKey, result);

            result = reader.Get<string>(strKey);
            Console.WriteLine("Generic get key {0}:{1}", strKey, result);
            Console.WriteLine();
            Console.WriteLine("===========================================");
            Console.Read();

            #endregion

            #region 时间

            DateTime dtNow = DateTime.Now;
            strKey = "datetime";
            isOK = writer.Remove(strKey); //移除
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            writer.Add(strKey, dtNow); //添加
            Console.WriteLine("Add key {0}, value:{1}", strKey, dtNow);

            isExists = reader.isExists(strKey);//是否存在
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            DateTime dt = (DateTime)reader.Get(strKey);//查询
            Console.WriteLine("Get key {0}:{1}", strKey, dt);

            dt = reader.Get<DateTime>(strKey);
            Console.WriteLine("Generic get key {0}:{1}", strKey, dt);

            isOK = writer.Remove(strKey);
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            isExists = reader.isExists(strKey);
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            Console.WriteLine("Get key {0}:{1}", strKey, reader.Get(strKey));

            Console.WriteLine("Generic get key {0}:{1}", strKey, reader.Get<DateTime>(strKey));//default(datetime)
            Console.WriteLine();
            Console.WriteLine("===========================================");

            Console.Read();

            #endregion

            #region 类

            dtNow = DateTime.Now;
            Province province = new Province(13579, "江苏", dtNow, dtNow);

            strKey = string.Format("{0}_{1}", province.GetType().Name, province.Id);//省
            isOK = writer.Remove(strKey); //移除
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            writer.Add(strKey, province); //添加
            Console.WriteLine("Add key {0}, value:{1}", strKey, dtNow);

            isExists = reader.isExists(strKey);//是否存在
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            Province queryProvince = (Province)reader.Get(strKey);//查询
            Console.WriteLine("Get key {0}:{1}", strKey, queryProvince.ProvinceName);

            queryProvince = reader.Get<Province>(strKey);
            Console.WriteLine("Generic get key {0}:{1}", strKey, queryProvince.ProvinceName);

            isOK = writer.Remove(strKey);
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            isExists = reader.isExists(strKey);
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            Console.WriteLine("Get key {0}:{1}", strKey, reader.Get(strKey));

            Console.WriteLine("Generic get key {0}:{1}", strKey, reader.Get<Province>(strKey));
            Console.WriteLine();
            Console.WriteLine("===========================================");

            Console.Read();

            #endregion

            #region 集合(列表)

            dtNow = DateTime.Now;
            IList<City> listCities = new List<City>();
            City city = new City(135, province.Id, "南京", "210000", dtNow, dtNow);
            listCities.Add(city);
            city = new City(246, province.Id, "苏州", "215000", dtNow, dtNow);
            listCities.Add(city);

            strKey = string.Format("List_{0}_{1}_{2}", province.GetType().Name, province.Id, city.GetType().Name);//省份对应城市
            isOK = writer.Remove(strKey); //移除
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            writer.Add(strKey, listCities); //添加
            Console.WriteLine("Add key {0}, value:", strKey);
            foreach (var item in listCities)
            {
                Console.WriteLine("CityId:{0} CityName:{1}", item.Id, item.CityName);
            }

            isExists = reader.isExists(strKey);//是否存在
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            IList<City> queryCities = reader.Get(strKey) as IList<City>;//查询
            Console.WriteLine("Get key {0}:", strKey);
            foreach (var item in queryCities)
            {
                Console.WriteLine("CityId:{0} CityName:{1}", item.Id, item.CityName);
            }

            queryCities = reader.Get<IList<City>>(strKey);
            Console.WriteLine("Generic get key {0}:", strKey);
            foreach (var item in queryCities)
            {
                Console.WriteLine("CityId:{0} CityName:{1}", item.Id, item.CityName);
            }

            isOK = writer.Remove(strKey);
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            isExists = reader.isExists(strKey);
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            Console.WriteLine("Get key {0}:{1}", strKey, reader.Get(strKey));

            Console.WriteLine("Generic get key {0}:{1}", strKey, reader.Get<IList<City>>(strKey));
            Console.WriteLine();
            Console.WriteLine("===========================================");

            Console.Read();

            #endregion

            #region 集合(字典)

            dtNow = DateTime.Now;
            IDictionary<int, City> dictCities = new Dictionary<int, City>();
            city = new City(123, province.Id, "镇江", "212000", dtNow, dtNow);
            dictCities.Add(city.Id, city);
            city = new City(321, province.Id, "扬州", "225000", dtNow, dtNow);
            dictCities.Add(city.Id, city);

            strKey = string.Format("Dictionary_{0}_{1}_{2}", province.GetType().Name, province.Id, city.GetType().Name);//省份对应城市
            isOK = writer.Remove(strKey); //移除
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            writer.Add(strKey, dictCities); //添加
            Console.WriteLine("Add key {0}, value:", strKey);
            foreach (var item in dictCities)
            {
                Console.WriteLine("CityId:{0} CityName:{1}", item.Key, item.Value.CityName);
            }

            isExists = reader.isExists(strKey);//是否存在
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            IDictionary<int, City> queryDictCities = reader.Get(strKey) as IDictionary<int, City>;//查询
            Console.WriteLine("Get key {0}:", strKey);
            foreach (var item in queryDictCities)
            {
                Console.WriteLine("CityId:{0} CityName:{1}", item.Key, item.Value.CityName);
            }

            queryDictCities = reader.Get<IDictionary<int, City>>(strKey);
            Console.WriteLine("Generic get key {0}:", strKey);
            foreach (var item in queryDictCities)
            {
                Console.WriteLine("CityId:{0} CityName:{1}", item.Key, item.Value.CityName);
            }

            isOK = writer.Remove(strKey);
            Console.WriteLine("Removed key {0}:{1}", strKey, isOK);

            isExists = reader.isExists(strKey);
            Console.WriteLine("Key {0} exists:{1}", strKey, isExists);

            Console.WriteLine("Get key {0}:{1}", strKey, reader.Get(strKey));

            Console.WriteLine("Generic get key {0}:{1}", strKey, reader.Get<IDictionary<int, City>>(strKey));
            Console.WriteLine();
            Console.WriteLine("===========================================");

            Console.Read();

            #endregion

 

此地就是未贴全部代码了,文章最后来示范可以下载。

每当自己的大概测试中,对大基础数据类型如(字符串、数组、数字和日)、集合(列表和字典)都发出佳的呈现,对datatable和dataset同样呈现不俗,但是非绝建议直接缓存这片栽重粒度的花色。

每当显式指定过期时的言传身教中,指定过期时是千篇一律分钟,但是memcached实际过期时有时候好像会多吃同一分钟,估计是系内部的推迟。

当该地电脑及展开10万不好循环添加缓存的长河被,发现系统内存果然多的老厉害。然后查询性能并没有明显下跌,也许同自身的单机测试环境有关,所以我觉得测试结果连从未说服力,要懂,memcached的优势是她的分布式缓存实现。

有人发现什么样确保缓存系统的键唯一也要命让人头疼。同样的休养生息存框架,不同品种不同开发者如何确保自己程序添加的复苏存键唯一呢?有同等栽简单方法就是是经过拼接字符串成为来意义之主键,比如以种类名为、命名空间、类名、数据库中之主键组合成主键等等。当然了,在询问的时节呢如协调封装特定格式的字符串主键。个人感觉确实是一个得力的点子。

demo下载:SimpleCacheApp

发表评论

电子邮件地址不会被公开。 必填项已用*标注