97 lines
4.4 KiB
Python
97 lines
4.4 KiB
Python
import hassapi as hass
|
|
import datetime
|
|
|
|
#
|
|
# Boiler Controller. Tries to optimize when to power (heat up) the boiler with respect to the hourly (dynamic) electricity prices
|
|
#
|
|
# Args:
|
|
#
|
|
|
|
|
|
class BoilerControl(hass.Hass):
|
|
def initialize(self):
|
|
self.log("Initializing boiler control")
|
|
|
|
self.boiler_switch = self.args['boiler_switch']
|
|
self.price_sensor = self.args['price_sensor']
|
|
self.boiler_on_for_hours = self.args['boiler_on_for_hours']
|
|
self.boiler_reheat_periods_per_day = int(self.args['boiler_reheat_periods_per_day'])
|
|
self.output_sensor = self.args['output_sensor']
|
|
|
|
self.log("Price sensor: " + str(self.get_state(self.price_sensor, attribute="raw_today")))
|
|
|
|
self.determine_cheapest_hours()
|
|
|
|
self.run_daily(self.determine_cheapest_hours, datetime.time(0, 1, 0))
|
|
|
|
def determine_cheapest_hours(self, cb_args=None):
|
|
|
|
# Get the prices for the current day
|
|
prices = self.get_state(self.price_sensor, attribute="raw_today")
|
|
start_of_day = self.parse_date(prices[0]["start"])
|
|
|
|
# Sort the prices on price
|
|
prices.sort(key=lambda x: x["value"])
|
|
|
|
boiler_active_blocks = prices[0:self.boiler_on_for_hours]
|
|
|
|
# Now we also add the cheapest hours per "on period" to the list
|
|
for i in range(self.boiler_reheat_periods_per_day):
|
|
period_start = start_of_day + datetime.timedelta(0, i * 24 * 60 * 60 / self.boiler_reheat_periods_per_day)
|
|
period_end = start_of_day + datetime.timedelta(0, (i + 1) * 24 * 60 * 60 / self.boiler_reheat_periods_per_day)
|
|
# Find the cheapest block within the period, and add it to the active blocks
|
|
period_prices = [price for price in prices if self.parse_date(price["start"]) >= period_start and self.parse_date(price["end"]) <= period_end]
|
|
period_prices.sort(key=lambda x: x["value"])
|
|
if period_prices[0] not in boiler_active_blocks:
|
|
boiler_active_blocks.append(period_prices[0])
|
|
|
|
# To simplify the scheduling, we merge adjacent blocks
|
|
boiler_active_blocks.sort(key=lambda x: x["start"])
|
|
|
|
# Log the active blocks
|
|
self.log("Calculated best hours to turn the boiler on, on " + start_of_day.strftime("%d-%m-%y") + ": " + ", ".join([self.parse_date(block["start"]).strftime("%H:%M") + " - " + self.parse_date(block["end"]).strftime("%H:%M") + " costing " + str(block["value"]) + "/kWh" for block in boiler_active_blocks]))
|
|
|
|
merged_blocks = []
|
|
for block in boiler_active_blocks:
|
|
if len(merged_blocks) == 0:
|
|
merged_blocks.append({"start": self.parse_date(block["start"]), "end": self.parse_date(block["end"])})
|
|
else:
|
|
if self.parse_date(block["start"]) == merged_blocks[-1]["end"]:
|
|
merged_blocks[-1]["end"] = self.parse_date(block["end"])
|
|
else:
|
|
merged_blocks.append({"start": self.parse_date(block["start"]), "end": self.parse_date(block["end"]) })
|
|
|
|
# self.log("Boiler active blocks: " + str(merged_blocks))
|
|
|
|
currently_on = False
|
|
for block in merged_blocks:
|
|
if (block["end"] < datetime.datetime.now(block["start"].tzinfo)):
|
|
continue
|
|
|
|
self.log("Scheduling boiler to turn on between " + block["start"].strftime("%H:%M") + " - " + block["end"].strftime("%H:%M"))
|
|
if (block["start"] < datetime.datetime.now(block["start"].tzinfo)):
|
|
self.turn_boiler_on()
|
|
currently_on = True
|
|
else:
|
|
self.run_at(self.turn_boiler_on, block["start"])
|
|
|
|
self.run_at(self.turn_boiler_off, block["end"])
|
|
|
|
if (self.output_sensor != None):
|
|
self.set_state(self.output_sensor, state="on" if currently_on else "off", attributes={"active_blocks": boiler_active_blocks, "merged_blocks": merged_blocks})
|
|
|
|
|
|
def turn_boiler_on(self, cb_args=None):
|
|
self.log("Turning boiler on")
|
|
self.turn_on(self.boiler_switch)
|
|
if (self.output_sensor != None):
|
|
self.set_state(self.output_sensor, state="on")
|
|
|
|
def turn_boiler_off(self, cb_args=None):
|
|
self.log("Turning boiler off")
|
|
self.turn_off(self.boiler_switch)
|
|
if (self.output_sensor != None):
|
|
self.set_state(self.output_sensor, state="off")
|
|
|
|
def parse_date(self, date_str):
|
|
return datetime.datetime.fromisoformat(date_str) |