Add aggregate_orders() function

- the function queries the database and aggregates the ad-hoc orders
  by pixel and time steps into a demand time series
- implement "heavy" integration tests for `aggregate_orders()`
- make `pandas` a package dependency
- streamline the `Config`
This commit is contained in:
Alexander Hess 2021-01-07 23:18:40 +01:00
parent e8c97dd7da
commit d5b3efbca1
Signed by: alexander
GPG key ID: 344EA5AB10D868E0
10 changed files with 460 additions and 6 deletions

2
poetry.lock generated
View file

@ -1120,7 +1120,7 @@ name = "pandas"
version = "1.1.5"
description = "Powerful data structures for data analysis, time series, and statistics"
category = "main"
optional = true
optional = false
python-versions = ">=3.6.1"
[package.dependencies]

View file

@ -31,6 +31,7 @@ python = "^3.8"
Shapely = "^1.7.1"
alembic = "^1.4.2"
click = "^7.1.2"
pandas = "^1.1.0"
psycopg2 = "^2.8.5" # adapter for PostgreSQL
sqlalchemy = "^1.3.18"
utm = "^0.7.0"
@ -40,7 +41,6 @@ utm = "^0.7.0"
jupyterlab = { version="^2.2.2", optional=true }
nb_black = { version="^1.0.7", optional=true }
numpy = { version="^1.19.1", optional=true }
pandas = { version="^1.1.0", optional=true }
pytz = { version="^2020.1", optional=true }
[tool.poetry.extras]
@ -48,7 +48,6 @@ research = [
"jupyterlab",
"nb_black",
"numpy",
"pandas",
"pytz",
]

View file

