ETL Overview
Data flow diagram
The overall ETL process is illustrated below. Broadly, a spreadsheet or CSV file is read from Google
Drive (or local drive) as a pandas.DataFrame and pushed through a series of pipelines to produce
a shipments.csv file which:
is transformed and loaded into the Analytics DB;
is written to local cache or Analytics (DEV) S3.
![digraph G {
graph [fontsize=10 fontname="Verdana" compound=true];
node [shape=record fontsize=10 fontname="Verdana" margin=0.3];
data[label="External Shipper Data\n(spreadsheet(s) on Google Drive \n OR local CSV/Excel file)" shape=box, color=black];
db[label="Analytics DB\n(via Analytics Service API)" shape=box, color=black];
s3[label="Analytics S3 / Local cache\n(via Prefect)" shape=box, color=black];
data -> read;
load -> db;
output -> s3;
subgraph cluster_A {
subgraph cluster_B {
read -> map;
graph[style=filled bgcolor=lightgray label="for each\nexternal dataset" fontsize="10pt" margin=20];
}
map -> concat;
concat -> filter;
filter -> validate;
validate -> geocode;
geocode -> pcmiler;
pcmiler -> fuel;
fuel -> flag;
flag -> cluster;
cluster -> uuid;
uuid -> output;
output -> load;
graph[style=dotted];
label="leaf_engine.etl";
}
}](_images/graphviz-05a414621090b8d1dd35c004bee18d65a5911788.png)
Pipeline functions
Shipper data is pushed through the functions below, in sequence. Most functions take as input a
pandas.DataFrame (each row representing a shipment record) and return the same
pandas.DataFrame with additional columns.
-
Reads shipper data as specified in run parameters with one the following functions (depending on file type):
-
Maps shipper data to Leaf internal schema using the mapping file specified in run parameters. Mapping files include custom code for (and push data through) the following functions:
_map_shipment_id_map_shipment_id_map_period_map_origin_details_map_destination_details_map_equipment_type_map_equipment_class_map_mode_map_hazardous_map_protection_map_weight_map_stops_map_shipments_map_scac_map_move_type_map_load_type_map_exclude_shipment_map_company_miles_map_company_transit_time_map_carrier_name_map_carrier_type_map_delayed_map_spends_drop_duplicatesleaf_engine.etl.map._normalize_address_details()- Address normalization is applied as a last step in the map pipeline and is not custom to shipper.
-
Concatenates mapped datasets (if multiple datasets are used) and applies post-concatenation deduplication logic.
-
leaf_engine.etl.validate.validate_pipeline()Applies
panderavalidation. Seeleaf_engine.domain.columnsfor column-specific checks.leaf_engine.geocode_pipeline()Geocodes origin and destination details (city, country, state, zip). See
leaf_engine.etl.geocode.geocode._geocode_location()for details.leaf_engine.pcmiler_pipeline()Makes PCMiler calls to determine distance (in miles and hours) and path geometry between origin and destination.
-
Calculates fuel-specific columns:
leaf_engine.domain.columns.LeafSpendColumn. -
Flags anomalous weight and spend rows.
leaf_engine.cluster_pipeline()Applies clustering logic. See
leaf_engine.etl.cluster.dbscan_kmeans.DBSCAN_KMeansfor algorithm,leaf_engine.etl.cluster.cluster_schemafor cluster schema.-
Sets UUIDs for point locations, point-to-point (PTP) lanes, power lanes, and shipments.
leaf_engine.etl.output.output_pipeline()Reorders columns and logs summary stats about data. The output of this pipeline is written to
shipments.csveither locally or on S3 (via Prefect).leaf_engine.etl.load.load_pipeline()Applies transformations from
leaf_engine.etl.transformon shipment data and makes Analytics Service API calls throughleaf_engine.io.api.caller.LeafAPICallerto load shipment data into the Analytics DB.