leaf_engine.io.api.crud_caller

Classes

LeafCRUDCaller

Makes calls to the analytics CRUD API.

Module Contents

class leaf_engine.io.api.crud_caller.LeafCRUDCaller(endpoint: str | None = None, key: str | None = None, **kwargs)

Bases: leaf_engine.io.api.base_caller.LeafBaseAPICaller

Makes calls to the analytics CRUD API.

Parameters:
  • endpoint (Optional[str]) –

  • key (Optional[str]) –

_check_responses(responses: List[requests.Response]) None

Checks responses for errors (response.status_code >= 400).

Parameters:

responses (List[requests.Response]) – List of responses to check.

Raises:

LeafAPIException – Raises if any response is not ok and prints the status code and response text.

Return type:

None

call_rpc(function_name: str, **kwargs) requests.Response

Calls an RPC function.

Parameters:
  • function_name (str) – Name of the function in the database to call as an RPC.

  • kwargs (Mapping[str, Any], optional) – A dictionary of arguments to pass to the RPC function.

Raises:

LeafAPIException – _description_

Return type:

requests.Response

check_existing_ids(uuids: List[str], table_name: str, company_id: int, batch_date: str, batch_size: int = 1000, verbose: int = 0) List[bool]

Receives a list of UUIDs and returns a list of uuid’s that already exist in the database.

Parameters:
  • uuids (List[str]) –

  • table_name (str) –

  • company_id (int) –

  • batch_date (str) –

  • batch_size (int) –

  • verbose (int) –

Return type:

List[bool]

count(path: str, **kwargs) int

Counts the number of records in a table.

Parameters:
  • path (str) – Analytics table to count.

  • kwargs – Can be used to pass additional query parameters, for example company_id=”eq.17852”. See PostgREST documentation for details.

Raises:

LeafAPIException – Raises if count value is not returned.

Returns:

Number of records

Return type:

Optional[int]

delete(path: str, **kwargs) requests.Response
Parameters:

path (str) –

Return type:

requests.Response

delete_batch(company_id: int, batch_date: str) None

Deletes all records associated with a batch for a given company.

This includes:
  • raw_shipment

  • normalized ETL tables (location, lane, shipment)

  • explorer outputs

  • network moves

  • batch and datasets

Parameters:
  • company_id (int) – Company ID to match batch.

  • batch_date (str) – Batch date to match batch.

Return type:

None

delete_network_moves(company_id: int, **kwargs) requests.Response

Deletes network moves for company_id.

Parameters:
  • company_id (int) – Delete moves for company.

  • kwargs – Can be used to pass additional filters on delete.

Return type:

requests.Response

get(path: str, headers: Mapping[str, str] | None = None, **kwargs) requests.Response

Makes a GET request to CRUD API.

Parameters:
  • path (str) – Analytics table to GET.

  • headers (Optional[Mapping[str, str]], optional) – Additional headers to pass to API. See PostgREST documentation for details.

  • kwargs – Can be used to pass additional query parameters. See PostgREST documentation for details.

Returns:

Response object.

Return type:

requests.Response

get_batch(company_id: int, batch_date: str, **kwargs) dict | None

Gets batch parameters for a given company and batch date.

Parameters:
  • company_id (int) – Company ID to get batch for.

  • batch_date (str) – Batch date to get batch for.

Returns:

Batch parameters if batch exists, else None.

Return type:

Optional[dict]

Raises:

LeafAPIException – Raises if multiple batches are found.

get_cluster_schema(company_id: int, name: str, record_types: list[str], **kwargs) dict

Gets cluster schema for a given company and name.

Parameters:
  • company_id (int) – Company ID to get cluster schema for.

  • name (str) – Name to get cluster schema for.

  • record_types (list[str]) –

Returns:

Cluster schema.

Return type:

dict

get_company_fuel_surcharge_schedules(company_id: int) pandas.DataFrame

Queries DB for all company fuel surcharge schedules.

Parameters:

company_id (int) – Company ID to get applicable FSCs for.

Return type:

pandas.DataFrame

get_company_network_lanes(company_id: int) pandas.DataFrame

Gets network lanes for a given company. Network lanes are used to populate the lane graph. Check analogous materialized view in the analytics schema analytics.mv_company_network_lanes for more details.

Note: the latest available batch date is used to get company network lanes.

Parameters:

company_id (int) – ID of the company to get network lanes for. Use -1 to get network lanes for all companies.

Raises:

