def create_app(config=AppConfig) -> Flask:
    app = Flask(__name__)
    app.config.from_object(config)
    app.babel = Babel(app)
    cache.init_app(app)
    app.register_blueprint(mkwvconf, url_prefix='/api/mkwvconf')
    return app
def create_app(config=AppConfig) -> Flask:
    app = Flask(__name__)
    app.config.from_object(config)
    app.ioc = Ioc()
    app.babel = Babel(app)
    cache.init_app(app)
    app.register_blueprint(mkwvconf, url_prefix='/api/mkwvconf')
    return app
def create_app():
    app = Flask(__name__)
    app.config.from_object("config")
    database.init_database(app)
    app.toolbar = DebugToolbarExtension(app)
    app.babel = Babel(app)
    @app.before_request
    def before_request():
        g.context = {}
#    elixir.setup_all()
    return app
def create_app(config_file=None):
    app = Flask(__name__)
    app = change_jinja_templates(app)
    if config_file:
        app.config.from_pyfile(config_file)
    else:
        app.config.from_envvar("CLA_PUBLIC_CONFIG")
    if app.config.get("SENTRY_DSN"):
        app.sentry = Sentry(app, dsn=app.config.get("SENTRY_DSN"), logging=True, level=logging.ERROR)
    app.babel = Babel(app)
    app.babel.localeselector(get_locale)
    app.cache = Cache(app)
    app.mail = Mail(app)
    for extension in app.config["EXTENSIONS"]:
        extension.init_app(app)
    app.session_interface = CheckerSessionInterface()
    app.json_encoder = CustomJSONEncoder
    register_error_handlers(app)
    app.add_template_global(honeypot.FIELD_NAME, name="honeypot_field_name")
    app.register_blueprint(base)
    app.register_blueprint(geocoder)
    app.register_blueprint(contact)
    app.register_blueprint(scope)
    if not app.config.get("CONTACT_ONLY"):
        app.register_blueprint(checker)
    logging.config.dictConfig(app.config["LOGGING"])
    # quiet markdown module
    logging.getLogger("MARKDOWN").setLevel(logging.WARNING)
    if app.debug:
        from werkzeug.debug import DebuggedApplication
        app.wsgi_app = DebuggedApplication(app.wsgi_app, True)
    return app
Example #7
File: ioc.py Project: chrisboyd/lokole
def create_app(config=AppConfig) -> Flask:
    app = Flask(__name__, static_url_path=config.APP_ROOT + '/static')
    app.config.from_object(config)
    app.babel = Babel(app)
    app.ioc = _new_ioc(config.IOC)
    cache.init_app(app)
    app.ioc.user_store.init_app(app)
    security.init_app(app,
                      app.ioc.user_store.r,
                      register_form=RegisterForm,
                      login_form=app.ioc.login_form)
    app.register_blueprint(mkwvconf,
                           url_prefix=config.APP_ROOT + '/api/mkwvconf')
    return app
def create_app(config_filename=None):
    app = Flask(__name__)
    app.request_class = Request
    app.url_map.converters['re'] = RegexConverter
    if config_filename is None:
        HerokuConfig(app)
    else:
        AppConfig(app, config_filename)
    from_envvars(app.config, prefix='')
    app.debug = app.config.get('DEBUG')
    # TODO: these variables probably aren't unjsonnable anymore
    # push all variables into the environment
    unjsonnable = (datetime, timedelta)
    data = {k: json.dumps(v) for k, v in app.config.items() if not isinstance(v, unjsonnable)}
    os.environ.update(data)
    # app.logger.info('App is running on port %s', os.environ.get('PORT'))
    if app.config['DEBUG'] is not True:
        log_level = app.config.get('LOG_LEVEL', 'DEBUG')
        app.logger.setLevel(getattr(logging, log_level.upper()))
    import bauble.db as db
    if 'LOG_SQL' in os.environ:
        logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
    db.init_app(app)
    # register flask extensionsa
    SSLify(app, permanent=True)
    app.login_manager = LoginManager(app)
    app.login_manager.login_view = "auth.login"
    app.mail = Mail(app)
    app.babel = Babel(app)
    from .assets import init_app
    init_app(app)
    # TODO: just import everything controllers
    for controller in ['auth', 'index', 'batch']:
        module = import_module('bauble.controllers.{}'.format(controller))
        app.register_blueprint(module.blueprint)
    from bauble.resource import Resource
    controllers = ['search', 'family', 'genus', 'taxon', 'accession',
                   'plant', 'location', 'vernacular_name']
    for controller in controllers:
        module = import_module('bauble.controllers.{}'.format(controller))
        for attr_name in dir(module):
            attr = getattr(module, attr_name)
            # find all the blueprints in the files
            if isinstance(attr, Blueprint):
                app.register_blueprint(attr)
            # if isclass(attr) and issubclass(attr, Blueprint) and attr != Resource:
            #     app.register_blueprint(attr())
    from bauble.error import init_errorhandlers
    init_errorhandlers(app)
    app.json_encoder = JSONEncoder
    return app
