[TASK] Wrote the first version of the boiler control script
This commit is contained in:
@@ -14,15 +14,81 @@ class BoilerControl(hass.Hass):
|
||||
|
||||
self.boiler_switch = self.args['boiler_switch']
|
||||
self.price_sensor = self.args['price_sensor']
|
||||
self.boiler_on_for_hours = self.args['boiler_on_for_hours']
|
||||
self.boiler_reheat_periods_per_day = int(self.args['boiler_reheat_periods_per_day'])
|
||||
self.output_sensor = self.args['output_sensor']
|
||||
|
||||
self.log("Price sensor: " + str(self.get_state(self.price_sensor, attribute="raw_today")))
|
||||
|
||||
self.determine_cheapest_hours()
|
||||
|
||||
self.run_daily(self.determine_cheapest_hours, datetime.time(0, 1, 0))
|
||||
|
||||
def determine_cheapest_hours(self):
|
||||
def determine_cheapest_hours(self, cb_args=None):
|
||||
|
||||
# Get the prices for the current day
|
||||
prices = self.get_state(self.price_sensor, attribute="raw_today")
|
||||
start_of_day = self.parse_date(prices[0]["start"])
|
||||
|
||||
for price in prices:
|
||||
self.log("Price: " + str(price["value"]) + " at " + datetime.datetime.fromisoformat(price["start"]).strftime("%H:%M"))
|
||||
# Sort the prices on price
|
||||
prices.sort(key=lambda x: x["value"])
|
||||
|
||||
boiler_active_blocks = prices[0:self.boiler_on_for_hours]
|
||||
|
||||
# Now we also add the cheapest hours per "on period" to the list
|
||||
for i in range(self.boiler_reheat_periods_per_day):
|
||||
period_start = start_of_day + datetime.timedelta(0, i * 24 * 60 * 60 / self.boiler_reheat_periods_per_day)
|
||||
period_end = start_of_day + datetime.timedelta(0, (i + 1) * 24 * 60 * 60 / self.boiler_reheat_periods_per_day)
|
||||
# Find the cheapest block within the period, and add it to the active blocks
|
||||
period_prices = [price for price in prices if self.parse_date(price["start"]) >= period_start and self.parse_date(price["end"]) <= period_end]
|
||||
period_prices.sort(key=lambda x: x["value"])
|
||||
if period_prices[0] not in boiler_active_blocks:
|
||||
boiler_active_blocks.append(period_prices[0])
|
||||
|
||||
# To simplify the scheduling, we merge adjacent blocks
|
||||
boiler_active_blocks.sort(key=lambda x: x["start"])
|
||||
|
||||
# Log the active blocks
|
||||
self.log("Calculated best hours to turn the boiler on, on " + start_of_day.strftime("%d-%m-%y") + ": " + ", ".join([self.parse_date(block["start"]).strftime("%H:%M") + " - " + self.parse_date(block["end"]).strftime("%H:%M") + " costing " + str(block["value"]) + "/kWh" for block in boiler_active_blocks]))
|
||||
|
||||
merged_blocks = []
|
||||
for block in boiler_active_blocks:
|
||||
if len(merged_blocks) == 0:
|
||||
merged_blocks.append({"start": self.parse_date(block["start"]), "end": self.parse_date(block["end"])})
|
||||
else:
|
||||
if self.parse_date(block["start"]) == merged_blocks[-1]["end"]:
|
||||
merged_blocks[-1]["end"] = self.parse_date(block["end"])
|
||||
else:
|
||||
merged_blocks.append({"start": self.parse_date(block["start"]), "end": self.parse_date(block["end"]) })
|
||||
|
||||
# self.log("Boiler active blocks: " + str(merged_blocks))
|
||||
|
||||
currently_on = False
|
||||
for block in merged_blocks:
|
||||
if (block["end"] < datetime.datetime.now(block["start"].tzinfo)):
|
||||
continue
|
||||
|
||||
self.log("Scheduling boiler to turn on between " + block["start"].strftime("%H:%M") + " - " + block["end"].strftime("%H:%M"))
|
||||
if (block["start"] < datetime.datetime.now(block["start"].tzinfo)):
|
||||
self.turn_boiler_on()
|
||||
currently_on = True
|
||||
else:
|
||||
self.run_at(self.turn_boiler_on, block["start"])
|
||||
|
||||
self.run_at(self.turn_boiler_off, block["end"])
|
||||
|
||||
self.set_state(self.output_sensor, state="on" if currently_on else "off", attributes={"active_blocks": boiler_active_blocks, "merged_blocks": merged_blocks})
|
||||
|
||||
|
||||
def turn_boiler_on(self, cb_args=None):
|
||||
self.log("Turning boiler on")
|
||||
self.turn_on(self.boiler_switch)
|
||||
self.set_state(self.output_sensor, state="on")
|
||||
|
||||
def turn_boiler_off(self, cb_args=None):
|
||||
self.log("Turning boiler off")
|
||||
self.turn_off(self.boiler_switch)
|
||||
self.set_state(self.output_sensor, state="off")
|
||||
|
||||
def parse_date(self, date_str):
|
||||
return datetime.datetime.fromisoformat(date_str)
|
||||
Reference in New Issue
Block a user