如何在Scala中读取Hadoop集群上的gz压缩文件

如何在Scala中读取Hadoop集群上的gz压缩文件,第1张

(1)一个从文件创建的Scala对象,或(2)一个并行切片(分布在各个节点之间),或(3)从其他RDD转换得来,或(4)改变已有RDD的持久性,如请求将已有RDD缓存在内存中。Spark应用称为driver,实现单个节点或一组节点上的操作。

读文件返回的是 BufferedSource

这个类有方法:

def

getLines(): Iterator[String]

可以返回按行输入的迭代器,对这个迭代器循环时可以跳过不需要的行(在for中写逻辑)

或者一定数量的行

感觉需要注意的是,如果文件太大了,要避免用那些需要将所有文件内容有读入的函数,例如统计总行数

spark进入txt文件的命令

1、首先启动spark-shell进入Spark-shell模式:(进入spark目录下后 输入命令 bin/spark-shell启动spark-shell模式)

2、加载text文件(spark创建sc,可以加载本地文件和HDFS文件创建RDD)

val textFile = sc.textFile("file:///home/hadoop/test1.txt") #注意file:后是三个“/”

注意:加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file://)进行标识。

3、获取RDD文件textFile所有项(文本文件即总共行数)的计数(还有很多其他的RDD操作,自行百度)

textFile.count() #统计结果显示 1 行

二、在 spark-shell 中读取 HDFS 系统文件“/home/hadoop/test.csv(也可以是txt文件)”(如果该文件不存在, 请先创建),然后,统计出文件的行数:

方法一:

1、加载text文件(spark创建sc,可以加载本地文件和HDFS文件创建RDD)

val textFile = sc.textFile("hdfs:///home/hadoop/test.csv") #注意hdfs:后是三个“/”

注意:加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file://)进行标识。

2、获取RDD文件textFile所有项的计数

textFile.count() #统计结果显示 1 行

方法二:(Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md”的错误。)

1、省去方法一中第一步的命令(1)中的“hdfs://”,其他部分相同,命令如下:

三、编写独立应用程序,读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在, 请先创建),然后,统计出文件的行数;通过 sbt 工具将整个应用程序编译打包成 JAR 包, 并将生成的 JAR 包通过 spark-submit 提交到 Spark 中运行命令:

1、首先输入:quit 命令退出spark-shell模式:

2、在终端中执行如下命令创建一个文件夹 sparkapp3 作为应用程序根目录:

cd ~ # 进入用户主文件夹

mkdir ./sparkapp3 # 创建应用程序根目录

mkdir -p ./sparkapp3/src/main/scala # 创建所需的文件夹结构

3、在 ./sparkapp3/src/main/scala 下建立一个名为 SimpleApp.scala 的文件(vim ./sparkapp3/src/main/scala/SimpleApp.scala),添加代码如下:

/* SimpleApp.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

object SimpleApp {

def main(args: Array[String]) {

val logFile = "hdfs://localhost:9000/home/hadoop/test.csv"

val conf = new SparkConf().setAppName("Simple Application")

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2)

val num = logData.count()

println("这个文件有 %d 行!".format(num))

}

}

4、该程序依赖 Spark API,因此我们需要通过 sbt 进行编译打包。 ./sparkapp3 中新建文件 simple.sbt(vim ./sparkapp3/simple.sbt),添加内容如下,声明该独立应用程序的信息以及与 Spark 的依赖关系:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0-preview2"


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/532299.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-06-24
下一篇2023-06-24

发表评论

登录后才能评论

评论列表(0条)

    保存