ETL Overview ------------- Data flow diagram ================= The overall ETL process is illustrated below. Broadly, a spreadsheet or CSV file is read from Google Drive (or local drive) as a :class:`pandas.DataFrame` and pushed through a series of pipelines to produce a `shipments.csv` file which: * is transformed and loaded into the Analytics DB; * is written to local cache or Analytics (DEV) S3. .. graphviz:: digraph G { graph [fontsize=10 fontname="Verdana" compound=true]; node [shape=record fontsize=10 fontname="Verdana" margin=0.3]; data[label="External Shipper Data\n(spreadsheet(s) on Google Drive \n OR local CSV/Excel file)" shape=box, color=black]; db[label="Analytics DB\n(via Analytics Service API)" shape=box, color=black]; s3[label="Analytics S3 / Local cache\n(via Prefect)" shape=box, color=black]; data -> read; load -> db; output -> s3; subgraph cluster_A { subgraph cluster_B { read -> map; graph[style=filled bgcolor=lightgray label="for each\nexternal dataset" fontsize="10pt" margin=20]; } map -> concat; concat -> filter; filter -> validate; validate -> geocode; geocode -> pcmiler; pcmiler -> fuel; fuel -> flag; flag -> cluster; cluster -> uuid; uuid -> output; output -> load; graph[style=dotted]; label="leaf_engine.etl"; } } | Pipeline functions ================== Shipper data is pushed through the functions below, in sequence. Most functions take as input a :class:`pandas.DataFrame` (each row representing a shipment record) and return the same :class:`pandas.DataFrame` with additional columns. #. :func:`leaf_engine.read_dataset` Reads shipper data as specified in run parameters with one the following functions (depending on file type): * :func:`leaf_engine.io.read_drive` * :func:`leaf_engine.io.read_csv` * :func:`leaf_engine.io.read_excel` #. :func:`leaf_engine.map_pipeline` Maps shipper data to Leaf internal schema using the mapping file specified in run parameters. Mapping files include custom code for (and push data through) the following functions: * :code:`_map_shipment_id` * :code:`_map_shipment_id` * :code:`_map_period` * :code:`_map_origin_details` * :code:`_map_destination_details` * :code:`_map_equipment_type` * :code:`_map_equipment_class` * :code:`_map_mode` * :code:`_map_hazardous` * :code:`_map_protection` * :code:`_map_weight` * :code:`_map_stops` * :code:`_map_shipments` * :code:`_map_scac` * :code:`_map_move_type` * :code:`_map_load_type` * :code:`_map_exclude_shipment` * :code:`_map_company_miles` * :code:`_map_company_transit_time` * :code:`_map_carrier_name` * :code:`_map_carrier_type` * :code:`_map_delayed` * :code:`_map_spends` * :code:`_drop_duplicates` * :func:`leaf_engine.etl.map._normalize_address_details` - Address normalization is applied as a last step in the map pipeline and is not custom to shipper. #. :func:`leaf_engine.concat_pipeline` Concatenates mapped datasets (if multiple datasets are used) and applies post-concatenation deduplication logic. #. :func:`leaf_engine.filter_pipeline` Filters data using run parameters: * :func:`leaf_engine.etl.filter.filter_date` * :func:`leaf_engine.etl.filter.filter_multi_stop` * :func:`leaf_engine.etl.filter.filter_mode` * :func:`leaf_engine.etl.filter.filter_states` * :func:`leaf_engine.etl.filter.filter_countries` #. :func:`leaf_engine.etl.validate.validate_pipeline` Applies :code:`pandera` validation. See :mod:`leaf_engine.domain.columns` for column-specific checks. #. :func:`leaf_engine.geocode_pipeline` Geocodes origin and destination details (city, country, state, zip). See :func:`leaf_engine.etl.geocode.geocode._geocode_location` for details. #. :func:`leaf_engine.pcmiler_pipeline` Makes PCMiler calls to determine distance (in miles and hours) and path geometry between origin and destination. #. :func:`leaf_engine.fuel_pipeline` Calculates fuel-specific columns: :class:`leaf_engine.domain.columns.LeafSpendColumn`. #. :func:`leaf_engine.flag_pipeline` Flags anomalous weight and spend rows. #. :func:`leaf_engine.cluster_pipeline` Applies clustering logic. See :class:`leaf_engine.etl.cluster.dbscan_kmeans.DBSCAN_KMeans` for algorithm, :mod:`leaf_engine.etl.cluster.cluster_schema` for cluster schema. #. :func:`leaf_engine.uuid_pipeline` Sets UUIDs for point locations, point-to-point (PTP) lanes, power lanes, and shipments. #. :func:`leaf_engine.etl.output.output_pipeline` Reorders columns and logs summary stats about data. The output of this pipeline is written to :code:`shipments.csv` either locally or on S3 (via Prefect). #. :func:`leaf_engine.etl.load.load_pipeline` Applies transformations from :mod:`leaf_engine.etl.transform` on shipment data and makes Analytics Service API calls through :class:`leaf_engine.io.api.caller.LeafAPICaller` to load shipment data into the Analytics DB.