# Cleaning the raw Data

The purpose of this notebook is to take the original PostgreSQL database and clean the data in it. The output is a set of tables in a new PostgreSQL schema that hold the cleaned data.

Cleaning occurs at several levels:
- textual data is unified (e.g., spelling, accents, ...)
- duplicate rows/records are merged together
- numeric columns are checked for plausibility
- foreign key relationships are strictly enforced

The structure of the data can be viewed at the [ORM layer](https://github.com/webartifex/urban-meal-delivery/tree/develop/src/urban_meal_delivery/db) in the package.

In [1]:
!umd --version

[32murban-meal-delivery[0m, version [34m0.2.0[0m


### Imports

In [2]:
from urban_meal_delivery import config, db

In [3]:
import collections
import datetime
import hashlib

import pandas as pd
import pytz as tz
import numpy as np
import sqlalchemy as sa

### Settings & Globals

In [4]:
%load_ext lab_black

In [5]:
pd.set_option("display.max_columns", 999)
pd.set_option("display.max_rows", 999)

In [6]:
_engine = db.make_engine()
connection = _engine.connect()

### New Database Schema

As a result of this notebook, a new PostgreSQL schema called `"clean"` is created holding the tables with the cleaned data.

In [7]:
config.CLEAN_SCHEMA

'clean'

All tables with the original data are stored in the default PostgreSQL schema called `"public"`.

In [8]:
config.ORIGINAL_SCHEMA

'public'

Use `alembic` to run the very first database migration script that creates the new tables.

In [9]:
%cd -q ..
!alembic upgrade f11cd76d2f45
%cd -q notebooks

INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> f11cd76d2f45, Create the database from scratch.


## Cities

While the original database consists of data concerning the UDP's operations in five cities in France, we only look at `"Bordeaux"`, `"Lyon"`, and `"Paris"` in this research project, as the amount of data for `"Lille"` and `"Nantes"` is simply not a lot due to the very short time horizons the UDP had been operating there.

### Raw Data

The following `target_cities` data were manually obtained from Google Maps and mapped to the `"database_id"`s of the cities in the original database where the UDP was operating in.

In [10]:
target_cities = {
    "Bordeaux": {
        "database_id": 4,
        "google_maps_data": {
            "center_latitude": 44.837789,
            "center_longitude": -0.57918,
            "northeast_latitude": 44.91670389999999,
            "northeast_longitude": -0.5333089999999999,
            "southwest_latitude": 44.810752,
            "southwest_longitude": -0.638973,
            "initial_zoom": 13,
        },
    },
    "Lyon": {
        "database_id": 1,
        "google_maps_data": {
            "center_latitude": 45.764043,
            "center_longitude": 4.835659,
            "northeast_latitude": 45.808425,
            "northeast_longitude": 4.898393,
            "southwest_latitude": 45.707486,
            "southwest_longitude": 4.7718489,
            "initial_zoom": 13,
        },
    },
    "Paris": {
        "database_id": 2,
        "google_maps_data": {
            "center_latitude": 48.856614,
            "center_longitude": 2.3522219,
            "northeast_latitude": 48.9021449,
            "northeast_longitude": 2.4699208,
            "southwest_latitude": 48.815573,
            "southwest_longitude": 2.225193,
            "initial_zoom": 12,
        },
    },
}

In [11]:
city_ids = tuple(city["database_id"] for city in target_cities.values())

`cities` below holds the cleaned city related data from the original database. They come with KML data (i.e., area) associated with a city, which is kept.

In [12]:
cities = pd.read_sql_query(
    f"""
    SELECT
        cities.id,
        cities.name,
        geo_areas.kml
    FROM
        {config.ORIGINAL_SCHEMA}.cities
    LEFT OUTER JOIN
        {config.ORIGINAL_SCHEMA}.geo_areas ON cities.geo_area_id = geo_areas.id
    WHERE
        cities.id IN %(city_ids)s
    ORDER BY
        cities.id
    """,
    con=connection,
    index_col="id",
    params={"city_ids": city_ids},
)

Merge in the data from Google Maps.

In [13]:
for city in target_cities.values():
    for col, val in city["google_maps_data"].items():
        cities.loc[city["database_id"], col] = val

Cast the columns' types explicitly.

In [14]:
cities = cities.astype(
    {
        "name": "string",
        "kml": "string",
        "center_latitude": float,
        "center_longitude": float,
        "northeast_latitude": float,
        "northeast_longitude": float,
        "southwest_latitude": float,
        "southwest_longitude": float,
        "initial_zoom": int,
    }
)

### Clean Data

In [15]:
cities.head()

Unnamed: 0_level_0,name,kml,center_latitude,center_longitude,northeast_latitude,northeast_longitude,southwest_latitude,southwest_longitude,initial_zoom
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1,Lyon,<?xml version='1.0' encoding='UTF-8'?><kml xml...,45.764043,4.835659,45.808425,4.898393,45.707486,4.771849,13
2,Paris,<?xml version='1.0' encoding='UTF-8'?><kml xml...,48.856614,2.352222,48.902145,2.469921,48.815573,2.225193,12
4,Bordeaux,<?xml version='1.0' encoding='UTF-8'?><kml xml...,44.837789,-0.57918,44.916704,-0.533309,44.810752,-0.638973,13


In [16]:
cities.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3 entries, 1 to 4
Data columns (total 9 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   name                 3 non-null      string 
 1   kml                  3 non-null      string 
 2   center_latitude      3 non-null      float64
 3   center_longitude     3 non-null      float64
 4   northeast_latitude   3 non-null      float64
 5   northeast_longitude  3 non-null      float64
 6   southwest_latitude   3 non-null      float64
 7   southwest_longitude  3 non-null      float64
 8   initial_zoom         3 non-null      int64  
dtypes: float64(6), int64(1), string(2)
memory usage: 320.0 bytes


As this notebook was developed iteratively, we validate that the cleaned data stays unchanged using SHA256 checksums of the cleaned DataFrames and other `assert`s.

In [17]:
assert (
    hashlib.sha256(cities.to_json().encode()).hexdigest()
    == "800689a6ba5b6d03f583f258e058eca0b12e6df8e34c98bfe7aec246ed688c92"
)

## Addresses

### Raw Data

Only load addresses with orders in the target cities, excluding the cut-off day.

In [18]:
addresses = pd.read_sql_query(
    f"""
    SELECT
        id,
        created_at,
        place_id,
        latitude,
        longitude,
        city_id,
        city_name AS city,
        zip,
        street_address AS street,
        floor,
        special_instructions
    FROM
        {config.ORIGINAL_SCHEMA}.addresses
    WHERE
        city_id IN %(city_ids)s
        AND
        id IN (
            SELECT DISTINCT address_id
            FROM (
                SELECT DISTINCT
                    pickup_address_id AS address_id
                FROM
                    {config.ORIGINAL_SCHEMA}.orders
                WHERE
                    created_at < '{config.CUTOFF_DAY}'
                UNION
                SELECT DISTINCT
                    dropoff_address_id AS address_id
                FROM
                    {config.ORIGINAL_SCHEMA}.orders
                WHERE
                    created_at < '{config.CUTOFF_DAY}'
            ) AS orders
        )
    ORDER BY
        id
    """,
    con=connection,
    index_col="id",
    params={"city_ids": city_ids},
    parse_dates=["created_at"],
)

All columns are `"strings"`, even `zip`.

In [19]:
addresses = addresses.astype(
    {
        "place_id": "string",
        "city": "string",
        "zip": "string",
        "street": "string",
        "floor": "string",
        "special_instructions": "string",
    }
)

In [20]:
addresses.head()

Unnamed: 0_level_0,created_at,place_id,latitude,longitude,city_id,city,zip,street,floor,special_instructions
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2,2016-02-22 09:42:10.204171,ChIJSfxJmlXq9EcRX2ChkiPW9J8,45.763149,4.83266,1,Lyon,69002,31 rue Mercière,,
3,2016-02-22 09:42:10.351150,ChIJwwMvNPnq9EcRY7Qu-Tw2HL8,45.767227,4.83575,1,Lyon,69001,Rue de la République 2,,
4,2016-02-22 09:43:57.077475,ChIJr1RhGN7B9EcRv6XSHmmN6a8,45.743725,4.873138,1,Lyon,69008,123 avenue des frères lumières,,
5,2016-02-22 09:43:57.220446,ChIJEwQm9H7q9EcRTymMxQ71z84,45.759369,4.864087,1,Lyon,69003,Avenue Georges Pompidou 17,,Appel au 06 46 12 20 27
6,2016-02-22 10:06:08.762590,ChIJbQz7p6vr9EcRr9L2cH5942I,45.761181,4.826371,1,Lyon,69005,8 bis Place Saint Jean,,


In [21]:
addresses.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 663082 entries, 2 to 691914
Data columns (total 10 columns):
 #   Column                Non-Null Count   Dtype         
---  ------                --------------   -----         
 0   created_at            663082 non-null  datetime64[ns]
 1   place_id              662953 non-null  string        
 2   latitude              663082 non-null  float64       
 3   longitude             663082 non-null  float64       
 4   city_id               663082 non-null  int64         
 5   city                  663082 non-null  string        
 6   zip                   663082 non-null  string        
 7   street                663082 non-null  string        
 8   floor                 337703 non-null  string        
 9   special_instructions  585310 non-null  string        
dtypes: datetime64[ns](1), float64(2), int64(1), string(6)
memory usage: 55.6 MB


In [22]:
assert len(addresses) == 663_082

### Adjust Time Zones

Create a helper function that strips out the microseconds from datetime columns and converts their time zones from UTC to Europe/Paris.

In [23]:
def clean_datetime(col):
    """Strip Microseconds and convert timezone to Europe/Paris."""
    return (
        col.dt.tz_localize(tz.utc)
        .dt.tz_convert(tz.timezone("Europe/Paris"))
        .dt.tz_localize(None)
        .map(
            lambda x: datetime.datetime(
                x.year, x.month, x.day, x.hour, x.minute, x.second
            )
            if x is not pd.NaT
            else x
        )
    )

In [24]:
addresses["created_at"] = clean_datetime(addresses["created_at"])

### Clean Place IDs

A tiny number of addresses has `latitude` / `longitude` pairs as `place_id`s.

In [25]:
addresses["place_id"] = (
    addresses["place_id"].str.replace(r"^[\d\.,-]+$", "", regex=True).str.strip()
)

Discard addresses without a `place_id` by Google Maps. If even Google does not know where these addresses are geo-located, we do not even try.

In [26]:
msk = addresses["place_id"].isnull() | (addresses["place_id"] == "")
addresses = addresses[~msk]

assert msk.sum() == 139

### Clean City Names

Some customers entered too much data into the `city` part of the address. Unify this column by only keeping the city's name.

In [27]:
addresses["city"].unique()

<StringArray>
[                     'Lyon',   'Lyon 2 E Arrondissement',
              'Villeurbanne',   'Lyon 7 E Arrondissement',
   'Lyon 6 E Arrondissement',   'Lyon 8 E Arrondissement',
   'Lyon 9 E Arrondissement',                     'Paris',
          'Levallois-Perret',                'Courbevoie',
                   'Puteaux',         'Neuilly Sur Seine',
      'Boulogne Billancourt',          'Levallois Perret',
                  'Malakoff',               'Saint-Mandé',
 'Paris 14 E Arrondissement',                 'Montrouge',
       'Issy-les-moulineaux', 'Paris 10 E Arrondissement',
       'Issy Les Moulineaux',                  'Nanterre',
 'Paris 12 E Arrondissement',       'La Garenne-colombes',
                 'Vincennes',                 'Montreuil',
        'Asnières-sur-Seine', 'Paris 15 E Arrondissement',
                'Saint-Ouen',  'Paris 9 E Arrondissement',
                    'Pantin', 'Paris 17 E Arrondissement',
 'Paris 1 Er Arrondissement',             

In [28]:
addresses["city"] = (
    addresses["city"]
    .str.replace(r"(E Arrondissement|Er Arrondissement)", "", regex=True)
    .str.replace(r"(\d)", "", regex=True)
    # Get rid off accents.
    .str.normalize("NFKD")
    .str.encode("ascii", errors="ignore")
    .str.decode("utf8")
    .astype("string")
    # Unify hyphens.
    .str.strip()
    .str.replace("-", " ")
    .str.title()
    .str.replace(" ", "-")
)

Sub-urban city names surrounding the three big cities in this research project are kept.

In [29]:
addresses["city"].value_counts()

Paris                   362252
Lyon                    199872
Bordeaux                 64379
Villeurbanne             15555
Levallois-Perret          4620
Courbevoie                3510
Puteaux                   2323
Issy-Les-Moulineaux       1219
Malakoff                  1214
Montrouge                 1164
Pantin                    1119
Saint-Mande               1043
Boulogne-Billancourt       986
Montreuil                  909
Neuilly-Sur-Seine          835
Vincennes                  826
Ivry-Sur-Seine             438
Nanterre                   361
La-Garenne-Colombes        213
Asnieres-Sur-Seine          62
Saint-Ouen                  35
Saint-Denis                  4
Testas                       3
Talence                      1
Name: city, dtype: Int64

### Clean Zip Codes

In [30]:
addresses["zip"].unique()

<StringArray>
[      '69002',       '69001',       '69008',       '69003',       '69005',
       '69007',       '69004',       '69009',       '69006',       '69100',
 ...
       '92053',       '92806',      '69003 ', '75015 PARIS',          '69',
       '33077',       '33092',       '33139',      '75010z',       '92040']
Length: 167, dtype: string

In [31]:
addresses["zip"] = (
    addresses["zip"]
    .str.replace(r".*(\d{5}).*", r"\1", regex=True)
    .str.replace(r"\D+", "", regex=True)
    .replace("", "NaN")
    .astype(float)
)

Zip codes with less than 5 digits are invalid. Paris has zip codes with 75xxx (with 92xxx, 93xxx, and 94xxx being suburbs), Lyon 69xxx, and Bordeaux 33xxx (cf., [source](https://en.wikipedia.org/wiki/Postal_codes_in_France)). Keep only valid zip codes in target cities.

In [32]:
invalid = addresses["zip"].notnull() & (addresses["zip"] < 10000)

assert invalid.sum() == 9

not_in_target_cities = (
    addresses["zip"].notnull()
    & ~invalid
    & ~(
        (33000 <= addresses["zip"]) & (addresses["zip"] < 34000)
        | (69000 <= addresses["zip"]) & (addresses["zip"] < 70000)
        | (75000 <= addresses["zip"]) & (addresses["zip"] < 76000)
        | (92000 <= addresses["zip"]) & (addresses["zip"] < 95000)
    )
)

assert not_in_target_cities.sum() == 10

addresses.loc[invalid | not_in_target_cities, "zip"] = np.NaN

In [33]:
addresses["zip"].unique()

array([69002., 69001., 69008., 69003., 69005., 69007., 69004., 69009.,
       69006., 69100., 69200., 69300., 69350.,    nan, 75011., 69370.,
       75009., 75010., 75001., 75008., 75017., 75003., 75002., 75018.,
       92300., 75007., 75012., 75006., 75116., 75015., 92400., 92800.,
       92200., 92100., 75020., 75019., 75004., 75013., 75016., 75005.,
       75014., 92130., 92092., 92932., 92000., 92150., 92270., 92190.,
       92110., 92170., 93400., 92600., 94250., 69160., 92120., 92250.,
       92078., 92310., 92240., 75543., 92064., 92210., 75270., 75000.,
       92057., 93100., 92140., 92081., 75741., 69326., 69130., 69500.,
       94160., 94300., 93170., 94110., 94220., 93260., 69000., 93500.,
       75045., 94120., 94200., 92220., 93000., 92974., 92063., 69429.,
       33000., 33800., 33200., 33400., 33300., 33130., 33110., 33100.,
       33270., 33520., 92935., 33067., 33150., 93507., 33700., 69628.,
       33080., 33076., 92700., 93310., 92042., 92058., 92930., 69120.,
      

Discard addresses with missing zip codes because they are hard to geo-code.

In [34]:
msk = addresses["zip"].isnull()
addresses = addresses[~msk]

assert msk.sum() == 21

In [35]:
addresses = addresses.astype({"zip": int})

### Clean Street Names

Remove extra whitespace, HTML encodings, and accents.

In [36]:
addresses["street"] = (
    addresses["street"]
    .str.replace("\s+", " ", regex=True)
    .str.replace("&#39;", "'")
    # Get rid off accents.
    .str.normalize("NFKD")
    .str.encode("ascii", errors="ignore")
    .str.decode("utf8")
    .astype("string")
    .str.strip()
    .str.title()
)

There are no addresses without a `street` name.

In [37]:
assert not addresses["street"].isnull().any()

### Parse Floor Numbers

Make `floor` an integer column.

In [38]:
addresses["floor"].unique()

<StringArray>
[                                 <NA>,                                    '',
                                   '3',                                  '19',
                          '2ème étage',                                   '1',
                                 '1er',            '2eme etage face escalier',
                                   '2',                        '2 eme droite',
 ...
 ' Premiere Bati  interphone 16  2eme',                 'Le Cargo - 4e étage',
                       'Rdc appart 11',             'Etage 3, appartement 11',
                    '5e étage gauche ',          '2eme etage a gauche bat b ',
              '4ème étage, chambre 41',                          'Cinquiéme ',
       'Montez jusqu&#39;à la grille ',                '5ème étage à droite ']
Length: 11990, dtype: string

Parse out floors from the `floor` text column.

In [39]:
addresses["floor"] = (
    addresses["floor"]
    # Get rid of accents and lower case everything.
    .str.normalize("NFKD")
    .str.encode("ascii", errors="ignore")
    .str.decode("utf8")
    .astype("string")
    .str.casefold()
    # Replace common text that messes up the matching.
    .str.replace(".", "")
    .str.replace(":", "")
    .str.replace(";", "")
    .str.replace("&#39;", "'")
    .str.replace("36b25", "")
    .str.replace("n°", "")
    .str.replace("#", "")
    .str.replace("face l'assanceur", "")
    .str.replace("\(drt\)", "")
    .str.replace("floor", "")
    .str.replace("et demi", "")
    .str.replace("et droite", "")
    .str.replace("droite", "")
    .str.replace("droit", "")
    .str.replace("a gauche", "")
    .str.replace("e gauche", "")
    .str.replace("gauche", "")
    .str.replace("entrez", "")
    .str.replace("serez", "")
    .str.replace("dussol", "")
    .str.replace("soler", "")
    .str.replace("sonner", "")
    .str.replace("code", "")
    .str.replace("perez", "")
    .str.replace("-", "")
    .str.replace("\s+", " ", regex=True)
    .str.strip()
    # Abbreviations.
    .str.replace(
        r"^.*?((\d+)\s?(er|ere|em|eme|ele|ieme|bis|(e|g|st|nd|rd|th|z)($|,|\s+))).*",
        r"\2",
        regex=True,
    )
    # French written out.
    .str.replace(r".*(rdc|rez|sol|ground).*", "0", regex=True)
    .str.replace(r".*(premiere|premier).*", "1", regex=True)
    .str.replace(r".*(deuxieme).*", "2", regex=True)
    .str.replace(r".*(troisieme).*", "3", regex=True)
    .str.replace(r".*(quatrieme).*", "4", regex=True)
    .str.replace(r".*(cinquieme).*", "5", regex=True)
    .str.replace(r".*(sixieme).*", "6", regex=True)
    .str.replace(r".*(septieme).*", "7", regex=True)
    .str.replace(r".*(huitieme).*", "8", regex=True)
    .str.replace(r".*(neuvieme).*", "9", regex=True)
    .str.replace(r".*(dixieme).*", "10", regex=True)
    .str.replace(r"^.*?((etage|etg) (\d+))($|\D+.*)", r"\3", regex=True)
    .str.replace(r"^.*?((\d+)(etage| etage|etg| etg)).*", r"\2", regex=True)
    # Remove apartment info to not confuse it with floor
    .str.replace(
        r"(.*)(ap|apt|app|appt|appart|appartment|appartement|chambre|room)\s*\w?\d+(.*)",
        r"\1 \3",
        regex=True,
    )
    .str.replace(r"(.*)(code|digicode)\s*\w?\d+(.*)", r"\1 \3", regex=True)
    # Take number at start.
    .str.replace(r"^(\d+)(,|\s+).*", r"\1", regex=True)
    # Ignore anything with non-numeric symbols entirely.
    .str.replace(r".*\D+.*", "", regex=True)
    .str.replace("^$", "NaN")
    .fillna("NaN")
    .astype(float)
)

If the `floor` column is empty, parse out floor info from the `special_instructions` column that must have been used before the `floor` column was introduced (slightly different parsing logic than above).

In [40]:
addresses["special_instructions"] = (
    addresses["special_instructions"]
    # Get rid of accents and lower case everything.
    .str.normalize("NFKD")
    .str.encode("ascii", errors="ignore")
    .str.decode("utf8")
    .astype("string")
    .str.casefold()
    # Replace common text that messes up the matching.
    .str.replace(".", "")
    .str.replace(":", "")
    .str.replace(";", "")
    .str.replace("&#39;", "'")
    .str.replace("36b25", "")
    .str.replace("n°", "")
    .str.replace("#", "")
    .str.replace("face l'assanceur", "")
    .str.replace("\(drt\)", "")
    .str.replace("floor", "")
    .str.replace("et demi", "")
    .str.replace("et droite", "")
    .str.replace("droite", "")
    .str.replace("droit", "")
    .str.replace("a gauche", "")
    .str.replace("e gauche", "")
    .str.replace("gauche", "")
    .str.replace("entrez", "")
    .str.replace("serez", "")
    .str.replace("dussol", "")
    .str.replace("soler", "")
    .str.replace("sonner", "")
    .str.replace("code", "")
    .str.replace("perez", "")
    .str.replace("-", "")
    .str.replace("\s+", " ", regex=True)
    .str.strip()
    # Abbreviations.
    .str.replace(
        r"^.*?((\d+)\s?(er|ere|em|eme|ele|ieme|bis|(e|g|st|nd|rd|th|z)($|,|\s+))).*",
        r"\2",
        regex=True,
    )
    # French written out.
    .str.replace(r".*(rdc|rez|sol|ground).*", "0", regex=True)
    .str.replace(r".*(premiere|premier).*", "1", regex=True)
    .str.replace(r".*(deuxieme).*", "2", regex=True)
    .str.replace(r".*(troisieme).*", "3", regex=True)
    .str.replace(r".*(quatrieme).*", "4", regex=True)
    .str.replace(r".*(cinquieme).*", "5", regex=True)
    .str.replace(r".*(sixieme).*", "6", regex=True)
    .str.replace(r".*(septieme).*", "7", regex=True)
    .str.replace(r".*(huitieme).*", "8", regex=True)
    .str.replace(r".*(neuvieme).*", "9", regex=True)
    .str.replace(r".*(dixieme).*", "10", regex=True)
    .str.replace(r"^.*?((etage|etg) (\d+))($|\D+.*)", r"\3", regex=True)
    .str.replace(r"^.*?((\d+)(etage| etage|etg| etg)).*", r"\2", regex=True)
    # Remove apartment info to not confuse it with floor.
    .str.replace(
        r"(.*)(ap|apt|app|appt|appart|appartment|appartement|chambre|room)\s*\w?\d+(.*)",
        r"\1 \3",
        regex=True,
    )
    .str.replace(r"(.*)(code|digicode)\s*\w?\d+(.*)", r"\1 \3", regex=True)
    # Ignore anything with non-numeric symbols entirely.
    .str.replace(r".*\D+.*", "", regex=True)
    .str.replace("^$", "NaN")
    .fillna("NaN")
    .astype(float)
)

Fill in `floor` from `special_instructions` and cast the type.

In [41]:
msk = addresses["floor"].isnull() & addresses["special_instructions"].notnull()
addresses.loc[msk, "floor"] = addresses.loc[msk, "special_instructions"].values

In [42]:
del addresses["special_instructions"]

In [43]:
addresses = addresses.astype({"floor": "Int64"})

Only keep the realisic numbers.

In [44]:
addresses.loc[addresses["floor"].notnull() & (addresses["floor"] > 40), "floor"] = pd.NA

Most addresses have no floor number given.

In [45]:
assert len(addresses.loc[addresses["floor"].isnull(), "floor"]) == 307_973

Most `floor`s are near the ground floor, which is plausible.

In [46]:
addresses["floor"].value_counts().sort_index()

0     20977
1     69765
2     62439
3     59011
4     52226
5     42930
6     28396
7      8853
8      3349
9      1631
10     1384
11      764
12      657
13      448
14      322
15      206
16      202
17      109
18      135
19      105
20       77
21       51
22       99
23       39
24       85
25       76
26       56
27       30
28       49
29       51
30       33
31       55
32       22
33       11
34       37
35       55
36       19
37        8
38       32
39      136
40       19
Name: floor, dtype: Int64

### Deduplicate

The number of addresses (ca. 663,000) is inflated, probably due to some sort of automated re-entering.

In [47]:
assert len(addresses) == 662_922

First, merge all addresses with the same `place_id`, `latitude` / `longitude`, `city`, `zip`, `street`, *and* `floor` into one entry, namely its first occurrence.

In [48]:
addresses["floor"] = addresses["floor"].fillna(999)  # dummy -> No grouping with NaN's

In [49]:
by = ["place_id", "latitude", "longitude", "city_id", "city", "zip", "street", "floor"]

addresses = (
    addresses.reset_index()
    .set_index(by)
    .merge(
        (
            addresses.reset_index()
            .groupby(by)[["id"]]
            .min()
            .rename(columns={"id": "merged_on_id"})
        ),
        left_index=True,
        right_index=True,
    )
    .reset_index()
    .astype({"place_id": "string", "city": "string", "street": "string"})
)

Keep a dictionary `address_merger` to map the ID's that are merged away to the ones that are kept.

In [50]:
address_merger = collections.defaultdict(lambda: np.NaN)
address_merger.update(
    {
        id_: merged_on_id
        for _, id_, merged_on_id in addresses[["id", "merged_on_id"]].itertuples()
    }
)

In [51]:
addresses = (
    addresses[addresses["id"] == addresses["merged_on_id"]]
    .set_index("id")
    .sort_index()[
        [
            "created_at",
            "place_id",
            "latitude",
            "longitude",
            "city_id",
            "city",
            "zip",
            "street",
            "floor",
        ]
    ]
)

In [52]:
addresses["floor"] = addresses["floor"].replace(999, pd.NA).astype("Int64")

Only about 178,000 addresses remain!

In [53]:
assert len(addresses) == 178_101

Second, many addresses are still redundant as they are referring to *different* `floor`s in the *same* house or their `street` name is written differently. 

We create a `primary_id` column that holds the ID of the first occurrence of an address independent of the exact spelling of the `street` name and the `floor` number.

That column is created via grouping the remaining addresses twice, once with their GPS location, and second by a simplified version of `street`. The latter accounts for slightly different `latitude` / `longitude` pairs of the same location, potentially due to an update in the Google Maps database.

In [54]:
by = ["place_id", "latitude", "longitude"]

addresses = (
    addresses.reset_index()
    .set_index(by)
    .merge(
        (
            addresses.reset_index()
            .groupby(by)[["id"]]
            .min()
            .rename(columns={"id": "unified1_id"})
        ),
        left_index=True,
        right_index=True,
    )
    .reset_index()
    .set_index("id")
    .sort_index()
    .astype({"place_id": "string"})[
        [
            "unified1_id",
            "created_at",
            "place_id",
            "latitude",
            "longitude",
            "city_id",
            "city",
            "zip",
            "street",
            "floor",
        ]
    ]
)

In [55]:
addresses["street_simple"] = (
    addresses["street"]
    .str.replace("Avenue", "Ave")
    .str.replace("Place", "Pl")
    .str.replace(".", "")
    .str.replace("-", "")
    .str.replace(" ", "")
    .str.lower()
)

In [56]:
by = ["city_id", "street_simple"]

addresses = (
    addresses.reset_index()
    .set_index(by)
    .merge(
        (
            addresses.reset_index()
            .groupby(by)[["id"]]
            .min()
            .rename(columns={"id": "unified2_id"})
        ),
        left_index=True,
        right_index=True,
    )
    .reset_index()
    .set_index("id")
    .sort_index()[
        [
            "unified1_id",
            "unified2_id",
            "created_at",
            "place_id",
            "latitude",
            "longitude",
            "city_id",
            "city",
            "zip",
            "street",
            "floor",
        ]
    ]
)

So, an address may be a duplicate of *two* different earlier addresses and we choose the earliest one.

In [57]:
addresses["primary_id"] = addresses[["unified1_id", "unified2_id"]].min(axis=1)

del addresses["unified1_id"]
del addresses["unified2_id"]

addresses = addresses[
    [
        "primary_id",
        "created_at",
        "place_id",
        "latitude",
        "longitude",
        "city_id",
        "city",
        "zip",
        "street",
        "floor",
    ]
]

A tricky issue is that an address could be identified as a duplicate of an earlier one that itself is a duplicate of an even earlier one. The following loop does the trick and maps each address to its earlierst version.

In [58]:
_address_unifier = {
    id_: unified_id
    for _, id_, unified_id in addresses.reset_index()[["id", "primary_id"]].itertuples()
}

while True:
    if (addresses["primary_id"] != addresses["primary_id"].map(_address_unifier)).any():
        addresses["primary_id"] = addresses["primary_id"].map(_address_unifier)
    else:
        break

Only about 87,000 of the remaining 178,000 addresses are unique locations disregarding `floor`s and different spellings of the `street` name.

In [59]:
_addresses = addresses.reset_index()
msk = _addresses["id"] == _addresses["primary_id"]
del _addresses

assert msk.sum() == 87_287

To not overwrite a Python built-in in the ORM layer.

In [60]:
addresses = addresses.rename(columns={"zip": "zip_code"})

### Clean Data

In [61]:
addresses.head()

Unnamed: 0_level_0,primary_id,created_at,place_id,latitude,longitude,city_id,city,zip_code,street,floor
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2,2,2016-02-22 10:42:10,ChIJSfxJmlXq9EcRX2ChkiPW9J8,45.763149,4.83266,1,Lyon,69002,31 Rue Merciere,
3,3,2016-02-22 10:42:10,ChIJwwMvNPnq9EcRY7Qu-Tw2HL8,45.767227,4.83575,1,Lyon,69001,Rue De La Republique 2,
4,4,2016-02-22 10:43:57,ChIJr1RhGN7B9EcRv6XSHmmN6a8,45.743725,4.873138,1,Lyon,69008,123 Avenue Des Freres Lumieres,
5,5,2016-02-22 10:43:57,ChIJEwQm9H7q9EcRTymMxQ71z84,45.759369,4.864087,1,Lyon,69003,Avenue Georges Pompidou 17,
6,6,2016-02-22 11:06:08,ChIJbQz7p6vr9EcRr9L2cH5942I,45.761181,4.826371,1,Lyon,69005,8 Bis Place Saint Jean,


In [62]:
addresses.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 178101 entries, 2 to 691914
Data columns (total 10 columns):
 #   Column      Non-Null Count   Dtype         
---  ------      --------------   -----         
 0   primary_id  178101 non-null  int64         
 1   created_at  178101 non-null  datetime64[ns]
 2   place_id    178101 non-null  string        
 3   latitude    178101 non-null  float64       
 4   longitude   178101 non-null  float64       
 5   city_id     178101 non-null  int64         
 6   city        178101 non-null  string        
 7   zip_code    178101 non-null  int64         
 8   street      178101 non-null  string        
 9   floor       100540 non-null  Int64         
dtypes: Int64(1), datetime64[ns](1), float64(2), int64(3), string(3)
memory usage: 15.1 MB


In [63]:
assert (
    hashlib.sha256(addresses.to_json().encode()).hexdigest()
    == "4f9f3b63a9b2472bf07207d0e06f4901619066121d6bb5fd3ad4ebf21b590410"
)

## Restaurants

### Raw Data

Load restaurants associated with *all* addresses in the target cities. Further below, *all* restaurants are shown to have a clean address.

In [64]:
restaurants = pd.read_sql_query(
    f"""
    SELECT
        id,
        created_at,
        name,
        address_id,
        estimated_prep_duration
    FROM
        {config.ORIGINAL_SCHEMA}.businesses
    WHERE
        address_id IN (
            SELECT id FROM {config.ORIGINAL_SCHEMA}.addresses WHERE city_id IN %(city_ids)s
        )
        AND
        created_at < '{config.CUTOFF_DAY}'
    ORDER BY
        id
    """,
    con=connection,
    index_col="id",
    params={"city_ids": city_ids},
    parse_dates=["created_at"],
)

In [65]:
restaurants["name"] = restaurants["name"].astype("string")

In [66]:
restaurants.head()

Unnamed: 0_level_0,created_at,name,address_id,estimated_prep_duration
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,2016-02-22 09:42:10.228854,King Marcel Mercière,2,1200
2,2016-02-22 09:43:57.103750,Trotekala,4,1200
3,2016-02-22 10:06:08.796042,Soul Food & Jazz Café,6,1200
4,2016-02-22 10:16:11.478981,Gourmix Bellecour,8,900
5,2016-02-22 10:36:09.150481,Yabio Hôtel de Ville,10,1200


In [67]:
restaurants.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1654 entries, 1 to 1787
Data columns (total 4 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   created_at               1654 non-null   datetime64[ns]
 1   name                     1654 non-null   string        
 2   address_id               1654 non-null   int64         
 3   estimated_prep_duration  1654 non-null   int64         
dtypes: datetime64[ns](1), int64(2), string(1)
memory usage: 64.6 KB


In [68]:
assert len(restaurants) == 1_654

### Adjust Time Zone

In [69]:
restaurants["created_at"] = clean_datetime(restaurants["created_at"])

### Simplify Names

In [70]:
restaurants["name"] = (
    restaurants["name"]
    .str.replace("\s+", " ", regex=True)
    .str.replace("&#39;", "'")
    # Get rid off accents.
    .str.normalize("NFKD")
    .str.encode("ascii", errors="ignore")
    .str.decode("utf8")
    .astype("string")
    .str.title()
    # To find duplicates further below.
    .str.replace(" & ", " And ")
    .str.replace("The ", "")
    .str.replace("Pasta Pizza ", "")
    .str.replace(" - Bar A Taboule", "")
    .str.replace("- Cuisine Mediterraneenne", "")
    .str.replace(" - Petit-Dejeuner", "")
    .str.replace("Lyon", "")
    .str.replace("La Burgeria Saint Mande", "La Fromagette Saint Mande")
    .str.replace("Mansou'", "Mansouria")
    .str.strip()
)

### Use Merged Addresses

In [71]:
restaurants["address_id"] = restaurants["address_id"].map(address_merger)

In [72]:
assert not restaurants["address_id"].isnull().any()

### Deduplicate

Restaurants with the same name at the same (unified) address are merged.

In [73]:
restaurants = restaurants.merge(
    addresses["primary_id"], left_on="address_id", right_index=True
)
restaurants = restaurants.rename(columns={"primary_id": "primary_address_id"})

In [74]:
by = ["name", "primary_address_id"]

restaurants = (
    restaurants.reset_index()
    .set_index(by)
    .merge(
        (
            restaurants.reset_index()
            .groupby(by)[["id"]]
            .min()
            .rename(columns={"id": "merged_on_id"})
        ),
        left_index=True,
        right_index=True,
    )
    .reset_index()
    .astype({"name": "string"})
)

Keep a dictionary to map the ID's that are merged away to the ones that are kept.

In [75]:
restaurants_merger = collections.defaultdict(lambda: np.NaN)
restaurants_merger.update(
    {
        id_: merged_on_id
        for _, id_, merged_on_id in restaurants[["id", "merged_on_id"]].itertuples()
    }
)

In [76]:
restaurants = (
    restaurants[restaurants["id"] == restaurants["merged_on_id"]]
    .set_index("id")
    .sort_index()[["created_at", "name", "address_id", "estimated_prep_duration"]]
)

In [77]:
assert len(restaurants) == 1_644

### Clean Data

In [78]:
restaurants.head()

Unnamed: 0_level_0,created_at,name,address_id,estimated_prep_duration
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,2016-02-22 10:42:10,King Marcel Merciere,2,1200
2,2016-02-22 10:43:57,Trotekala,4,1200
3,2016-02-22 11:06:08,Soul Food And Jazz Cafe,6,1200
4,2016-02-22 11:16:11,Gourmix Bellecour,8,900
5,2016-02-22 11:36:09,Yabio Hotel De Ville,10,1200


In [79]:
restaurants.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1644 entries, 1 to 1787
Data columns (total 4 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   created_at               1644 non-null   datetime64[ns]
 1   name                     1644 non-null   string        
 2   address_id               1644 non-null   int64         
 3   estimated_prep_duration  1644 non-null   int64         
dtypes: datetime64[ns](1), int64(2), string(1)
memory usage: 64.2 KB


In [80]:
assert (
    hashlib.sha256(restaurants.to_json().encode()).hexdigest()
    == "8eb852690c027e2fcc0d9cf988391741b1cd028e35c494766f8c21d4ea1722b7"
)

## Couriers

### Raw Data

Only load couriers that worked in one of the target cities (i.e., had an order) and include the vehicle information.

In [81]:
couriers = pd.read_sql_query(
    f"""
    SELECT
        couriers.id,
        couriers.created_at,
        MD5(couriers.name) AS name,
        vehicle_types.icon as vehicle,
        couriers.speed,
        vehicle_bag_types.capacity,
        couriers.pay_per_hour,
        couriers.pay_per_order
    FROM
        {config.ORIGINAL_SCHEMA}.couriers
    LEFT OUTER JOIN
        {config.ORIGINAL_SCHEMA}.vehicles ON couriers.vehicle_id = vehicles.id
    LEFT OUTER JOIN
        {config.ORIGINAL_SCHEMA}.vehicle_types ON vehicles.vehicle_type_id = vehicle_types.id
    LEFT OUTER JOIN
        {config.ORIGINAL_SCHEMA}.vehicle_bag_types ON vehicles.vehicle_bag_type_id = vehicle_bag_types.id
    WHERE
        couriers.id in (
            SELECT DISTINCT
                deliveries.courier_id
            FROM
                {config.ORIGINAL_SCHEMA}.orders
            INNER JOIN
                {config.ORIGINAL_SCHEMA}.deliveries ON orders.id = deliveries.order_id
            WHERE
                orders.featured_business_id IN (
                    SELECT  -- Subquery based off the restaurants query above!
                        id
                    FROM
                        {config.ORIGINAL_SCHEMA}.businesses
                    WHERE
                        address_id IN (
                            SELECT id FROM {config.ORIGINAL_SCHEMA}.addresses WHERE city_id IN %(city_ids)s
                        )
                        AND
                        created_at < '{config.CUTOFF_DAY}'
                )
                AND
                deliveries.courier_id IS NOT NULL
        )
    ORDER BY
        couriers.id
    """,
    con=connection,
    index_col="id",
    params={"city_ids": city_ids},
    parse_dates=["created_at"],
)

In [82]:
couriers = couriers.astype({"name": "string", "vehicle": "string"})

In [83]:
couriers.head()

Unnamed: 0_level_0,created_at,name,vehicle,speed,capacity,pay_per_hour,pay_per_order
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,2016-02-21 13:28:41.571046,9181c2be2b3746cbcd6abf86d77fb2aa,bicycle,13.9,150,0,300
2,2016-02-21 13:28:41.673711,1b5d5709985e874e9c54e30127b3049f,bicycle,18.47,100,0,300
3,2016-02-21 13:28:41.776893,0e385d898f81147d460a45d2943a4eed,bicycle,19.18,100,0,300
4,2016-02-21 13:28:41.879876,ca21f91ea528f1b6e596d0c43959062b,bicycle,14.73,150,0,200
5,2016-02-21 13:28:41.982542,04af1119256f88ea3a25e79344dec96c,bicycle,15.2,150,0,200


In [84]:
couriers.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2471 entries, 1 to 53764
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   created_at     2471 non-null   datetime64[ns]
 1   name           2471 non-null   string        
 2   vehicle        2471 non-null   string        
 3   speed          2471 non-null   float64       
 4   capacity       2471 non-null   int64         
 5   pay_per_hour   2471 non-null   int64         
 6   pay_per_order  2471 non-null   int64         
dtypes: datetime64[ns](1), float64(1), int64(3), string(2)
memory usage: 154.4 KB


In [85]:
assert len(couriers) == 2_471

### Adjust Time Zone

In [86]:
couriers["created_at"] = clean_datetime(couriers["created_at"])

### Deduplicate

Couriers with the same name either have the same phone number or signed up within a short time window: They are merged.

In [87]:
by = ["name"]

couriers = (
    couriers.reset_index()
    .set_index(by)
    .merge(
        (
            couriers.reset_index()
            .groupby(by)[["id"]]
            .max()  # merged on the latest courier!
            .rename(columns={"id": "merged_on_id"})
        ),
        left_index=True,
        right_index=True,
    )
    .reset_index()
    .astype({"name": "string"})
)

Keep a dictionary to map the ID's that are merged away to the ones that are kept.

In [88]:
couriers_merger = collections.defaultdict(lambda: np.NaN)
couriers_merger.update(
    {
        id_: merged_on_id
        for _, id_, merged_on_id in couriers[["id", "merged_on_id"]].itertuples()
    }
)

In [89]:
couriers = (
    couriers[couriers["id"] == couriers["merged_on_id"]]
    .set_index("id")
    .sort_index()[
        ["created_at", "vehicle", "speed", "capacity", "pay_per_hour", "pay_per_order"]
    ]
)

In [90]:
assert len(couriers) == 2_469

### Clean Salary

The column `pay_per_hour` defaults to `0` in the database definition. The actual default value is EUR 7,50, which is also the mode in the dataset.

In [91]:
couriers["pay_per_hour"].value_counts()

0       2398
750       70
1500       1
Name: pay_per_hour, dtype: int64

The column `pay_per_order` defaults to `0` in the database definition. A more realistic value is EUR 2 (i.e., 200 cents), which is the mode in the dataset.

In [92]:
couriers["pay_per_order"].value_counts()

200        2146
0           159
300          97
400          29
500          11
2             9
600           8
20000         3
250           3
650           2
1400000       1
1             1
Name: pay_per_order, dtype: int64

Whenever a `0` appears in `pay_per_order`, the corresponding `pay_per_hour` is `0` in all cases except one, which is the highest paid courier.

In [93]:
assert ((couriers["pay_per_order"] == 0) & (couriers["pay_per_hour"] == 0)).sum() == 158

In [94]:
assert ((couriers["pay_per_order"] == 0) & (couriers["pay_per_hour"] > 0)).sum() == 1

In [95]:
couriers[(couriers["pay_per_order"] == 0) & (couriers["pay_per_hour"] > 0)]

Unnamed: 0_level_0,created_at,vehicle,speed,capacity,pay_per_hour,pay_per_order
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
721,2016-03-18 16:56:39,bicycle,20.49,100,1500,0


Couriers with `0`s in both columns receive the default payment scheme.

In [96]:
msk_0_pay = (couriers["pay_per_hour"] == 0) & (couriers["pay_per_order"] == 0)

couriers.loc[msk_0_pay, "pay_per_hour"] = 750
couriers.loc[msk_0_pay, "pay_per_order"] = 200

Couriers with a `0` in the `pay_per_hour` column, receive a fixed salary of EUR 7,50.

In [97]:
couriers.loc[couriers["pay_per_hour"] == 0, "pay_per_hour"] = 750

The column `pay_per_order` contains obvious typos that are corrected.

In [98]:
couriers.loc[
    couriers["pay_per_order"].isin([1, 2, 20, 2000, 20000]), "pay_per_order"
] = 200
couriers.loc[couriers["pay_per_order"] == 1400000, "pay_per_order"] = 400

Distribution of the various `pay_per_hour` / `pay_per_order` combinations.

In [99]:
collections.Counter(
    (y, z) for (x, y, z) in couriers[["pay_per_hour", "pay_per_order"]].itertuples()
)

Counter({(750, 300): 97,
         (750, 200): 2317,
         (750, 500): 11,
         (750, 400): 30,
         (750, 600): 8,
         (750, 650): 2,
         (1500, 0): 1,
         (750, 250): 3})

### Clean Data

In [100]:
couriers.head()

Unnamed: 0_level_0,created_at,vehicle,speed,capacity,pay_per_hour,pay_per_order
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,2016-02-21 14:28:41,bicycle,13.9,150,750,300
2,2016-02-21 14:28:41,bicycle,18.47,100,750,300
3,2016-02-21 14:28:41,bicycle,19.18,100,750,300
4,2016-02-21 14:28:41,bicycle,14.73,150,750,200
5,2016-02-21 14:28:41,bicycle,15.2,150,750,200


In [101]:
couriers.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2469 entries, 1 to 53764
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   created_at     2469 non-null   datetime64[ns]
 1   vehicle        2469 non-null   string        
 2   speed          2469 non-null   float64       
 3   capacity       2469 non-null   int64         
 4   pay_per_hour   2469 non-null   int64         
 5   pay_per_order  2469 non-null   int64         
dtypes: datetime64[ns](1), float64(1), int64(3), string(1)
memory usage: 135.0 KB


In [102]:
assert (
    hashlib.sha256(couriers.to_json().encode()).hexdigest()
    == "a1059e93095842120a58c4f74145bb6a14aeb94f47a0d77043896c70f4772afe"
)

## Orders

### Raw Data

The order related data is spread over many different tables in the original database. Also, some data is not even normalized. The following SQL query puts all the data into one big relation that is cleaned further below.

In [103]:
orders = pd.read_sql_query(
    f"""
    SELECT
        orders.id,
        deliveries.id AS delivery_id,
        MD5(CONCAT(orders.email, orders.phone_number)) as customer_id,  -- anonymize the customer data
        orders.order_placed_at AS placed_at,
        CASE
            WHEN orders.preorder IS FALSE
                THEN TRUE
                ELSE FALSE    
            END AS ad_hoc,
        CASE
            WHEN orders.preorder is TRUE
                THEN orders.scheduled_dropoff_at
                ELSE NULL
            END AS scheduled_delivery_at,
        deliveries.status,
        cancellations.cancelled_at,
        orders.featured_business_id as restaurant_id,
        orders.order_sent_at AS restaurant_notified_at,
        orders.order_received_at AS restaurant_confirmed_at,
        orders.estimated_prep_duration,
        orders.estimated_prep_buffer,
        deliveries.courier_id,
        deliveries.courier_dispatched_at AS dispatch_at,
        deliveries.courier_notified_at,
        deliveries.courier_accepted_at,
        courier_no_accept_confirmed.issue AS courier_no_accept_confirmed_issue,
        orders.pickup_address_id,
        orders.scheduled_pickup_at,
        deliveries.courier_picked_up_at AS pickup_at,
        left_pickups.left_pickup_at,
        courier_late_at_pickup.issue AS courier_late_at_pickup_issue,
        courier_waited_at_pickup.issue AS courier_waited_at_pickup_issue,
        courier_no_pickup_confirmed.issue AS courier_no_pickup_confirmed_issue,
        orders.dropoff_address_id AS delivery_address_id,
        deliveries.first_estimated_dropoff_at AS first_estimated_delivery_at,
        deliveries.courier_dropped_off_at AS delivery_at,
        courier_waited_at_delivery.issue AS courier_waited_at_delivery_issue,
        courier_no_delivery_confirmed.issue AS courier_no_delivery_confirmed_issue,
        orders.utilization,
        items_totals.sub_total,
        orders.delivery_fee,
        orders.total,
        deliveries.delivery_distance AS logged_delivery_distance,
        deliveries.courier_avg_speed AS logged_avg_courier_speed,
        CAST(deliveries.courier_avg_speed_distance AS INTEGER) AS logged_avg_courier_speed_distance,
        delivery_timings.accepting_time AS logged_accepting_time,
        delivery_timings.courier_reaction_time AS logged_reaction_time,
        delivery_timings.to_pickup_time AS logged_to_pickup_time,
        delivery_timings.expected_wait_pickup_time AS expected_wait_pickup_time,
        delivery_timings.wait_pickup_time AS logged_wait_pickup_time,
        delivery_timings.pickup_time AS logged_pickup_time,
        delivery_timings.courier_late AS logged_courier_late_time,
        delivery_timings.vendor_late AS logged_restaurant_late_time,
        delivery_timings.to_dropoff_time AS logged_to_delivery_time,
        delivery_timings.expected_dropoff_time AS expected_delivery_time,
        delivery_timings.dropoff_time AS logged_delivery_time,
        delivery_timings.delivery_late AS logged_delivery_late_time,
        delivery_timings.total_time AS logged_total_time,
        delivery_timings.confirmed_total_time AS logged_confirmed_total_time
    FROM
        {config.ORIGINAL_SCHEMA}.orders
    LEFT OUTER JOIN
        {config.ORIGINAL_SCHEMA}.deliveries ON orders.id = deliveries.order_id
    LEFT OUTER JOIN
        (
            SELECT
                order_id,
                CAST(100 * SUM(price) AS INTEGER) AS sub_total
            FROM
                {config.ORIGINAL_SCHEMA}.order_records
            GROUP BY
                order_id
        ) AS items_totals ON orders.id = items_totals.order_id
    LEFT OUTER JOIN
        {config.ORIGINAL_SCHEMA}.delivery_timings ON deliveries.id = delivery_timings.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'DispatchIssue'
                AND
                category = 'no_courier_interaction'
            GROUP BY
                delivery_id
        ) AS courier_no_accept_confirmed ON deliveries.id = courier_no_accept_confirmed.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'PickupIssue'
                AND
                category = 'waiting'
            GROUP BY
                delivery_id
        ) AS courier_waited_at_pickup ON deliveries.id = courier_waited_at_pickup.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'PickupIssue'
                AND
                category = 'late'
            GROUP BY
                delivery_id
        ) AS courier_late_at_pickup ON deliveries.id = courier_late_at_pickup.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'PickupIssue'
                AND
                category = 'no_courier_interaction'
            GROUP BY
                delivery_id
        ) AS courier_no_pickup_confirmed ON deliveries.id = courier_no_pickup_confirmed.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'DropoffIssue'
                AND
                category = 'waiting'
            GROUP BY
                delivery_id
        ) AS courier_waited_at_delivery ON deliveries.id = courier_waited_at_delivery.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'DropoffIssue'
                AND
                category = 'late'
            GROUP BY
                delivery_id
        ) AS courier_late_at_delivery ON deliveries.id = courier_late_at_delivery.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(notes) AS issue
            FROM
                {config.ORIGINAL_SCHEMA}.issues
            WHERE
                type = 'DropoffIssue'
                AND
                category = 'no_courier_interaction'
            GROUP BY
                delivery_id
        ) AS courier_no_delivery_confirmed ON deliveries.id = courier_no_delivery_confirmed.delivery_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                courier_id,
                MAX(created_at) AS left_pickup_at
            FROM (
                SELECT
                    delivery_id,
                    (metadata -> 'courier_id')::TEXT::INTEGER AS courier_id,
                    created_at
                FROM
                    {config.ORIGINAL_SCHEMA}.delivery_transitions
                WHERE
                    to_state = 'left_pickup'
            ) AS left_pickups
            GROUP BY
                delivery_id,
                courier_id
        ) AS left_pickups ON deliveries.id = left_pickups.delivery_id AND deliveries.courier_id = left_pickups.courier_id
    LEFT OUTER JOIN (
            SELECT
                delivery_id,
                MAX(created_at) AS cancelled_at
            FROM
                {config.ORIGINAL_SCHEMA}.delivery_transitions
            WHERE
                to_state = 'cancelled'
            GROUP BY
                delivery_id
        ) AS cancellations ON deliveries.id = cancellations.delivery_id
    WHERE
        orders.featured_business_id IN (
            SELECT  -- Subquery based off the restaurants query above!
                id
            FROM
                {config.ORIGINAL_SCHEMA}.businesses
            WHERE
                address_id IN (
                    SELECT id FROM {config.ORIGINAL_SCHEMA}.addresses WHERE city_id IN %(city_ids)s
                )
                AND
                created_at < '{config.CUTOFF_DAY}'
        )
        AND
        scheduled_dropoff_at < '{config.CUTOFF_DAY}'
        AND
        deliveries.is_primary IS TRUE
    ORDER BY
        orders.id
    """,
    con=connection,
    index_col="id",
    params={"city_ids": city_ids},
    parse_dates=[
        "placed_at",
        "scheduled_delivery_at",
        "cancelled_at",
        "restaurant_notified_at",
        "restaurant_confirmed_at",
        "dispatch_at",
        "courier_notified_at",
        "courier_accepted_at",
        "pickup_at",
        "left_pickup_at",
        "first_estimated_delivery_at",
        "delivery_at",
    ],
)

In [104]:
orders = orders.astype(
    {
        "customer_id": "string",
        "status": "string",
        "estimated_prep_duration": "Int64",
        "courier_id": "Int64",
        "courier_no_accept_confirmed_issue": "string",
        "courier_late_at_pickup_issue": "string",
        "courier_waited_at_pickup_issue": "string",
        "courier_no_pickup_confirmed_issue": "string",
        "courier_waited_at_delivery_issue": "string",
        "courier_no_delivery_confirmed_issue": "string",
        "logged_avg_courier_speed_distance": "Int64",
        "logged_accepting_time": "Int64",
        "logged_reaction_time": "Int64",
        "logged_to_pickup_time": "Int64",
        "expected_wait_pickup_time": "Int64",
        "logged_wait_pickup_time": "Int64",
        "logged_pickup_time": "Int64",
        "logged_courier_late_time": "Int64",
        "logged_restaurant_late_time": "Int64",
        "logged_to_delivery_time": "Int64",
        "expected_delivery_time": "Int64",
        "logged_delivery_time": "Int64",
        "logged_delivery_late_time": "Int64",
        "logged_total_time": "Int64",
        "logged_confirmed_total_time": "Int64",
    }
)

In [105]:
orders.head()

Unnamed: 0_level_0,delivery_id,customer_id,placed_at,ad_hoc,scheduled_delivery_at,status,cancelled_at,restaurant_id,restaurant_notified_at,restaurant_confirmed_at,estimated_prep_duration,estimated_prep_buffer,courier_id,dispatch_at,courier_notified_at,courier_accepted_at,courier_no_accept_confirmed_issue,pickup_address_id,scheduled_pickup_at,pickup_at,left_pickup_at,courier_late_at_pickup_issue,courier_waited_at_pickup_issue,courier_no_pickup_confirmed_issue,delivery_address_id,first_estimated_delivery_at,delivery_at,courier_waited_at_delivery_issue,courier_no_delivery_confirmed_issue,utilization,sub_total,delivery_fee,total,logged_delivery_distance,logged_avg_courier_speed,logged_avg_courier_speed_distance,logged_accepting_time,logged_reaction_time,logged_to_pickup_time,expected_wait_pickup_time,logged_wait_pickup_time,logged_pickup_time,logged_courier_late_time,logged_restaurant_late_time,logged_to_delivery_time,expected_delivery_time,logged_delivery_time,logged_delivery_late_time,logged_total_time,logged_confirmed_total_time
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1
1,487377,f172baefbdf8550eb2c6bcf667e43090,2016-02-22 09:42:01,False,2016-02-22 11:30:00,cancelled,2016-10-18 07:52:45.136243,1,NaT,NaT,,0,,NaT,NaT,NaT,,2,2016-02-22 11:15:00.000000,NaT,NaT,,,,3,NaT,NaT,,,15,1250,250,1500,671,,,,,,,,,,,,,,,,
2,487433,cc2f0563675b4b2e9b985bfa4d7a157f,2016-02-22 09:40:09,False,2016-02-22 11:00:00,completed,NaT,2,2016-02-22 10:31:03.664857,2016-02-22 10:46:43.348671,,0,96.0,2016-02-22 10:37:00.683271,2016-02-22 10:36:04.532254,2016-02-22 10:37:00.683188,"Courier did not hit ""Accept""",4,2016-02-22 10:41:03.664793,2016-02-22 10:54:00.886417,NaT,,,"Courier did not hit ""Picked up""",5,2016-02-23 22:55:23,2016-02-22 11:13:23.772580,,,19,1550,250,1800,2281,,,2916.0,,187.0,,69.0,125.0,67.0,69.0,1745.0,4791.0,264.0,942.0,5734.0,5595.0
3,487444,c09807c923d356a43fac084543de664b,2016-02-22 09:56:16,False,2016-02-22 11:00:00,completed,NaT,3,2016-02-22 10:31:02.531350,2016-02-22 10:42:00.334529,,0,57.0,2016-02-22 10:31:30.963423,2016-02-22 10:31:03.587634,2016-02-22 10:31:30.963342,,6,2016-02-22 10:41:02.531284,2016-02-22 10:54:32.122854,NaT,,,,7,2016-02-23 22:25:47,2016-02-22 11:06:07.677724,,,25,2000,250,2250,2449,,,28.0,,1381.0,,,0.0,0.0,0.0,522.0,3824.0,270.0,464.0,4288.0,4192.0
4,470503,440d641745a8707a7afd5565adc99be2,2016-02-22 10:11:46,False,2016-02-22 11:30:00,completed,NaT,4,2016-02-22 10:32:04.810633,2016-02-22 10:48:01.402101,,0,39.0,2016-02-22 10:50:29.141811,2016-02-22 10:50:03.610169,2016-02-22 10:50:29.141722,,8,2016-02-22 11:17:04.810526,2016-02-22 11:11:26.166161,NaT,,,,9,2016-02-22 11:25:50,2016-02-22 11:18:43.289947,,,100,9800,250,10050,196,,,26.0,,439.0,,-180.0,818.0,-1061.0,-338.0,87.0,4694.0,350.0,-676.0,4017.0,4017.0
5,487439,0e7b3e4012a00a56b5bf1dee1bc6c1c7,2016-02-22 10:35:58,False,2016-02-22 11:30:00,completed,NaT,5,2016-02-22 10:50:03.866327,2016-02-22 10:50:19.473584,,0,128.0,2016-02-22 10:50:28.153663,2016-02-22 10:50:04.830855,2016-02-22 10:50:28.153559,,10,2016-02-22 11:10:03.866256,2016-02-22 11:05:19.667624,NaT,,,,11,2016-02-22 11:28:19,2016-02-22 11:20:44.147345,,,45,3600,250,3850,2584,,,24.0,,284.0,,-440.0,607.0,-735.0,-284.0,694.0,3242.0,231.0,-555.0,2686.0,2686.0


In [106]:
orders.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 661314 entries, 1 to 688690
Data columns (total 50 columns):
 #   Column                               Non-Null Count   Dtype         
---  ------                               --------------   -----         
 0   delivery_id                          661314 non-null  int64         
 1   customer_id                          661314 non-null  string        
 2   placed_at                            661314 non-null  datetime64[ns]
 3   ad_hoc                               661314 non-null  bool          
 4   scheduled_delivery_at                83579 non-null   datetime64[ns]
 5   status                               661314 non-null  string        
 6   cancelled_at                         23665 non-null   datetime64[ns]
 7   restaurant_id                        661314 non-null  int64         
 8   restaurant_notified_at               656311 non-null  datetime64[ns]
 9   restaurant_confirmed_at              639636 non-null  datetime64[ns]
 

In [107]:
assert len(orders) == 661_314

### Adjust Time Zones

In [108]:
for column in [
    "placed_at",
    "scheduled_delivery_at",
    "cancelled_at",
    "restaurant_notified_at",
    "restaurant_confirmed_at",
    "dispatch_at",
    "courier_notified_at",
    "courier_accepted_at",
    "scheduled_pickup_at",
    "pickup_at",
    "left_pickup_at",
    "first_estimated_delivery_at",
    "delivery_at",
]:
    orders[column] = clean_datetime(orders[column])

### Use Merged Addresses

About 0.02 % of the orders belong to discarded addresses and are discarded also.

In [109]:
orders["pickup_address_id"] = orders["pickup_address_id"].map(address_merger)
orders["delivery_address_id"] = orders["delivery_address_id"].map(address_merger)
msk = orders["pickup_address_id"].isnull() | orders["delivery_address_id"].isnull()
orders = orders[~msk].astype({"pickup_address_id": int, "delivery_address_id": int,})

assert msk.sum() == 160

### Use Merged Restaurants

In [110]:
orders["restaurant_id"] = orders["restaurant_id"].map(restaurants_merger)

In [111]:
assert not orders["restaurant_id"].isnull().any()

### Use Merged Couriers

In [112]:
orders["courier_id"] = orders["courier_id"].map(couriers_merger).astype("Int64")

Verify that the couriers' IDs are the same in `couriers` and `orders`.

In [113]:
assert set(couriers.index) == set(
    orders.loc[orders["courier_id"].notnull(), "courier_id"].unique()
)

### Clean User IDs

Convert the MD5 hashed emails and phone numbers into integer ID's.

In [114]:
orders["customer_id"] = (
    orders["customer_id"]
    .map({y: x for (x, y) in enumerate(orders["customer_id"].unique(), start=1)})
    .astype({"customer_id": int})
)

### Ad-hoc vs. Scheduled Orders

Ad-hoc orders never have a `scheduled_delivery_at` value set, and scheduled orders always have it set.

In [115]:
assert not (
    (orders["ad_hoc"] == True) & orders["scheduled_delivery_at"].notnull()
).any()
assert not (
    (orders["ad_hoc"] == False) & orders["scheduled_delivery_at"].isnull()
).any()

For all adjusted timestamps we add `*_corrected` columns indicating if a correction is made in the following.

In [116]:
for column in [
    "scheduled_delivery_at",
    "cancelled_at",
    "restaurant_notified_at",
    "restaurant_confirmed_at",
    "estimated_prep_duration",
    "dispatch_at",
    "courier_notified_at",
    "courier_accepted_at",
    "pickup_at",
    "left_pickup_at",
    "delivery_at",
]:
    orders[column + "_corrected"] = False
    orders.loc[orders[column].isnull(), column + "_corrected"] = pd.NA

Some customers managed to place scheduled orders for the past. These are converted into ad-hoc orders.

In [117]:
msk = orders["scheduled_delivery_at"] < orders["placed_at"]
orders.loc[msk, "ad_hoc"] = True
orders.loc[msk, "scheduled_delivery_at"] = pd.NaT
orders.loc[msk, "scheduled_delivery_at_corrected"] = True

assert msk.sum() == 11

Orders scheduled within the next 30 minutes are treated as ad-hoc orders. With the median fulfillment time of ad-hoc orders being 34 minutes, it is absolutely unrealistic to fulfill such a scheduled order on time. This should not influence the KPIs in a bad way.

In [118]:
msk = (orders["ad_hoc"] == False) & (
    orders["scheduled_delivery_at"] - orders["placed_at"]
    < datetime.timedelta(minutes=30)
)
orders.loc[msk, "ad_hoc"] = True
orders.loc[msk, "scheduled_delivery_at"] = pd.NaT
orders.loc[msk, "scheduled_delivery_at_corrected"] = True

assert msk.sum() == 3_267

For scheduled orders, `scheduled_delivery_at` is mostly set to quarters of an hour. The seconds part is always `0`.

In [119]:
assert not (
    (orders["ad_hoc"] == False) & (orders["scheduled_delivery_at"].dt.second != 0)
).any()

If a customer managed to enter something other than a quarter of an hour as `scheduled_delivery_at`, we adjust that.

In [120]:
msk = (orders["ad_hoc"] == False) & (
    orders["scheduled_delivery_at"].dt.minute % 15 != 0
)
round_down = msk & (orders["scheduled_delivery_at"].dt.minute % 15 < 8)
orders.loc[round_down, "scheduled_delivery_at"] = orders.loc[
    round_down, "scheduled_delivery_at"
] - (orders.loc[round_down, "scheduled_delivery_at"].dt.minute % 15).map(
    lambda m: datetime.timedelta(minutes=m)
)
round_up = msk & (orders["scheduled_delivery_at"].dt.minute % 15 >= 8)
orders.loc[round_up, "scheduled_delivery_at"] = orders.loc[
    round_up, "scheduled_delivery_at"
] + (orders.loc[round_up, "scheduled_delivery_at"].dt.minute % 15).map(
    lambda m: datetime.timedelta(minutes=(15 - m))
)
orders.loc[msk, "scheduled_delivery_at_corrected"] = True

assert msk.sum() == 6

### Timestamps

All timestamps in `orders` must occur in a strict sequence (i.e., order) according to the delivery process. A tiny fraction of the orders has timestamps that do not comply with that and are adjusted in the following.

`placed_at` must always be the earliest of all timestamps.

In [121]:
for column in [
    "scheduled_delivery_at",
    "cancelled_at",
    "restaurant_notified_at",
    "restaurant_confirmed_at",
    "dispatch_at",
    "courier_notified_at",
    "courier_accepted_at",
    "pickup_at",
    "left_pickup_at",
    "first_estimated_delivery_at",
    "delivery_at",
]:
    assert not (orders["placed_at"] >= orders[column]).any()

Rarely, a restaurant confirmed an order before it was notified about it. We keep `restaurant_confirmed_at` in these cases.

In [122]:
msk = orders["restaurant_notified_at"] >= orders["restaurant_confirmed_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True

assert msk.sum() == 47

Whenever `restaurant_notified_at` or `restaurant_confirmed_at` is later than `pickup_at`, we discard the values.

In [123]:
msk = orders["restaurant_notified_at"] >= orders["pickup_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True

assert msk.sum() == 73

In [124]:
msk = orders["restaurant_confirmed_at"] >= orders["pickup_at"]
orders.loc[msk, "restaurant_confirmed_at"] = pd.NaT
orders.loc[msk, "restaurant_confirmed_at_corrected"] = True

assert msk.sum() == 2_001

If a courier forgot to confirm the pickup, `pickup_at` and `delivery_at` are the same.

In [125]:
msk = orders["delivery_at"] == orders["pickup_at"]
orders.loc[msk, "pickup_at"] = pd.NaT
orders.loc[msk, "pickup_at_corrected"] = True

assert msk.sum() == 16

In [126]:
msk = orders["delivery_at"] == orders["left_pickup_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True

assert msk.sum() == 15

`delivery_at` must be the latest of all dispatch-related timestamps.

In [127]:
for column in [
    "dispatch_at",
    "courier_notified_at",
    "courier_accepted_at",
    "pickup_at",
    "left_pickup_at",
]:
    assert not (orders["delivery_at"] <= orders[column]).any()

In about 14,500 cases the `left_pickup_at` lies before or on `pickup_at`. This only affects orders between September 6 and October 17. We discard these timestamps.

In [128]:
msk = orders["left_pickup_at"] < orders["pickup_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True

assert msk.sum() == 14_013
assert orders.loc[msk, "placed_at"].min().date() == datetime.date(2016, 9, 6)
assert orders.loc[msk, "placed_at"].max().date() == datetime.date(2016, 10, 17)

In [129]:
msk = orders["left_pickup_at"] == orders["pickup_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True

assert msk.sum() == 496

In [130]:
for column in [
    "dispatch_at",
    "courier_notified_at",
    "courier_accepted_at",
    "pickup_at",
]:
    assert not (orders["left_pickup_at"] <= orders[column]).any()

Rarely, `pickup_at` is earlier than or equal to `dispatch_at`, `courier_notified_at`, or `courier_accepted_at`. They are discarded.

In [131]:
msk = orders["pickup_at"] <= orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] = pd.NaT
orders.loc[msk, "dispatch_at_corrected"] = True

assert msk.sum() == 15

In [132]:
msk = orders["pickup_at"] <= orders["courier_notified_at"]
orders.loc[msk, "courier_notified_at"] = pd.NaT
orders.loc[msk, "courier_notified_at_corrected"] = True

assert msk.sum() == 8
assert set(orders.loc[msk, "status"].unique()) == set(["cancelled"])

In [133]:
msk = orders["pickup_at"] <= orders["courier_accepted_at"]
orders.loc[msk, "courier_accepted_at"] = pd.NaT
orders.loc[msk, "courier_accepted_at_corrected"] = True

assert msk.sum() == 15

In [134]:
for column in ["dispatch_at", "courier_notified_at", "courier_accepted_at"]:
    assert not (orders["pickup_at"] <= orders[column]).any()

For about 66.000 orders `courier_accepted_at` equals `dispatch_at` or lies before it. We assume the former is correct and discard the latter.

In [135]:
msk = orders["courier_accepted_at"] <= orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] = pd.NaT
orders.loc[msk, "dispatch_at_corrected"] = True

assert msk.sum() == 65_848

If `courier_accepted_at` is equal or before `courier_notified_at`, we discard the latter.

In [136]:
msk = orders["courier_accepted_at"] <= orders["courier_notified_at"]
orders.loc[msk, "courier_notified_at"] = pd.NaT
orders.loc[msk, "courier_notified_at_corrected"] = True

assert msk.sum() == 165_585

In [137]:
for column in ["dispatch_at", "courier_notified_at"]:
    assert not (orders["courier_accepted_at"] <= orders[column]).any()

For some more orders, `courier_notified_at` lies before `dispatch_at`. Manual analysis reveals that in most of these cases, the courier did not hit "accept". We discard `dispatch_at` as the timings between `courier_notified_at` and `courier_accepted_at` fit the issue messages.

In [138]:
msk = orders["courier_notified_at"] <= orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] = pd.NaT
orders.loc[msk, "dispatch_at_corrected"] = True

assert msk.sum() == 3_397

Ad-hoc orders that were placed before 11 in the morning and after 23 in the evening are discarded. Most of them were cancelled anyways.

In [139]:
msk = (orders["ad_hoc"] == True) & (
    (orders["placed_at"].dt.hour <= 10) | (orders["placed_at"].dt.hour >= 23)
)
orders = orders[~msk]

assert msk.sum() == 337

The orders scheduled for 11:15 (=1) and 11:30 (=37) are scheduled for 11:45. Most of them were not delivered until 12 anyways. This is in line with the 30-minute minimum horizon above.

In [140]:
msk = (orders["scheduled_delivery_at"].dt.hour == 11) & (
    orders["scheduled_delivery_at"].dt.minute == 30
)
orders.loc[msk, "scheduled_delivery_at"] += datetime.timedelta(minutes=15)
orders.loc[msk, "scheduled_delivery_at_corrected"] = True

assert msk.sum() == 37

msk = (orders["scheduled_delivery_at"].dt.hour == 11) & (
    orders["scheduled_delivery_at"].dt.minute == 15
)
orders.loc[msk, "scheduled_delivery_at"] += datetime.timedelta(minutes=30)
orders.loc[msk, "scheduled_delivery_at_corrected"] = True

assert msk.sum() == 1

assert not (
    (orders["scheduled_delivery_at"].dt.hour == 11)
    & (orders["scheduled_delivery_at"].dt.minute == 0)
).any()

Orders with a scheduled delivery before 11 in the morning and after 23 in the evening are discarded.

In [141]:
msk = (orders["ad_hoc"] == False) & (
    (orders["scheduled_delivery_at"].dt.hour <= 10)
    | (orders["scheduled_delivery_at"].dt.hour >= 23)
)
orders = orders[~msk]

assert msk.sum() == 159

In [142]:
ad_hoc = orders["ad_hoc"] == True
scheduled = ~ad_hoc

### Order Stati

There are only cancelled and completed orders. We replace the `status` column with a boolean `cancelled` column.

In [143]:
assert set(orders["status"].unique()) == set(["cancelled", "completed"])

In [144]:
orders["cancelled"] = False
msk = orders["status"] == "cancelled"
orders.loc[msk, "cancelled"] = True
del orders["status"]

assert msk.sum() == 23_552

Some cancelled orders still have a `delivery_at` value. All of them have a dummy value for the `cancelled_at` value (cf., below). For roughly two thirds of them, the time between pickup and delivery is so small that it seems unrealistic that they actually were delivered. In these cases, we take `delivery_at` as the realistic `cancelled_at` value. The ones that could have been delivered realistically are treated as completed orders.

In [145]:
claimed_to_be_delivered = (orders["cancelled"] == True) & orders[
    "delivery_at"
].notnull()

assert (
    orders.loc[claimed_to_be_delivered, "cancelled_at"].min()
    == orders.loc[claimed_to_be_delivered, "cancelled_at"].max()
    == datetime.datetime(2016, 10, 18, 9, 52, 45)
)

realistically_delivered = (
    orders["delivery_at"] - orders["pickup_at"]
).dt.total_seconds() > 120
msk = claimed_to_be_delivered & realistically_delivered
orders.loc[msk, "cancelled"] = False
orders.loc[msk, "cancelled_at"] = pd.NaT
orders.loc[msk, "cancelled_at_corrected"] = True
msk = claimed_to_be_delivered & ~realistically_delivered
orders.loc[msk, "cancelled_at"] = orders.loc[msk, "delivery_at"]
orders.loc[msk, "cancelled_at_corrected"] = True
orders.loc[msk, "delivery_at"] = pd.NaT
orders.loc[msk, "delivery_at_corrected"] = pd.NA

assert claimed_to_be_delivered.sum() == 159
assert (claimed_to_be_delivered & realistically_delivered).sum() == 61
assert (claimed_to_be_delivered & ~realistically_delivered).sum() == 98

Only cancelled orders have a `cancelled_at` value.

In [146]:
cancelled = orders["cancelled"] == True
completed = orders["cancelled"] == False

assert not orders.loc[cancelled, "cancelled_at"].isnull().any()
assert not orders.loc[completed, "cancelled_at"].notnull().any()

### Cancelled Orders

For about 40% of the orders the `cancelled_at` field was only filled in after a system change on October 18 (i.e., in a batch). For these orders, this field is not meaningful because of that. We discard it.

In [147]:
batch = orders["cancelled_at"] == datetime.datetime(2016, 10, 18, 9, 52, 45)
orders.loc[cancelled & batch, "cancelled_at"] = pd.NaT
orders.loc[cancelled & batch, "cancelled_at_corrected"] = True

assert (cancelled & batch).sum() == 9_410

When a restaurant was notified about an order after the order was cancelled, we discard `restaurant_notified_at` and `restaurant_confirmed_at`.

In [148]:
msk = orders["cancelled_at"] <= orders["restaurant_notified_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True

assert msk.sum() == 6

In [149]:
msk = orders["cancelled_at"] <= orders["restaurant_confirmed_at"]
orders.loc[msk, "restaurant_confirmed_at"] = pd.NaT
orders.loc[msk, "restaurant_confirmed_at_corrected"] = True

assert msk.sum() == 1_253

When an order was dispatched in the moment it was cancelled, we adjust that.

In [150]:
msk = orders["cancelled_at"] == orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "dispatch_at_corrected"] = True

assert msk.sum() == 3

When a courier was notified about or accepted an order in the moment it was cancelled, we adjust that.

In [151]:
msk = orders["cancelled_at"] == orders["courier_notified_at"]
orders.loc[msk, "courier_notified_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "courier_notified_at_corrected"] = True

assert msk.sum() == 1

In [152]:
msk = orders["cancelled_at"] == orders["courier_accepted_at"]
orders.loc[msk, "courier_accepted_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "courier_accepted_at_corrected"] = True

assert msk.sum() == 8

When a courier picked up an order in the moment it was cancelled, we adjust that.

In [153]:
msk = orders["cancelled_at"] == orders["pickup_at"]
orders.loc[msk, "pickup_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "pickup_at_corrected"] = True

assert msk.sum() == 1

Verify that `cancelled_at` is indeed the latest timestamp in every row.

In [154]:
orders["_max_datetime"] = pd.NaT
orders["_max_datetime"] = orders[
    [
        "restaurant_notified_at",
        "restaurant_confirmed_at",
        "dispatch_at",
        "courier_notified_at",
        "courier_accepted_at",
        "pickup_at",
        "left_pickup_at",
        "delivery_at",
        "cancelled_at",
    ]
].max(axis=1)

assert not (
    cancelled & ~batch & (orders["cancelled_at"] != orders["_max_datetime"])
).any()

del orders["_max_datetime"]

### Timings

The times in between the timestamps can be used to obtain timings of individual steps in the delivery process. In the original database, such timings were already logged. In the following, we validate the timestamps against the timings and only keep the timestamps as the timings are then calculated as `@property`s in the ORM layer.

`confirmed_total_time` is the difference between `placed_at` and `delivery_at`. It is set only for completed orders and useful only for ad_hoc orders. The `Order` class has a `total_time` property that computes that value.

In [155]:
all_data_available = (
    orders["logged_confirmed_total_time"].notnull()
    & orders["delivery_at"].notnull()
    & orders["placed_at"].notnull()
)
good_data = (
    orders["logged_confirmed_total_time"]
    - ((orders["delivery_at"] - orders["placed_at"]).dt.total_seconds().round())
).abs() <= 5
del orders["logged_confirmed_total_time"]

assert (all_data_available & good_data).sum() == 635_768
assert (all_data_available & good_data & completed).sum() == 635_768
assert (all_data_available & good_data & ad_hoc).sum() == 561_340

round(
    (all_data_available & good_data & ad_hoc).sum()
    / (all_data_available & ad_hoc).sum(),
    3,
)

0.998

The best guess for `accepting_time` is the difference between `dispatch_at` and `courier_accepted_at`. `Order.time_to_accept` models that.

In [156]:
all_data_available = (
    orders["logged_accepting_time"].notnull()
    & orders["courier_accepted_at"].notnull()
    & orders["dispatch_at"].notnull()
)
good_data = (
    orders["logged_accepting_time"]
    - (
        (orders["courier_accepted_at"] - orders["dispatch_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

assert (all_data_available & good_data).sum() == 345_803
assert (all_data_available & good_data & completed).sum() == 345_803

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

0.607

We use `accepting_time` to extrapolate missing values for `dispatch_at`.

In [157]:
extrapolate = (
    orders["dispatch_at"].isnull()
    & orders["courier_accepted_at"].notnull()
    & orders["logged_accepting_time"].notnull()
)

accept_time = orders["logged_accepting_time"].map(
    lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
extrapolated_dispatch_at = orders["courier_accepted_at"] - accept_time
still_wrong = extrapolated_dispatch_at >= orders["courier_notified_at"]
msk = extrapolate & ~still_wrong
orders.loc[msk, "dispatch_at"] = extrapolated_dispatch_at.loc[msk]
orders.loc[msk, "dispatch_at_corrected"] = True
del orders["logged_accepting_time"]

assert extrapolate.sum() == 67_372
assert (extrapolate & ~still_wrong).sum() == 61_545

The best guess for `reaction_time` is the difference between `courier_notified_at` and `courier_accepted_at`. `Order.time_to_react` models that in the ORM.

In [158]:
all_data_available = (
    orders["logged_reaction_time"].notnull()
    & orders["courier_accepted_at"].notnull()
    & orders["courier_notified_at"].notnull()
)
good_data = (
    orders["logged_reaction_time"]
    - (
        (orders["courier_accepted_at"] - orders["courier_notified_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

assert (all_data_available & good_data).sum() == 165_355
assert (all_data_available & good_data & completed).sum() == 165_355

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

0.843

We use `reaction_time` to extrapolate missing values for `courier_notified_at`.

In [159]:
extrapolate = (
    orders["courier_notified_at"].isnull()
    & orders["courier_accepted_at"].notnull()
    & orders["logged_reaction_time"].notnull()
)
extrapolated_courier_notified_at = (
    orders["courier_accepted_at"]
    # Some values for logged_reaction_time are <= 0.
    - orders["logged_reaction_time"].map(
        lambda x: datetime.timedelta(seconds=x) if x is not pd.NA and x > 0 else pd.NaT
    )
)
still_wrong = extrapolated_courier_notified_at <= orders["dispatch_at"]
msk = extrapolate & ~still_wrong
orders.loc[msk, "courier_notified_at"] = extrapolated_courier_notified_at.loc[msk]
orders.loc[msk, "courier_notified_at_corrected"] = True

assert extrapolate.sum() == 214_043
assert (extrapolate & ~still_wrong).sum() == 213_290

No need to extrapolate `courier_accepted_at` from `courier_notified_at`.

In [160]:
assert not (
    orders["courier_notified_at"].notnull()
    & orders["courier_accepted_at"].isnull()
    & orders["logged_reaction_time"].notnull()
).any()

del orders["logged_reaction_time"]

`estimated_prep_duration` equals `expected_wait_pickup_time`. As the latter is not filled in for cancelled orders, we keep the former.
Also, `estimated_prep_duration` is only filled in starting with May 24. It is always a multiple of `60`, so it is stored as full minutes.

In [161]:
all_data_available = (
    orders["estimated_prep_duration"].notnull()
    & orders["expected_wait_pickup_time"].notnull()
)
good_data = (
    orders["estimated_prep_duration"] - orders["expected_wait_pickup_time"]
).abs() <= 5

no_duration = orders["estimated_prep_duration"].isnull()

assert not (no_duration & orders["expected_wait_pickup_time"].notnull()).any()
assert (~no_duration & orders["expected_wait_pickup_time"].isnull()).sum() == 19_865
assert not (
    (orders["placed_at"].dt.date > datetime.date(2016, 5, 24))
    & orders["expected_wait_pickup_time"].isnull()
    & (orders["cancelled"] == False)
).any()

del orders["expected_wait_pickup_time"]

assert orders.loc[no_duration, "placed_at"].min().date() == datetime.date(2016, 2, 21)
assert orders.loc[no_duration, "placed_at"].max().date() == datetime.date(2016, 5, 24)
assert orders.loc[~no_duration, "placed_at"].min().date() == datetime.date(2016, 5, 24)
assert orders.loc[~no_duration, "placed_at"].max().date() == datetime.date(2017, 1, 31)
assert not (~no_duration & (orders["estimated_prep_duration"] % 60 != 0)).any()

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

1.0

`estimated_prep_duration` is the difference between `restaurant_notified_at` and `scheduled_pickup_at` when allowing up to half a minute of clock skew. `restaurant_confirmed_at` only works in about 40% of the cases. So, if and when a restaurant confirms an order, does not affect the dispatching process.

In [162]:
all_data_available = (
    orders["estimated_prep_duration"].notnull()
    & orders["restaurant_notified_at"].notnull()
    & orders["scheduled_pickup_at"].notnull()
)
good_data = (
    orders["estimated_prep_duration"]
    - (
        (orders["scheduled_pickup_at"] - orders["restaurant_notified_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 35

assert (all_data_available & good_data).sum() == 539_668
assert (all_data_available & good_data & completed).sum() == 524_709

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

0.986

We use `estimated_prep_duration` to correct about a third of the 1.5% of `restaurant_notified_at` that are off for orders after May 24.

In [163]:
duration = orders["estimated_prep_duration"].map(
    lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
calc_restaurant_notified_at = orders["scheduled_pickup_at"] - duration

not_wrong = (
    completed
    & (orders["placed_at"] < calc_restaurant_notified_at)
    & (calc_restaurant_notified_at < orders["restaurant_confirmed_at"])
)

msk = all_data_available & ~good_data & not_wrong
orders.loc[msk, "restaurant_notified_at"] = calc_restaurant_notified_at.loc[msk]
orders.loc[msk, "restaurant_notified_at_corrected"] = True

assert (all_data_available & ~good_data).sum() == 7_514
assert msk.sum() == 2_425
assert orders.loc[msk, "placed_at"].min().date() == datetime.date(2016, 5, 24)
assert orders.loc[msk, "placed_at"].max().date() == datetime.date(2017, 1, 31)

Also, we use `estimated_prep_duration` to extrapolate missing `restaurant_notified_at` values for orders after May 24.

In [164]:
duration = orders["estimated_prep_duration"].map(
    lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
extrapolated = orders["scheduled_pickup_at"] - duration

extrapolate = (
    orders["restaurant_notified_at"].isnull()
    & orders["scheduled_pickup_at"].notnull()
    & orders["estimated_prep_duration"].notnull()
)
still_wrong = (
    (extrapolated <= orders["placed_at"])
    | (extrapolated >= orders["restaurant_confirmed_at"])
    | (extrapolated >= orders["cancelled_at"])
)

msk = extrapolate & ~still_wrong
orders.loc[msk, "restaurant_notified_at"] = extrapolated.loc[msk]
orders.loc[msk, "restaurant_notified_at_corrected"] = True

assert extrapolate.sum() == 469
assert msk.sum() == 374
assert orders.loc[msk, "placed_at"].min().date() == datetime.date(2016, 5, 29)

Vice versa, we extrapolate `estimated_prep_duration` as the difference of `scheduled_pickup_at` and `restaurant_notified_at` for orders before May 24.

In [165]:
extrapolated = orders["scheduled_pickup_at"] - orders["restaurant_notified_at"]
extrapolated = (extrapolated.dt.total_seconds() // 60 * 60).astype("Int64")

extrapolate = (
    orders["restaurant_notified_at"].notnull()
    & orders["scheduled_pickup_at"].notnull()
    & orders["estimated_prep_duration"].isnull()
)
orders.loc[extrapolate, "estimated_prep_duration"] = extrapolated.loc[extrapolate]
orders.loc[extrapolate, "estimated_prep_duration_corrected"] = True

assert extrapolate.sum() == 108_398
assert orders.loc[extrapolate, "placed_at"].min().date() == datetime.date(2016, 2, 21)
assert orders.loc[extrapolate, "placed_at"].max().date() == datetime.date(2016, 5, 24)

More than 99.9% of the orders with `estimated_prep_duration` set, have this value be under 45 minutes. We view the remaining ones as outliers and adjust them.

In [166]:
more_than_45_mins = orders["estimated_prep_duration"].notnull()
more_than_45_mins &= orders["estimated_prep_duration"] > 45 * 60

orders.loc[more_than_45_mins, "estimated_prep_duration"] = 45 * 60
orders.loc[more_than_45_mins, "estimated_prep_duration_corrected"] = True

assert more_than_45_mins.sum() == 449

round((~more_than_45_mins).sum() / orders["estimated_prep_duration"].notnull().sum(), 5)

0.99973

We create a boolean column `pickup_not_confirmed` out of the text column `courier_no_pickup_confirmed_issue`.

In [167]:
orders["courier_no_pickup_confirmed_issue"].value_counts()

Courier did not hit "Picked up"    35345
Name: courier_no_pickup_confirmed_issue, dtype: Int64

In [168]:
orders["pickup_not_confirmed"] = False

msk = orders["courier_no_pickup_confirmed_issue"].notnull()
orders.loc[msk, "pickup_not_confirmed"] = True

msk = orders["pickup_at"].isnull()
orders.loc[msk, "pickup_not_confirmed"] = pd.NA

del orders["courier_no_pickup_confirmed_issue"]

assert orders["pickup_not_confirmed"].sum() == 34_966

`logged_to_pickup_time` and `logged_pickup_time` constitute the difference between `courier_accepted_at` and `pickup_at`. `logged_pickup_time` is negative in rare cases.

In [169]:
assert not (orders["logged_to_pickup_time"] < 0).any()
assert (orders["logged_pickup_time"] < 0).sum() == 30

all_data_available = (
    orders["logged_to_pickup_time"].notnull()
    & orders["logged_pickup_time"].notnull()
    & (orders["logged_pickup_time"] >= 0)
    & orders["pickup_at"].notnull()
    & orders["courier_accepted_at"].notnull()
)
good_data = (
    orders["logged_to_pickup_time"]
    + orders["logged_pickup_time"]
    - ((orders["pickup_at"] - orders["courier_accepted_at"]).dt.total_seconds().round())
).abs() <= 5

pickup_not_confirmed = orders["pickup_not_confirmed"] == True

assert (all_data_available & good_data).sum() == 599_195
assert (all_data_available & good_data & completed).sum() == 599_111
assert (all_data_available & (good_data | pickup_not_confirmed)).sum() == 604_483

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

0.94

For the 6% where `pickup_at` does not relate back to `courier_accepted_at`, we correct the former. Unconfirmed pickups seem to not be the cause of these inconsistencies.

In [170]:
calc_pickup_at = (
    orders["courier_accepted_at"]
    + orders["logged_to_pickup_time"].map(
        lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
    )
    + orders["logged_pickup_time"].map(
        lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
    )
)

msk = all_data_available & ~good_data
orders.loc[msk, "pickup_at"] = calc_pickup_at.loc[msk]
orders.loc[msk, "pickup_at_corrected"] = True

assert (all_data_available & ~good_data).sum() == 38_015
assert (all_data_available & ~good_data & pickup_not_confirmed).sum() == 5_288

Keep other timestamps consistent after the correction.

In [171]:
msk = orders["pickup_at"] <= orders["restaurant_notified_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True

assert msk.sum() == 107

In [172]:
msk = orders["pickup_at"] <= orders["restaurant_confirmed_at"]
orders.loc[msk, "restaurant_confirmed_at"] = pd.NaT
orders.loc[msk, "restaurant_confirmed_at_corrected"] = True

assert msk.sum() == 892

With `logged_to_pickup_time` we calculate a new timestamp `reached_pickup_at`.

In [173]:
to_pickup_time = orders["logged_to_pickup_time"].map(
    lambda x: datetime.timedelta(seconds=x) if x is not pd.NA and x > 0 else pd.NaT
)
reached_pickup_at = orders["courier_accepted_at"] + to_pickup_time

orders["reached_pickup_at"] = pd.NaT
msk = (
    completed & reached_pickup_at.notnull() & (reached_pickup_at < orders["pickup_at"])
)
orders.loc[msk, "reached_pickup_at"] = reached_pickup_at.loc[msk]

assert msk.sum() == 530_724

`logged_courier_late_time` and `logged_restaurant_late_time` are always set together. The ca. 110,000 missing values are spread over the entire horizon.

In [174]:
assert not (
    (
        orders["logged_courier_late_time"].notnull()
        & orders["logged_restaurant_late_time"].isnull()
    )
    | (
        orders["logged_courier_late_time"].isnull()
        & orders["logged_restaurant_late_time"].notnull()
    )
).any()

assert orders.loc[
    orders["logged_courier_late_time"].isnull(), "placed_at"
].min().date() == datetime.date(2016, 2, 22)

assert orders.loc[
    orders["logged_courier_late_time"].isnull(), "placed_at"
].max().date() == datetime.date(2017, 1, 31)

assert orders.loc[
    orders["logged_courier_late_time"].notnull(), "placed_at"
].min().date() == datetime.date(2016, 2, 21)

assert orders.loc[
    orders["logged_courier_late_time"].notnull(), "placed_at"
].max().date() == datetime.date(2017, 1, 31)

`logged_courier_late_time` is mostly explained with `reached_pickup_at` and `scheduled_pickup_at`. `Order.courier_early` and `Order.courier_late` model that in the ORM.

In [175]:
all_data_available = (
    orders["logged_courier_late_time"].notnull()
    & orders["reached_pickup_at"].notnull()
    & orders["scheduled_pickup_at"].notnull()
)
good_data = (
    orders["logged_courier_late_time"]
    - (
        (orders["reached_pickup_at"] - orders["scheduled_pickup_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

assert (all_data_available & good_data).sum() == 471_553
assert (all_data_available & good_data & completed).sum() == 471_553

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

0.964

`logged_restaurant_late_time` is mostly explained with `pickup_at` and `scheduled_pickup_at`. `logged_restaurant_late_time` is also `0` quite often, indicating no timing was taken. `Order.restaurant_early` and `Order.restaurant_late` model that in the ORM.

In [176]:
all_data_available = (
    orders["logged_restaurant_late_time"].notnull()
    & orders["pickup_at"].notnull()
    & orders["scheduled_pickup_at"].notnull()
)
good_data = (
    orders["logged_restaurant_late_time"]
    - ((orders["pickup_at"] - orders["scheduled_pickup_at"]).dt.total_seconds().round())
).abs() <= 5

restaurant_not_timed = orders["logged_restaurant_late_time"] == 0

assert (all_data_available).sum() == 503_179
assert (all_data_available & good_data).sum() == 245_714
assert (all_data_available & restaurant_not_timed).sum() == 246_362
assert (all_data_available & (good_data | restaurant_not_timed)).sum() == 488_512

restaurant_timed = orders["logged_restaurant_late_time"] != 0

round(
    (all_data_available & restaurant_timed & good_data).sum()
    / (all_data_available & restaurant_timed).sum(),
    3,
)

0.943

`logged_wait_pickup_time` is unfortunately not a good timing to extrapolate when a meal was ready to picked up by the courier. It is only good to explain the difference between `reached_pickup_at` and `left_pickup_at`, which is not really the time the courier had to wait. Also, the field seems to only be tracked correctly if the courier was late.

In [177]:
all_data_available = (
    (orders["logged_wait_pickup_time"]).notnull()
    & orders["reached_pickup_at"].notnull()
    & orders["left_pickup_at"].notnull()
)
good_data = (
    orders["logged_wait_pickup_time"]
    - (
        (orders["left_pickup_at"] - orders["reached_pickup_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

round(
    (all_data_available & good_data).sum() / (all_data_available).sum(), 3,
)

0.396

In [178]:
all_data_available = (
    (orders["logged_wait_pickup_time"]).notnull()
    & (orders["logged_courier_late_time"] >= 0)
    & orders["reached_pickup_at"].notnull()
    & orders["left_pickup_at"].notnull()
)
good_data = (
    orders["logged_wait_pickup_time"]
    - (
        (orders["left_pickup_at"] - orders["reached_pickup_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

round(
    (all_data_available & good_data).sum() / (all_data_available).sum(), 3,
)

1.0

In [179]:
del orders["logged_courier_late_time"]
del orders["logged_restaurant_late_time"]
del orders["logged_wait_pickup_time"]

We create a boolean column `delivery_not_confirmed` out of the text column `courier_no_pickup_confirmed_issue`.

In [180]:
orders["courier_no_delivery_confirmed_issue"].value_counts()

Courier did not hit "Dropped off"    13862
Name: courier_no_delivery_confirmed_issue, dtype: Int64

In [181]:
orders["delivery_not_confirmed"] = False

msk = orders["courier_no_delivery_confirmed_issue"].notnull()
orders.loc[msk, "delivery_not_confirmed"] = True

msk = orders["delivery_at"].isnull()
orders.loc[msk, "delivery_not_confirmed"] = pd.NA

del orders["courier_no_delivery_confirmed_issue"]

assert orders["delivery_not_confirmed"].sum() == 13_817

`logged_to_delivery_time` and `logged_delivery_time` constitute the difference between `pickup_at` and `delivery_at`. Without the `pickup_at` corrections above, not 91% but only 86% of the differences would work. `Order.time_to_delivery` and `Order.time_at_delivery` model that in the ORM.

In [182]:
assert not (orders["logged_to_delivery_time"] < 0).any()
assert not (orders["logged_delivery_time"] < 0).any()

all_data_available = (
    orders["logged_to_delivery_time"].notnull()
    & orders["logged_delivery_time"].notnull()
    & orders["delivery_at"].notnull()
    & orders["pickup_at"].notnull()
)
good_data = (
    orders["logged_to_delivery_time"]
    + orders["logged_delivery_time"]
    - ((orders["delivery_at"] - orders["pickup_at"]).dt.total_seconds().round())
).abs() <= 5

delivery_not_confirmed = orders["delivery_not_confirmed"] == True

assert (all_data_available & good_data).sum() == 572_609
assert (all_data_available & good_data & completed).sum() == 572_609
assert (all_data_available & (good_data | delivery_not_confirmed)).sum() == 581_700

round(
    (all_data_available & (good_data | delivery_not_confirmed)).sum()
    / all_data_available.sum(),
    3,
)

0.913

`courier_waited_at_delivery_issue` is filled in whenever the courier needed to wait for the customer at delivery. It is also filled in if the courier forgot to confirm the delivery, which mostly happened at the end of a shift. If a courier needed to wait for more than 45 minutes, that is summarized as 'waiting about 1, 2, or 3 hours.'

In [183]:
orders["courier_waited_at_delivery_issue"].value_counts().sort_index()

Waiting at Dropoff                   1597
Waiting at Dropoff: 10 minutes       4132
Waiting at Dropoff: 11 minutes       3151
Waiting at Dropoff: 12 minutes       2348
Waiting at Dropoff: 13 minutes       1775
Waiting at Dropoff: 14 minutes       1432
Waiting at Dropoff: 15 minutes       1199
Waiting at Dropoff: 16 minutes        941
Waiting at Dropoff: 17 minutes        826
Waiting at Dropoff: 18 minutes        720
Waiting at Dropoff: 19 minutes        594
Waiting at Dropoff: 20 minutes        522
Waiting at Dropoff: 21 minutes        431
Waiting at Dropoff: 22 minutes        415
Waiting at Dropoff: 23 minutes        284
Waiting at Dropoff: 24 minutes        281
Waiting at Dropoff: 25 minutes        215
Waiting at Dropoff: 26 minutes        193
Waiting at Dropoff: 27 minutes        167
Waiting at Dropoff: 28 minutes        162
Waiting at Dropoff: 29 minutes        118
Waiting at Dropoff: 30 minutes        109
Waiting at Dropoff: 31 minutes         97
Waiting at Dropoff: 32 minutes    

We convert `courier_waited_at_delivery_issue` into `courier_waited_at_delivery` to validate it further below.

In [184]:
waited_at_delivery = (
    orders["courier_waited_at_delivery_issue"]
    .str.replace(r"\D+", "", regex=True)
    .fillna("NaN")
    .replace("", "NaN")
    .astype(float)
    .astype("Int64")
)

orders["courier_waited_at_delivery"] = pd.NA
orders["courier_waited_at_delivery"] = orders["courier_waited_at_delivery"].astype(
    "Int64"
)

hours = orders["courier_waited_at_delivery_issue"].str.contains("hour").fillna(0)
orders.loc[hours, "courier_waited_at_delivery"] = (
    60 * 60 * waited_at_delivery.loc[hours]
)

mins = orders["courier_waited_at_delivery_issue"].str.contains("minutes").fillna(0)
orders.loc[mins, "courier_waited_at_delivery"] = 60 * waited_at_delivery.loc[mins]

customer_late = orders["courier_waited_at_delivery_issue"].notnull()

del orders["courier_waited_at_delivery_issue"]

assert hours.sum() == 254
assert mins.sum() == 30_512

For the roughly 9% of orders where `logged_to_delivery_time` and `logged_delivery_time` do not explain `delivery_at`, the latter is corrected. However, this is only done if the courier did not have to wait for the customer or forgot to confirm the delivery, which wrongly shows up as waiting for the customer as well.

In [185]:
calc_delivery_at = (
    orders["pickup_at"]
    + orders["logged_to_delivery_time"].map(
        lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
    )
    + orders["logged_delivery_time"].map(
        lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
    )
)

del orders["logged_delivery_time"]

orders["delivery_at_orig"] = orders["delivery_at"]

msk = (
    all_data_available
    & ~good_data
    & (~customer_late | (customer_late & delivery_not_confirmed))
)
orders.loc[msk, "delivery_at"] = calc_delivery_at.loc[msk]
orders.loc[msk, "delivery_at_corrected"] = True

assert (all_data_available & ~good_data).sum() == 64_543
assert (all_data_available & ~good_data & ~customer_late).sum() == 49_122

assert (
    all_data_available & ~good_data & customer_late & delivery_not_confirmed
).sum() == 5_241

assert (all_data_available & ~good_data & delivery_not_confirmed).sum() == 9_091

With `logged_to_delivery_time` we calculate a new timestamp `reached_delivery_at`.

In [186]:
to_delivery_time = orders["logged_to_delivery_time"].map(
    lambda x: datetime.timedelta(seconds=x) if x is not pd.NA and x > 0 else pd.NaT
)
reached_delivery_at = orders["pickup_at"] + to_delivery_time

del orders["logged_to_delivery_time"]

orders["reached_delivery_at"] = pd.NaT

msk = (
    completed
    & reached_delivery_at.notnull()
    & (reached_delivery_at < orders["delivery_at"])
)
orders.loc[msk, "reached_delivery_at"] = reached_delivery_at.loc[msk]

assert msk.sum() == 608_160

Some `left_pickup_at` values conflict with that and are discarded.

In [187]:
msk = orders["left_pickup_at"] >= orders["reached_delivery_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True

assert msk.sum() == 4_215

`logged_delivery_late_time` is the difference between `scheduled_delivery_at` and `delivery_at` for pre-orders. `Order.delivery_early` and `Order.delivery_late` model that in the ORM.

In [188]:
all_data_available = (
    scheduled
    & orders["logged_delivery_late_time"].notnull()
    & orders["delivery_at"].notnull()
    & orders["scheduled_delivery_at"].notnull()
)
good_data = (
    orders["logged_delivery_late_time"]
    - (
        (orders["delivery_at"] - orders["scheduled_delivery_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

assert (all_data_available & good_data).sum() == 73_658
assert (all_data_available & good_data & completed).sum() == 73_658
assert (all_data_available & (good_data | delivery_not_confirmed)).sum() == 73_659

round(
    (all_data_available & (good_data | delivery_not_confirmed)).sum()
    / all_data_available.sum(),
    3,
)

0.986

`expected_delivery_time` is simply the difference between `placed_at` and `scheduled_delivery_at`, for both ad-hoc and pre-orders. So, the field provides no new information.

In [189]:
all_data_available = (
    orders["expected_delivery_time"].notnull()
    & orders["placed_at"].notnull()
    & orders["scheduled_delivery_at"].notnull()
)
good_data = (
    orders["expected_delivery_time"]
    - (
        (orders["scheduled_delivery_at"] - orders["placed_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 5

del orders["expected_delivery_time"]

assert (all_data_available & good_data).sum() == 74_386

round((all_data_available & good_data).sum() / all_data_available.sum(), 3)

0.996

`courier_waited_at_delivery` can be mostly explained as the difference between `reached_delivery_at` and `delivery_at`. `Order.courier_waited_at_delivery` models that in the ORM.

In [190]:
all_data_available = (
    orders["courier_waited_at_delivery"].notnull()
    & orders["delivery_at"].notnull()
    & orders["reached_delivery_at"].notnull()
)
good_data = (
    orders["courier_waited_at_delivery"]
    - (
        (orders["delivery_at"] - orders["reached_delivery_at"])
        .dt.total_seconds()
        .round()
    )
).abs() <= 90
imprecise_wait_times = orders["courier_waited_at_delivery"].fillna(0) >= 45 * 60

assert (all_data_available & good_data).sum() == 26_268
assert (all_data_available & good_data & completed).sum() == 26_268
assert (all_data_available & (good_data | imprecise_wait_times)).sum() == 26_499

round(
    (all_data_available & (good_data | imprecise_wait_times)).sum()
    / all_data_available.sum(),
    3,
)

0.871

We keep `courier_waited_at_delivery` as a boolean field here to be used by `Order.courier_waited_at_delivery`.

In [191]:
msk = orders["delivery_at"].notnull() & orders["courier_waited_at_delivery"].notnull()
orders["courier_waited_at_delivery"] = pd.NA
orders.loc[orders["delivery_at"].notnull(), "courier_waited_at_delivery"] = False
orders.loc[msk, "courier_waited_at_delivery"] = True

assert orders["courier_waited_at_delivery"].sum() == msk.sum() == 30_658

### Statistical Columns

Keep the columns that log the courier's speed.

In [192]:
orders = orders.rename(
    columns={
        "logged_avg_courier_speed": "logged_avg_speed",
        "logged_avg_courier_speed_distance": "logged_avg_speed_distance",
    }
)

In [193]:
unrealistic = orders["logged_delivery_distance"] > 12_000
orders.loc[unrealistic, "logged_delivery_distance"] = pd.NA

assert unrealistic.sum() == 17

### Clean Data

In [194]:
orders = orders[
    [
        # Generic columns
        "delivery_id",
        "customer_id",
        "placed_at",
        "ad_hoc",
        "scheduled_delivery_at",
        "scheduled_delivery_at_corrected",
        "first_estimated_delivery_at",
        "cancelled",
        "cancelled_at",
        "cancelled_at_corrected",
        # Price related columns
        "sub_total",
        "delivery_fee",
        "total",
        # Restaurant related columns
        "restaurant_id",
        "restaurant_notified_at",
        "restaurant_notified_at_corrected",
        "restaurant_confirmed_at",
        "restaurant_confirmed_at_corrected",
        "estimated_prep_duration",
        "estimated_prep_duration_corrected",
        "estimated_prep_buffer",
        # Dispatch related columns
        "courier_id",
        "dispatch_at",
        "dispatch_at_corrected",
        "courier_notified_at",
        "courier_notified_at_corrected",
        "courier_accepted_at",
        "courier_accepted_at_corrected",
        "utilization",
        # Pickup related columns
        "pickup_address_id",
        "reached_pickup_at",
        "pickup_at",
        "pickup_at_corrected",
        "pickup_not_confirmed",
        "left_pickup_at",
        "left_pickup_at_corrected",
        # Delivery related columns
        "delivery_address_id",
        "reached_delivery_at",
        "delivery_at",
        "delivery_at_corrected",
        "delivery_not_confirmed",
        "courier_waited_at_delivery",
        # Statistical columns
        "logged_delivery_distance",
        "logged_avg_speed",
        "logged_avg_speed_distance",
    ]
].sort_index()

In [195]:
orders.head()

Unnamed: 0_level_0,delivery_id,customer_id,placed_at,ad_hoc,scheduled_delivery_at,scheduled_delivery_at_corrected,first_estimated_delivery_at,cancelled,cancelled_at,cancelled_at_corrected,sub_total,delivery_fee,total,restaurant_id,restaurant_notified_at,restaurant_notified_at_corrected,restaurant_confirmed_at,restaurant_confirmed_at_corrected,estimated_prep_duration,estimated_prep_duration_corrected,estimated_prep_buffer,courier_id,dispatch_at,dispatch_at_corrected,courier_notified_at,courier_notified_at_corrected,courier_accepted_at,courier_accepted_at_corrected,utilization,pickup_address_id,reached_pickup_at,pickup_at,pickup_at_corrected,pickup_not_confirmed,left_pickup_at,left_pickup_at_corrected,delivery_address_id,reached_delivery_at,delivery_at,delivery_at_corrected,delivery_not_confirmed,courier_waited_at_delivery,logged_delivery_distance,logged_avg_speed,logged_avg_speed_distance
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1
1,487377,1,2016-02-22 10:42:01,False,2016-02-22 12:30:00,False,NaT,True,NaT,True,1250,250,1500,1,NaT,,NaT,,,,0,,NaT,,NaT,,NaT,,15,2,NaT,NaT,,,NaT,,3,NaT,NaT,,,,671,,
2,487433,2,2016-02-22 10:40:09,False,2016-02-22 12:00:00,False,2016-02-23 23:55:23,False,NaT,,1550,250,1800,2,2016-02-22 11:31:03,False,NaT,True,600.0,True,0,96.0,2016-02-22 10:48:24,True,2016-02-22 11:36:04,False,2016-02-22 11:37:00,False,19,4,2016-02-22 11:40:07,2016-02-22 11:42:12,True,True,NaT,,5,2016-02-22 12:11:17,2016-02-22 12:15:41,True,False,False,2281,,
3,487444,3,2016-02-22 10:56:16,False,2016-02-22 12:00:00,False,2016-02-23 23:25:47,False,NaT,,2000,250,2250,3,2016-02-22 11:31:02,False,2016-02-22 11:42:00,False,600.0,True,0,57.0,2016-02-22 11:31:02,True,2016-02-22 11:31:03,False,2016-02-22 11:31:30,False,25,6,2016-02-22 11:54:31,2016-02-22 11:54:32,False,False,NaT,,7,2016-02-22 12:03:14,2016-02-22 12:07:44,True,False,False,2449,,
4,470503,4,2016-02-22 11:11:46,False,2016-02-22 12:30:00,False,2016-02-22 12:25:50,False,NaT,,9800,250,10050,4,2016-02-22 11:32:04,False,2016-02-22 11:48:01,False,2700.0,True,0,39.0,NaT,True,2016-02-22 11:50:03,False,2016-02-22 11:50:29,False,100,8,2016-02-22 11:57:48,2016-02-22 12:11:26,False,False,NaT,,9,2016-02-22 12:12:53,2016-02-22 12:18:43,False,False,False,196,,
5,487439,5,2016-02-22 11:35:58,False,2016-02-22 12:30:00,False,2016-02-22 12:28:19,False,NaT,,3600,250,3850,5,2016-02-22 11:50:03,False,2016-02-22 11:50:19,False,1200.0,True,0,128.0,NaT,True,2016-02-22 11:50:04,False,2016-02-22 11:50:28,False,45,10,2016-02-22 11:55:12,2016-02-22 12:05:19,False,False,NaT,,11,2016-02-22 12:16:53,2016-02-22 12:20:44,False,False,False,2584,,


In [196]:
orders.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 660658 entries, 1 to 688690
Data columns (total 45 columns):
 #   Column                             Non-Null Count   Dtype         
---  ------                             --------------   -----         
 0   delivery_id                        660658 non-null  int64         
 1   customer_id                        660658 non-null  int64         
 2   placed_at                          660658 non-null  datetime64[ns]
 3   ad_hoc                             660658 non-null  bool          
 4   scheduled_delivery_at              80121 non-null   datetime64[ns]
 5   scheduled_delivery_at_corrected    83398 non-null   object        
 6   first_estimated_delivery_at        657168 non-null  datetime64[ns]
 7   cancelled                          660658 non-null  bool          
 8   cancelled_at                       14081 non-null   datetime64[ns]
 9   cancelled_at_corrected             23552 non-null   object        
 10  sub_total           

In [197]:
for column in orders.columns:
    if column.endswith("corrected"):
        print(column, (orders[column] == True).sum())

scheduled_delivery_at_corrected 3321
cancelled_at_corrected 9569
restaurant_notified_at_corrected 2862
restaurant_confirmed_at_corrected 4128
estimated_prep_duration_corrected 108591
dispatch_at_corrected 69241
courier_notified_at_corrected 216869
courier_accepted_at_corrected 23
pickup_at_corrected 38032
left_pickup_at_corrected 18739
delivery_at_corrected 54363


In [198]:
assert (
    hashlib.sha256(orders.to_json().encode()).hexdigest()
    == "c548084f094bd220f3aff7e9b7072a4964127f6962dffd54f21c8d1f5b846a7f"
)

All couriers had at least one order.

In [199]:
assert set(couriers.reset_index()["id"]) == set(
    orders.loc[orders["courier_id"].notnull(), "courier_id"].unique()
)

Only keep restaurants that had at least one order.

In [200]:
restaurants = restaurants.reset_index()
msk = restaurants["id"].isin(orders["restaurant_id"].unique())
restaurants = restaurants[msk].set_index("id")

assert (~msk).sum() == 6

Only keep addresses with pickups or deliveries.

In [201]:
addresses = addresses.reset_index()
msk = addresses["id"].isin(
    set(restaurants["address_id"])
    | set(orders["pickup_address_id"])
    | set(orders["delivery_address_id"])
)
addresses = addresses[msk].set_index("id")

assert (~msk).sum() == 100

discarded_addresses = set(addresses["primary_id"]) - set(addresses.reset_index()["id"])
for old_primary_id in set(addresses["primary_id"]) - set(addresses.reset_index()["id"]):
    msk = addresses["primary_id"] == old_primary_id
    new_primary_id = addresses[msk].index.min()
    addresses.loc[msk, "primary_id"] = new_primary_id

## Store the Results

In [202]:
config.CLEAN_SCHEMA

'clean'

In [203]:
cities.to_sql(
    "cities",
    con=connection,
    schema=config.CLEAN_SCHEMA,
    if_exists="append",
    index=True,
)

In [204]:
addresses.to_sql(
    "addresses",
    con=connection,
    schema=config.CLEAN_SCHEMA,
    if_exists="append",
    index=True,
)

In [205]:
restaurants.to_sql(
    "restaurants",
    con=connection,
    schema=config.CLEAN_SCHEMA,
    if_exists="append",
    index=True,
)

In [206]:
couriers.to_sql(
    "couriers",
    con=connection,
    schema=config.CLEAN_SCHEMA,
    if_exists="append",
    index=True,
)

In [207]:
customers = pd.DataFrame({"id": orders["customer_id"].unique()}).set_index("id")

customers.to_sql(
    "customers",
    con=connection,
    schema=config.CLEAN_SCHEMA,
    if_exists="append",
    index=True,
)

In [208]:
orders.to_sql(
    "orders",
    con=connection,
    schema=config.CLEAN_SCHEMA,
    if_exists="append",
    index=True,
)