Add OrderHistory class

- the main purpose of this class is to manage querying the order totals
  from the database and slice various kinds of time series out of the
  data
- the class holds the former `aggregate_orders()` function as a method
- modularize the corresponding tests
- add `tests.config` with globals used when testing to provide a
  single source of truth for various settings
This commit is contained in:
Alexander Hess 2021-01-09 16:20:23 +01:00
parent d5b3efbca1
commit 65d1632e98
Signed by: alexander
GPG key ID: 344EA5AB10D868E0
6 changed files with 289 additions and 129 deletions

View file

@ -1,6 +1,6 @@
"""Obtain and work with time series data."""
import datetime
import datetime as dt
import pandas as pd
@ -8,107 +8,140 @@ from urban_meal_delivery import config
from urban_meal_delivery import db
def aggregate_orders(grid: db.Grid, time_step: int) -> pd.DataFrame: # pragma: no cover
"""Obtain a time series of the ad-hoc `Order` totals.
class OrderHistory:
"""Generate time series from the `Order` model in the database.
Args:
grid: pixel grid used to aggregate orders spatially
time_step: interval length (in minutes) into which orders are aggregated
Returns:
order_totals: `DataFrame` with a `MultiIndex` of the "pixel_id"s and
beginnings of the intervals (i.e., "start_at"s); the sole column
with data is "total_orders"
The purpose of this class is to abstract away the managing of the order data
in memory and the slicing the data into various kinds of time series.
"""
# `data` is probably missing "pixel_id"-"start_at" pairs.
# This happens whenever there is no demand in the `Pixel` in the given `time_step`.
data = pd.read_sql_query(
f"""-- # noqa:WPS221
SELECT
pixel_id,
start_at,
COUNT(*) AS total_orders
FROM (
def __init__(self, grid: db.Grid, time_step: int) -> None:
"""Initialize a new `OrderHistory` object.
Args:
grid: pixel grid used to aggregate orders spatially
time_step: interval length (in minutes) into which orders are aggregated
# noqa:DAR401 RuntimeError
"""
self._grid = grid
self._time_step = time_step
# Number of daily time steps must be a whole multiple of `time_step` length.
n_daily_time_steps = (
60 * (config.SERVICE_END - config.SERVICE_START) / time_step
)
if n_daily_time_steps != int(n_daily_time_steps): # pragma: no cover
raise RuntimeError('Internal error: configuration has invalid TIME_STEPS')
self._n_daily_time_steps = int(n_daily_time_steps)
# The `_data` are populated by `.aggregate_orders()`.
self._data = None
@property
def totals(self) -> pd.DataFrame:
"""The order totals by `Pixel` and `.time_step`.
The returned object should not be mutated!
Returns:
order_totals: a one-column `DataFrame` with a `MultiIndex` of the
"pixel_id"s and "start_at"s (i.e., beginnings of the intervals);
the column with data is "total_orders"
"""
if self._data is None:
self._data = self.aggregate_orders()
return self._data
def aggregate_orders(self) -> pd.DataFrame: # pragma: no cover
"""Generate and load all order totals from the database."""
# `data` is probably missing "pixel_id"-"start_at" pairs.
# This happens when there is no demand in the `Pixel` in the given `time_step`.
data = pd.read_sql_query(
f"""-- # noqa:E501,WPS221
SELECT
pixel_id,
placed_at_without_seconds - minutes_to_be_cut AS start_at
start_at,
COUNT(*) AS total_orders
FROM (
SELECT
pixels.pixel_id,
DATE_TRUNC('MINUTE', orders.placed_at) AS placed_at_without_seconds,
((
EXTRACT(MINUTES FROM orders.placed_at)::INTEGER % {time_step}
)::TEXT || ' MINUTES')::INTERVAL
AS minutes_to_be_cut
pixel_id,
placed_at_without_seconds - minutes_to_be_cut AS start_at
FROM (
SELECT
id,
placed_at,
pickup_address_id
FROM
{config.CLEAN_SCHEMA}.orders
pixels.pixel_id,
DATE_TRUNC('MINUTE', orders.placed_at) AS placed_at_without_seconds,
((
EXTRACT(MINUTES FROM orders.placed_at)::INTEGER % {self._time_step}
)::TEXT || ' MINUTES')::INTERVAL
AS minutes_to_be_cut
FROM (
SELECT
id,
placed_at,
pickup_address_id
FROM
{config.CLEAN_SCHEMA}.orders
INNER JOIN (
SELECT
id AS address_id
FROM
{config.CLEAN_SCHEMA}.addresses
WHERE
city_id = {self._grid.city.id}
) AS in_city
ON orders.pickup_address_id = in_city.address_id
WHERE
ad_hoc IS TRUE
) AS
orders
INNER JOIN (
SELECT
id AS address_id
address_id,
pixel_id
FROM
{config.CLEAN_SCHEMA}.addresses
{config.CLEAN_SCHEMA}.addresses_pixels
WHERE
city_id = {grid.city.id}
) AS in_city
ON orders.pickup_address_id = in_city.address_id
WHERE
ad_hoc IS TRUE
) AS
orders
INNER JOIN (
SELECT
address_id,
pixel_id
FROM
{config.CLEAN_SCHEMA}.addresses_pixels
WHERE
grid_id = {grid.id}
AND
city_id = {grid.city.id} -- city_id is redundant -> sanity check
) AS pixels
ON orders.pickup_address_id = pixels.address_id
) AS placed_at_aggregated_into_start_at
) AS pixel_start_at_combinations
GROUP BY
pixel_id,
start_at
ORDER BY
pixel_id,
start_at;
""",
con=db.connection,
index_col=['pixel_id', 'start_at'],
)
grid_id = {self._grid.id}
AND
city_id = {self._grid.city.id} -- redundant -> sanity check
) AS pixels
ON orders.pickup_address_id = pixels.address_id
) AS placed_at_aggregated_into_start_at
) AS pixel_start_at_combinations
GROUP BY
pixel_id,
start_at
ORDER BY
pixel_id,
start_at;
""",
con=db.connection,
index_col=['pixel_id', 'start_at'],
)
if data.empty:
return data
if data.empty:
return data
# Calculate the first and last "start_at" value ...
start_day = data.index.levels[1].min().date()
start = datetime.datetime(
start_day.year, start_day.month, start_day.day, config.SERVICE_START,
)
end_day = data.index.levels[1].max().date()
end = datetime.datetime(
end_day.year, end_day.month, end_day.day, config.SERVICE_END,
)
# Calculate the first and last "start_at" value ...
start_day = data.index.levels[1].min().date()
start = dt.datetime(
start_day.year, start_day.month, start_day.day, config.SERVICE_START,
)
end_day = data.index.levels[1].max().date()
end = dt.datetime(end_day.year, end_day.month, end_day.day, config.SERVICE_END)
# ... and all possible `tuple`s of "pixel_id"-"start_at" combinations.
# The "start_at" values must lie within the operating hours.
gen = (
(pixel_id, start_at)
for pixel_id in sorted(data.index.levels[0])
for start_at in pd.date_range(start, end, freq=f'{self._time_step}T')
if config.SERVICE_START <= start_at.hour < config.SERVICE_END
)
# ... and all possible `tuple`s of "pixel_id"-"start_at" combinations.
# The "start_at" values must lie within the operating hours.
gen = (
(pixel_id, start_at)
for pixel_id in sorted(data.index.levels[0])
for start_at in pd.date_range(start, end, freq=f'{time_step}T')
if config.SERVICE_START <= start_at.time().hour < config.SERVICE_END
)
# Re-index `data` filling in `0`s where there is no demand.
index = pd.MultiIndex.from_tuples(gen)
index.names = ['pixel_id', 'start_at']
# Re-index `data` filling in `0`s where there is no demand.
index = pd.MultiIndex.from_tuples(gen)
index.names = ['pixel_id', 'start_at']
return data.reindex(index, fill_value=0)
return data.reindex(index, fill_value=0)

10
tests/config.py Normal file
View file

@ -0,0 +1,10 @@
"""Globals used when testing."""
# The day on which most test cases take place.
YEAR, MONTH, DAY = 2016, 7, 1
# Default time steps, for example, for `OrderHistory` objects.
LONG_TIME_STEP = 60
SHORT_TIME_STEP = 30
TIME_STEPS = (SHORT_TIME_STEP, LONG_TIME_STEP)

View file

@ -9,6 +9,7 @@ import faker
from factory import alchemy
from geopy import distance
from tests import config as test_config
from urban_meal_delivery import db
@ -27,13 +28,10 @@ def _random_timespan( # noqa:WPS211
return dt.timedelta(seconds=random.randint(total_min_seconds, total_max_seconds))
# The test day.
_YEAR, _MONTH, _DAY = 2020, 1, 1
def _early_in_the_morning():
"""A randomized `datetime` object early in the morning."""
return dt.datetime(_YEAR, _MONTH, _DAY, 3, 0) + _random_timespan(max_hours=2)
early = dt.datetime(test_config.YEAR, test_config.MONTH, test_config.DAY, 3, 0)
return early + _random_timespan(max_hours=2)
class AddressFactory(alchemy.SQLAlchemyModelFactory):
@ -171,7 +169,9 @@ class AdHocOrderFactory(alchemy.SQLAlchemyModelFactory):
# Attributes regarding the specialization of an `Order`: ad-hoc or scheduled.
# Ad-hoc `Order`s are placed between 11.45 and 14.15.
placed_at = factory.LazyFunction(
lambda: dt.datetime(_YEAR, _MONTH, _DAY, 11, 45)
lambda: dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 11, 45,
)
+ _random_timespan(max_hours=2, max_minutes=30),
)
ad_hoc = True
@ -337,13 +337,27 @@ class ScheduledOrderFactory(AdHocOrderFactory):
scheduled_delivery_at = factory.LazyFunction(
lambda: random.choice(
[
dt.datetime(_YEAR, _MONTH, _DAY, 12, 0),
dt.datetime(_YEAR, _MONTH, _DAY, 12, 15),
dt.datetime(_YEAR, _MONTH, _DAY, 12, 30),
dt.datetime(_YEAR, _MONTH, _DAY, 12, 45),
dt.datetime(_YEAR, _MONTH, _DAY, 13, 0),
dt.datetime(_YEAR, _MONTH, _DAY, 13, 15),
dt.datetime(_YEAR, _MONTH, _DAY, 13, 30),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 12, 0,
),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 12, 15,
),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 12, 30,
),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 12, 45,
),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 13, 0,
),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 13, 15,
),
dt.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 13, 30,
),
],
),
)

View file

@ -0,0 +1 @@
"""Tests for the `urban_meal_delivery.forecasts.timify` module."""

View file

@ -1,22 +1,20 @@
"""Test the time series related code."""
"""Test the `OrderHistory.aggregate_orders()` method."""
# pylint:disable=no-self-use,unused-argument
import datetime
import pytest
from tests import config as test_config
from urban_meal_delivery import db
from urban_meal_delivery.forecasts import timify
YEAR, MONTH, DAY = 2020, 1, 1
@pytest.mark.db
class TestAggregateOrders:
"""Test the `aggregate_orders()` function.
"""Test the `OrderHistory.aggregate_orders()` method.
The test cases are all integration tests that model realistic scenarios.
The test cases are integration tests that model realistic scenarios.
"""
@pytest.fixture
@ -39,10 +37,13 @@ class TestAggregateOrders:
def test_no_orders(self, db_session, one_pixel_grid, restaurant):
"""Edge case that does not occur for real-life data."""
db_session.commit()
assert len(restaurant.orders) == 0 # noqa:WPS507 sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
assert len(result) == 0 # noqa:WPS507
@ -55,7 +56,9 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, hour, 11,
),
)
db_session.add(order)
@ -63,7 +66,11 @@ class TestAggregateOrders:
assert len(restaurant.orders) == 12 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
# The resulting `DataFrame` has 12 rows holding `1`s.
assert len(result) == 12
@ -80,7 +87,9 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, hour, 11,
),
)
db_session.add(order)
@ -88,7 +97,11 @@ class TestAggregateOrders:
assert len(restaurant.orders) == 10 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
# Even though there are only 10 orders, there are 12 rows in the `DataFrame`.
# That is so as `0`s are filled in for hours without any demand at the end.
@ -107,7 +120,9 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, hour, 11,
),
)
db_session.add(order)
@ -115,7 +130,11 @@ class TestAggregateOrders:
assert len(restaurant.orders) == 6 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
# The resulting `DataFrame` has 12 rows, 6 holding `0`s, and 6 holding `1`s.
assert len(result) == 12
@ -133,15 +152,21 @@ class TestAggregateOrders:
ad_hoc_order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, 11, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 11, 11,
),
)
db_session.add(ad_hoc_order)
pre_order = make_order(
scheduled=True,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, 9, 0),
scheduled_delivery_at=datetime.datetime(YEAR, MONTH, DAY, 12, 0),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 9, 0,
),
scheduled_delivery_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, 12, 0,
),
)
db_session.add(pre_order)
@ -149,7 +174,11 @@ class TestAggregateOrders:
assert len(restaurant.orders) == 2 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
# The resulting `DataFrame` has 12 rows, 11 holding `0`s, and one holding a `1`.
assert len(result) == 12
@ -169,7 +198,9 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, hour, 11,
),
)
db_session.add(order)
@ -177,7 +208,11 @@ class TestAggregateOrders:
assert len(restaurant.orders) == 12 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=30)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.SHORT_TIME_STEP,
)
result = oh.aggregate_orders()
# The resulting `DataFrame` has 24 rows for the 24 30-minute time steps.
# The rows' values are `0` and `1` alternating.
@ -200,7 +235,9 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, hour, 11,
),
)
db_session.add(order)
@ -210,7 +247,11 @@ class TestAggregateOrders:
scheduled=False,
restaurant=restaurant,
placed_at=datetime.datetime(
YEAR, MONTH, DAY + 1, hour, 11, # noqa:WPS441
test_config.YEAR,
test_config.MONTH,
test_config.DAY + 1,
hour, # noqa:WPS441
11,
),
)
db_session.add(order)
@ -219,7 +260,11 @@ class TestAggregateOrders:
assert len(restaurant.orders) == 18 # sanity check
result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=one_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
# The resulting `DataFrame` has 24 rows, 12 for each day.
assert len(result) == 24
@ -270,7 +315,9 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant1,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11),
placed_at=datetime.datetime(
test_config.YEAR, test_config.MONTH, test_config.DAY, hour, 11,
),
)
db_session.add(order)
@ -279,14 +326,26 @@ class TestAggregateOrders:
order = make_order(
scheduled=False,
restaurant=restaurant2,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 13), # noqa:WPS441
placed_at=datetime.datetime(
test_config.YEAR,
test_config.MONTH,
test_config.DAY,
hour, # noqa:WPS441
13,
),
)
db_session.add(order)
order = make_order(
scheduled=False,
restaurant=restaurant2,
placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 14), # noqa:WPS441
placed_at=datetime.datetime(
test_config.YEAR,
test_config.MONTH,
test_config.DAY,
hour, # noqa:WPS441
14,
),
)
db_session.add(order)
@ -296,7 +355,11 @@ class TestAggregateOrders:
assert len(restaurant1.orders) == 6
assert len(restaurant2.orders) == 24
result = timify.aggregate_orders(grid=two_pixel_grid, time_step=60)
oh = timify.OrderHistory(
grid=two_pixel_grid, time_step=test_config.LONG_TIME_STEP,
)
result = oh.aggregate_orders()
# The resulting `DataFrame` has 24 rows, 12 for each pixel.
assert len(result) == 24

View file

@ -0,0 +1,39 @@
"""Test the basic functionalities in the `OrderHistory` class."""
# pylint:disable=no-self-use
import pytest
from tests import config as test_config
from urban_meal_delivery.forecasts import timify
class TestSpecialMethods:
"""Test the special methods in `OrderHistory`."""
@pytest.mark.parametrize('time_step', test_config.TIME_STEPS)
def test_instantiate(self, grid, time_step):
"""Test `OrderHistory.__init__()`."""
oh = timify.OrderHistory(grid=grid, time_step=time_step)
assert oh is not None
class TestProperties:
"""Test the properties in `OrderHistory`."""
def test_totals_is_cached(self, grid, monkeypatch):
"""Test `.totals` property.
The result of the `OrderHistory.aggregate_orders()` method call
is cached in the `OrderHistory.totals` property.
"""
oh = timify.OrderHistory(grid=grid, time_step=test_config.LONG_TIME_STEP)
sentinel = object()
monkeypatch.setattr(oh, 'aggregate_orders', lambda: sentinel)
result1 = oh.totals
result2 = oh.totals
assert result1 is result2
assert result1 is sentinel