We read data from files using
- DataFrameReader's load() function. <- RECOMMENDED
- DataFrameReader's format specific functions like csv(), parquet(), orc() etc NOT RECOMMENDED
Reading data
The flow of reading data from files is is simple -
- You read data from files into a spark dataframe
- You set schema of the dataframe through any one of two ways -
- You tell spark to infer the schema during loading
- You explicitly define the data's schema. You can do it in one of two ways
- DDL string
- Programmatically - Through Spark API functions
Read data from files
2 ways - standard template, shortcut method. Prefer using standard remplate
The flow is simple -
- You read data from files into a spark dataframe
- You set schema of the dataframe through any one of two ways -
- You tell spark to infer the schema during loading
- You explicitly define the data's schema. You can do it in one of two ways
- DDL string
- Programmatically - Through Spark API functions
General structure:
# create spark session
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[1]") \
.appName("DataLoadDemo") \
.getOrCreate()
# Perform load action
df = spark.read \
.format("your_data_format")
.option("key", "value")
.schema(schemaObject) # optional
.load("path/to/data")
CSV
documentation link for options
sales_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferschema", "true") \ # optional option
.load("data/sales/2022/01/*.csv")
List of options - CSV files documentation - options
Parquet
sales_df = spark.read \
.format("parquet") \
.load("data/sales/2022/01/*.parquet")
List of options - Parquet files documentation - options
There aren't much options to set when dealing with parquet files
Json
sales_df = spark.read \
.format("json") \
.load("data/sales/2022/01/*.json")
List of options - Json files documentation - options
Delta
from pyspark.sql import functions as F
df = spark.read \
.format("delta") \
.load("data/sales/2022/01/")
Want to load specific partitions from delta table? Just apply a filter, and the library will take care of optimizations
df = spark.read.format("delta").load('data/sales/').filter("year = '2021' and month = '01' and day in ('04','05','06')")
General knowledge you should have
mode
option
Every loader has an option to set the mode of reading data. Example:
.option("mode", "DROPMALFORMED")
The default is PERMISSIVE
It decides what to do when spark comes across a malformed row/file
There are 3 modes:
PERMISSIVE
- Default mode. Sets all fields to null. Places values in a string col called_corrupt_record
(it uses the value set in thespark.sql.columnNameOfCorruptRecord
configuration)DROPMALFORMED
- Removes malformed record. It basically skips over malformed records and just loads the healthy onesFAILFAST
- raises exception, terminates immediately
You can override the name of the corrupt record column used by PERMISSIVE
mode using another option. But it is not at all recommended.
...
.option("mode", "DROPMALFORMED")
.option("columnNameOfCorruptRecord","corrupt_record")
...
Format specific methods
NOT RECOMMENDED
Instead of doing
.option("path", "data/sales/2022/01/")
.load()
You can also directly do
.csv("data/sales/2022/01/*.csv")
But it is not recommended. It is much better to follow the standard template everywhere.
Find more details at pyspark.sql.DataFrameReader documentation
Set schema of loaded dataframe
It is recommended that you always explicitly define schema, instead of depending on inferschema
option. Only exception will be data formats like Parquet, ORC that contain metadata which includes the data's schema.
The flow is:
- Create schema through DDL string or through StructType
- Supply schema to the DataFrameReader through the
.schema()
method
You can find all supported schema datatypes at - https://spark.apache.org/docs/latest/sql-ref-datatypes.html
Using SQL DDL string
Note - If you use the DATE
type, you'll have to provide date format through .option("dateFormat","YOUR_FORMAT_STRING")
Example -
sales_schema_ddl = """CUSTOMER_ID INT, COST INT, SALE_DATE DATE, COUNTRY STRING"""
sales_df = spark.read \
.format("csv") \
.option("header", "true") \
.schema(sales_schema_ddl) \ # EXPLICITLY SET SCHEMA
.option("mode", "FAILFAST") \
.option("dateFormat", "M/d/y") \
.load("data/sales/2022/01/*.csv")
Here is a list of datatypes you can use in the SQL string:
Data type | SQL name |
---|---|
BooleanType | BOOLEAN |
ByteType | BYTE, TINYINT |
ShortType | SHORT, SMALLINT |
IntegerType | INT, INTEGER |
LongType | LONG, BIGINT |
FloatType | FLOAT, REAL |
DoubleType | DOUBLE |
DateType | DATE |
TimestampType | TIMESTAMP, TIMESTAMP_LTZ |
TimestampNTZType | TIMESTAMP_NTZ |
StringType | STRING |
BinaryType | BINARY |
DecimalType | DECIMAL, DEC, NUMERIC |
YearMonthIntervalType | INTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTH |
DayTimeIntervalType | INTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECOND, INTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECOND, INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECOND |
ArrayType | ARRAY |
StructType | STRUCT Note: ‘:’ is optional. |
MapType | MAP |
Programatically
Create the schema structure using StructType()
which is made up to StructField()
.
sales_schema_struct = StructType([
StructField("CUSTOMER_ID", IntegerType()),
StructField("COST", IntegerType()),
StructField("SALE_DATE", DateType()),
StructField("COUNTRY", StringType()),
])
sales_df = spark.read \
.format("csv") \
.option("header", "true") \
.schema(sales_schema_struct) \ # EXPLICITLY SET SCHEMA
.option("mode", "FAILFAST") \
.option("dateFormat", "M/d/y") \
.load("data/sales/2022/01/*.csv")
Here is a list of the various data types you can use in StructField():
Data type | Value type in Python | API to access or create a data type |
---|---|---|
ByteType | int or long Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. |
ByteType() |
ShortType | int or long Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. |
ShortType() |
IntegerType | int or long | IntegerType() |
LongType | long Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. |
LongType() |
FloatType | float Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime. |
FloatType() |
DoubleType | float | DoubleType() |
DecimalType | decimal.Decimal | DecimalType() |
StringType | string | StringType() |
BinaryType | bytearray | BinaryType() |
BooleanType | bool | BooleanType() |
TimestampType | datetime.datetime | TimestampType() |
TimestampNTZType | datetime.datetime | TimestampNTZType() |
DateType | datetime.date | DateType() |
DayTimeIntervalType | datetime.timedelta | DayTimeIntervalType() |
ArrayType | list, tuple, or array | ArrayType(elementType, [containsNull]) Note:The default value of containsNull is True. |
MapType | dict | MapType(keyType, valueType, [valueContainsNull]) Note:The default value of valueContainsNull is True. |
StructType | list or tuple | StructType(fields) Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed. |
StructField | The value type in Python of the data type of this field (For example, Int for a StructField with the data type IntegerType) |
StructField(name, dataType, [nullable]) Note: The default value of nullable is True. |