ETL pipeline using Mage to process and load Green Taxi trip data into a PostgreSQL database and Google Cloud Storage, with daily scheduled updates.
This project involves constructing an ETL (Extract, Transform, Load) pipeline to process Green Taxi trip data for the final quarter of 2020. The pipeline reads the data from an API, performs necessary transformations, and writes the data to a PostgreSQL database and Google Cloud Storage (GCS).
Task: Load data for October, November, and December 2020.
Method: Utilized Pandas to read the CSV files, with an
Task: Load data for October, November, and December 2020.
Method: Utilized Pandas to read the CSV files, with an added bonus of using a loop and "pd.concat" for loading multiple files.
Task: Load data for October, November, and December 2020.
Task: Clean and transform the data.
Transformations:
Removed rows where 'passenger_count' and 'trip_distance' are zero.
Created a new column 'lpep_pickup_date' by converting 'lpep_pickup_datetime' to a date.
Renamed columns from Camel Case to Snake Case.
Added assertions to validate data integrity.
p
Task: Write the transformed data to PostgreSQL and Google Cloud Storage.
PostgreSQL:
Wrote to a table named 'green_taxi' in the 'mage' schema.
Replaced the table if it existed.
Google Cloud Storage:
Wrote data as Parquet files, partitioned by 'lpep_pickup_date' - using the 'pyarrow' library.
Task: Schedule the pipeline to run daily at 5 AM UTC.
Method: Used Mage AI to schedule the pipeline
Task: Schedule the pipeline to run daily at 5 AM UTC.
Method: Used Mage AI to schedule the pipeline
The completed ETL pipeline efficiently processed and transformed Green Taxi trip data, ensuring data integrity and consistency. The data was loaded into a PostgreSQL database for easy querying and analysis and stored in Google Cloud Storage in a space-efficient and query-friendly Parquet format. The pipeline was orchestrated and scheduled using Mage AI, ensuring regular and automated data processing.
Docker, Mage AI, Python
Google Cloud Storage, PostgreSQL, Github