ich bin gerade wieder mal dabei, ein altes Skript zu überarbeiten. Ursprünglich waren das nur ein paar lose zusammengesetzte Funktionen. Nun möchte ich etwas Struktur hinein bringen.
Da ich hier in letzter Zeit öfters mal mitbekommen habe, dass über Sinn und Unsinn von Klassen gesprochen wurde ("Du willst keine Klassen nutzen

Es handelt sich um ein Skript, welches Bestellungen aus der MySQL-Datenbank eines Webshops ausliest. Und mir ist aufgefallen, dass ich alleine zum auslesen der Einstellungen eine eigene Klasse geschrieben habe.
Das Skript ist nun auch wieder nur Teil eines großen Ganzen welches ich gerade umsetze. Die Daten werden spätes durch ein anderes Skript weiterverarbeitet und dann in eine Warenwirtschaft importiert.
Ich freue mich wie immer über Kommentare

Code: Alles auswählen
#!/usr/bin/python
# -*- coding: utf8 -*-
import os
import logging
import datetime
import MySQLdb
import MySQLdb.cursors
from decimal import Decimal
from pprint import pprint
from ConfigParser import SafeConfigParser
CONFIGFILE_WEBSHOP = os.path.join (os.path.expanduser("~"), ".config/python_webshop_db_config.ini")
CONFIGFILE_WEBSHOP_SECTION = "data"
CONFIGFILE_WEBSHOP_OPTIONS = ["webshop_host", "webshop_user", "webshop_pass", "webshop_db"]
class MyConfigfile():
def __init__(self):
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:\t%(message)s')
self.configfile_parser = SafeConfigParser()
def write_configfile(self, filename, section, options):
with open(filename , "w") as configfile:
logging.info("writing config file ...")
self.configfile_parser.add_section(section)
for option in options:
self.configfile_parser.set(section, option, "{0}".format(raw_input("please enter {0}: ".format(option))))
self.configfile_parser.write(configfile)
def update_configfile(self, filename, section, options):
with open(filename, "r+") as configfile:
logging.info("updating config file ...")
if not section in self.configfile_parser.sections():
self.configfile_parser.add_section(section)
for option in options:
self.configfile_parser.set(section, option, "{0}".format(raw_input("please enter {0}: ".format(option))))
self.configfile_parser.write(configfile)
def read_configfile(self, filename, section, options):
logging.info("reading config file {}".format(filename))
self.configfile_parser.read(filename)
while not section in self.configfile_parser.sections():
logging.warning("config file corrupt, writing new one ...")
self.write_configfile(filename, section, options)
for option in options:
if not self.configfile_parser.has_option("data", option):
logging.warning("option {0} is missing".format(option))
if not self.configfile_parser.get("data", option):
logging.warning("option {0} has no value".format(option))
self.update_configfile(options=[option])
self.read_configfile(filename, section, options)
def run(self):
if os.path.exists(CONFIGFILE_WEBSHOP):
logging.debug("configfile exists")
self.read_configfile(CONFIGFILE_WEBSHOP, CONFIGFILE_WEBSHOP_SECTION, CONFIGFILE_WEBSHOP_OPTIONS)
else:
logging.warning("configfile not exists")
self.write_configfile(CONFIGFILE_WEBSHOP, CONFIGFILE_WEBSHOP_SECTION, CONFIGFILE_WEBSHOP_OPTIONS)
self.read_configfile(CONFIGFILE_WEBSHOP, CONFIGFILE_WEBSHOP_SECTION, CONFIGFILE_WEBSHOP_OPTIONS)
mysql_opts = {
'host': self.configfile_parser.get("data", "webshop_host"),
'user': self.configfile_parser.get("data", "webshop_user"),
'pass': self.configfile_parser.get("data", "webshop_pass"),
'db' : self.configfile_parser.get("data", "webshop_db")
}
return mysql_opts
class GetModifiedWebshopSales():
def __init__(self):
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:\t%(message)s')
def connect_to_database(self, mysql_opts="" ):
if not mysql_opts:
mysql_opts = MyConfigfile().run()
logging.debug("connection to MySQL database")
mysql = MySQLdb.connect(mysql_opts['host'], mysql_opts['user'], mysql_opts['pass'], mysql_opts['db'], cursorclass=MySQLdb.cursors.DictCursor, charset='utf8')
mysql.apilevel = "2.0"
mysql.threadsafety = 2
mysql.paramstyle = "format"
self.cursor = mysql.cursor()
def get_orders(self, state="processing", number=""):
logging.debug("get orders")
orders = []
# by state
if state:
logging.info("Import order(s) by state")
logging.info("State = {}".format(state))
order_states = { "open" : "1",
"finished" : "3",
"processing" : "2" }
# validate state
if state in ["processing"]:
self.cursor.execute("SELECT * FROM `orders` WHERE `orders_status` = %s", order_states[state])
orders = self.cursor.fetchall()
logging.info("orders found: {}".format(len(orders)))
# by number
elif number:
logging.info("Import order(s) by number")
logging.info("Number = {}".format(number))
cursor.execute("SELECT * FROM `orders` WHERE `orders_id` = %s", i)
orders = cursor.fetchall()
logging.info("orders found: {}".format(len(orders)))
else:
logging.info("No import type statement found")
exit("No import type statement found")
# for order in orders
for order in orders:
pprint (order)
orders_id = order["orders_id"]
order["customers_name"]
order_template = {
"order_description" : "",
"order_total" : Decimal(0),
"order_state" : "",
"order_sale_lines" : {},
"order_party" : {},
"order_shipping_address" : {},
"order_payments" : {}
}
orders_id = order["orders_id"]
# party name
order_template["order_party"]["name"] = order["customers_name"]
order_template["order_party"]["company"] = order["customers_company"]
order_template["order_party"]["street"] = order["billing_street_address"]
order_template["order_party"]["street2"] = order["delivery_suburb"]
order_template["order_party"]["zip"] = order["customers_postcode"]
order_template["order_party"]["city"] = order["customers_city"]
order_template["order_party"]["country"] = order["customers_country"]
order_template["order_party"]["email"] = order["customers_email_address"]
order_template["order_party"]["phone"] = order["customers_telephone"]
order_template["order_party"]["vat"] = order["customers_vat_id"]
order_template["order_shipping_address"]["name"] = order["delivery_name"]
order_template["order_shipping_address"]["company"] = order["delivery_company"]
order_template["order_shipping_address"]["street"] = order["billing_street_address"]
order_template["order_shipping_address"]["street2"] = order["delivery_suburb"]
order_template["order_shipping_address"]["zip"] = order["delivery_postcode"]
order_template["order_shipping_address"]["city"] = order["delivery_city"]
order_template["order_shipping_address"]["country"] = order["delivery_country"]
# get products / order lines
order_lines = []
self.cursor.execute("SELECT * FROM `orders_products` WHERE `orders_id` = %s", order["orders_id"])
orders_lines = self.cursor.fetchall()
logging.info("lines in order: {}".format(len(orders_lines)))
for order_line in orders_lines:
line = {}
line["product_name"] = order_line["products_name"]
line["product_code"] = order_line["products_model"]
line["product_price"] = order_line["products_price"]
line["product_quantity"] = order_line["products_quantity"]
#self.cursor.execute("SELECT * FROM `orders_total` WHERE `orders_id` = %s", order["orders_id"])
#orders_total = self.cursor.fetchall()
#pprint (orders_total)
def main():
base = GetModifiedWebshopSales()
base.connect_to_database()
base.get_orders()
if __name__ == "__main__":
main()