@ -138,6 +138,12 @@ per-file-ignores =
src/urban_meal_delivery/db/utils/__init__.py:
# Top-level of a sub-packages is intended to import a lot.
F401,
src/urban_meal_delivery/forecasts/__init__.py:
# Top-level of a sub-packages is intended to import a lot.
F401,
src/urban_meal_delivery/forecasts/timify.py:
# No SQL injection as the inputs come from a safe source.
S608,
tests/*.py:
# Type annotations are not strictly enforced.
ANN0, ANN2,
@ -245,6 +251,8 @@ cache_dir = .cache/mypy
ignore_missing_imports = true
[mypy-packaging]
ignore_missing_imports = true
[mypy-pandas]
ignore_missing_imports = true
[mypy-pytest]
ignore_missing_imports = true
[mypy-sqlalchemy.*]

View file

@ -26,14 +26,38 @@ def random_schema_name() -> str:
class Config:
"""Configuration that applies in all situations."""
# Application-specific settings
# -----------------------------
# Date after which the real-life data is discarded.
CUTOFF_DAY = datetime.datetime(2017, 2, 1)
# If a scheduled pre-order is made within this
# time horizon, we treat it as an ad-hoc order.
QUASI_AD_HOC_LIMIT = datetime.timedelta(minutes=45)
# Operating hours of the platform.
SERVICE_START = 11
SERVICE_END = 23
# Side lengths (in meters) for which pixel grids are created.
# They are the basis for the aggregated demand forecasts.
GRID_SIDE_LENGTHS = [707, 1000, 1414]
# Time steps (in minutes) used to aggregate the
# individual orders into time series.
TIME_STEPS = [60]
# Training horizons (in full weeks) used
# to train the forecasting models.
TRAINING_HORIZONS = [8]
# The demand forecasting methods used in the simulations.
FORECASTING_METHODS = ['hets', 'rtarima']
# Implementation-specific settings
# --------------------------------
DATABASE_URI = os.getenv('DATABASE_URI')
# The PostgreSQL schema that holds the tables with the original data.

View file

@ -43,7 +43,7 @@ class Grid(meta.Base):
def __repr__(self) -> str:
"""Non-literal text representation."""
return '<{cls}: {area}>'.format(
return '<{cls}: {area} sqr. km>'.format(
cls=self.__class__.__name__, area=self.pixel_area,
)
@ -51,7 +51,7 @@ class Grid(meta.Base):
@property
def pixel_area(self) -> float:
"""The area of a `Pixel` on the grid in square kilometers."""
return (self.side_length ** 2) / 1_000_000 # noqa:WPS432
return round((self.side_length ** 2) / 1_000_000, 1) # noqa:WPS432
@classmethod
def gridify(cls, city: db.City, side_length: int) -> db.Grid:

View file

@ -0,0 +1,3 @@
"""Demand forecasting utilities."""
from urban_meal_delivery.forecasts import timify

View file

@ -0,0 +1,114 @@
"""Obtain and work with time series data."""
import datetime
import pandas as pd
from urban_meal_delivery import config
from urban_meal_delivery import db
def aggregate_orders(grid: db.Grid, time_step: int) -> pd.DataFrame: # pragma: no cover
"""Obtain a time series of the ad-hoc `Order` totals.
Args:
grid: pixel grid used to aggregate orders spatially
time_step: interval length (in minutes) into which orders are aggregated
Returns:
order_totals: `DataFrame` with a `MultiIndex` of the "pixel_id"s and
beginnings of the intervals (i.e., "start_at"s); the sole column
with data is "total_orders"
"""
# `data` is probably missing "pixel_id"-"start_at" pairs.
# This happens whenever there is no demand in the `Pixel` in the given `time_step`.
data = pd.read_sql_query(
f"""-- # noqa:WPS221
SELECT
pixel_id,
start_at,
COUNT(*) AS total_orders
FROM (
SELECT
pixel_id,
placed_at_without_seconds - minutes_to_be_cut AS start_at
FROM (
SELECT
pixels.pixel_id,
DATE_TRUNC('MINUTE', orders.placed_at) AS placed_at_without_seconds,
((
EXTRACT(MINUTES FROM orders.placed_at)::INTEGER % {time_step}
)::TEXT || ' MINUTES')::INTERVAL
AS minutes_to_be_cut
FROM (
SELECT
id,
placed_at,
pickup_address_id
FROM
{config.CLEAN_SCHEMA}.orders
INNER JOIN (
SELECT
id AS address_id
FROM
{config.CLEAN_SCHEMA}.addresses
WHERE
city_id = {grid.city.id}
) AS in_city
ON orders.pickup_address_id = in_city.address_id
WHERE
ad_hoc IS TRUE
) AS
orders
INNER JOIN (
SELECT
address_id,
pixel_id
FROM
{config.CLEAN_SCHEMA}.addresses_pixels
WHERE
grid_id = {grid.id}
AND
city_id = {grid.city.id} -- city_id is redundant -> sanity check
) AS pixels
ON orders.pickup_address_id = pixels.address_id
) AS placed_at_aggregated_into_start_at
) AS pixel_start_at_combinations
GROUP BY
pixel_id,
start_at
ORDER BY
pixel_id,
start_at;
""",
con=db.connection,
index_col=['pixel_id', 'start_at'],
)
if data.empty:
return data
# Calculate the first and last "start_at" value ...
start_day = data.index.levels[1].min().date()
start = datetime.datetime(
start_day.year, start_day.month, start_day.day, config.SERVICE_START,
)
end_day = data.index.levels[1].max().date()
end = datetime.datetime(
end_day.year, end_day.month, end_day.day, config.SERVICE_END,
)
# ... and all possible `tuple`s of "pixel_id"-"start_at" combinations.
# The "start_at" values must lie within the operating hours.
gen = (
(pixel_id, start_at)
for pixel_id in sorted(data.index.levels[0])
for start_at in pd.date_range(start, end, freq=f'{time_step}T')
if config.SERVICE_START <= start_at.time().hour < config.SERVICE_END
)
# Re-index `data` filling in `0`s where there is no demand.
index = pd.MultiIndex.from_tuples(gen)
index.names = ['pixel_id', 'start_at']
return data.reindex(index, fill_value=0)

View file

@ -19,7 +19,7 @@ class TestSpecialMethods:
"""`Grid` has a non-literal text representation."""
result = repr(grid)
assert result == f'<Grid: {grid.pixel_area}>'
assert result == f'<Grid: {grid.pixel_area} sqr. km>'
@pytest.mark.db

View file

@ -0,0 +1 @@
"""Test the forecasting-related functionality."""

View file

@ -0,0 +1,305 @@
"""Test the time series related code."""
# pylint:disable=no-self-use,unused-argument
import datetime
import pytest
from urban_meal_delivery import db
from urban_meal_delivery.forecasts import timify
YEAR, MONTH, DAY = 2020, 1, 1
@pytest.mark.db
class TestAggregateOrders:
"""Test the `aggregate_orders()` function.
The test cases are all integration tests that model realistic scenarios.
"""
@pytest.fixture
def one_pixel_grid(self, db_session, city, restaurant):
"""A persisted `Grid` with one `Pixel`.
`restaurant` must be a dependency as otherwise
its `.address` is not put into the database.
"""
# `+1` as otherwise there would be a second pixel in one direction.
side_length = max(city.total_x, city.total_y) + 1
grid = db.Grid.gridify(city=city, side_length=side_length)
db_session.add(grid)
assert len(grid.pixels) == 1 # sanity check
return grid
def test_no_orders(self, db_session, one_pixel_grid, restaurant):
"""Edge case that does not occur for real-life data."""
db_session.commit()
assert len(restaurant.orders) == 0 # noqa:WPS507 sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
assert len(result) == 0 # noqa:WPS507
def test_evenly_distributed_ad_hoc_orders(
self, db_session, one_pixel_grid, restaurant, make_order,
):
"""12 ad-hoc orders, one per operating hour."""
# Create one order per hour and 12 orders in total.
for hour in range(11, 23):
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
)
db_session.add(order)
db_session.commit()
assert len(restaurant.orders) == 12 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
# The resulting `DataFrame` has 12 rows holding `1`s.
assert len(result) == 12
assert result['total_orders'].min() == 1
assert result['total_orders'].max() == 1
assert result['total_orders'].sum() == 12
def test_evenly_distributed_ad_hoc_orders_with_no_demand_late( # noqa:WPS218
self, db_session, one_pixel_grid, restaurant, make_order,
):
"""10 ad-hoc orders, one per hour, no orders after 21."""
# Create one order per hour and 10 orders in total.
for hour in range(11, 21):
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
)
db_session.add(order)
db_session.commit()
assert len(restaurant.orders) == 10 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
# Even though there are only 10 orders, there are 12 rows in the `DataFrame`.
# That is so as `0`s are filled in for hours without any demand at the end.
assert len(result) == 12
assert result['total_orders'].min() == 0
assert result['total_orders'].max() == 1
assert result.iloc[:10]['total_orders'].sum() == 10
assert result.iloc[10:]['total_orders'].sum() == 0
def test_one_ad_hoc_order_every_other_hour(
self, db_session, one_pixel_grid, restaurant, make_order,
):
"""6 ad-hoc orders, one every other hour."""
# Create one order every other hour.
for hour in range(11, 23, 2):
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
)
db_session.add(order)
db_session.commit()
assert len(restaurant.orders) == 6 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
# The resulting `DataFrame` has 12 rows, 6 holding `0`s, and 6 holding `1`s.
assert len(result) == 12
assert result['total_orders'].min() == 0
assert result['total_orders'].max() == 1
assert result['total_orders'].sum() == 6
def test_one_ad_hoc_and_one_pre_order(
self, db_session, one_pixel_grid, restaurant, make_order,
):
"""1 ad-hoc and 1 scheduled order.
The scheduled order is discarded.
"""
ad_hoc_order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, 11, 11),
)
db_session.add(ad_hoc_order)
pre_order = make_order(
scheduled=True,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, 9, 0),
scheduled_delivery_at=datetime.datetime(YEAR, MONTH, DAY, 12, 0),
)
db_session.add(pre_order)
db_session.commit()
assert len(restaurant.orders) == 2 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
# The resulting `DataFrame` has 12 rows, 11 holding `0`s, and one holding a `1`.
assert len(result) == 12
assert result['total_orders'].min() == 0
assert result['total_orders'].max() == 1
assert result['total_orders'].sum() == 1
def test_evenly_distributed_ad_hoc_orders_with_half_hour_time_steps( # noqa:WPS218
self, db_session, one_pixel_grid, restaurant, make_order,
):
"""12 ad-hoc orders, one per hour, with 30 minute time windows.
In half the time steps, there is no demand.
"""
# Create one order per hour and 10 orders in total.
for hour in range(11, 23):
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
)
db_session.add(order)
db_session.commit()
assert len(restaurant.orders) == 12 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=30)
# The resulting `DataFrame` has 24 rows for the 24 30-minute time steps.
# The rows' values are `0` and `1` alternating.
assert len(result) == 24
assert result['total_orders'].min() == 0
assert result['total_orders'].max() == 1
assert result.iloc[::2]['total_orders'].sum() == 12
assert result.iloc[1::2]['total_orders'].sum() == 0
def test_ad_hoc_orders_over_two_days(
self, db_session, one_pixel_grid, restaurant, make_order,
):
"""First day 12 ad-hoc orders, one per operating hour ...
... and 6 orders, one every other hour on the second day.
In total, there are 18 orders.
"""
# Create one order per hour and 12 orders in total.
for hour in range(11, 23):
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
)
db_session.add(order)
# Create one order every other hour and 6 orders in total.
for hour in range(11, 23, 2): # noqa:WPS440
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(
YEAR, MONTH, DAY + 1, hour, 11, # noqa:WPS441
),
)
db_session.add(order)
db_session.commit()
assert len(restaurant.orders) == 18 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
# The resulting `DataFrame` has 24 rows, 12 for each day.
assert len(result) == 24
assert result['total_orders'].min() == 0
assert result['total_orders'].max() == 1
assert result['total_orders'].sum() == 18
@pytest.fixture
def two_pixel_grid(self, db_session, city, make_address, make_restaurant):
"""A persisted `Grid` with two `Pixel` objects.
`restaurant` must be a dependency as otherwise
its `.address` is not put into the database.
"""
# One `Address` in the lower-left `Pixel`, ...
address1 = make_address(latitude=48.8357377, longitude=2.2517412)
# ... and another one in the upper-right one.
address2 = make_address(latitude=48.8898312, longitude=2.4357622)
# Create `Restaurant`s at the two addresses.
make_restaurant(address=address1)
make_restaurant(address=address2)
# This creates four `Pixel`s, two of which have no `pickup_address`.
side_length = max(city.total_x // 2, city.total_y // 2) + 1
grid = db.Grid.gridify(city=city, side_length=side_length)
db_session.add(grid)
assert len(grid.pixels) == 2 # sanity check
return grid
def test_two_pixels_with_shifted_orders( # noqa:WPS218
self, db_session, two_pixel_grid, make_order,
):
"""One restaurant with one order every other hour ...
... and another restaurant with two orders per hour.
In total, there are 30 orders.
"""
address1, address2 = two_pixel_grid.city.addresses
restaurant1, restaurant2 = address1.restaurant, address2.restaurant
# Create one order every other hour for `restaurant1`.
for hour in range(11, 23, 2):
order = make_order(
scheduled=False,
restaurant=restaurant1,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
)
db_session.add(order)
# Create two orders per hour for `restaurant2`.
for hour in range(11, 23): # noqa:WPS440
order = make_order(
scheduled=False,
restaurant=restaurant2,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 13), # noqa:WPS441
)
db_session.add(order)
order = make_order(
scheduled=False,
restaurant=restaurant2,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 14), # noqa:WPS441
)
db_session.add(order)
db_session.commit()
# sanity checks
assert len(restaurant1.orders) == 6
assert len(restaurant2.orders) == 24
result = timify.aggregate_orders(grid=two_pixel_grid, time_step=60)
# The resulting `DataFrame` has 24 rows, 12 for each pixel.
assert len(result) == 24
assert result['total_orders'].min() == 0
assert result['total_orders'].max() == 2
assert result['total_orders'].sum() == 30