def create_app(config_filename="config.py",
               app_name=None,
               register_blueprints=True):
    # App configuration
    app = Flask(app_name or __name__)
    app.config.from_pyfile(config_filename)
    Bootstrap(app)
    app.jinja_env.add_extension("jinja2.ext.with_")
    app.jinja_env.add_extension("jinja2.ext.do")
    app.jinja_env.globals.update(is_admin=is_admin)
    app.jinja_env.globals.update(duration_elapsed_human=duration_elapsed_human)
    app.jinja_env.globals.update(duration_song_human=duration_song_human)
    if HAS_SENTRY:
        sentry_sdk.init(
            app.config["SENTRY_DSN"],
            integrations=[SentryFlaskIntegration(),
                          SentryCeleryIntegration()],
            release=f"{VERSION} ({GIT_VERSION})",
        print(" * Sentry Flask/Celery support activated")
        print(" * Sentry DSN: %s" % app.config["SENTRY_DSN"])
    if app.config["DEBUG"] is True:
        app.jinja_env.auto_reload = True
        app.logger.setLevel(logging.DEBUG)
    # Logging
    if not app.debug:
        formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s "
                                      "[in %(pathname)s:%(lineno)d]")
        file_handler = RotatingFileHandler("%s/errors_app.log" % os.getcwd(),
                                           "a", 1000000, 1)
        file_handler.setLevel(logging.DEBUG)
        file_handler.setFormatter(formatter)
        app.logger.addHandler(file_handler)
    mail.init_app(app)
    migrate = Migrate(app, db)  # noqa: F841
    babel = Babel(app)  # noqa: F841
    app.babel = babel
    toolbar = DebugToolbarExtension(app)  # noqa: F841
    db.init_app(app)
    # ActivityPub backend
    back = Reel2BitsBackend()
    ap.use_backend(back)
    # Setup Flask-Security
    security = Security(  # noqa: F841
        user_datastore,
        register_form=ExtendedRegisterForm,
        confirm_register_form=ExtendedRegisterForm)
    @FlaskSecuritySignals.password_reset.connect_via(app)
    @FlaskSecuritySignals.password_changed.connect_via(app)
    def log_password_reset(sender, user):
        if not user:
            return
        add_user_log(user.id, user.id, "user", "info",
                     "Your password has been changed !")
    @FlaskSecuritySignals.reset_password_instructions_sent.connect_via(app)
    def log_reset_password_instr(sender, user, token):
        if not user:
            return
        add_user_log(user.id, user.id, "user", "info",
                     "Password reset instructions sent.")
    @FlaskSecuritySignals.user_registered.connect_via(app)
    def create_actor_for_registered_user(app, user, confirm_token):
        if not user:
            return
        actor = create_actor(user)
        actor.user = user
        actor.user_id = user.id
        db.session.add(actor)
        db.session.commit()
    @babel.localeselector
    def get_locale():
        # if a user is logged in, use the locale from the user settings
        identity = getattr(g, "identity", None)
        if identity is not None and identity.id:
            return identity.user.locale
        # otherwise try to guess the language from the user accept
        # header the browser transmits.  We support fr/en in this
        # example.  The best match wins.
        return request.accept_languages.best_match(AVAILABLE_LOCALES)
    @babel.timezoneselector
    def get_timezone():
        identity = getattr(g, "identity", None)
        if identity is not None and identity.id:
            return identity.user.timezone
    @app.before_request
    def before_request():
        _config = Config.query.first()
        if not _config:
            flash(gettext("Config not found"), "error")
        cfg = {
            "REEL2BITS_VERSION_VER": VERSION,
            "REEL2BITS_VERSION_GIT": GIT_VERSION,
            "REEL2BITS_VERSION": "{0} ({1})".format(VERSION, GIT_VERSION),
            "app_name": _config.app_name,
            "app_description": _config.app_description,
        g.cfg = cfg
    @app.errorhandler(InvalidUsage)
    def handle_invalid_usage(error):
        response = jsonify(error.to_dict())
        response.status_code = error.status_code
        return response
    sounds = UploadSet("sounds", AUDIO)
    configure_uploads(app, sounds)
    patch_request_class(app, 500 * 1024 * 1024)  # 500m limit
    if register_blueprints:
        from controllers.main import bp_main
        app.register_blueprint(bp_main)
        from controllers.users import bp_users
        app.register_blueprint(bp_users)
        from controllers.admin import bp_admin
        app.register_blueprint(bp_admin)
        from controllers.sound import bp_sound
        app.register_blueprint(bp_sound)
        from controllers.albums import bp_albums
        app.register_blueprint(bp_albums)
        from controllers.search import bp_search
        app.register_blueprint(bp_search)
        from controllers.api.v1.well_known import bp_wellknown
        app.register_blueprint(bp_wellknown)
        from controllers.api.v1.nodeinfo import bp_nodeinfo
        app.register_blueprint(bp_nodeinfo)
        from controllers.api.v1.activitypub import bp_ap
        app.register_blueprint(bp_ap)
    @app.route("/uploads/<string:thing>/<path:stuff>", methods=["GET"])
    def get_uploads_stuff(thing, stuff):
        if app.testing:
            directory = safe_join(app.config["UPLOADS_DEFAULT_DEST"], thing)
            app.logger.debug(f"serving {stuff} from {directory}")
            return send_from_directory(directory, stuff, as_attachment=True)
        else:
            app.logger.debug(f"X-Accel-Redirect serving {stuff}")
            resp = Response("")
            resp.headers[
                "Content-Disposition"] = f"attachment; filename={stuff}"
            resp.headers[
                "X-Accel-Redirect"] = f"/_protected/media/{thing}/{stuff}"
            resp.headers[
                "Content-Type"] = ""  # empty it so Nginx will guess it correctly
            return resp
    @app.errorhandler(404)
    def page_not_found(msg):
        pcfg = {
            "title": gettext("Whoops, something failed."),
            "error": 404,
            "message": gettext("Page not found"),
            "e": msg,
        return render_template("error_page.jinja2", pcfg=pcfg), 404
    @app.errorhandler(403)
    def err_forbidden(msg):
        pcfg = {
            "title": gettext("Whoops, something failed."),
            "error": 403,
            "message": gettext("Access forbidden"),
            "e": msg,
        return render_template("error_page.jinja2", pcfg=pcfg), 403
    @app.errorhandler(410)
    def err_gone(msg):
        pcfg = {
            "title": gettext("Whoops, something failed."),
            "error": 410,
            "message": gettext("Gone"),
            "e": msg
        return render_template("error_page.jinja2", pcfg=pcfg), 410
    if not app.debug:
        @app.errorhandler(500)
        def err_failed(msg):
            pcfg = {
                "title": gettext("Whoops, something failed."),
                "error": 500,
                "message": gettext("Something is broken"),
                "e": msg,
            return render_template("error_page.jinja2", pcfg=pcfg), 500
    @app.after_request
    def set_x_powered_by(response):
        response.headers["X-Powered-By"] = "reel2bits"
        return response
    # Other commands
    @app.cli.command()
    def routes():
        """Dump all routes of defined app"""
        table = texttable.Texttable()
        table.set_deco(texttable.Texttable().HEADER)
        table.set_cols_dtype(["t", "t", "t"])
        table.set_cols_align(["l", "l", "l"])
        table.set_cols_width([50, 30, 80])
        table.add_rows([["Prefix", "Verb", "URI Pattern"]])
        for rule in sorted(app.url_map.iter_rules(), key=lambda x: str(x)):
            methods = ",".join(rule.methods)
            table.add_row([rule.endpoint, methods, rule])
        print(table.draw())
    @app.cli.command()
    def config():
        """Dump config"""
        pp(app.config)
    @app.cli.command()
    def seed():
        """Seed database with default content"""
        make_db_seed(db)
    @app.cli.command()
    def createuser():
        """Create an user"""
        username = click.prompt("Username", type=str)
        email = click.prompt("Email", type=str)
        password = click.prompt("Password",
                                type=str,
                                hide_input=True,
                                confirmation_prompt=True)
        while True:
            role = click.prompt("Role [admin/user]", type=str)
            if role == "admin" or role == "user":
                break
        if click.confirm("Do you want to continue ?"):
            role = Role.query.filter(Role.name == role).first()
            if not role:
                raise click.UsageError("Roles not present in database")
            u = user_datastore.create_user(name=username,
                                           email=email,
                                           password=encrypt_password(password),
                                           roles=[role])
            actor = create_actor(u)
            actor.user = u
            actor.user_id = u.id
            db.session.add(actor)
            db.session.commit()
            if FSConfirmable.requires_confirmation(u):
                FSConfirmable.send_confirmation_instructions(u)
                print("Look at your emails for validation instructions.")
    return app
# -*- coding: utf-8 -*-
Shikin, political donations database.
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.babel import Babel
from . import config
# create our little application
app = Flask(config.APP_NAME)
app.config.from_object(config)
app.config.from_envvar("SHIKIN_SETTINGS", silent=True)
app.dbobj = SQLAlchemy(app)
app.babel = Babel(app)
from . import views
from . import api
from . import pageapi
from . import review
Example #11
Show file
def create_app():
    app = Flask(__name__)
    app.config["DEBUG"] = True
    app.config["SECRET_KEY"] = "super-secret"
    app.config["LOGIN_DISABLED"] = False
    app.config["WTF_CSRF_ENABLED"] = False
    # Don't actually send any email - instead we subscribe to signals
    # and print out required info.
    app.config["MAIL_SUPPRESS_SEND"] = True
    app.config["SECURITY_TWO_FACTOR_SECRET"] = {
        "1": "TjQ9Qa31VOrfEzuPy4VHQWPCTmRzCnFzMKLxXYiZu9B"
    app.config["SECURITY_PASSWORD_SALT"] = "salty"
    # Make this plaintext for most tests - reduces unit test time by 50%
    app.config["SECURITY_PASSWORD_HASH"] = "plaintext"
    # Make this hex_md5 for token tests
    app.config["SECURITY_HASHING_SCHEMES"] = ["hex_md5"]
    app.config["SECURITY_DEPRECATED_HASHING_SCHEMES"] = []
    for opt in [
        "changeable",
        "recoverable",
        "registerable",
        "trackable",
        "NOTpasswordless",
        "confirmable",
        "two_factor",
        app.config["SECURITY_" + opt.upper()] = True
    if os.environ.get("SETTINGS"):
        app.config.from_envvar("SETTINGS")
    mail = Mail(app)
    babel = Babel(app)
    app.babel = babel
    app.json_encoder = JSONEncoder
    app.mail = mail
    # Setup Flask-Security
    user_datastore = SQLAlchemySessionUserDatastore(db_session, User, Role)
    Security(app, user_datastore)
    # Create a user to test with
    @app.before_first_request
    def create_user():
        init_db()
        db_session.commit()
    @user_registered.connect_via(app)
    def on_user_registerd(myapp, user, confirm_token):
        print("User {} registered with token {}".format(user.email, confirm_token))
    @reset_password_instructions_sent.connect_via(app)
    def on_reset(myapp, user, token):
        print("User {} started password reset with token {}".format(user.email, token))
    @tf_security_token_sent.connect_via(app)
    def on_token_sent(myapp, user, token, method):
        print(
            "User {} was sent two factor token {} via {}".format(
                user.email, token, method
    # Views
    @app.route("/")
    @login_required
    def home():
        return render_template_string("Hello {{email}} !", email=current_user.email)
    return app
def app(request):
    app = Flask(__name__)
    app.response_class = Response
    app.debug = True
    app.config["SECRET_KEY"] = "secret"
    app.config["TESTING"] = True
    app.config["LOGIN_DISABLED"] = False
    app.config["WTF_CSRF_ENABLED"] = False
    app.config["SECURITY_TWO_FACTOR_SECRET"] = {
        "1": "TjQ9Qa31VOrfEzuPy4VHQWPCTmRzCnFzMKLxXYiZu9B"
    app.config["SECURITY_TWO_FACTOR_SMS_SERVICE"] = "test"
    app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
    app.config["SECURITY_PASSWORD_SALT"] = "salty"
    # Make this plaintext for most tests - reduces unit test time by 50%
    app.config["SECURITY_PASSWORD_HASH"] = "plaintext"
    # Make this hex_md5 for token tests
    app.config["SECURITY_HASHING_SCHEMES"] = ["hex_md5"]
    app.config["SECURITY_DEPRECATED_HASHING_SCHEMES"] = []
    for opt in [
            "changeable",
            "recoverable",
            "registerable",
            "trackable",
            "passwordless",
            "confirmable",
            "two_factor",
        app.config["SECURITY_" + opt.upper()] = opt in request.keywords
    pytest_major = int(pytest.__version__.split(".")[0])
    if pytest_major >= 4:
        marker_getter = request.node.get_closest_marker
    else:
        marker_getter = request.keywords.get
    settings = marker_getter("settings")
    babel = marker_getter("babel")
    if settings is not None:
        for key, value in settings.kwargs.items():
            app.config["SECURITY_" + key.upper()] = value
    mail = Mail(app)
    if babel is None or babel.args[0]:
        babel = Babel(app)
        app.babel = babel
    app.json_encoder = JSONEncoder
    app.mail = mail
    @app.route("/")
    def index():
        return render_template("index.html", content="Home Page")
    @app.route("/profile")
    @login_required
    def profile():
        return render_template("index.html", content="Profile Page")
    @app.route("/post_login")
    @login_required
    def post_login():
        return render_template("index.html", content="Post Login")
    @app.route("/http")
    @http_auth_required
    def http():
        return "HTTP Authentication"
    @app.route("/http_custom_realm")
    @http_auth_required("My Realm")
    def http_custom_realm():
        return render_template("index.html", content="HTTP Authentication")
    @app.route("/token", methods=["GET", "POST"])
    @auth_token_required
    def token():
        return render_template("index.html", content="Token Authentication")
    @app.route("/multi_auth")
    @auth_required("session", "token", "basic")
    def multi_auth():
        return render_template("index.html",
                               content="Session, Token, Basic auth")
    @app.route("/post_logout")
    def post_logout():
        return render_template("index.html", content="Post Logout")
    @app.route("/post_register")
    def post_register():
        return render_template("index.html", content="Post Register")
    @app.route("/admin")
    @roles_required("admin")
    def admin():
        return render_template("index.html", content="Admin Page")
    @app.route("/admin_and_editor")
    @roles_required("admin", "editor")
    def admin_and_editor():
        return render_template("index.html", content="Admin and Editor Page")
    @app.route("/admin_or_editor")
    @roles_accepted("admin", "editor")
    def admin_or_editor():
        return render_template("index.html", content="Admin or Editor Page")
    @app.route("/unauthorized")
    def unauthorized():
        return render_template("unauthorized.html")
    @app.route("/page1")
    def page_1():
        return "Page 1"
    @app.route("/json", methods=["GET", "POST"])
    def echo_json():
        return jsonify(flask_request.get_json())
    return app
def create_app(config_filename="config.development.Config",
               app_name=None,
               register_blueprints=True):
    # App configuration
    app = Flask(app_name or __name__)
    app_settings = os.getenv("APP_SETTINGS", config_filename)
    print(f" * Loading config: '{app_settings}'")
        cfg = import_string(app_settings)()
    except ImportError:
        print(" *** Cannot import config ***")
        cfg = import_string("config.config.BaseConfig")
        print(" *** Default config loaded, expect problems ***")
    if hasattr(cfg, "post_load"):
        print(" *** Doing some magic")
        cfg.post_load()
    app.config.from_object(cfg)
    app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1)
    Bootstrap(app)
    app.jinja_env.add_extension("jinja2.ext.with_")
    app.jinja_env.add_extension("jinja2.ext.do")
    app.jinja_env.globals.update(is_admin=is_admin)
    if HAS_SENTRY:
        sentry_sdk.init(
            app.config["SENTRY_DSN"],
            integrations=[SentryFlaskIntegration(),
                          SentryCeleryIntegration()],
            release=f"{VERSION} ({GIT_VERSION})",
        print(" * Sentry Flask/Celery support activated")
        print(" * Sentry DSN: %s" % app.config["SENTRY_DSN"])
    if app.debug:
        app.jinja_env.auto_reload = True
        logging.basicConfig(level=logging.DEBUG)
    # Logging
    if not app.debug:
        formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s "
                                      "[in %(pathname)s:%(lineno)d]")
        file_handler = RotatingFileHandler("%s/errors_app.log" % os.getcwd(),
                                           "a", 1000000, 1)
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
        app.logger.addHandler(file_handler)
    dbLogger = logging.getLogger("reel2bits.sqltime")
    dbLogger.setLevel(logging.DEBUG)
    CORS(app, origins=["*"])
    if app.debug:
        logging.getLogger("flask_cors.extension").level = logging.DEBUG
    mail = Mail(app)  # noqa: F841
    migrate = Migrate(app, db)  # noqa: F841 lgtm [py/unused-local-variable]
    babel = Babel(app)  # noqa: F841
    app.babel = babel
    template = {
        "swagger": "2.0",
        "info": {
            "title": "reel2bits API",
            "description": "API instance",
            "version": VERSION
        "host": app.config["AP_DOMAIN"],
        "basePath": "/",
        "schemes": ["https"],
        "securityDefinitions": {
            "OAuth2": {
                "type": "oauth2",
                "flows": {
                    "authorizationCode": {
                        "authorizationUrl":
                        f"https://{app.config['AP_DOMAIN']}/oauth/authorize",
                        "tokenUrl":
                        f"https://{app.config['AP_DOMAIN']}/oauth/token",
                        "scopes": {
                            "read": "Grants read access",
                            "write": "Grants write access",
                            "admin": "Grants admin operations",
        "consumes": ["application/json", "application/jrd+json"],
        "produces": ["application/json", "application/jrd+json"],
    db.init_app(app)
    # ActivityPub backend
    back = Reel2BitsBackend()
    ap.use_backend(back)
    # Oauth
    config_oauth(app)
    # Setup Flask-Security
    security = Security(
        app, user_datastore)  # noqa: F841 lgtm [py/unused-local-variable]
    @FlaskSecuritySignals.password_reset.connect_via(app)
    @FlaskSecuritySignals.password_changed.connect_via(app)
    def log_password_reset(sender, user):
        if not user:
            return
        add_user_log(user.id, user.id, "user", "info",
                     "Your password has been changed !")
    @FlaskSecuritySignals.reset_password_instructions_sent.connect_via(app)
    def log_reset_password_instr(sender, user, token):
        if not user:
            return
        add_user_log(user.id, user.id, "user", "info",
                     "Password reset instructions sent.")
    @FlaskSecuritySignals.user_registered.connect_via(app)
    def create_actor_for_registered_user(app, user, confirm_token):
        if not user:
            return
        actor = create_actor(user)
        actor.user = user
        actor.user_id = user.id
        db.session.add(actor)
        db.session.commit()
    @security.mail_context_processor
    def mail_ctx_proc():
        _config = Config.query.first()
        if not _config:
            print("ERROR: cannot get instance Config from database")
        instance = {"name": None, "url": None}
        if _config:
            instance["name"] = _config.app_name
        instance["url"] = app.config["REEL2BITS_URL"]
        return dict(instance=instance)
    @babel.localeselector
    def get_locale():
        # if a user is logged in, use the locale from the user settings
        identity = getattr(g, "identity", None)
        if identity is not None and identity.id:
            return identity.user.locale
        # otherwise try to guess the language from the user accept
        # header the browser transmits.  We support fr/en in this
        # example.  The best match wins.
        return request.accept_languages.best_match(AVAILABLE_LOCALES)
    @babel.timezoneselector
    def get_timezone():
        identity = getattr(g, "identity", None)
        if identity is not None and identity.id:
            return identity.user.timezone
    @app.before_request
    def before_request():
        _config = Config.query.first()
        if not _config:
            flash(gettext("Config not found"), "error")
        cfg = {
            "REEL2BITS_VERSION_VER": VERSION,
            "REEL2BITS_VERSION_GIT": GIT_VERSION,
            "app_name": _config.app_name,
            "app_description": _config.app_description,
        if GIT_VERSION:
            cfg["REEL2BITS_VERSION"] = "{0}-{1}".format(VERSION, GIT_VERSION)
        else:
            cfg["REEL2BITS_VERSION"] = VERSION
        g.cfg = cfg
    @app.errorhandler(InvalidUsage)
    def handle_invalid_usage(error):
        response = jsonify(error.to_dict())
        response.status_code = error.status_code
        return response
    @event.listens_for(Engine, "before_cursor_execute")
    def before_cursor_execute(conn, cursor, statement, parameters, context,
                              executemany):
        if not False:
            return
        conn.info.setdefault("query_start_time", []).append(time.time())
        dbLogger.debug("Start Query: %s", statement)
    @event.listens_for(Engine, "after_cursor_execute")
    def after_cursor_execute(conn, cursor, statement, parameters, context,
                             executemany):
        if not False:
            return
        total = time.time() - conn.info["query_start_time"].pop(-1)
        dbLogger.debug("Query Complete!")
        dbLogger.debug("Total Time: %f", total)
    # Tracks files upload set
    sounds = UploadSet("sounds", AUDIO)
    configure_uploads(app, sounds)
    # Album artwork upload set
    artworkalbums = UploadSet("artworkalbums",
                              Reel2bitsDefaults.artwork_extensions_allowed)
    configure_uploads(app, artworkalbums)
    # Track artwork upload set
    artworksounds = UploadSet("artworksounds",
                              Reel2bitsDefaults.artwork_extensions_allowed)
    configure_uploads(app, artworksounds)
    # User avatars
    avatars = UploadSet("avatars", Reel2bitsDefaults.avatar_extensions_allowed)
    configure_uploads(app, avatars)
    # Total max size upload for the whole app
    patch_request_class(app, app.config["UPLOAD_TRACK_MAX_SIZE"])
    app.flake_id = FlakeId()
    if register_blueprints:
        from controllers.main import bp_main
        app.register_blueprint(bp_main)
        from controllers.admin import bp_admin
        app.register_blueprint(bp_admin)
        # ActivityPub
        from controllers.api.v1.well_known import bp_wellknown
        app.register_blueprint(bp_wellknown)
        from controllers.api.v1.nodeinfo import bp_nodeinfo
        app.register_blueprint(bp_nodeinfo)
        from controllers.api.v1.ap import bp_ap
        # Feeds
        from controllers.feeds import bp_feeds
        app.register_blueprint(bp_feeds)
        # API
        app.register_blueprint(bp_ap)
        from controllers.api.v1.auth import bp_api_v1_auth
        app.register_blueprint(bp_api_v1_auth)
        from controllers.api.v1.accounts import bp_api_v1_accounts
        app.register_blueprint(bp_api_v1_accounts)
        from controllers.api.v1.timelines import bp_api_v1_timelines
        app.register_blueprint(bp_api_v1_timelines)
        from controllers.api.v1.notifications import bp_api_v1_notifications
        app.register_blueprint(bp_api_v1_notifications)
        from controllers.api.tracks import bp_api_tracks
        app.register_blueprint(bp_api_tracks)
        from controllers.api.albums import bp_api_albums
        app.register_blueprint(bp_api_albums)
        from controllers.api.account import bp_api_account
        app.register_blueprint(bp_api_account)
        from controllers.api.reel2bits import bp_api_reel2bits
        app.register_blueprint(bp_api_reel2bits)
        # Pleroma API
        from controllers.api.pleroma_admin import bp_api_pleroma_admin
        app.register_blueprint(bp_api_pleroma_admin)
        # OEmbed
        from controllers.api.oembed import bp_api_oembed
        app.register_blueprint(bp_api_oembed)
        # Iframe
        from controllers.api.embed import bp_api_embed
        app.register_blueprint(bp_api_embed)
        swagger = Swagger(
            template=template)  # noqa: F841 lgtm [py/unused-local-variable]
        # SPA catchalls for meta tags
        from controllers.spa import bp_spa
        app.register_blueprint(bp_spa)
    @app.route("/uploads/<string:thing>/<path:stuff>", methods=["GET"])
    @cross_origin(origins="*",
                  methods=["GET", "HEAD", "OPTIONS"],
                  expose_headers="content-length",
                  send_wildcard=True)
    def get_uploads_stuff(thing, stuff):
        if app.testing or app.debug:
            directory = safe_join(app.config["UPLOADS_DEFAULT_DEST"], thing)
            app.logger.debug(f"serving {stuff} from {directory}")
            return send_from_directory(directory, stuff, as_attachment=True)
        else:
            app.logger.debug(f"X-Accel-Redirect serving {stuff}")
            resp = Response("")
            resp.headers[
                "Content-Disposition"] = f"attachment; filename={stuff}"
            resp.headers[
                "X-Accel-Redirect"] = f"/_protected/media/{thing}/{stuff}"
            resp.headers[
                "Content-Type"] = ""  # empty it so Nginx will guess it correctly
            return resp
    def render_tags(tags):
        Given a dict like {'tag': 'meta', 'hello': 'world'}
        return a html ready tag like
        <meta hello="world" />
        for tag in tags:
            yield "<{tag} {attrs} />".format(
                tag=tag.pop("tag"),
                attrs=" ".join([
                    '{}="{}"'.format(a, html.escape(str(v)))
                    for a, v in sorted(tag.items()) if v
    @app.errorhandler(404)
    def page_not_found(msg):
        excluded = ["/api", "/.well-known", "/feeds", "/oauth/authorize"]
        if any([request.path.startswith(m) for m in excluded]):
            return jsonify({"error": "page not found"}), 404
        html = get_spa_html(app.config["REEL2BITS_SPA_HTML"])
        head, tail = html.split("</head>", 1)
        request_tags = get_request_head_tags(request)
        default_tags = get_default_head_tags(request.path)
        unique_attributes = ["name", "property"]
        final_tags = request_tags
        skip = []
        for t in final_tags:
            for attr in unique_attributes:
                if attr in t:
                    skip.append(t[attr])
        for t in default_tags:
            existing = False
            for attr in unique_attributes:
                if t.get(attr) in skip:
                    existing = True
                    break
            if not existing:
                final_tags.append(t)
        head += "\n" + "\n".join(render_tags(final_tags)) + "\n</head>"
        return head + tail
    @app.errorhandler(403)
    def err_forbidden(msg):
        if request.path.startswith("/api/"):
            return jsonify({"error": "access forbidden"}), 403
        pcfg = {
            "title": gettext("Whoops, something failed."),
            "error": 403,
            "message": gettext("Access forbidden"),
            "e": msg,
        return render_template("error_page.jinja2", pcfg=pcfg), 403
    @app.errorhandler(410)
    def err_gone(msg):
        if request.path.startswith("/api/"):
            return jsonify({"error": "gone"}), 410
        pcfg = {
            "title": gettext("Whoops, something failed."),
            "error": 410,
            "message": gettext("Gone"),
            "e": msg
        return render_template("error_page.jinja2", pcfg=pcfg), 410
    if not app.debug:
        @app.errorhandler(500)
        def err_failed(msg):
            if request.path.startswith("/api/"):
                return jsonify({"error": "server error"}), 500
            pcfg = {
                "title": gettext("Whoops, something failed."),
                "error": 500,
                "message": gettext("Something is broken"),
                "e": msg,
            return render_template("error_page.jinja2", pcfg=pcfg), 500
    @app.after_request
    def set_x_powered_by(response):
        response.headers["X-Powered-By"] = "reel2bits"
        return response
    # Register CLI commands
    app.cli.add_command(commands.db_datas)
    app.cli.add_command(commands.users)
    app.cli.add_command(commands.roles)
    app.cli.add_command(commands.tracks)
    app.cli.add_command(commands.system)
    return app
def app(request):
    app = Flask(__name__)
    app.response_class = Response
    app.debug = True
    app.config['SECRET_KEY'] = 'secret'
    app.config['TESTING'] = True
    app.config['LOGIN_DISABLED'] = False
    app.config['WTF_CSRF_ENABLED'] = False
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SECURITY_PASSWORD_SALT'] = 'salty'
    for opt in [
            'changeable', 'recoverable', 'registerable', 'trackable',
            'passwordless', 'confirmable'
        app.config['SECURITY_' + opt.upper()] = opt in request.keywords
    if 'settings' in request.keywords:
        for key, value in request.keywords['settings'].kwargs.items():
            app.config['SECURITY_' + key.upper()] = value
    mail = Mail(app)
    if 'babel' not in request.keywords or \
            request.keywords['babel'].args[0]:
        babel = Babel(app)
        app.babel = babel
    app.json_encoder = JSONEncoder
    app.mail = mail
    @app.route('/')
    def index():
        return render_template('index.html', content='Home Page')
    @app.route('/profile')
    @login_required
    def profile():
        return render_template('index.html', content='Profile Page')
    @app.route('/post_login')
    @login_required
    def post_login():
        return render_template('index.html', content='Post Login')
    @app.route('/http')
    @http_auth_required
    def http():
        return 'HTTP Authentication'
    @app.route('/http_custom_realm')
    @http_auth_required('My Realm')
    def http_custom_realm():
        return render_template('index.html', content='HTTP Authentication')
    @app.route('/token', methods=['GET', 'POST'])
    @auth_token_required
    def token():
        return render_template('index.html', content='Token Authentication')
    @app.route('/multi_auth')
    @auth_required('session', 'token', 'basic')
    def multi_auth():
        return render_template('index.html',
                               content='Session, Token, Basic auth')
    @app.route('/post_logout')
    def post_logout():
        return render_template('index.html', content='Post Logout')
    @app.route('/post_register')
    def post_register():
        return render_template('index.html', content='Post Register')
    @app.route('/admin')
    @roles_required('admin')
    def admin():
        return render_template('index.html', content='Admin Page')
    @app.route('/admin_and_editor')
    @roles_required('admin', 'editor')
    def admin_and_editor():
        return render_template('index.html', content='Admin and Editor Page')
    @app.route('/admin_or_editor')
    @roles_accepted('admin', 'editor')
    def admin_or_editor():
        return render_template('index.html', content='Admin or Editor Page')
    @app.route('/unauthorized')
    def unauthorized():
        return render_template('unauthorized.html')
    @app.route('/page1')
    def page_1():
        return 'Page 1'
    return app
def app(request):
    app = Flask(__name__)
    app.response_class = Response
    app.debug = True
    app.config['SECRET_KEY'] = 'secret'
    app.config['TESTING'] = True
    app.config['LOGIN_DISABLED'] = False
    app.config['WTF_CSRF_ENABLED'] = False
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SECURITY_PASSWORD_SALT'] = 'salty'
    # Make this plaintext for most tests - reduces unit test time by 50%
    app.config['SECURITY_PASSWORD_HASH'] = 'plaintext'
    for opt in [
            'changeable', 'recoverable', 'registerable', 'trackable',
            'passwordless', 'confirmable'
        app.config['SECURITY_' + opt.upper()] = opt in request.keywords
    pytest_major = int(pytest.__version__.split('.')[0])
    if pytest_major >= 4:
        marker_getter = request.node.get_closest_marker
    else:
        marker_getter = request.keywords.get
    settings = marker_getter('settings')
    babel = marker_getter('babel')
    if settings is not None:
        for key, value in settings.kwargs.items():
            app.config['SECURITY_' + key.upper()] = value
    mail = Mail(app)
    if babel is None or babel.args[0]:
        babel = Babel(app)
        app.babel = babel
    app.json_encoder = JSONEncoder
    app.mail = mail
    @app.route('/')
    def index():
        return render_template('index.html', content='Home Page')
    @app.route('/profile')
    @login_required
    def profile():
        return render_template('index.html', content='Profile Page')
    @app.route('/post_login')
    @login_required
    def post_login():
        return render_template('index.html', content='Post Login')
    @app.route('/http')
    @http_auth_required
    def http():
        return 'HTTP Authentication'
    @app.route('/http_custom_realm')
    @http_auth_required('My Realm')
    def http_custom_realm():
        return render_template('index.html', content='HTTP Authentication')
    @app.route('/token', methods=['GET', 'POST'])
    @auth_token_required
    def token():
        return render_template('index.html', content='Token Authentication')
    @app.route('/multi_auth')
    @auth_required('session', 'token', 'basic')
    def multi_auth():
        return render_template('index.html',
                               content='Session, Token, Basic auth')
    @app.route('/post_logout')
    def post_logout():
        return render_template('index.html', content='Post Logout')
    @app.route('/post_register')
    def post_register():
        return render_template('index.html', content='Post Register')
    @app.route('/admin')
    @roles_required('admin')
    def admin():
        return render_template('index.html', content='Admin Page')
    @app.route('/admin_and_editor')
    @roles_required('admin', 'editor')
    def admin_and_editor():
        return render_template('index.html', content='Admin and Editor Page')
    @app.route('/admin_or_editor')
    @roles_accepted('admin', 'editor')
    def admin_or_editor():
        return render_template('index.html', content='Admin or Editor Page')
    @app.route('/unauthorized')
    def unauthorized():
        return render_template('unauthorized.html')
    @app.route('/page1')
    def page_1():
        return 'Page 1'
    @app.route('/json', methods=['GET', 'POST'])
    def echo_json():
        return jsonify(flask_request.get_json())
    return app
Example #17
Show file
def create_app(config_filename=None):
    app = Flask(__name__)
    app.request_class = Request
    app.url_map.converters['re'] = RegexConverter
    if config_filename is None:
        HerokuConfig(app)
    else:
        AppConfig(app, config_filename)
    from_envvars(app.config, prefix='')
    app.debug = app.config.get('DEBUG')
    # TODO: these variables probably aren't unjsonnable anymore
    # push all variables into the environment
    unjsonnable = (datetime, timedelta)
    data = {
        k: json.dumps(v)
        for k, v in app.config.items() if not isinstance(v, unjsonnable)
    os.environ.update(data)
    # app.logger.info('App is running on port %s', os.environ.get('PORT'))
    if app.config['DEBUG'] is not True:
        log_level = app.config.get('LOG_LEVEL', 'DEBUG')
        app.logger.setLevel(getattr(logging, log_level.upper()))
    import bauble.db as db
    if 'LOG_SQL' in os.environ:
        logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
    db.init_app(app)
    # register flask extensionsa
    SSLify(app, permanent=True)
    app.login_manager = LoginManager(app)
    app.login_manager.login_view = "auth.login"
    app.mail = Mail(app)
    app.babel = Babel(app)
    from .assets import init_app
    init_app(app)
    # TODO: just import everything controllers
    for controller in ['auth', 'index', 'batch']:
        module = import_module('bauble.controllers.{}'.format(controller))
        app.register_blueprint(module.blueprint)
    from bauble.resource import Resource
    controllers = [
        'search', 'family', 'genus', 'taxon', 'accession', 'plant', 'location',
        'vernacular_name'
    for controller in controllers:
        module = import_module('bauble.controllers.{}'.format(controller))
        for attr_name in dir(module):
            attr = getattr(module, attr_name)
            # find all the blueprints in the files
            if isinstance(attr, Blueprint):
                app.register_blueprint(attr)
            # if isclass(attr) and issubclass(attr, Blueprint) and attr != Resource:
            #     app.register_blueprint(attr())
    from bauble.error import init_errorhandlers
    init_errorhandlers(app)
    app.json_encoder = JSONEncoder
    return app