SuperAI

Pyspark in Databricks using CSV files and Other formats
Jan 7
2 min read
0
7
0
Scenario Series : Working With CSV files and other format
Here's a detailed guide for reading CSV files and other formats (like text and Excel files) using PySpark in Databricks, covering all the cases you requested:
1. Read a CSV with Header and Predefined Schema
Scenario: You want more control over the schema, so instead of inferring it automatically, you define the schema yourself.
from pyspark.sql.types import StructType, StructField, StringType,
IntegerType
# Define the schema manually schema = StructType([
StructField("CustomerID", IntegerType(), True), StructField("Name", StringType(), True), StructField("Age", IntegerType(), True), StructField("Country", StringType(), True)
])
df = spark.read.option("header", "true").schema(schema).csv("dbfs:/mnt/customer_data.csv")
df.show()
df.printSchema()
Explanation: By defining a StructType schema, you have explicit control over the data types for each column. This ensures consistency across data loads.
2. Read CSV with a Different Delimiter and Multiple CSV Files Together
Scenario: You have sales data partitioned across multiple CSV files, and they use a semicolon (;) as the delimiter.
df = spark.read.option("header", "true").option("delimiter", ";").csv("dbfs:/mnt/sales_data_part*.csv")
df.show()
Explanation: You can specify custom delimiters (like ; or |) using the delimiter option. The wildcard (*) is used to read multiple files at once.
3. Handling Malformed Data While Reading CSV
Scenario: Some rows in your CSV may have more or fewer columns than expected, and you need to handle these malformed rows.
df = spark.read.option("header", "true").option("mode", "PERMISSIVE").csv("dbfs:/mnt/sales_data.csv")
df.show()
· Explanation: The mode option can be set to handle malformed data:
o PERMISSIVE (default): Corrupt records are placed in a separate column called
corruptrecord.
o DROPMALFORMED: Discards rows that don't match the schema.
o FAILFAST: Throws an error and stops reading if any malformed data is encountered.
4. Handling Corrupt Records with corrupt_record
Scenario: You want to capture malformed or corrupt rows in a separate column for further investigation.
df = spark.read.option("header", "true").option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "_corrupt_record").csv("dbfs:/mnt/sales_data.csv")
df.select("_corrupt_record").show(truncate=False)
Explanation: The columnNameOfCorruptRecord option allows you to specify where corrupt rows should be stored. You can inspect this column to identify data issues.
5. Reading a Text File
Scenario: You have log data or a document saved as a .txt file, and you need to read it into a DataFrame.
df = spark.read.text("dbfs:/mnt/log_data.txt")
df.show(truncate=False)
Explanation: Reading a text file treats each line as a row, and the entire line is placed in a single column called value. This is useful for log processing or analysing unstructured text data.
6. Reading an Excel (XLSX) File
Scenario: You have customer data saved in an Excel file, and you need to read it into Databricks.
Step 1: Install the necessary library in Databricks:
1. In the Databricks workspace, go to "Libraries" > "Install New".
2. Search for com.crealytics:spark-excel_2.12:0.13.5 and install it on your cluster.
Step 2: Use the following code to read the Excel file:
df = spark.read.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("dbfs:/mnt/customer_data.xlsx")
df.show()
Explanation: The spark-excel library allows you to read .xlsx files in Databricks. You can specify whether the file has a header and whether to infer the schema.
Conclusion :
With these options, One can handle a wide variety of CSV files and other formats like text and Excel in Databricks. Whether you need to deal with malformed data, read multiple files at once, or handle complex delimiters, PySpark in Databricks provides robust tools for efficient data processing.
References : Shivakiran kotur from linkedin share.