|簡體中文

比思論壇

 找回密碼
 按這成為會員
搜索



查看: 1026|回復: 1
打印 上一主題 下一主題

大数据开发之sparkSQL的使用分享

[複製鏈接]

53

主題

0

好友

436

積分

中學生

Rank: 3Rank: 3

  • TA的每日心情
    擦汗
    2023-11-18 23:06
  • 簽到天數: 821 天

    [LV.10]以壇為家III

    推廣值
    0
    貢獻值
    0
    金錢
    352
    威望
    436
    主題
    53
    樓主
    發表於 2021-12-24 13:35:11
    Spark之前使用RDD操作大数据,非常方便,但是也有各种问题,例如RDD每次读取的都是字符串,以及语法比较比较麻烦。针对这种情况,spark在新版本中升级RDD为DataFrame和DataSet,并使用SQL的方式去操作数据。DataFrame,RDD的升级版,分布式的数据集,并且以列的方式组合的,类似于二维表格式,除数据外保存数据结构信息
    DataSet,DataFrame扩展,最新的数据抽象,相对于DataFrame,DataSet会记录字段的数据类型,并进行严格错误检查

    三者的关系是RDD进化 >>> DataFrame ,DataFrame进化 >>> DataSet。

    当然这里也不是说RDD不用了,而是把RDD转为底层处理,所以同学们还是需要先理解什么是RDD

    常用方法

    DataSet,DataFrame 的使用是依赖于SparkSession的,所以我们需要先创建SparkSession

    val spark = new SparkContext(

    new SparkConf().setMaster("local")

    .setAppName("taobao")

    read.csv("路径")

    val data = spark.read.csv("data/A.csv")

    val data = spark.read

    .option("header","true") // 设置读取首行,这里的声明用于把数据首行作为列名【关注尚硅谷,轻松学IT】

    .csv("data/A.csv")

    show(num)

    显示顶部num行数据

    map(func)

    操作和RDD中类似,不同的是需要隐式转换,在代码前加上

    import spark.implicits._

    data.map(

    x => { // raw 可以通过下标获得到对应中,不需要切分

    (x.getString(0),x.getString(1).toInt)

    }

    toDF("列名"...)

    转换成DataFrame类型,并设置列名

    select(col: String, cols: String*): DataFrame

    查询指定列并返回数据

    val r2 = data.select("名称",

    "人均价格")

    r2.show(10)

    selectExpr(exprs: String*): DataFrame

    执行原生的SQL中函数

    data.selectExpr("count(name)")

    data.selectExpr("avg(age)")

    rdd

    把DataFrame和DataSet转换成RDD类型

    printSchema()

    查看表结构

    root

    -- age: long (nullable = true)

    -- name: string (nullable = true)

    filter(Str)

    执行过滤

    filter("age>10")

    filter("age>10 and name='张三'")

    sum("列名") \ avg("列名") \ max("列名") \ min("列名") \ count()

    针对列进行求和

    平均值

    最大值

    最小值

    数量

    data.groupBy("age").sum("age").show()

    data.groupBy("age").avg("age").show()

    orderBy("列名")

    排序

    val r2 = data.map(

    x => {

    (x.getString(0),x.getString(1).toInt)

    }

    ).toDF("名称","评论数")

    .orderBy("评论数")

    r2.show(10)

    这里注意如果需要降序需要

    val r2 = data.map(

    x => {

    (x.getString(0),x.getString(1).toInt)

    }

    ).toDF("名称","评论数")

    .orderBy(desc("评论数"))

    r2.show(10)

    注意导入import org.apache.spark.sql.functions.desc

    除了上述的方法外,Spark还支持直接使用SQL的方式操作数据,方法如下

    createOrReplaceTempView(str)

    创建临时表,注意使用SQL的时候需要根据当前数据创建临时表,这样才可以在SQL里面使用

    data.createOrReplaceTempView("user")

    sql(str)

    在当前数据集上执行SQL语句

    val result = spark.sql("select name from user")

    result.show()

    val count = spark.sql("select count(*) from user")

    count.show()

    spark.udf.register(fName,func)

    自定义一个函数,用于SQL中处理

    spark.udf.register("f1",(x:String) => (x+"a"))

    val result = spark.sql("select f1(name) from user")

    result.show()

    原创作者:浩哥

    頭像被屏蔽

    8

    主題

    0

    好友

    2801

    積分

    禁止發言

  • TA的每日心情
    慵懶
    2023-7-23 17:55
  • 簽到天數: 1435 天

    [LV.10]以壇為家III

    推廣值
    0
    貢獻值
    0
    金錢
    2906
    威望
    2801
    主題
    8
    沙發
    發表於 2022-1-6 00:19:40
    提示: 作者被禁止或刪除 內容自動屏蔽
    重要聲明:本論壇是以即時上載留言的方式運作,比思論壇對所有留言的真實性、完整性及立場等,不負任何法律責任。而一切留言之言論只代表留言者個人意見,並非本網站之立場,讀者及用戶不應信賴內容,並應自行判斷內容之真實性。於有關情形下,讀者及用戶應尋求專業意見(如涉及醫療、法律或投資等問題)。 由於本論壇受到「即時上載留言」運作方式所規限,故不能完全監察所有留言,若讀者及用戶發現有留言出現問題,請聯絡我們比思論壇有權刪除任何留言及拒絕任何人士上載留言 (刪除前或不會作事先警告及通知 ),同時亦有不刪除留言的權利,如有任何爭議,管理員擁有最終的詮釋權。用戶切勿撰寫粗言穢語、誹謗、渲染色情暴力或人身攻擊的言論,敬請自律。本網站保留一切法律權利。

    手機版| 廣告聯繫

    GMT+8, 2024-5-23 20:37 , Processed in 0.029665 second(s), 20 queries , Gzip On, Memcache On.

    Powered by Discuz! X2.5

    © 2001-2012 Comsenz Inc.

    回頂部