from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# Get the credentials to connect to the Kafka service.
credentials = config.credentials('kafka')
try:
kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
# Producer
producer = KafkaProducer(
bootstrap_servers=[kafka_server],
value_serializer=lambda x: dumps(x).encode('utf-8')
for e in range(10):
data = {'number' : e}
producer.send('numtest', value=data)
# Consumer
consumer = KafkaConsumer(
bootstrap_servers=[kafka_server],
auto_offset_reset='earliest'
consumer.subscribe(['numtest'])
output = ''
# For demonstration purposes so it doesn't block.
for e in range(10):
message = next(consumer)
output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '
# What a real implementation would do instead.
# for message in consumer:
# output += loads(message.value.decode('UTF-8'))["number"]
return output
except Exception as e:
return e
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# Get the credentials to connect to the Memcached service.
credentials = config.credentials('memcached')
try:
# Try connecting to Memached server.
memcached = pymemcache.Client((credentials['host'], credentials['port']))
memcached.set('Memcached::OPT_BINARY_PROTOCOL', True)
key = "Deploy_day"
value = "Friday"
# Set a value.
memcached.set(key, value)
# Read it back.
test = memcached.get(key)
return 'Found value <strong>{0}</strong> for key <strong>{1}</strong>.'.format(test.decode("utf-8"), key)
except Exception as e:
return e
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# The 'database' relationship is generally the name of primary SQL database of an application.
# It could be anything, though, as in the case here here where it's called "mongodb".
credentials = config.credentials('mongodb')
try:
formatted = config.formatted_credentials('mongodb', 'pymongo')
server = '{0}://{1}:{2}@{3}'.format(
credentials['scheme'],
credentials['username'],
credentials['password'],
formatted
client = MongoClient(server)
collection = client.main.starwars
post = {
"name": "Rey",
"occupation": "Jedi"
post_id = collection.insert_one(post).inserted_id
document = collection.find_one(
{"_id": post_id}
# Clean up after ourselves.
collection.drop()
return 'Found {0} ({1})<br />'.format(document['name'], document['occupation'])
except Exception as e:
return e
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# The 'database' relationship is generally the name of primary SQL database of an application.
# That's not required, but much of our default automation code assumes it.'
credentials = config.credentials('database')
try:
# Connect to the database using PDO. If using some other abstraction layer you would inject the values
# from `database` into whatever your abstraction layer asks for.
conn = pymysql.connect(host=credentials['host'],
port=credentials['port'],
database=credentials['path'],
user=credentials['username'],
password=credentials['password'])
sql = '''
CREATE TABLE People (
id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(30) NOT NULL,
city VARCHAR(30) NOT NULL
cur = conn.cursor()
cur.execute(sql)
sql = '''
INSERT INTO People (name, city) VALUES
('Neil Armstrong', 'Moon'),
('Buzz Aldrin', 'Glen Ridge'),
('Sally Ride', 'La Jolla');
cur.execute(sql)
# Show table.
sql = '''SELECT * FROM People'''
cur.execute(sql)
result = cur.fetchall()
table = '''<table>
<thead>
<tr><th>Name</th><th>City</th></tr>
</thead>
<tbody>'''
if result:
for record in result:
table += '''<tr><td>{0}</td><td>{1}</td><tr>\n'''.format(record[1], record[2])
table += '''</tbody>\n</table>\n'''
# Drop table
sql = '''DROP TABLE People'''
cur.execute(sql)
# Close communication with the database
cur.close()
conn.close()
return table
except Exception as e:
return e
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# The 'database' relationship is generally the name of primary SQL database of an application.
# That's not required, but much of our default automation code assumes it.' \
database = config.credentials('postgresql')
try:
# Connect to the database.
conn_params = {
'host': database['host'],
'port': database['port'],
'dbname': database['path'],
'user': database['username'],
'password': database['password']
conn = psycopg2.connect(**conn_params)
# Open a cursor to perform database operations.
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS People")
# Creating a table.
sql = '''
CREATE TABLE IF NOT EXISTS People (
id SERIAL PRIMARY KEY,
name VARCHAR(30) NOT NULL,
city VARCHAR(30) NOT NULL
cur.execute(sql)
# Insert data.
sql = '''
INSERT INTO People (name, city) VALUES
('Neil Armstrong', 'Moon'),
('Buzz Aldrin', 'Glen Ridge'),
('Sally Ride', 'La Jolla');
cur.execute(sql)
# Show table.
sql = '''SELECT * FROM People'''
cur.execute(sql)
result = cur.fetchall()
table = '''<table>
<thead>
<tr><th>Name</th><th>City</th></tr>
</thead>
<tbody>'''
if result:
for record in result:
table += '''<tr><td>{0}</td><td>{1}</td><tr>\n'''.format(record[1], record[2])
table += '''</tbody>\n</table>\n'''
# Drop table
sql = "DROP TABLE People"
cur.execute(sql)
# Close communication with the database
cur.close()
conn.close()
return table
except Exception as e:
return e
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# Get the credentials to connect to the RabbitMQ service.
credentials = config.credentials('rabbitmq')
try:
# Connect to the RabbitMQ server
creds = pika.PlainCredentials(credentials['username'], credentials['password'])
parameters = pika.ConnectionParameters(credentials['host'], credentials['port'], credentials=creds)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Check to make sure that the recipient queue exists
channel.queue_declare(queue='deploy_days')
# Try sending a message over the channel
channel.basic_publish(exchange='',
routing_key='deploy_days',
body='Friday!')
# Receive the message
def callback(ch, method, properties, body):
print(" [x] Received {}".format(body))
# Tell RabbitMQ that this particular function should receive messages from our 'hello' queue
channel.basic_consume('deploy_days',
callback,
auto_ack=False)
# This blocks on waiting for an item from the queue, so comment it out in this demo script.
# print(' [*] Waiting for messages. To exit press CTRL+C')
# channel.start_consuming()
connection.close()
return " [x] Sent 'Friday!'<br/>"
except Exception as e:
return e
def usage_example():
# Create a new config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
# Get the credentials to connect to the Redis service.
credentials = config.credentials('redis')
try:
redis = Redis(credentials['host'], credentials['port'])
key = "Deploy day"
value = "Friday"
# Set a value
redis.set(key, value)
# Read it back
test = redis.get(key)
return 'Found value <strong>{0}</strong> for key <strong>{1}</strong>.'.format(test.decode("utf-8"), key)
except Exception as e:
return e
from xml.etree import ElementTree as et
import json
from platformshconfig import Config
def usage_example():
# Create a new Config object to ease reading the Platform.sh environment variables.
# You can alternatively use os.environ yourself.
config = Config()
try:
# Get the pysolr-formatted connection string.
formatted_url = config.formatted_credentials('solr', 'pysolr')
# Create a new Solr Client using config variables
client = pysolr.Solr(formatted_url)
# Add a document
message = ''
doc_1 = {
"id": 123,
"name": "Valentina Tereshkova"
result0 = client.add([doc_1], commit=True)
client.commit()
message += 'Adding one document. Status (0 is success): {} <br />'.format(json.loads(result0)['responseHeader']['status'])
# Select one document
query = client.search('*:*')
message += '\nSelecting documents (1 expected): {} <br />'.format(str(query.hits))
# Delete one document
result1 = client.delete(doc_1['id'])
client.commit()
message += '\nDeleting one document. Status (0 is success): {}'.format(et.fromstring(result1)[0][0].text)
return message
except Exception as e:
return e
While you can read the environment directly from your app,
you might want to use the
platformshconfig
library
It decodes service credentials, the correct port, and other information for you.
By default, data is inherited automatically by each child environment from its parent.
If you need to sanitize data in preview environments for compliance,
see how to sanitize databases.
All major Python web frameworks can be deployed on Platform.sh.
See dedicated guides for deploying and working with them:
Django
The following list shows templates available for Python apps.
A template is a starting point for building your project.
It should help you get a project ready for production.
Django 3
This template deploys the Django 3 application framework on Platform.sh, using the gunicorn application runner. It also includes a PostgreSQL database connection pre-configured.
Django is a Python-based web application framework with a built-in ORM.
Django 4
This template builds Django 4 on Platform.sh, using the gunicorn application runner.
Django is a Python-based web application framework with a built-in ORM.
FastAPI
This template demonstrates building the FastAPI framework for Platform.sh. It includes a minimalist application skeleton that demonstrates how to connect to a MariaDB server for data storage and Redis for caching. The application starts as a bare Python process with no separate runner. It is intended for you to use as a starting point and modify for your own needs.
FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.6+ based on standard Python type hints.
Flask
This template demonstrates building the Flask framework for Platform.sh. It includes a minimalist application skeleton that demonstrates how to connect to a MariaDB server for data storage and Redis for caching. The application starts as a bare Python process with no separate runner. It is intended for you to use as a starting point and modify for your own needs.
Flask is a lightweight web microframework for Python.
Pyramid
This template builds Pyramid on Platform.sh. It includes a minimalist application skeleton that demonstrates how to connect to a MariaDB server for data storage and Redis for caching. It is intended for you to use as a starting point and modify for your own needs.
Pyramid is a web framework written in Python.
Wagtail
This template builds the Wagtail CMS on Platform.sh, using the gunicorn application runner. It includes a PostgreSQL database that is configured automatically, and a basic demonstration app that shows how to use it. It is intended for you to use as a starting point and modify for your own needs. You will need to run the command line installation process by logging into the project over SSH after the first deploy.
Wagtail is a web CMS built using the Django framework for Python.