ETL Overview

Data flow diagram

The overall ETL process is illustrated below. Broadly, a spreadsheet or CSV file is read from Google Drive (or local drive) as a pandas.DataFrame and pushed through a series of pipelines to produce a shipments.csv file which:

  • is transformed and loaded into the Analytics DB;

  • is written to local cache or Analytics (DEV) S3.

digraph G {

 graph [fontsize=10 fontname="Verdana" compound=true];
 node [shape=record fontsize=10 fontname="Verdana" margin=0.3];

 data[label="External Shipper Data\n(spreadsheet(s) on Google Drive \n OR local CSV/Excel file)" shape=box, color=black];
 db[label="Analytics DB\n(via Analytics Service API)" shape=box, color=black];
 s3[label="Analytics S3 / Local cache\n(via Prefect)" shape=box, color=black];

 data -> read;
 load -> db;
 output -> s3;

 subgraph cluster_A {
     subgraph cluster_B {
         read -> map;
         graph[style=filled bgcolor=lightgray label="for each\nexternal dataset" fontsize="10pt" margin=20];
     }

     map -> concat;
     concat -> filter;
     filter -> validate;
     validate -> geocode;
     geocode -> pcmiler;
     pcmiler -> fuel;
     fuel -> flag;
     flag -> cluster;
     cluster -> uuid;
     uuid -> output;
     output -> load;
     graph[style=dotted];
     label="leaf_engine.etl";
     }
 }

Pipeline functions

Shipper data is pushed through the functions below, in sequence. Most functions take as input a pandas.DataFrame (each row representing a shipment record) and return the same pandas.DataFrame with additional columns.

  1. leaf_engine.read_dataset()

    Reads shipper data as specified in run parameters with one the following functions (depending on file type):

  2. leaf_engine.map_pipeline()

    Maps shipper data to Leaf internal schema using the mapping file specified in run parameters. Mapping files include custom code for (and push data through) the following functions:

    • _map_shipment_id

    • _map_shipment_id

    • _map_period

    • _map_origin_details

    • _map_destination_details

    • _map_equipment_type

    • _map_equipment_class

    • _map_mode

    • _map_hazardous

    • _map_protection

    • _map_weight

    • _map_stops

    • _map_shipments

    • _map_scac

    • _map_move_type

    • _map_load_type

    • _map_exclude_shipment

    • _map_company_miles

    • _map_company_transit_time

    • _map_carrier_name

    • _map_carrier_type

    • _map_delayed

    • _map_spends

    • _drop_duplicates

    • leaf_engine.etl.map._normalize_address_details() - Address normalization is applied as a last step in the map pipeline and is not custom to shipper.

  3. leaf_engine.concat_pipeline()

    Concatenates mapped datasets (if multiple datasets are used) and applies post-concatenation deduplication logic.

  4. leaf_engine.filter_pipeline()

  5. leaf_engine.etl.validate.validate_pipeline()

    Applies pandera validation. See leaf_engine.domain.columns for column-specific checks.

  6. leaf_engine.geocode_pipeline()

    Geocodes origin and destination details (city, country, state, zip). See leaf_engine.etl.geocode.geocode._geocode_location() for details.

  7. leaf_engine.pcmiler_pipeline()

    Makes PCMiler calls to determine distance (in miles and hours) and path geometry between origin and destination.

  8. leaf_engine.fuel_pipeline()

    Calculates fuel-specific columns: leaf_engine.domain.columns.LeafSpendColumn.

  9. leaf_engine.flag_pipeline()

    Flags anomalous weight and spend rows.

  10. leaf_engine.cluster_pipeline()

    Applies clustering logic. See leaf_engine.etl.cluster.dbscan_kmeans.DBSCAN_KMeans for algorithm, leaf_engine.etl.cluster.cluster_schema for cluster schema.

  11. leaf_engine.uuid_pipeline()

    Sets UUIDs for point locations, point-to-point (PTP) lanes, power lanes, and shipments.

  12. leaf_engine.etl.output.output_pipeline()

    Reorders columns and logs summary stats about data. The output of this pipeline is written to shipments.csv either locally or on S3 (via Prefect).

  13. leaf_engine.etl.load.load_pipeline()

    Applies transformations from leaf_engine.etl.transform on shipment data and makes Analytics Service API calls through leaf_engine.io.api.caller.LeafAPICaller to load shipment data into the Analytics DB.