【Spark】读取本地文件

最近有一个需求,spark任务读取本地csv文件,拼接成rk之后再去hbase取值进行后续处理。搞了好久都没能解决,记录一下解决思路如下:

1、写入临时文件

spark可以读取本地文件,但打成jar包之后不会自动读取jar包中的文件,需要写入临时文件再进行读取。于是操作如下:

//定义文件路径,从jar包中读取csv文件到inputStream
val inputStream = this.getClass.getResourceAsStream(s"/source/fileName.csv")

//创建临时文件存储csv文件
val tempFile = Files.createTempFile("temp-source",".csv")
val tempFilePath = tempFile.toAbsolutePath.toString

val outputStream = new FileOutputStream(tempFilePath)
val bufferedOutputStream = new BufferedOutputStream(outputStream)

//将inputStream中的数据写入到临时文件
try {
    val buffer = enw Array[Byte](1024)
    var bytesRead = -1
    while ({
        bytesRead = inputStream.read(buffer)
        var bytesRead = -1
    }) {
        bufferedOutputStream.write(buffer,0,bytesRead)
    }
} finally {
    bufferedOutputStream.close()
    outputStream.close()
}
println(s"Temp file created at: ${tempFilePath}")



//读取临时csv文件为DataFrame
val csvDF = spark.read.option("header","true")
            .csv("file:///${tempFilePath}")

2、临时文件上传至HDFS

按照上面的做法发包到集群上运行之后报错。猜测可能因为在集群上运行,driver端读取不到本地创建的临时文件数据。于是将临时文件上传至HDFS,再从hdfs中读取

//将临时文件上传至HDFS
val hdfsPath = new Path("hdfs-source-csv.csv")
FileSystem.get(spark.sparkContext.hadoopConfiguration).copyFromLocalFile(new Path(tempFilePath), hdfsPath)
println(s"File uploaded to HDFS at: ${hdfsPath.toString}")

//读取hdfs文件
val csvDF = spark.read.option("header","true")
            .csv(s"${hdfsPath}")

但是这么做还是失败了。推测是没有写入hdfs的权限。

那只好换个思路:

1、将csv文件转换成sql,写入PG临时表,再从PG读取

2、将csv文件转换成Map,再将Map转换成rdd,进行后续操作

3、摆烂,告诉领导这点工资我做不了,换人做吧

3、读取csv文件传换成Map

package scala.test

import org.apache.commons.lang3.StringUtils

import scala.collection.mutable.ArrayBuffer
import scala.util.parsing.combinator._

object CSVParser extends RegexParsers {
  override protected val whiteSpace = """[ \t]""".r

  def field: Parser[String] = quoted | nonQuoted

  def quoted: Parser[String] = "\"" ~> """[^"\n]*""".r <~ "\""

  def nonQuoted: Parser[String] = """[^,\n]*""".r

  def record: Parser[List[String]] = repsep(field, ",")

  def records: Parser[List[List[String]]] = repsep(record, "\n")

  def apply(input: String): ParseResult[List[List[String]]] = parseAll(records, input)
}

// 读csv转成map
object Main {

  def main(args: Array[String]): Unit = {
    val arrayBuffer: ArrayBuffer[List[String]] = new ArrayBuffer[List[String]]()
    val source = scala.io.Source.fromInputStream(this.getClass.getResourceAsStream("/真实停电汇总.csv"))

    source.getLines().foreach(line => {
      val result = CSVParser(line)
      result match {
        case CSVParser.Success(records, _) =>
          records.foreach { record =>
            arrayBuffer.append(record)
          }
        case CSVParser.Failure(msg, _) => println("Parsing failed: " + msg)
        case CSVParser.Error(msg, _) => println("Error: " + msg)
      }
    })

    val head = arrayBuffer.head
    arrayBuffer.tail.foreach(item => {
      if (StringUtils.isNoneEmpty(item.head)) println(head.zip(item).filter(x=>StringUtils.isNotEmpty(x._1)).toMap)
    })

  }
}

4、将Map转换成rdd

方法一:

var seq = Seq[Map[String, String]]()
arrayBuffer.tail.foreach(item => {
    //将多个map合并成一个seq
    if(StringUtils.isNoneEmpty(item.head)) seq :+= (head.zip(item).filter(x=> StringUtils.isNotEmpty(x._1)).toMap)
})

val rdd = spark.sparkContext.parallelize(seq).repartition(12)
rdd.map(item => {
    (item("id"), item("name"))
}).toDF("id","name")

方法二:

val head = arrayBuffer.head
val body = arrayBuffer.tail
val rdd = spark.sparkContext.parallelize(body).repartition(12).persist()
rdd.toDF("COL").select(selectCol($"COL", head) :_*)

