Loading data from files


io

TABLE OF CONTENTS

We read data from files using

  1. DataFrameReader's load() function. <- RECOMMENDED
  2. DataFrameReader's format specific functions like csv(), parquet(), orc() etc NOT RECOMMENDED

Reading data

The flow of reading data from files is is simple -

  1. You read data from files into a spark dataframe
  2. You set schema of the dataframe through any one of two ways -
    1. You tell spark to infer the schema during loading
    2. You explicitly define the data's schema. You can do it in one of two ways
      1. DDL string
      2. Programmatically - Through Spark API functions

Read data from files

2 ways - standard template, shortcut method. Prefer using standard remplate

The flow is simple -

  1. You read data from files into a spark dataframe
  2. You set schema of the dataframe through any one of two ways -
    1. You tell spark to infer the schema during loading
    2. You explicitly define the data's schema. You can do it in one of two ways
      1. DDL string
      2. Programmatically - Through Spark API functions

General structure:

# create spark session
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master("local[1]") \
        .appName("DataLoadDemo") \
        .getOrCreate()

# Perform load action
df = spark.read \
    .format("your_data_format")
    .option("key", "value")
    .schema(schemaObject) # optional
    .load("path/to/data")

CSV

documentation link for options

sales_df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferschema", "true") \ # optional option
        .load("data/sales/2022/01/*.csv")

List of options - CSV files documentation - options

Parquet

sales_df = spark.read \
        .format("parquet") \
        .load("data/sales/2022/01/*.parquet")

List of options - Parquet files documentation - options

There aren't much options to set when dealing with parquet files

Json

sales_df = spark.read \
    .format("json") \
    .load("data/sales/2022/01/*.json")

List of options - Json files documentation - options

Delta

from pyspark.sql import functions as F

df = spark.read \
    .format("delta") \
    .load("data/sales/2022/01/")

Want to load specific partitions from delta table? Just apply a filter, and the library will take care of optimizations

df = spark.read.format("delta").load('data/sales/').filter("year = '2021' and month = '01' and day in ('04','05','06')")

General knowledge you should have

mode option

Every loader has an option to set the mode of reading data. Example:

.option("mode", "DROPMALFORMED")

The default is PERMISSIVE It decides what to do when spark comes across a malformed row/file

There are 3 modes:

  1. PERMISSIVE - Default mode. Sets all fields to null. Places values in a string col called _corrupt_record (it uses the value set in the spark.sql.columnNameOfCorruptRecord configuration)
  2. DROPMALFORMED - Removes malformed record. It basically skips over malformed records and just loads the healthy ones
  3. FAILFAST - raises exception, terminates immediately

You can override the name of the corrupt record column used by PERMISSIVE mode using another option. But it is not at all recommended.

...
.option("mode", "DROPMALFORMED")
.option("columnNameOfCorruptRecord","corrupt_record")
...

Format specific methods

NOT RECOMMENDED

Instead of doing

.option("path", "data/sales/2022/01/")
.load()

You can also directly do

.csv("data/sales/2022/01/*.csv")

But it is not recommended. It is much better to follow the standard template everywhere.

Find more details at pyspark.sql.DataFrameReader documentation

Set schema of loaded dataframe

It is recommended that you always explicitly define schema, instead of depending on inferschema option. Only exception will be data formats like Parquet, ORC that contain metadata which includes the data's schema.

The flow is:

  1. Create schema through DDL string or through StructType
  2. Supply schema to the DataFrameReader through the .schema() method

You can find all supported schema datatypes at - https://spark.apache.org/docs/latest/sql-ref-datatypes.html

Using SQL DDL string

Note - If you use the DATE type, you'll have to provide date format through .option("dateFormat","YOUR_FORMAT_STRING") Example -

sales_schema_ddl = """CUSTOMER_ID INT, COST INT, SALE_DATE DATE, COUNTRY STRING"""

sales_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(sales_schema_ddl) \ # EXPLICITLY SET SCHEMA
    .option("mode", "FAILFAST") \
    .option("dateFormat", "M/d/y") \
    .load("data/sales/2022/01/*.csv")

Here is a list of datatypes you can use in the SQL string:

Data type SQL name
BooleanType BOOLEAN
ByteType BYTE, TINYINT
ShortType SHORT, SMALLINT
IntegerType INT, INTEGER
LongType LONG, BIGINT
FloatType FLOAT, REAL
DoubleType DOUBLE
DateType DATE
TimestampType TIMESTAMP, TIMESTAMP_LTZ
TimestampNTZType TIMESTAMP_NTZ
StringType STRING
BinaryType BINARY
DecimalType DECIMAL, DEC, NUMERIC
YearMonthIntervalType INTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTH
DayTimeIntervalType INTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECOND, INTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECOND, INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECOND
ArrayType ARRAY
StructType STRUCT
Note: ‘:’ is optional.
MapType MAP

Programatically

Create the schema structure using StructType() which is made up to StructField().

sales_schema_struct = StructType([
    StructField("CUSTOMER_ID", IntegerType()),
    StructField("COST", IntegerType()),
    StructField("SALE_DATE", DateType()),
    StructField("COUNTRY", StringType()),
    ])

sales_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(sales_schema_struct) \ # EXPLICITLY SET SCHEMA
    .option("mode", "FAILFAST") \
    .option("dateFormat", "M/d/y") \
    .load("data/sales/2022/01/*.csv")

Here is a list of the various data types you can use in StructField():

Data type Value type in Python API to access or create a data type
ByteType int or long
Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127.
ByteType()
ShortType int or long
Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767.
ShortType()
IntegerType int or long IntegerType()
LongType long
Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType.
LongType()
FloatType float
Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime.
FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
TimestampNTZType datetime.datetime TimestampNTZType()
DateType datetime.date DateType()
DayTimeIntervalType datetime.timedelta DayTimeIntervalType()
ArrayType list, tuple, or array ArrayType(elementType, [containsNull])
Note:The default value of containsNull is True.
MapType dict MapType(keyType, valueType, [valueContainsNull])
Note:The default value of valueContainsNull is True.
StructType list or tuple StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Python of the data type of this field
(For example, Int for a StructField with the data type IntegerType)
StructField(name, dataType, [nullable])
Note: The default value of nullable is True.

Other articles with similar tags: