Warm tip: This article is reproduced from serverfault.com, please click

UML aggregation between Interface and Classes in Python

发布于 2020-12-05 13:11:41

I want to create an UML from a little Program i wrote. So far i only made UML Diagrams for Java Program and its quite new for me to draw one for python. The thing is, that there is a Writer - "Interface" e.g. a class that inherits from abc.ABCMeta and implements the abstract method write(). This interface is implemented in two classes. A Database class and a CSVWriter Class. The constuctor e.g. the init() method of another class called DataCollector, takes a Class that had implemented the Writer Interface as an argument. The instance CSVWriter or Database will then be stored as an instance variable inside a DataCollector object.

How do i show this relationship within a UML? Python does not really have interfaces. To me it seems that it just inherits from the "interface". I tried a UML and i aggregated the DataCollector with the WriterInterface. Is it ok to use an aggregation between an interface and a class or do i have to draw aggregations between the Classes that implement the interface and the Class DataCollector?

This is how i draw it so far:

enter image description here

The code the UML is based on:

import os
import abc
import Adafruit_DHT
import smbus
import csv
import RPi.GPIO as GPIO
import time
import datetime
import mh_z19
#import psycopg2

import config as cfg


I2C_SETTINGS = {
    "DEVICE": 0x23,
    "POWER_DOWN": 0x00,
    "POWER_ON": 0x01,
    "RESET": 0x07,
    "RESOLUTION": {"ONE_TIME_HIGH_RES_MODE_1": 0x20},
    "BUS": 1,
}

CSV_HEADERS = [
    "timestamp",
    "light",
    "humidity",
    "temperature",
    "co2",
    "occupancy",
    "motion_count",
]


class SensorConnector:
    def __init__(self, pin_pir, pin_dht22, i2c_settings):
        self.pin_pir = pin_pir
        self.pin_dht22 = pin_dht22
        self.dht = Adafruit_DHT.DHT22
        self.i2c_settings = i2c_settings
        self.bus = smbus.SMBus(i2c_settings["BUS"])
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin_pir, GPIO.IN)

    def convert_to_number(self, data):
        "Simple Function to convert 2 bytes of data into a decimal number"
        result = (data[1] + (256 * data[0])) / 1.2
        return result

    def read_light(self, resolution):
        "Read data from I2C Interface"
        data = self.bus.read_i2c_block_data(self.i2c_settings["DEVICE"], resolution)
        return self.convert_to_number(data)

    def read_temp_hum(self):
        hum, temp = Adafruit_DHT.read_retry(self.dht, self.pin_dht22)
        return hum, temp

    def read_sensors(self):
        data = {}
        data["light"] = self.read_light(
            self.i2c_settings["RESOLUTION"]["ONE_TIME_HIGH_RES_MODE_1"]
        )
        data["humidity"], data["temperature"] = self.read_temp_hum()
        data["co2"] = mh_z19.read()["co2"]
        return data


class WriterInterface(metaclass=abc.ABCMeta):
    @classmethod
    def __subclasshook__(cls, subclass):
        return hasattr(subclass, "write") and callable(subclass.write) or NotImplemented

    @abc.abstractmethod
    def write(self, data):
        raise NotImplementedError


class CSVWriter(WriterInterface):
    def __init__(self, file, headers):
        file_exists = os.path.isfile(file)
        self.csv_file = open(file, "a")
        self.writer = csv.DictWriter(
            self.csv_file, delimiter=",", lineterminator="\n", fieldnames=headers
        )
        if not file_exists:
            self.writer.writeheader()
            print("initialized csv_file")

    def write(self, data):
        self.writer.writerow(data)
        self.csv_file.flush()

class Database(WriterInterface):
    def __init__(self):
        self.con = psycopg2.connect(
            host=cfg.postgres["host"],
            dbname=cfg.postgres["db"],
            user=cfg.postgres["user"],
            password=cfg.postgres["passwd"],
        )

    def write(self, data):
        if None not in data.values():
            try:
                cur = self.con.cursor()
                cur.execute(
                    """insert into "sensordata" (created_time, temperature, humidity, \
                    light, occupancy, people_count) values (%s, %s, %s, %s, %s, %s);""",
                    (
                        data["timestamp"],
                        data["temperature"],
                        data["humidity"],
                        data["light"],
                        data["occupancy"],
                        data["motion_count"],
                    ),
                )
                self.con.commit()
                cur.close()
            except Exception as e:
                print(e)
        else:
            print(f"problem with data:\n{data}")


class DataCollector:
    def __init__(self, s_connector, writer):
        self.s_connector = s_connector
        self.writer = writer
        self.ts_last_motion = None
        self.occupied_state = False
        self.motion_count = 0

    def motion_handler(self, channel):
        print("motion detected")
        self.ts_last_motion = datetime.datetime.now()
        if self.occupied_state == False:
            self.occupied_state = True
        self.motion_count += 1

    def collect_data(self):
        GPIO.add_event_detect(
            self.s_connector.pin_pir, GPIO.RISING, callback=self.motion_handler
        )
        while True:
            now = datetime.datetime.now()
            timedelta_since_last_motion = (
                (now - self.ts_last_motion) if self.ts_last_motion else 0
            )
            if (
                self.occupied_state == True
                and timedelta_since_last_motion.seconds > 900
            ):
                print("Room vacant at: ")
                print(time.strftime("%H"), ":", time.strftime("%M"))
                self.occupied_state = False

            data = {
                **self.s_connector.read_sensors(),
                "occupancy": self.occupied_state,
                "timestamp": datetime.datetime.now(),
                "motion_count": self.motion_count,
            }

            print(data)
            self.writer.write(data)
            time.sleep(60)


if __name__ == "__main__":

    s_connector = SensorConnector(pin_pir=17, pin_dht22=4, i2c_settings=I2C_SETTINGS)
    csv_writer = CSVWriter('/path/to/data.csv', CSV_HEADERS)
    #writer = Database()
    dc = DataCollector(s_connector, csv_writer)
    dc.collect_data()
Questioner
jo87casi
Viewed
0
bruno 2020-12-05 21:26:15

it ok to use an aggregation between an interface and a class

yes, this is the right way

do i have to draw aggregations between the Classes that implement the interface and the Class DataCollector

no, for several reasons :

  • the goal of the interface is to hide the effective implementations, having a relation to each implementing classes reveal the implementations with all the associated consequences

  • new classes implementing the interface can appears later, you do not want to need to add a relation modifying DataCollector to a new one each time it appears

  • DataCollector has only one writers, if you have several relations they need to be exclusives, this is a complicated way for nothing