I want to create an UML from a little Program i wrote. So far i only made UML Diagrams for Java Program and its quite new for me to draw one for python. The thing is, that there is a Writer - "Interface" e.g. a class that inherits from abc.ABCMeta and implements the abstract method write(). This interface is implemented in two classes. A Database class and a CSVWriter Class. The constuctor e.g. the init() method of another class called DataCollector, takes a Class that had implemented the Writer Interface as an argument. The instance CSVWriter or Database will then be stored as an instance variable inside a DataCollector object.
How do i show this relationship within a UML? Python does not really have interfaces. To me it seems that it just inherits from the "interface". I tried a UML and i aggregated the DataCollector with the WriterInterface. Is it ok to use an aggregation between an interface and a class or do i have to draw aggregations between the Classes that implement the interface and the Class DataCollector?
This is how i draw it so far:
The code the UML is based on:
import os
import abc
import Adafruit_DHT
import smbus
import csv
import RPi.GPIO as GPIO
import time
import datetime
import mh_z19
#import psycopg2
import config as cfg
I2C_SETTINGS = {
"DEVICE": 0x23,
"POWER_DOWN": 0x00,
"POWER_ON": 0x01,
"RESET": 0x07,
"RESOLUTION": {"ONE_TIME_HIGH_RES_MODE_1": 0x20},
"BUS": 1,
}
CSV_HEADERS = [
"timestamp",
"light",
"humidity",
"temperature",
"co2",
"occupancy",
"motion_count",
]
class SensorConnector:
def __init__(self, pin_pir, pin_dht22, i2c_settings):
self.pin_pir = pin_pir
self.pin_dht22 = pin_dht22
self.dht = Adafruit_DHT.DHT22
self.i2c_settings = i2c_settings
self.bus = smbus.SMBus(i2c_settings["BUS"])
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin_pir, GPIO.IN)
def convert_to_number(self, data):
"Simple Function to convert 2 bytes of data into a decimal number"
result = (data[1] + (256 * data[0])) / 1.2
return result
def read_light(self, resolution):
"Read data from I2C Interface"
data = self.bus.read_i2c_block_data(self.i2c_settings["DEVICE"], resolution)
return self.convert_to_number(data)
def read_temp_hum(self):
hum, temp = Adafruit_DHT.read_retry(self.dht, self.pin_dht22)
return hum, temp
def read_sensors(self):
data = {}
data["light"] = self.read_light(
self.i2c_settings["RESOLUTION"]["ONE_TIME_HIGH_RES_MODE_1"]
)
data["humidity"], data["temperature"] = self.read_temp_hum()
data["co2"] = mh_z19.read()["co2"]
return data
class WriterInterface(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return hasattr(subclass, "write") and callable(subclass.write) or NotImplemented
@abc.abstractmethod
def write(self, data):
raise NotImplementedError
class CSVWriter(WriterInterface):
def __init__(self, file, headers):
file_exists = os.path.isfile(file)
self.csv_file = open(file, "a")
self.writer = csv.DictWriter(
self.csv_file, delimiter=",", lineterminator="\n", fieldnames=headers
)
if not file_exists:
self.writer.writeheader()
print("initialized csv_file")
def write(self, data):
self.writer.writerow(data)
self.csv_file.flush()
class Database(WriterInterface):
def __init__(self):
self.con = psycopg2.connect(
host=cfg.postgres["host"],
dbname=cfg.postgres["db"],
user=cfg.postgres["user"],
password=cfg.postgres["passwd"],
)
def write(self, data):
if None not in data.values():
try:
cur = self.con.cursor()
cur.execute(
"""insert into "sensordata" (created_time, temperature, humidity, \
light, occupancy, people_count) values (%s, %s, %s, %s, %s, %s);""",
(
data["timestamp"],
data["temperature"],
data["humidity"],
data["light"],
data["occupancy"],
data["motion_count"],
),
)
self.con.commit()
cur.close()
except Exception as e:
print(e)
else:
print(f"problem with data:\n{data}")
class DataCollector:
def __init__(self, s_connector, writer):
self.s_connector = s_connector
self.writer = writer
self.ts_last_motion = None
self.occupied_state = False
self.motion_count = 0
def motion_handler(self, channel):
print("motion detected")
self.ts_last_motion = datetime.datetime.now()
if self.occupied_state == False:
self.occupied_state = True
self.motion_count += 1
def collect_data(self):
GPIO.add_event_detect(
self.s_connector.pin_pir, GPIO.RISING, callback=self.motion_handler
)
while True:
now = datetime.datetime.now()
timedelta_since_last_motion = (
(now - self.ts_last_motion) if self.ts_last_motion else 0
)
if (
self.occupied_state == True
and timedelta_since_last_motion.seconds > 900
):
print("Room vacant at: ")
print(time.strftime("%H"), ":", time.strftime("%M"))
self.occupied_state = False
data = {
**self.s_connector.read_sensors(),
"occupancy": self.occupied_state,
"timestamp": datetime.datetime.now(),
"motion_count": self.motion_count,
}
print(data)
self.writer.write(data)
time.sleep(60)
if __name__ == "__main__":
s_connector = SensorConnector(pin_pir=17, pin_dht22=4, i2c_settings=I2C_SETTINGS)
csv_writer = CSVWriter('/path/to/data.csv', CSV_HEADERS)
#writer = Database()
dc = DataCollector(s_connector, csv_writer)
dc.collect_data()
it ok to use an aggregation between an interface and a class
yes, this is the right way
do i have to draw aggregations between the Classes that implement the interface and the Class DataCollector
no, for several reasons :
the goal of the interface is to hide the effective implementations, having a relation to each implementing classes reveal the implementations with all the associated consequences
new classes implementing the interface can appears later, you do not want to need to add a relation modifying DataCollector to a new one each time it appears
DataCollector has only one writers, if you have several relations they need to be exclusives, this is a complicated way for nothing