import hassapi as hass import datetime # # Boiler Controller. Tries to optimize when to power (heat up) the boiler with respect to the hourly (dynamic) electricity prices # # Args: # class BoilerControl(hass.Hass): def initialize(self): self.log("Initializing boiler control") self.boiler_switch = self.args['boiler_switch'] self.price_sensor = self.args['price_sensor'] self.boiler_on_for_hours = self.args['boiler_on_for_hours'] self.boiler_reheat_periods_per_day = int(self.args['boiler_reheat_periods_per_day']) self.output_sensor = self.args['output_sensor'] self.log("Price sensor: " + str(self.get_state(self.price_sensor, attribute="raw_today"))) self.determine_cheapest_hours() self.run_daily(self.determine_cheapest_hours, datetime.time(0, 1, 0)) def determine_cheapest_hours(self, cb_args=None): # Get the prices for the current day prices = self.get_state(self.price_sensor, attribute="raw_today") start_of_day = self.parse_date(prices[0]["start"]) # Sort the prices on price prices.sort(key=lambda x: x["value"]) boiler_active_blocks = prices[0:self.boiler_on_for_hours] # Now we also add the cheapest hours per "on period" to the list for i in range(self.boiler_reheat_periods_per_day): period_start = start_of_day + datetime.timedelta(0, i * 24 * 60 * 60 / self.boiler_reheat_periods_per_day) period_end = start_of_day + datetime.timedelta(0, (i + 1) * 24 * 60 * 60 / self.boiler_reheat_periods_per_day) # Find the cheapest block within the period, and add it to the active blocks period_prices = [price for price in prices if self.parse_date(price["start"]) >= period_start and self.parse_date(price["end"]) <= period_end] period_prices.sort(key=lambda x: x["value"]) if period_prices[0] not in boiler_active_blocks: boiler_active_blocks.append(period_prices[0]) # To simplify the scheduling, we merge adjacent blocks boiler_active_blocks.sort(key=lambda x: x["start"]) # Log the active blocks self.log("Calculated best hours to turn the boiler on, on " + start_of_day.strftime("%d-%m-%y") + ": " + ", ".join([self.parse_date(block["start"]).strftime("%H:%M") + " - " + self.parse_date(block["end"]).strftime("%H:%M") + " costing " + str(block["value"]) + "/kWh" for block in boiler_active_blocks])) merged_blocks = [] for block in boiler_active_blocks: if len(merged_blocks) == 0: merged_blocks.append({"start": self.parse_date(block["start"]), "end": self.parse_date(block["end"])}) else: if self.parse_date(block["start"]) == merged_blocks[-1]["end"]: merged_blocks[-1]["end"] = self.parse_date(block["end"]) else: merged_blocks.append({"start": self.parse_date(block["start"]), "end": self.parse_date(block["end"]) }) # self.log("Boiler active blocks: " + str(merged_blocks)) currently_on = False for block in merged_blocks: if (block["end"] < datetime.datetime.now(block["start"].tzinfo)): continue self.log("Scheduling boiler to turn on between " + block["start"].strftime("%H:%M") + " - " + block["end"].strftime("%H:%M")) if (block["start"] < datetime.datetime.now(block["start"].tzinfo)): self.turn_boiler_on() currently_on = True else: self.run_at(self.turn_boiler_on, block["start"]) self.run_at(self.turn_boiler_off, block["end"]) if (self.output_sensor != None): self.set_state(self.output_sensor, state="on" if currently_on else "off", attributes={"active_blocks": boiler_active_blocks, "merged_blocks": merged_blocks, "average_price": sum([block["value"] for block in boiler_active_blocks]) / len(boiler_active_blocks)}) def turn_boiler_on(self, cb_args=None): self.log("Turning boiler on") self.turn_on(self.boiler_switch) if (self.output_sensor != None): self.set_state(self.output_sensor, state="on") def turn_boiler_off(self, cb_args=None): self.log("Turning boiler off") self.turn_off(self.boiler_switch) if (self.output_sensor != None): self.set_state(self.output_sensor, state="off") def parse_date(self, date_str): return datetime.datetime.fromisoformat(date_str)