diff --git a/poetry.lock b/poetry.lock index e4862bf..b56f2af 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1120,7 +1120,7 @@ name = "pandas" version = "1.1.5" description = "Powerful data structures for data analysis, time series, and statistics" category = "main" -optional = true +optional = false python-versions = ">=3.6.1" [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index dbddd95..573d282 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ python = "^3.8" Shapely = "^1.7.1" alembic = "^1.4.2" click = "^7.1.2" +pandas = "^1.1.0" psycopg2 = "^2.8.5" # adapter for PostgreSQL sqlalchemy = "^1.3.18" utm = "^0.7.0" @@ -40,7 +41,6 @@ utm = "^0.7.0" jupyterlab = { version="^2.2.2", optional=true } nb_black = { version="^1.0.7", optional=true } numpy = { version="^1.19.1", optional=true } -pandas = { version="^1.1.0", optional=true } pytz = { version="^2020.1", optional=true } [tool.poetry.extras] @@ -48,7 +48,6 @@ research = [ "jupyterlab", "nb_black", "numpy", - "pandas", "pytz", ] diff --git a/setup.cfg b/setup.cfg index f6b291f..76746f3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -138,6 +138,12 @@ per-file-ignores = src/urban_meal_delivery/db/utils/__init__.py: # Top-level of a sub-packages is intended to import a lot. F401, + src/urban_meal_delivery/forecasts/__init__.py: + # Top-level of a sub-packages is intended to import a lot. + F401, + src/urban_meal_delivery/forecasts/timify.py: + # No SQL injection as the inputs come from a safe source. + S608, tests/*.py: # Type annotations are not strictly enforced. ANN0, ANN2, @@ -245,6 +251,8 @@ cache_dir = .cache/mypy ignore_missing_imports = true [mypy-packaging] ignore_missing_imports = true +[mypy-pandas] +ignore_missing_imports = true [mypy-pytest] ignore_missing_imports = true [mypy-sqlalchemy.*] diff --git a/src/urban_meal_delivery/configuration.py b/src/urban_meal_delivery/configuration.py index e4cca50..2d36392 100644 --- a/src/urban_meal_delivery/configuration.py +++ b/src/urban_meal_delivery/configuration.py @@ -26,14 +26,38 @@ def random_schema_name() -> str: class Config: """Configuration that applies in all situations.""" + # Application-specific settings + # ----------------------------- + + # Date after which the real-life data is discarded. CUTOFF_DAY = datetime.datetime(2017, 2, 1) # If a scheduled pre-order is made within this # time horizon, we treat it as an ad-hoc order. QUASI_AD_HOC_LIMIT = datetime.timedelta(minutes=45) + # Operating hours of the platform. + SERVICE_START = 11 + SERVICE_END = 23 + + # Side lengths (in meters) for which pixel grids are created. + # They are the basis for the aggregated demand forecasts. GRID_SIDE_LENGTHS = [707, 1000, 1414] + # Time steps (in minutes) used to aggregate the + # individual orders into time series. + TIME_STEPS = [60] + + # Training horizons (in full weeks) used + # to train the forecasting models. + TRAINING_HORIZONS = [8] + + # The demand forecasting methods used in the simulations. + FORECASTING_METHODS = ['hets', 'rtarima'] + + # Implementation-specific settings + # -------------------------------- + DATABASE_URI = os.getenv('DATABASE_URI') # The PostgreSQL schema that holds the tables with the original data. diff --git a/src/urban_meal_delivery/db/grids.py b/src/urban_meal_delivery/db/grids.py index 5593892..c1d7dd2 100644 --- a/src/urban_meal_delivery/db/grids.py +++ b/src/urban_meal_delivery/db/grids.py @@ -43,7 +43,7 @@ class Grid(meta.Base): def __repr__(self) -> str: """Non-literal text representation.""" - return '<{cls}: {area}>'.format( + return '<{cls}: {area} sqr. km>'.format( cls=self.__class__.__name__, area=self.pixel_area, ) @@ -51,7 +51,7 @@ class Grid(meta.Base): @property def pixel_area(self) -> float: """The area of a `Pixel` on the grid in square kilometers.""" - return (self.side_length ** 2) / 1_000_000 # noqa:WPS432 + return round((self.side_length ** 2) / 1_000_000, 1) # noqa:WPS432 @classmethod def gridify(cls, city: db.City, side_length: int) -> db.Grid: diff --git a/src/urban_meal_delivery/forecasts/__init__.py b/src/urban_meal_delivery/forecasts/__init__.py new file mode 100644 index 0000000..be8843e --- /dev/null +++ b/src/urban_meal_delivery/forecasts/__init__.py @@ -0,0 +1,3 @@ +"""Demand forecasting utilities.""" + +from urban_meal_delivery.forecasts import timify diff --git a/src/urban_meal_delivery/forecasts/timify.py b/src/urban_meal_delivery/forecasts/timify.py new file mode 100644 index 0000000..08cd1df --- /dev/null +++ b/src/urban_meal_delivery/forecasts/timify.py @@ -0,0 +1,114 @@ +"""Obtain and work with time series data.""" + +import datetime + +import pandas as pd + +from urban_meal_delivery import config +from urban_meal_delivery import db + + +def aggregate_orders(grid: db.Grid, time_step: int) -> pd.DataFrame: # pragma: no cover + """Obtain a time series of the ad-hoc `Order` totals. + + Args: + grid: pixel grid used to aggregate orders spatially + time_step: interval length (in minutes) into which orders are aggregated + + Returns: + order_totals: `DataFrame` with a `MultiIndex` of the "pixel_id"s and + beginnings of the intervals (i.e., "start_at"s); the sole column + with data is "total_orders" + """ + # `data` is probably missing "pixel_id"-"start_at" pairs. + # This happens whenever there is no demand in the `Pixel` in the given `time_step`. + data = pd.read_sql_query( + f"""-- # noqa:WPS221 + SELECT + pixel_id, + start_at, + COUNT(*) AS total_orders + FROM ( + SELECT + pixel_id, + placed_at_without_seconds - minutes_to_be_cut AS start_at + FROM ( + SELECT + pixels.pixel_id, + DATE_TRUNC('MINUTE', orders.placed_at) AS placed_at_without_seconds, + (( + EXTRACT(MINUTES FROM orders.placed_at)::INTEGER % {time_step} + )::TEXT || ' MINUTES')::INTERVAL + AS minutes_to_be_cut + FROM ( + SELECT + id, + placed_at, + pickup_address_id + FROM + {config.CLEAN_SCHEMA}.orders + INNER JOIN ( + SELECT + id AS address_id + FROM + {config.CLEAN_SCHEMA}.addresses + WHERE + city_id = {grid.city.id} + ) AS in_city + ON orders.pickup_address_id = in_city.address_id + WHERE + ad_hoc IS TRUE + ) AS + orders + INNER JOIN ( + SELECT + address_id, + pixel_id + FROM + {config.CLEAN_SCHEMA}.addresses_pixels + WHERE + grid_id = {grid.id} + AND + city_id = {grid.city.id} -- city_id is redundant -> sanity check + ) AS pixels + ON orders.pickup_address_id = pixels.address_id + ) AS placed_at_aggregated_into_start_at + ) AS pixel_start_at_combinations + GROUP BY + pixel_id, + start_at + ORDER BY + pixel_id, + start_at; + """, + con=db.connection, + index_col=['pixel_id', 'start_at'], + ) + + if data.empty: + return data + + # Calculate the first and last "start_at" value ... + start_day = data.index.levels[1].min().date() + start = datetime.datetime( + start_day.year, start_day.month, start_day.day, config.SERVICE_START, + ) + end_day = data.index.levels[1].max().date() + end = datetime.datetime( + end_day.year, end_day.month, end_day.day, config.SERVICE_END, + ) + + # ... and all possible `tuple`s of "pixel_id"-"start_at" combinations. + # The "start_at" values must lie within the operating hours. + gen = ( + (pixel_id, start_at) + for pixel_id in sorted(data.index.levels[0]) + for start_at in pd.date_range(start, end, freq=f'{time_step}T') + if config.SERVICE_START <= start_at.time().hour < config.SERVICE_END + ) + + # Re-index `data` filling in `0`s where there is no demand. + index = pd.MultiIndex.from_tuples(gen) + index.names = ['pixel_id', 'start_at'] + + return data.reindex(index, fill_value=0) diff --git a/tests/db/test_grids.py b/tests/db/test_grids.py index 8bdb0c5..4dd5beb 100644 --- a/tests/db/test_grids.py +++ b/tests/db/test_grids.py @@ -19,7 +19,7 @@ class TestSpecialMethods: """`Grid` has a non-literal text representation.""" result = repr(grid) - assert result == f'' + assert result == f'' @pytest.mark.db diff --git a/tests/forecasts/__init__.py b/tests/forecasts/__init__.py new file mode 100644 index 0000000..50eaeb3 --- /dev/null +++ b/tests/forecasts/__init__.py @@ -0,0 +1 @@ +"""Test the forecasting-related functionality.""" diff --git a/tests/forecasts/test_timify.py b/tests/forecasts/test_timify.py new file mode 100644 index 0000000..0cd4d21 --- /dev/null +++ b/tests/forecasts/test_timify.py @@ -0,0 +1,305 @@ +"""Test the time series related code.""" +# pylint:disable=no-self-use,unused-argument + +import datetime + +import pytest + +from urban_meal_delivery import db +from urban_meal_delivery.forecasts import timify + + +YEAR, MONTH, DAY = 2020, 1, 1 + + +@pytest.mark.db +class TestAggregateOrders: + """Test the `aggregate_orders()` function. + + The test cases are all integration tests that model realistic scenarios. + """ + + @pytest.fixture + def one_pixel_grid(self, db_session, city, restaurant): + """A persisted `Grid` with one `Pixel`. + + `restaurant` must be a dependency as otherwise + its `.address` is not put into the database. + """ + # `+1` as otherwise there would be a second pixel in one direction. + side_length = max(city.total_x, city.total_y) + 1 + grid = db.Grid.gridify(city=city, side_length=side_length) + + db_session.add(grid) + + assert len(grid.pixels) == 1 # sanity check + + return grid + + def test_no_orders(self, db_session, one_pixel_grid, restaurant): + """Edge case that does not occur for real-life data.""" + db_session.commit() + + assert len(restaurant.orders) == 0 # noqa:WPS507 sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60) + + assert len(result) == 0 # noqa:WPS507 + + def test_evenly_distributed_ad_hoc_orders( + self, db_session, one_pixel_grid, restaurant, make_order, + ): + """12 ad-hoc orders, one per operating hour.""" + # Create one order per hour and 12 orders in total. + for hour in range(11, 23): + order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11), + ) + db_session.add(order) + + db_session.commit() + + assert len(restaurant.orders) == 12 # sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60) + + # The resulting `DataFrame` has 12 rows holding `1`s. + assert len(result) == 12 + assert result['total_orders'].min() == 1 + assert result['total_orders'].max() == 1 + assert result['total_orders'].sum() == 12 + + def test_evenly_distributed_ad_hoc_orders_with_no_demand_late( # noqa:WPS218 + self, db_session, one_pixel_grid, restaurant, make_order, + ): + """10 ad-hoc orders, one per hour, no orders after 21.""" + # Create one order per hour and 10 orders in total. + for hour in range(11, 21): + order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11), + ) + db_session.add(order) + + db_session.commit() + + assert len(restaurant.orders) == 10 # sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60) + + # Even though there are only 10 orders, there are 12 rows in the `DataFrame`. + # That is so as `0`s are filled in for hours without any demand at the end. + assert len(result) == 12 + assert result['total_orders'].min() == 0 + assert result['total_orders'].max() == 1 + assert result.iloc[:10]['total_orders'].sum() == 10 + assert result.iloc[10:]['total_orders'].sum() == 0 + + def test_one_ad_hoc_order_every_other_hour( + self, db_session, one_pixel_grid, restaurant, make_order, + ): + """6 ad-hoc orders, one every other hour.""" + # Create one order every other hour. + for hour in range(11, 23, 2): + order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11), + ) + db_session.add(order) + + db_session.commit() + + assert len(restaurant.orders) == 6 # sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60) + + # The resulting `DataFrame` has 12 rows, 6 holding `0`s, and 6 holding `1`s. + assert len(result) == 12 + assert result['total_orders'].min() == 0 + assert result['total_orders'].max() == 1 + assert result['total_orders'].sum() == 6 + + def test_one_ad_hoc_and_one_pre_order( + self, db_session, one_pixel_grid, restaurant, make_order, + ): + """1 ad-hoc and 1 scheduled order. + + The scheduled order is discarded. + """ + ad_hoc_order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, 11, 11), + ) + db_session.add(ad_hoc_order) + + pre_order = make_order( + scheduled=True, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, 9, 0), + scheduled_delivery_at=datetime.datetime(YEAR, MONTH, DAY, 12, 0), + ) + db_session.add(pre_order) + + db_session.commit() + + assert len(restaurant.orders) == 2 # sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60) + + # The resulting `DataFrame` has 12 rows, 11 holding `0`s, and one holding a `1`. + assert len(result) == 12 + assert result['total_orders'].min() == 0 + assert result['total_orders'].max() == 1 + assert result['total_orders'].sum() == 1 + + def test_evenly_distributed_ad_hoc_orders_with_half_hour_time_steps( # noqa:WPS218 + self, db_session, one_pixel_grid, restaurant, make_order, + ): + """12 ad-hoc orders, one per hour, with 30 minute time windows. + + In half the time steps, there is no demand. + """ + # Create one order per hour and 10 orders in total. + for hour in range(11, 23): + order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11), + ) + db_session.add(order) + + db_session.commit() + + assert len(restaurant.orders) == 12 # sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=30) + + # The resulting `DataFrame` has 24 rows for the 24 30-minute time steps. + # The rows' values are `0` and `1` alternating. + assert len(result) == 24 + assert result['total_orders'].min() == 0 + assert result['total_orders'].max() == 1 + assert result.iloc[::2]['total_orders'].sum() == 12 + assert result.iloc[1::2]['total_orders'].sum() == 0 + + def test_ad_hoc_orders_over_two_days( + self, db_session, one_pixel_grid, restaurant, make_order, + ): + """First day 12 ad-hoc orders, one per operating hour ... + + ... and 6 orders, one every other hour on the second day. + In total, there are 18 orders. + """ + # Create one order per hour and 12 orders in total. + for hour in range(11, 23): + order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11), + ) + db_session.add(order) + + # Create one order every other hour and 6 orders in total. + for hour in range(11, 23, 2): # noqa:WPS440 + order = make_order( + scheduled=False, + restaurant=restaurant, + placed_at=datetime.datetime( + YEAR, MONTH, DAY + 1, hour, 11, # noqa:WPS441 + ), + ) + db_session.add(order) + + db_session.commit() + + assert len(restaurant.orders) == 18 # sanity check + + result = timify.aggregate_orders(grid=one_pixel_grid, time_step=60) + + # The resulting `DataFrame` has 24 rows, 12 for each day. + assert len(result) == 24 + assert result['total_orders'].min() == 0 + assert result['total_orders'].max() == 1 + assert result['total_orders'].sum() == 18 + + @pytest.fixture + def two_pixel_grid(self, db_session, city, make_address, make_restaurant): + """A persisted `Grid` with two `Pixel` objects. + + `restaurant` must be a dependency as otherwise + its `.address` is not put into the database. + """ + # One `Address` in the lower-left `Pixel`, ... + address1 = make_address(latitude=48.8357377, longitude=2.2517412) + # ... and another one in the upper-right one. + address2 = make_address(latitude=48.8898312, longitude=2.4357622) + + # Create `Restaurant`s at the two addresses. + make_restaurant(address=address1) + make_restaurant(address=address2) + + # This creates four `Pixel`s, two of which have no `pickup_address`. + side_length = max(city.total_x // 2, city.total_y // 2) + 1 + + grid = db.Grid.gridify(city=city, side_length=side_length) + + db_session.add(grid) + + assert len(grid.pixels) == 2 # sanity check + + return grid + + def test_two_pixels_with_shifted_orders( # noqa:WPS218 + self, db_session, two_pixel_grid, make_order, + ): + """One restaurant with one order every other hour ... + + ... and another restaurant with two orders per hour. + In total, there are 30 orders. + """ + address1, address2 = two_pixel_grid.city.addresses + restaurant1, restaurant2 = address1.restaurant, address2.restaurant + + # Create one order every other hour for `restaurant1`. + for hour in range(11, 23, 2): + order = make_order( + scheduled=False, + restaurant=restaurant1, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 11), + ) + db_session.add(order) + + # Create two orders per hour for `restaurant2`. + for hour in range(11, 23): # noqa:WPS440 + order = make_order( + scheduled=False, + restaurant=restaurant2, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 13), # noqa:WPS441 + ) + db_session.add(order) + + order = make_order( + scheduled=False, + restaurant=restaurant2, + placed_at=datetime.datetime(YEAR, MONTH, DAY, hour, 14), # noqa:WPS441 + ) + db_session.add(order) + + db_session.commit() + + # sanity checks + assert len(restaurant1.orders) == 6 + assert len(restaurant2.orders) == 24 + + result = timify.aggregate_orders(grid=two_pixel_grid, time_step=60) + + # The resulting `DataFrame` has 24 rows, 12 for each pixel. + assert len(result) == 24 + assert result['total_orders'].min() == 0 + assert result['total_orders'].max() == 2 + assert result['total_orders'].sum() == 30