LeafAPIException – If no network lanes are found for the given company.

Returns:

DataFrame of network lanes. Follows the same schema as in notebooks/08 - network_lanes.ipynb.

Return type:

pd.DataFrame

get_corridors(**kwargs)

Gets all corridors.

get_execution_shipments(**kwargs)

Gets all execution shipments for building the execution graph.

get_lanes(company_id: int) pandas.DataFrame

Gets all lanes.

Returns:

DataFrame of lanes

Return type:

pd.DataFrame

Parameters:

company_id (int) –

get_lighthouse_lanes(company_ids: List[int]) pandas.DataFrame

Gets lanes for lighthouse shippers. Used for Internal Type 2 Runs. See the materialized view in the migrations repository analytics.mv_lighthouse_lanes for more details.

Note: the latest available batch date is used to get lanes.

Parameters:

company_ids (List[int]) – List of company IDs to get lanes for.

Returns:

DataFrame of lanes.

Return type:

pd.DataFrame

get_locations(location_ids: Iterable[str], **kwargs) pandas.DataFrame

Gets locations given a list of location ids.

Parameters:
  • location_ids (Iterable[str]) – A list of location ids to retrieve data for.

  • kwargs – Can be used to pass additional parameters to leaf_engine.io.api.crud_caller.get().

Returns:

DataFrame of locations

Return type:

pd.DataFrame

get_many(path: str, headers: Mapping[str, str] | None = None, batch_size: int = -1, n_jobs: int = -1, verbose: int = 0, **kwargs) List[requests.Response]

Makes parallel GET requests to CRUD API.

See :method:`leaf_engine.io.api.crud_caller.get` for details.

Parameters:
  • path (str) –

  • headers (Optional[Mapping[str, str]]) –

  • batch_size (int) –

  • n_jobs (int) –

  • verbose (int) –

Return type:

List[requests.Response]

get_network_moves(company_id: int, record_types: list[str], **kwargs) pandas.DataFrame

Gets network moves for a given company ID.

Parameters:
  • company_id (int) – Company ID to filter on.

  • record_types (list[str]) – List of record types to filter on.

Returns:

DataFrame of network moves.

Return type:

pd.DataFrame

get_point_locations(company_id: int, record_types: list[str], **kwargs) pandas.DataFrame

Gets all point locations.

Parameters:
  • company_id (int) – Company ID to filter on.

  • record_types (list[str]) – List of record types to filter on.

Returns:

DataFrame of point locations

Return type:

pd.DataFrame

get_raw_shipments(company_id: int, batch_date: str, **kwargs) pandas.DataFrame

Gets raw shipment records from the analytics.raw_shipments table.

Parameters:
  • company_id (int) – Company ID to get raw shipments for.

  • batch_date (str) – Batch date to get raw shipments for. Must be in YYYY-MM-DD format.

Returns:

DataFrame of raw shipments

Return type:

pd.DataFrame

normalize_raw_lanes(company_id: int, batch_date: str) None

Updates the location, lane, and powerlane tables.

Normalizes raw_lane table rows for a given company and batch date. The location table is updated with point and cluster locations. The lane table is updated with point-to-point and power lanes.

Parameters:
  • company_id (int) – Company ID to update lanes for.

  • batch_date (str) – Batch date to update lanes for.

Return type:

None

normalize_raw_shipments(company_id: int, batch_date: str) None

Updates the location, lane, and shipment tables.

Normalizes raw_shipment table rows for a given company and batch date. The location table is updated with point and cluster locations. The lane table is updated with point-to-point and power lanes. The shipment table is updated with shipments.

Parameters:
  • company_id (int) – Company ID to update shipments for.

  • batch_date (str) – Batch date to update shipments for.

Return type:

None

post(path: str, data: Mapping[str, Any] | List[Mapping[str, Any]] | List[Dict[str, Any]], headers: Mapping[str, str] | None = None, return_columns: List[str] | None = None, **kwargs) requests.Response

Makes a POST request to CRUD API.

Parameters:
  • path (str) – Analytics table to write to.

  • data (Union[Mapping[str, Any], List[Mapping[str, Any]]]) – Record or records to write.

  • headers (Optional[Mapping[str, str]], optional) – Additional headers to pass to API. See PostgREST documentation for details.

  • return_columns (Optional[List[str]], optional) – List of columns to return in response, defaults to [“id”]. See PostgREST documentation for details.

Returns:

Response object.

Return type:

requests.Response

post_batch(batch_params: dict[str, Any]) requests.Response

Creates a batch record in the database from a dictionary.

Also creates associated records in the dataset, batch_dataset, and batch_equipment_class tables.

Parameters:

batch_params (Mapping[str, Any]) – Batch parameters to insert.

Return type:

requests.Response

post_carrier_churn(df: pandas.DataFrame, batch_size: int = 100, **kwargs) List[requests.Response]

Inserts carrier churn DataFrame into carrier_churn table.

Parameters:
Return type:

List[requests.Response]

post_cluster_schema(schema: dict, batch_date: str) dict

Creates a cluster schema record in the database from a dictionary.

Parameters:
  • schema (Mapping[str, Any]) – Cluster schema parameters to insert.

  • batch_date (str) –

Returns:

Cluster schema.

Return type:

dict

post_consolidated_flex(df: pandas.DataFrame, batch_size: int = 100, **kwargs) List[requests.Response]

Inserts consolidated flex DataFrame into consolidated_flex table.

Parameters:
Return type:

List[requests.Response]

post_lane_adapt_detail(records: List[Mapping[str, Any]], batch_size: int = 100, **kwargs)

Adds benefit, actions, observations to analytics.lane_adapt.

The analytics.lane_adapt table is used to populate the lane explorers. Updates existing records in place.

Parameters:
  • records (List[Mapping[str, Any]]) –

  • batch_size (int) –

post_lane_quick_ref(df: pandas.DataFrame, batch_size: int = 100, **kwargs) List[requests.Response]

Inserts lane quick reference DataFrame into lane_quick_ref table.

Parameters:
Return type:

List[requests.Response]

post_many(path: str, records: List[Mapping[str, Any]], headers: Mapping[str, str] | None = None, return_columns: List[str] | None = None, batch_size: int = -1, n_jobs: int = -1, verbose: int = 0, **kwargs) List[requests.Response]

Makes a parallel POST requests to CRUD API.

See :method:`leaf_engine.io.api.crud_caller.post` for details.

Parameters:
  • path (str) –

  • records (List[Mapping[str, Any]]) –

  • headers (Optional[Mapping[str, str]]) –

  • return_columns (Optional[List[str]]) –

  • batch_size (int) –

  • n_jobs (int) –

  • verbose (int) –

Return type:

List[requests.Response]

post_network_moves(df: pandas.DataFrame, batch_size: int = 100, **kwargs) List[requests.Response]

Inserts network moves DataFrame into network_move table.

Parameters:
  • df (pd.DataFrame) – Network moves pd.DataFrame with company_id and batch_date columns.

  • batch_size (int, optional) – Inserts per call. Defaults to 1000.

  • kwargs – Can be used to pass additional parameters to leaf_engine.io.api.crud_caller.post_many().

Returns:

List of responses.

Return type:

List[requests.Response]

post_observations_patterns(df: pandas.DataFrame, batch_size: int = 100, **kwargs) List[requests.Response]

Inserts observation patterns DataFrame into observation_pattern table.

Parameters:
Return type:

List[requests.Response]

post_raw_lanes(records: List[Mapping[str, Any]], batch_size: int = 100, **kwargs) List[requests.Response]

Inserts raw lanes into the raw_lane table.

Parameters:
  • records (List[Mapping[str, Any]]) – List of records (dicts) of raw lanes to insert.

  • batch_size (int) – Number of records to insert in each call.

Return type:

List[requests.Response]

post_raw_shipments(records: List[Mapping[str, Any]], batch_size: int = 100, **kwargs) List[requests.Response]

Inserts raw shipments into the raw_shipment table.

Parameters:
  • records (List[Mapping[str, Any]]) – List of records (dicts) of raw shipments to insert.

  • batch_size (int) – Number of records to insert in each call.

Return type:

List[requests.Response]

post_should_be_flex(df: pandas.DataFrame, batch_size: int = 100, **kwargs) List[requests.Response]

Inserts should be flex DataFrame into should_be_flex table.

Parameters:
Return type:

List[requests.Response]

refresh_mvs()

Calls the refresh_company_network_lanes and refresh_lighthouse_lanes functions to refresh the materialized views.

update_routing_tables(company_id: int, batch_date: str) None

Updates routing tables for a given company and batch date.

Parameters:
  • company_id (int) – Company ID to update routing for.

  • batch_date (str) – Batch date to update routing for.

Return type:

None

endpoint
key