Simplest ETL: Data Engineering

Task Overview Extract: Read data from a CSV file. Transform: Clean and process the data (e.g., filter rows, modify columns). Load: Insert the transformed data into a SQLite database. Code Example # Step 1: Import necessary libraries import pandas as pd # For data manipulation import sqlite3 # For interacting with SQLite databases # Step 2: Extract - Load data from a CSV file def extract_data(file_path): """ Reads data from a CSV file into a Pandas DataFrame. """ df = pd.read_csv(file_path) # Read the CSV file return df # Step 3: Transform - Clean and process the data def transform_data(df): """ Cleans and transforms the data. """ # Drop rows with missing values df = df.dropna() # Convert a column to uppercase (example transformation) df['name'] = df['name'].str.upper() # Filter rows where 'age' is greater than 18 df = df[df['age'] > 18] return df # Step 4: Load - Insert data into a SQLite database def load_data(df, db_path, table_name): """ Loads the transformed data into a SQLite database. """ # Connect to the SQLite database (creates it if it doesn't exist) conn = sqlite3.connect(db_path) # Insert the DataFrame into the database as a table df.to_sql(table_name, conn, if_exists='replace', index=False) # Close the database connection conn.close() # Step 5: Main function to orchestrate the ETL process def main(): # Define file paths input_file = 'data.csv' # Path to the input CSV file db_file = 'example.db' # Path to the SQLite database table_name = 'users' # Name of the table to create # Step 1: Extract print("Extracting data...") data = extract_data(input_file) # Step 2: Transform print("Transforming data...") transformed_data = transform_data(data) # Step 3: Load print("Loading data into the database...") load_data(transformed_data, db_file, table_name) print("ETL process completed successfully!") # Run the main function if __name__ == "__main__": main() Explanation of Each Line Step 1: Import Libraries import pandas as pd import sqlite3 pandas: A powerful library for data manipulation and analysis. It provides the DataFrame object, which is ideal for handling tabular data. sqlite3: A built-in Python library for interacting with SQLite databases. Step 2: Extract Data def extract_data(file_path): df = pd.read_csv(file_path) return df pd.read_csv(file_path): Reads the CSV file into a Pandas DataFrame. The file_path is the location of the CSV file. The function returns the DataFrame containing the raw data. Step 3: Transform Data def transform_data(df): df = df.dropna() df['name'] = df['name'].str.upper() df = df[df['age'] > 18] return df df.dropna(): Removes rows with missing values (NaN). df['name'].str.upper(): Converts the name column to uppercase. df[df['age'] > 18]: Filters rows where the age column is greater than 18. The function returns the cleaned and transformed DataFrame. Step 4: Load Data def load_data(df, db_path, table_name): conn = sqlite3.connect(db_path) df.to_sql(table_name, conn, if_exists='replace', index=False) conn.close() sqlite3.connect(db_path): Connects to the SQLite database. If the database doesn't exist, it creates one. df.to_sql(table_name, conn, if_exists='replace', index=False): Inserts the DataFrame into the database as a table. table_name: Name of the table to create. if_exists='replace': Replaces the table if it already exists. index=False: Prevents Pandas from writing row indices to the database. conn.close(): Closes the database connection. Step 5: Main Function def main(): input_file = 'data.csv' db_file = 'example.db' table_name = 'users' print("Extracting data...") data = extract_data(input_file) print("Transforming data...") transformed_data = transform_data(data) print("Loading data into the database...") load_data(transformed_data, db_file, table_name) print("ETL process completed successfully!") Orchestrates the ETL process by calling the extract_data, transform_data, and load_data functions. Prints progress messages to the console. Run the Script if __name__ == "__main__": main() Ensures the main() function runs only when the script is executed directly (not when imported as a module). Sample Input (data.csv) name,age,email Alice,25,alice@example.com Bob,17,bob@example.com Charlie,30,charlie@example.com Diana,,diana@example.com Output A SQLite database (example.db) is created with a table named users. The table contains the following data: NAME AGE EMAIL ALICE 25 alice@example.com CHARLIE 30 charlie@

Feb 5, 2025 - 03:10
 0
Simplest ETL: Data Engineering

Task Overview

  1. Extract: Read data from a CSV file.
  2. Transform: Clean and process the data (e.g., filter rows, modify columns).
  3. Load: Insert the transformed data into a SQLite database.

Code Example

# Step 1: Import necessary libraries
import pandas as pd  # For data manipulation
import sqlite3      # For interacting with SQLite databases

# Step 2: Extract - Load data from a CSV file
def extract_data(file_path):
    """
    Reads data from a CSV file into a Pandas DataFrame.
    """
    df = pd.read_csv(file_path)  # Read the CSV file
    return df

# Step 3: Transform - Clean and process the data
def transform_data(df):
    """
    Cleans and transforms the data.
    """
    # Drop rows with missing values
    df = df.dropna()

    # Convert a column to uppercase (example transformation)
    df['name'] = df['name'].str.upper()

    # Filter rows where 'age' is greater than 18
    df = df[df['age'] > 18]

    return df

# Step 4: Load - Insert data into a SQLite database
def load_data(df, db_path, table_name):
    """
    Loads the transformed data into a SQLite database.
    """
    # Connect to the SQLite database (creates it if it doesn't exist)
    conn = sqlite3.connect(db_path)

    # Insert the DataFrame into the database as a table
    df.to_sql(table_name, conn, if_exists='replace', index=False)

    # Close the database connection
    conn.close()

# Step 5: Main function to orchestrate the ETL process
def main():
    # Define file paths
    input_file = 'data.csv'       # Path to the input CSV file
    db_file = 'example.db'        # Path to the SQLite database
    table_name = 'users'          # Name of the table to create

    # Step 1: Extract
    print("Extracting data...")
    data = extract_data(input_file)

    # Step 2: Transform
    print("Transforming data...")
    transformed_data = transform_data(data)

    # Step 3: Load
    print("Loading data into the database...")
    load_data(transformed_data, db_file, table_name)

    print("ETL process completed successfully!")

# Run the main function
if __name__ == "__main__":
    main()

Explanation of Each Line

Step 1: Import Libraries

import pandas as pd
import sqlite3
  • pandas: A powerful library for data manipulation and analysis. It provides the DataFrame object, which is ideal for handling tabular data.
  • sqlite3: A built-in Python library for interacting with SQLite databases.

Step 2: Extract Data

def extract_data(file_path):
    df = pd.read_csv(file_path)
    return df
  • pd.read_csv(file_path): Reads the CSV file into a Pandas DataFrame. The file_path is the location of the CSV file.
  • The function returns the DataFrame containing the raw data.

Step 3: Transform Data

def transform_data(df):
    df = df.dropna()
    df['name'] = df['name'].str.upper()
    df = df[df['age'] > 18]
    return df
  • df.dropna(): Removes rows with missing values (NaN).
  • df['name'].str.upper(): Converts the name column to uppercase.
  • df[df['age'] > 18]: Filters rows where the age column is greater than 18.
  • The function returns the cleaned and transformed DataFrame.

Step 4: Load Data

def load_data(df, db_path, table_name):
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists='replace', index=False)
    conn.close()
  • sqlite3.connect(db_path): Connects to the SQLite database. If the database doesn't exist, it creates one.
  • df.to_sql(table_name, conn, if_exists='replace', index=False): Inserts the DataFrame into the database as a table.
    • table_name: Name of the table to create.
    • if_exists='replace': Replaces the table if it already exists.
    • index=False: Prevents Pandas from writing row indices to the database.
  • conn.close(): Closes the database connection.

Step 5: Main Function

def main():
    input_file = 'data.csv'
    db_file = 'example.db'
    table_name = 'users'

    print("Extracting data...")
    data = extract_data(input_file)

    print("Transforming data...")
    transformed_data = transform_data(data)

    print("Loading data into the database...")
    load_data(transformed_data, db_file, table_name)

    print("ETL process completed successfully!")
  • Orchestrates the ETL process by calling the extract_data, transform_data, and load_data functions.
  • Prints progress messages to the console.

Run the Script

if __name__ == "__main__":
    main()
  • Ensures the main() function runs only when the script is executed directly (not when imported as a module).

Sample Input (data.csv)

name,age,email
Alice,25,alice@example.com
Bob,17,bob@example.com
Charlie,30,charlie@example.com
Diana,,diana@example.com

Output

  1. A SQLite database (example.db) is created with a table named users.
  2. The table contains the following data:
   NAME     AGE  EMAIL
   ALICE    25   alice@example.com
   CHARLIE  30   charlie@example.com

Key Concepts Learned

  1. ETL (Extract, Transform, Load): A fundamental process in data engineering.
  2. Pandas: A library for data manipulation.
  3. SQLite: A lightweight database for storing data.
  4. Data Cleaning: Handling missing values and filtering rows.
  5. Data Transformation: Modifying columns (e.g., converting to uppercase).