def selectCol(col: Column, head: List[String]): ArrayBuffer[Column] = {
    val arrayBuffer = new ArrayBuffer[Column]()
    for (i <- head.indices) {
        arrayBuffer.append(col(i).as(head(i)))
    }
    arrayBuffer
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/583937.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【架构】后端项目如何分层及分层领域模型简化

文章目录 一. 如何分层1. 阿里规范2. 具体案例分析 二. 分层领域模型的转换1. 阿里规范2. 模型种类简化分析 三. 小结 本文描述后端项目中如何进行分层&#xff0c;以及分层领域模型简化 一. 如何分层 1. 阿里规范 阿里的编码规范中约束分层逻辑如下: 开放接口层&#xff1a…

Apache Seata基于改良版雪花算法的分布式UUID生成器分析1

title: Seata基于改良版雪花算法的分布式UUID生成器分析 author: selfishlover keywords: [Seata, snowflake, UUID] date: 2021/05/08 本文来自 Apache Seata官方文档&#xff0c;欢迎访问官网&#xff0c;查看更多深度文章。 Seata基于改良版雪花算法的分布式UUID生成器分析…

NLP(10)--TFIDF优劣势及其应用Demo

前言 仅记录学习过程&#xff0c;有问题欢迎讨论 TF*IDF&#xff1a; 优势&#xff1a; 可解释性好 可以清晰地看到关键词 即使预测结果出错&#xff0c;也很容易找到原因 计算速度快 分词本身占耗时最多&#xff0c;其余为简单统计计算 对标注数据依赖小 可以使用无标注语…

请编写函数fun,该函数的功能是:将放在字符串数组中的M个字符串(每串的长度不超过N),按顺序合并组成一个新的字符串。

本文收录于专栏:算法之翼 https://blog.csdn.net/weixin_52908342/category_10943144.html 订阅后本专栏全部文章可见。 本文含有题目的题干、解题思路、解题思路、解题代码、代码解析。本文分别包含C语言、C++、Java、Python四种语言的解法完整代码和详细的解析。 题干 请编…

React Router 路由配置数组配组持久化

在一些特定场景下,你可能需要将路由配置数组进行持久化,例如从后端动态加载路由配置或根据用户权限动态生成路由配置。这时,持久化路由配置数组就很有用,可以避免每次应用启动时重新获取或计算路由配置。 持久化路由配置数组的步骤如下: 定义路由配置数组 首先,你需要定义一…

[华为OD]C卷 找座位,在一个大型体育场内举办了一场大型活动,由于疫情防控的需要 100

题目&#xff1a; 在一个大型体育场内举办了一场大型活动&#xff0c;由于疫情防控的需要&#xff0c;要求每位观众的必须间隔至 少一个空位才允许落座。现在给出一排观众座位分布图Q,座位中存在已落座的观众&#xff0c;请计 算出&#xff0c;在不移动现有观众座位的情况…

从不同性别、年龄入手,发过的主题还能发!| NHANES数据库周报(4.24)

零基础NHANES挖掘培训班,欢迎咨询&#xff01; 课程 | 零基础两天掌握NHANES公共数据库挖掘技巧&#xff0c;发表SCI论文 美国国家健康和营养检查调查&#xff08;NHANES&#xff09;是一项旨在评估美国成人和儿童健康和营养状况的研究计划。该调查的独特之处在于它结合了访谈和…

Spring6 当中 获取 Bean 的四种方式

1. Spring6 当中 获取 Bean 的四种方式 文章目录 1. Spring6 当中 获取 Bean 的四种方式每博一文案1.1 第一种方式&#xff1a;通过构造方法获取 Bean1.2 第二种方式&#xff1a;通过简单工厂模式获取 Bean1.3 第三种方式&#xff1a;通过 factory-bean 属性获取 Bean1.4 第四种…

LT6911C HDMI 1.4 至 2 端口 MIPI DSI/CSI 龙迅方案

1. 描述LT6911C 是一款高性能 HDMI1.4 至 MIPIDSI/CSI/LVDS 芯片&#xff0c;适用于 VR/智能手机 / 显示应用。对于 MIPIDSI / CSI 输出&#xff0c;LT6911C 具有可配置的单端口或双端口 MIPIDSI/CSI&#xff0c;具有 1 个高速时钟通道和 1~4 个高速数据通道&#xff0c;工作速…

NFTScan | 04.22~04.28 NFT 市场热点汇总

欢迎来到由 NFT 基础设施 NFTScan 出品的 NFT 生态热点事件每周汇总。 周期&#xff1a;2024.04.22~ 2024.04.28 NFT Hot News 01/ ApeCoin DAO 发起「由 APE 代币支持的 NFT Launchpad」提案投票 4 月 22 日&#xff0c;ApeCoin DAO 社区发起「由 APE 代币支持的 NFT Launch…

JAVA基础——集合框架(List与Set)

数据结构 什么是数据结构 数据结构就是用来装数据以及数据与之间关系的一种集合。如何把相关联的数据存储到计算机&#xff0c;为后续的分析提供有效的数据源&#xff0c;是数据结构产生的由来。数据结构就是计算机存储、组织数据的方式。好的数据结构&#xff0c;让我们做起事…

Deckset for Mac激活版:MD文档转幻灯片软件

Deckset for Mac是一款专为Mac用户打造的Markdown文档转幻灯片软件。它凭借简洁直观的界面和强大的功能&#xff0c;成为许多用户的心头好。 Deckset for Mac激活版下载 Deckset支持Markdown语法&#xff0c;让用户在编辑文档时无需分心于复杂的格式设置&#xff0c;只需专注于…

分布式与一致性协议之Raft算法(二)

Raft算法 什么是任期 我们知道&#xff0c;议会选举中的领导者是有任期的&#xff0c;当领导者任命到期后&#xff0c;需要重新再次选举。Raft算法中的领导者也是有任期&#xff0c;每个任期由单调递增的数字(任期编号)标识。比如&#xff0c;节点A的任期编号是1。任期编号会…

Spark-机器学习(8)分类学习之随机森林

在之前的文章中&#xff0c;我们学习了分类学习之支持向量机决策树支持向量机&#xff0c;并带来简单案例&#xff0c;学习用法。想了解的朋友可以查看这篇文章。同时&#xff0c;希望我的文章能帮助到你&#xff0c;如果觉得我的文章写的不错&#xff0c;请留下你宝贵的点赞&a…

【全开源】Java上门老人护理老人上门服务类型系统小程序APP源码

功能&#xff1a; 服务分类与选择&#xff1a;系统提供详细的老人护理服务分类&#xff0c;包括日常照护、康复训练、医疗护理等&#xff0c;用户可以根据老人的需求选择合适的服务项目。预约与订单管理&#xff1a;用户可以通过系统预约护理服务&#xff0c;并查看订单详情&a…

mybatis工程需要的pom.xml,以及@Data 、@BeforeEach、@AfterEach 的使用,简化mybatis

对 “mybatis - XxxMapper.java接口中方法的参数 和 返回值类型&#xff0c;怎样在 XxxMapper.xml 中配置的问题” 这篇文章做一下优化 这个pom.xml文件&#xff0c;就是上面说的这篇文章的父工程的pom.xml&#xff0c;即&#xff1a;下面这个pom.xml 是可以拿来就用的 <?…

Python爬虫(入门版)

1、爬虫是什么 简单的来说&#xff1a;就是用程序获取网络上数据。 2、爬虫的原理 如果要获取网络上数据&#xff0c;我们要给爬虫一个网址&#xff08;程序中通常叫URL&#xff09;&#xff0c;爬虫发送一个HTTP请求给目标网页的服务器&#xff0c;服务器返回数据给客户端&am…

帕累托森林李朝政博士受聘「天工开物开源基金会」专家顾问

导语&#xff1a; 开源铸造了当前最前沿的科技引擎。开源驱动了软件生态&#xff0c;也以指数级速度驱动硬件生态。 3月中旬&#xff0c;天工开物开源基金会授予李朝政博士专家顾问&#xff0c;表彰他积极推动参与中国智能软件生态的建设&#xff0c;期待一起共筑未来新生态。…

稳扎稳打 部署丝滑 开源即时通讯(IM)项目OpenIM源码部署流程(linux windows mac)

背景 OpenIM包含多个关键组件&#xff0c;每个都是系统功能必不可少的一部分。具体来说&#xff0c;MongoDB 用于持久化存储&#xff1b;Redis 用作缓存&#xff1b;Kafka 用于消息队列&#xff1b;Zookeeper 用于服务发现&#xff1b;Minio 用于对象存储。这些组件的众多可能会…

C# Web控件与数据感应之 ListControl 类

目录 关于数据感应 ListControl 类类型控件 范例运行环境 数据感应通用方法 设计 实现 调用示例 数据源 调用 小结 关于数据感应 数据感应也即数据捆绑&#xff0c;是一种动态的&#xff0c;Web控件与数据源之间的交互&#xff0c;诸如 System.Web.UI.WebControls 里…
最新文章