理解 Spark 写入 API 的数据处理能力


理解 Spark 写入 API 的数据处理能力

文章插图
这张图解释了 Apache Spark DataFrame 写入 API 的流程 。它始于对写入数据的 API 调用,支持的格式包括 CSV、JSON 或 Parquet 。流程根据选择的保存模式(追加、覆盖、忽略或报错)而分岔 。每种模式执行必要的检查和操作,例如分区和数据写入处理 。流程以数据的最终写入或错误结束 , 取决于这些检查和操作的结果 。
Apache Spark 是一个开源的分布式计算系统 , 提供了强大的平台用于处理大规模数据 。写入 API 是 Spark 数据处理能力的基本组成部分,允许用户将数据从他们的 Spark 应用程序写入或输出到不同的数据源 。
一、理解 Spark 写入 API1.数据源Spark 支持将数据写入各种数据源,包括但不限于:
  • 分布式文件系统,如 HDFS
  • 云存储 , 如 AWS S3、Azure Blob Storage
  • 传统数据库(包括 SQL 和 NoSQL)
  • 大数据文件格式(Parquet、Avro、ORC)
2.DataFrameWriter写入 API 的核心类是 DataFrameWriter 。它提供配置和执行写入操作的功能 。通过在 DataFrame 或 Dataset 上调用 .write 方法获得 DataFrameWriter 。
3.写入模式指定 Spark 在写入数据时应如何处理现有数据的模式 。常见的模式包括:
  • Append:将新数据添加到现有数据中 。
  • overwrite:用新数据覆盖现有数据 。
  • ignore:如果数据已存在,则忽略写入操作 。
  • errorIfExists(默认):如果数据已存在,则抛出错误 。
4.格式规范可以使用 .format("formatType") 方法指定输出数据的格式,如 JSON、CSV、Parquet 等 。
5.分区为了实现有效的数据存储,可以使用 .partitionBy("column") 方法根据一个或多个列对输出数据进行分区 。
6.配置选项可以使用 .option("key", "value") 方法设置特定于数据源的各种选项,如压缩、CSV 文件的自定义分隔符等 。
7.保存数据【理解 Spark 写入 API 的数据处理能力】最后 , 使用 .save("path") 方法将 DataFrame 写入指定的路径 。其他方法如 .saveAsTable("tableName") 也可用于不同的写入场景 。
from pyspark.sql import SparkSessionfrom pyspark.sql import Rowimport os# 初始化 SparkSessionspark = SparkSession.builder.appName("DataFrameWriterSaveModesExample").getOrCreate()# 示例数据data = https://www.isolves.com/it/cxkf/bk/2023-12-13/[Row(name="Alice", age=25, country="USA"),Row(name="Bob", age=30, country="UK")]# 附加数据用于追加模式additional_data = [Row(name="Carlos", age=35, country="SpAIn"),Row(name="Daisy", age=40, country="Australia")]# 创建 DataFramesdf = spark.createDataFrame(data)additional_df = spark.createDataFrame(additional_data)# 定义输出路径output_path = "output/csv_save_modes"# 函数:列出目录中的文件def list_files_in_directory(path):files = os.listdir(path)return files# 显示初始 DataFrameprint("初始 DataFrame:")df.show()# 使用覆盖模式写入 CSV 格式df.write.csv(output_path, mode="overwrite", header=True)print("覆盖模式后的文件:", list_files_in_directory(output_path))# 显示附加 DataFrameprint("附加 DataFrame:")additional_df.show()# 使用追加模式写入 CSV 格式additional_df.write.csv(output_path, mode="append", header=True)print("追加模式后的文件:", list_files_in_directory(output_path))# 使用忽略模式写入 CSV 格式additional_df.write.csv(output_path, mode="ignore", header=True)print("忽略模式后的文件:", list_files_in_directory(output_path))# 使用 errorIfExists 模式写入 CSV 格式try:additional_df.write.csv(output_path, mode="errorIfExists", header=True)except Exception as e:print("errorIfExists 模式中发生错误:", e)# 停止 SparkSessionspark.stop()二、Spark 架构概述
理解 Spark 写入 API 的数据处理能力

文章插图
在 Apache Spark 中写入 DataFrame 遵循一种顺序流程 。Spark 基于用户 DataFrame 操作创建逻辑计划,优化为物理计划,并分成阶段 。系统按分区处理数据,对其进行日志记录以确保可靠性 , 并带有定义的分区和写入模式写入到本地存储 。Spark 的架构确保在计算集群中高效管理和扩展数据写入任务 。
从 Spark 内部架构的角度来看,Apache Spark 写入 API 涉及了解 Spark 如何在幕后管理数据处理、分发和写入操作 。让我们来详细了解:


推荐阅读