==== web.py ==== #!/usr/bin/env python #encoding=utf8 import hashlib from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy from jinja2 import BaseLoader, ChoiceLoader, TemplateNotFound from config import cfg app = Flask(__name__) app.config.from_object('config-web') app.jinja_env.line_statement_prefix = '%' app.config["SQLALCHEMY_DATABASE_URI"] = cfg["db_uri"] db = SQLAlchemy(app) @app.teardown_appcontext def shotdown_session(exception=None): print "@app.teardown_appcontext: shotdown_session()" db.session.remove() from views import * if __name__ == "__main__": app.debug = True app.run(host = cfg['host'], port = cfg['port'] ) ==== config.py ==== #!/usr/bin/env python #encoding=utf8 LANGUAGES = { "en" : "English", "da" : "Dansk", "de" : "Deutsch" } cfg = {} cfg['runmode'] = "dev" #cfg['runmode'] = "prod" if cfg['runmode'] == "prod": cfg['host'] = '0.0.0.0' cfg['app_root'] = "/data/www/virtual/example.com.com/wc/src/myapp" cfg['static_root'] = "%s/static" % (cfg['app_root']) cfg['product_image_folder'] = "%s/product_images" % (cfg['static_root']) cfg['upload_root'] = "%s/upload" % (cfg['app_root']) cfg["site_url"] = "http://example.com/" cfg['dbhost'] = "localhost" cfg['dbport'] = "5432" cfg['dbname'] = "*" cfg['dbuser'] = "*" cfg['dbpass'] = "*" cfg['log_path'] = "%s/log/shoemyfeet.com.log" % (cfg['app_root']) cfg['log_level'] = "INFO" cfg['log_name'] = "footwear" cfg['data_root'] = "/data/www/virtual/dev.shoemyfeet.com/data" elif cfg["runmode"] == "dev": ... else: print "ERROR: Unknown runmode" import sys sys.exit(1) ==== views.py ==== import os from flask import render_template from flask import url_for from flask import g from flask import request from flask import Response from flask import flash from flask import redirect from flask import session from sqlalchemy.orm.exc import NoResultFound from flask.ext.login import login_user, logout_user, current_user, login_required from flask.ext.login import LoginManager from flask.ext.babel import Babel from flask.ext.babel import gettext, ngettext from web import app from web import db import shared from admin import import_footwear, update_images, import_shoplinks from forms import AdminCssForm, AdminPageForm, AdminEditVendorForm, LoginForm, QuickSearchForm, ShoeSearchForm, AdminEditShopForm from model.meta import Base, Session, engine from model import Vendor from model import Footwear from model import FootwearType from model import Page from model import User from model import Shoplink, Shop from config import LANGUAGES, cfg login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "login" @login_manager.user_loader def load_user(id): user = db.session.query(User).filter(User.id == id).first() if user: return user else: return None babel = Babel(app) @babel.localeselector def get_locale(): # if a user is logged in, use the locale from the user settings user = getattr(g, 'user', None) if user is not None and hasattr(user, 'locale'): app.logger.debug("get_locale: user.locale %s" % user.locale) return user.locale # otherwise try to guess the language from the user accept header # we support en/da/de return request.accept_languages.best_match(LANGUAGES.keys()) @babel.timezoneselector def get_timezone(): user = getattr(g, 'user', None) if user is not None: return user.timezone @app.before_request def before_request(): g.user = current_user g.locale = get_locale() @app.route("/intro") def introduction(): return render_template("introduction.html") @app.route("/js/") def static_proxy(path): # send_static_file will guess the correct MIME type return app.send_static_file(os.path.join('js', path)) @app.route('/login', methods = ['GET', 'POST']) def login(): if g.user is not None and g.user.is_authenticated(): return redirect(url_for("index")) form = LoginForm() if form.validate_on_submit(): username = form.username.data password = form.password.data user = db.session.query(User).filter(User.username==username).first() if user: if user.authenticate(password): login_user(user) flash(gettext("Hej %s. Du er nu logget ind." % (username))) return redirect(request.args.get("next") or url_for("index")) else: flash(gettext("Forkert brugernavn eller adgangskode.")) else: flash(gettext("Forkert brugernavn eller adgangskode.")) db.session.remove() return render_template('login.html', title = 'Sign In', form = form ) @app.route("/", methods = ['GET', 'POST']) def index(): form = QuickSearchForm() page = db.session.query(Page).filter(Page.path=='.WELCOME').first() db.session.remove() if form.validate_on_submit(): return redirect(url_for("match")) return render_template("search.html", form = form, page=page) @app.route("/about") def about(): return render_template("about.html") @app.route("/contact") def contact(): return render_template("contact.html") @app.route("/product/") def show_product(footwear_id): product = db.session.query(Footwear).filter(Footwear.id==footwear_id).one() product_type = db.session.query(FootwearType).filter(FootwearType.id==product.footwear_type).one() vendor = db.session.query(Vendor).filter(Vendor.id==product.vendor).one() shoplinks = db.session.query(Shoplink, Shop).filter( Shoplink.footwear_id == product.id).join(Shop, Shoplink.shop_id == Shop.id).order_by(Shoplink.shop_id).all() print "BEFORE RETURN" resp = render_template("show_product.html", product=product, vendor=vendor, product_type=product_type, shoplinks=shoplinks) db.session.remove() return resp @app.route("/page/") def basic_page(path): page = db.session.query(Page).filter_by(path=path).first() db.session.remove() return render_template("basic_page.html", page=page) @app.route("/css/user.css") def usercss(): cssdata = shared.get_setting('usercss') if cssdata: return Response(cssdata, mimetype='text/css') else: return None [...] ### ADMIN SECTION START @app.route("/admin/logout") @login_required def admin_logout(): logout_user() return redirect(url_for("index")) @app.route("/admin/users", methods=["GET","POST"]) @login_required def admin_users(): users = db.session.query(User).all() db.session.remove() return render_template("admin_users.html", users=users) @app.route("/admin/footwear", methods=["GET","POST"]) @login_required def admin_footwear(): footwear = db.session.query(Footwear).all() db.session.remove() return render_template("admin_footwear.html", footwear=footwear) @app.route("/admin/update-footwear-images/footwear_id/", defaults={'vendor_id' : None }) @app.route("/admin/update-footwear-images/vendor_id/", defaults={'footwear_id' : None}) @app.route("/admin/update-footwear-images/") @login_required def update_footwear_images(footwear_id=None, vendor_id=None): update_images(vendor_id = vendor_id, footwear_id = footwear_id) if footwear_id: flash("Updated images for footwear.id=%s" % (footwear_id)) else: flash("Updated images for vendor.id=%s" % (vendor_id)) return redirect(url_for("admin")) [...] ==== shared.py ==== #!/usr/bin/env python #encoding=utf8 from urllib import urlretrieve import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from config import cfg from model.meta import Base, Session, engine from model import Vendor from model import Setting from model import Footwear import logger from logger import debug, info, warn, error, fatal def set_setting(id, value=None): session = Session() setting = session.query(Setting).filter_by(id=id).first() if not setting: setting = Setting(id=id) setting.value = value session.add(setting) session.commit() def get_setting(id): session = Session() setting = session.query(Setting.value).filter_by(id=id).first() if setting: return setting[0] def update_images(footwear_id=None, vendor_id=None, force=False): session = Session() if footwear_id: query = session.query(Footwear).filter(Footwear.id == int(footwear_id)) elif vendor_id: query = session.query(Footwear).filter(Footwear.vendor == vendor_id) elif force: # Re-download all images query = session.query(Footwear) else: query = session.query(Footwear).filter(Footwear.product_image_url==None) debug("update_images(footwear_id=%s, force=%s)" % (footwear_id, force)) for f in query: info("Download image for %s : from '%s'" % (f, f.product_image_url)) try: filename, headers = urlretrieve(f.product_image_url, "%s/%s.jpg" % (cfg["product_image_folder"], f.id)) debug("filename=%s, headers=%s" % (filename, headers)) if not headers["Content-Type"] == "image/jpeg": warn("Unknown content-type: %s, was expecting image/jpg" % (headers["Content-Type"])) except IOError as e: error("Error occured while downloading image for %s : from '%s'. Exception was: %s" % (f, f.product_image_url, e)) return [...] ==== admin.py ==== #!/usr/bin/env python #encoding=utf8 """Command Line Utility Usage: admin.py (-h | --help) admin.py --version admin.py [-v...] [ -q... ] footwear export admin.py [-v...] [ -q... ] footwear import admin.py [-v...] [ -q... ] footwear export [...] """ import csv import os import sys from decimal import Decimal from random import random from docopt.docopt import docopt from config import cfg from model import * import model import logger from logger import debug, info, warn, error, fatal from shared import match, update_images, set_setting, get_setting def list_settings(): session = Session() settings = session.query(Setting).all() for setting in settings: info(setting) def export_pages(infile): "Export pages" info("Exporting pages") session = Session() with open(infile, 'w') as csvfile: pages = session.query(Page).all() writer = csv.writer(csvfile, delimiter=",", quotechar='"') for page in pages: writer.writerow(convert_to_strings([ page.parent_id, page.path, page.title_da, page.title_en, page.title_de, page.content_type, page.content_da, page.content_en, page.content_de, page.visible ])) def import_pages(infile): "Import pages" info("Importing pages") session = Session() with open(infile, 'rb') as csvfile: reader = csv.reader(csvfile, delimiter=",", quotechar='"') for row in reader: if reader.line_num > 1: parent_id, path, title_da, title_en, title_de, content_type, content_da, content_en, content_de, visible = row page = session.query(Page).filter_by(path=path).first() if page: page.parent_id = parent_id or None page.path = path page.title_da = title_da.decode("utf-8") page.title_en = title_en.decode("utf-8") page.title_de = title_de.decode("utf-8") page.content_type = content_type page.content_da = content_da.decode("utf-8") page.content_en = content_en.decode("utf-8") page.content_de = content_de.decode("utf-8") page.visible = visible else: page = Page( parent_id=parent_id or None, path=path, title_da=title_da.decode("utf-8"), title_en=title_en.decode("utf-8"), title_de=title_de.decode("utf-8"), content_type=content_type, content_da=content_da.decode("utf-8"), content_en=content_en.decode("utf-8"), content_de=content_de.decode("utf-8"), visible=visible) debug("Found page: %s" % page) session.add(page) session.commit() def list_pages(): session= Session() pages = session.query(Page).order_by(Page.id) info("Listing pages") for page in pages: info(page) [...] def main(args): # Parse arguments and run appropriate functions [...] ==== forms.py ==== #encoding=utf8 from flask.ext.wtf import Form from flask.ext.babel import gettext, ngettext from wtforms import TextField, BooleanField, IntegerField, RadioField, SelectField, TextAreaField from wtforms.validators import Required, NumberRange, Optional from sqlalchemy import distinct from web import db from model import Vendor from model import Footwear class AdminCssForm(Form): cssdata = TextAreaField('CSS', validators = [Required()] ) class AdminPageForm(Form): content_type_choices = [ ("html", "HTML"), ("raw", "RAW") ] path = TextField(gettext("Path"), validators = [Required()]) title_da = TextField(gettext("Title (Danish)"), validators = [Required()]) title_en = TextField(gettext("Title (English)"), validators = [Required()]) title_de = TextField(gettext("Title (German)"), validators = [Required()]) content_type = SelectField(gettext("Content type"), validators = [Required()], choices=content_type_choices, default="html") visible = BooleanField(gettext("Visible"), default=True) content_da = TextAreaField(gettext("Content (Danish)"), validators = [Required()]) content_en = TextAreaField(gettext("Content (English)"), validators = [Required()]) content_de = TextAreaField(gettext("Content (German)"), validators = [Required()]) class LoginForm(Form): username = TextField('username', validators = [Required()]) password = TextField('password', validators = [Required()]) remember_me = BooleanField('remember_me', default = False) [...] ==== model/meta.py ==== #!/usr/bin/env python #encoding=utf8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from flask_sqlalchemy import SQLAlchemy from config import cfg engine = create_engine(cfg["db_uri"], echo=False, pool_size=10) db = SQLAlchemy() Base = db.Model Session = scoped_session(sessionmaker(bind=engine)) def init_db(): "Drops and rebuilds all tables" Base.metadata.drop_all(engine) Base.metadata.create_all(engine) ==== model/page.py ==== #!/usr/bin/env python #encoding=utf8 """Page model""" import datetime from flask_sqlalchemy import SQLAlchemy from flask_babel import get_locale from model.meta import Base, db class Page(Base): __tablename__ = "page" id = db.Column(db.Integer, db.Sequence('page_id_seq'), primary_key=True, nullable=False) parent_id = db.Column(db.Integer) path = db.Column(db.String, unique=True, nullable=False) title_da = db.Column(db.String, nullable=False) title_en = db.Column(db.String, nullable=False) title_de = db.Column(db.String, nullable=False) content_da = db.Column(db.Text, nullable=False) content_en = db.Column(db.Text, nullable=False) content_de = db.Column(db.Text, nullable=False) # Possible content_types: # raw will be output as is, typically used for HTML # markdown markdown formatted text, will be output as html content_type = db.Column(db.Text, nullable=False) visible = db.Column(db.Boolean, default=True, nullable=False) date_created = db.Column(db.DateTime, default=datetime.datetime.utcnow, nullable=False) @property def content(self): return getattr(self, 'content_%s' % (get_locale() )) @property def title(self): return getattr(self, 'title_%s' % (get_locale() )) def __repr__(self): return "" % (self.id, self.path, self.title_da, self.visible, self.date_created) @property def children(self): c = db.session.query(Page).filter(Page.parent_id == self.id).all() db.session.remove() if c: return c else: return None ==== model/__init__.py ==== #!/usr/bin/env python #encoding=utf8 """model/__init__.py contains the table definitions, the ORM classes and an init_model() function. This init_model() function must be called at application startup. """ from model.meta import Session, Base from model.footwear import Footwear, FootwearType from model.vendor import Vendor from model.page import Page from model.user import User from model.setting import Setting from model.whitesite import Whitesite from model.shop import Shop, Shoplink