我想用我写的一个小程序创建一个UML。到目前为止,我只制作了用于Java程序的UML图,而对于我来说,为python绘制一个图是一个相当新的东西。事实是,存在一个Writer-“ Interface”,例如一个从abc.ABCMeta继承并实现抽象方法write()的类。该接口在两个类中实现。数据库类和CSVWriter类。构造函数(例如,另一个称为DataCollector的类的init()方法)采用已实现Writer接口的类作为参数。然后,实例CSVWriter或数据库将作为实例变量存储在DataCollector对象内。
如何在UML中显示这种关系?Python确实没有接口。在我看来,它只是继承自“接口”。我尝试了UML,并使用WriterInterface聚合了DataCollector。是否可以在接口和类之间使用聚合,还是必须在实现接口的类与Class DataCollector之间绘制聚合?
到目前为止,这是我绘制它的方式:
UML的代码基于:
import os
import abc
import Adafruit_DHT
import smbus
import csv
import RPi.GPIO as GPIO
import time
import datetime
import mh_z19
#import psycopg2
import config as cfg
I2C_SETTINGS = {
"DEVICE": 0x23,
"POWER_DOWN": 0x00,
"POWER_ON": 0x01,
"RESET": 0x07,
"RESOLUTION": {"ONE_TIME_HIGH_RES_MODE_1": 0x20},
"BUS": 1,
}
CSV_HEADERS = [
"timestamp",
"light",
"humidity",
"temperature",
"co2",
"occupancy",
"motion_count",
]
class SensorConnector:
def __init__(self, pin_pir, pin_dht22, i2c_settings):
self.pin_pir = pin_pir
self.pin_dht22 = pin_dht22
self.dht = Adafruit_DHT.DHT22
self.i2c_settings = i2c_settings
self.bus = smbus.SMBus(i2c_settings["BUS"])
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin_pir, GPIO.IN)
def convert_to_number(self, data):
"Simple Function to convert 2 bytes of data into a decimal number"
result = (data[1] + (256 * data[0])) / 1.2
return result
def read_light(self, resolution):
"Read data from I2C Interface"
data = self.bus.read_i2c_block_data(self.i2c_settings["DEVICE"], resolution)
return self.convert_to_number(data)
def read_temp_hum(self):
hum, temp = Adafruit_DHT.read_retry(self.dht, self.pin_dht22)
return hum, temp
def read_sensors(self):
data = {}
data["light"] = self.read_light(
self.i2c_settings["RESOLUTION"]["ONE_TIME_HIGH_RES_MODE_1"]
)
data["humidity"], data["temperature"] = self.read_temp_hum()
data["co2"] = mh_z19.read()["co2"]
return data
class WriterInterface(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return hasattr(subclass, "write") and callable(subclass.write) or NotImplemented
@abc.abstractmethod
def write(self, data):
raise NotImplementedError
class CSVWriter(WriterInterface):
def __init__(self, file, headers):
file_exists = os.path.isfile(file)
self.csv_file = open(file, "a")
self.writer = csv.DictWriter(
self.csv_file, delimiter=",", lineterminator="\n", fieldnames=headers
)
if not file_exists:
self.writer.writeheader()
print("initialized csv_file")
def write(self, data):
self.writer.writerow(data)
self.csv_file.flush()
class Database(WriterInterface):
def __init__(self):
self.con = psycopg2.connect(
host=cfg.postgres["host"],
dbname=cfg.postgres["db"],
user=cfg.postgres["user"],
password=cfg.postgres["passwd"],
)
def write(self, data):
if None not in data.values():
try:
cur = self.con.cursor()
cur.execute(
"""insert into "sensordata" (created_time, temperature, humidity, \
light, occupancy, people_count) values (%s, %s, %s, %s, %s, %s);""",
(
data["timestamp"],
data["temperature"],
data["humidity"],
data["light"],
data["occupancy"],
data["motion_count"],
),
)
self.con.commit()
cur.close()
except Exception as e:
print(e)
else:
print(f"problem with data:\n{data}")
class DataCollector:
def __init__(self, s_connector, writer):
self.s_connector = s_connector
self.writer = writer
self.ts_last_motion = None
self.occupied_state = False
self.motion_count = 0
def motion_handler(self, channel):
print("motion detected")
self.ts_last_motion = datetime.datetime.now()
if self.occupied_state == False:
self.occupied_state = True
self.motion_count += 1
def collect_data(self):
GPIO.add_event_detect(
self.s_connector.pin_pir, GPIO.RISING, callback=self.motion_handler
)
while True:
now = datetime.datetime.now()
timedelta_since_last_motion = (
(now - self.ts_last_motion) if self.ts_last_motion else 0
)
if (
self.occupied_state == True
and timedelta_since_last_motion.seconds > 900
):
print("Room vacant at: ")
print(time.strftime("%H"), ":", time.strftime("%M"))
self.occupied_state = False
data = {
**self.s_connector.read_sensors(),
"occupancy": self.occupied_state,
"timestamp": datetime.datetime.now(),
"motion_count": self.motion_count,
}
print(data)
self.writer.write(data)
time.sleep(60)
if __name__ == "__main__":
s_connector = SensorConnector(pin_pir=17, pin_dht22=4, i2c_settings=I2C_SETTINGS)
csv_writer = CSVWriter('/path/to/data.csv', CSV_HEADERS)
#writer = Database()
dc = DataCollector(s_connector, csv_writer)
dc.collect_data()
可以在接口和类之间使用聚合
是的,这是正确的方法
我是否必须在实现接口的类和类DataCollector之间绘制聚合
否,原因如下:
界面的目标是隐藏有效的实现,与每个实现类都有关系,以揭示实现及其所有相关后果的实现
稍后将出现实现该接口的新类,你不需要每次在新的类上添加修改DataCollector的关系
DataCollector只有一个作者,如果你有多个关系需要排他,这是一种复杂的方法,无所事事