Build a Python App with CockroachDB and psycopg2

On this page Carat arrow pointing down
Warning:
CockroachDB v20.1 is no longer supported as of November 12, 2021. For more details, refer to the Release Support Policy.
Tip:

Check out our developer courses at Cockroach University.

This tutorial shows you how build a simple Python application with CockroachDB and the psycopg2 driver. For the CockroachDB back-end, you'll use a temporary local cluster.

Step 1. Install the psycopg2 driver

To install the Python psycopg2 driver, run the following command:

icon/buttons/copy
$ pip install psycopg2-binary

For other ways to install psycopg2, see the official documentation.

Step 2. Start CockroachDB

  1. If you haven't already, download the CockroachDB binary.
  2. Run the cockroach demo command:

    icon/buttons/copy
    $ cockroach demo \
    --empty
    

    This starts a temporary, in-memory cluster and opens an interactive SQL shell to the cluster.

  3. Take note of the (sql/tcp) connection string in the SQL shell welcome text:

    # Connection parameters:
    #   (console) http://127.0.0.1:61009
    #   (sql)     postgres://root:admin@?host=%2Fvar%2Ffolders%2Fk1%2Fr048yqpd7_9337rgxm9vb_gw0000gn%2FT%2Fdemo255013852&port=26257
    #   (sql/tcp) postgres://root:admin@127.0.0.1:61011?sslmode=require    
    

    You will use it in your application code later.

Step 3. Create a database

  1. In the SQL shell, create the bank database that your application will use:

    icon/buttons/copy
    > CREATE DATABASE bank;
    
  2. Create a SQL user for your app:

    icon/buttons/copy
    > CREATE USER <username> WITH PASSWORD <password>;
    

    Take note of the username and password. You will use it in your application code later.

  3. Give the user the necessary permissions:

    icon/buttons/copy
    > GRANT ALL ON DATABASE bank TO <username>;
    

Step 4. Run the Python code

Now that you have a database, you'll run the code shown below to:

  • Create an accounts table and insert some rows.
  • Transfer funds between two accounts inside a transaction.
  • Delete the accounts from the table before exiting so you can re-run the example code.

To handle transaction retry errors, the code uses an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.

Get the code

Download the example.py file, or create the file yourself and copy the code into it.

If you prefer, you can also clone a version of the code:

icon/buttons/copy
$ git clone https://github.com/cockroachlabs/hello-world-python-psycopg2/
icon/buttons/copy
#!/usr/bin/env python3
"""
Test psycopg with CockroachDB.
"""

import logging
import os
import random
import time
import uuid
from argparse import ArgumentParser, RawTextHelpFormatter

import psycopg2
from psycopg2.errors import SerializationFailure
import psycopg2.extras


def create_accounts(conn):
    psycopg2.extras.register_uuid()
    ids = []
    id1 = uuid.uuid4()
    id2 = uuid.uuid4()
    with conn.cursor() as cur:
        cur.execute(
            "CREATE TABLE IF NOT EXISTS accounts (id UUID PRIMARY KEY, balance INT)"
        )
        cur.execute(
            "UPSERT INTO accounts (id, balance) VALUES (%s, 1000), (%s, 250)", (id1, id2))
        logging.debug("create_accounts(): status message: %s",
                      cur.statusmessage)
    conn.commit()
    ids.append(id1)
    ids.append(id2)
    return ids


def delete_accounts(conn):
    with conn.cursor() as cur:
        cur.execute("DELETE FROM accounts")
        logging.debug("delete_accounts(): status message: %s",
                      cur.statusmessage)
    conn.commit()


def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: %s",
                      cur.statusmessage)
        rows = cur.fetchall()
        conn.commit()
        print(f"Balances at {time.asctime()}:")
        for row in rows:
            print("account id: {0}  balance: ${1:2d}".format(row['id'], row['balance']))


def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = %s", (frm,))
        from_balance = cur.fetchone()['balance']
        if from_balance < amount:
            raise RuntimeError(
                f"insufficient funds in {frm}: have {from_balance}, need {amount}"
            )

        # Perform the transfer.
        cur.execute(
            "UPDATE accounts SET balance = balance - %s WHERE id = %s", (
                amount, frm)
        )
        cur.execute(
            "UPDATE accounts SET balance = balance + %s WHERE id = %s", (
                amount, to)
        )

    conn.commit()
    logging.debug("transfer_funds(): status message: %s", cur.statusmessage)


def run_transaction(conn, op, max_retries=3):
    """
    Execute the operation *op(conn)* retrying serialization failure.

    If the database returns an error asking to retry the transaction, retry it
    *max_retries* times before giving up (and propagate it).
    """
    # leaving this block the transaction will commit or rollback
    # (if leaving with an exception)
    with conn:
        for retry in range(1, max_retries + 1):
            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                return

            except SerializationFailure as e:
                # This is a retry error, so we roll back the current
                # transaction and sleep for a bit before retrying. The
                # sleep time increases for each failed transaction.
                logging.debug("got error: %s", e)
                conn.rollback()
                logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                sleep_ms = (2**retry) * 0.1 * (random.random() + 0.5)
                logging.debug("Sleeping %s seconds", sleep_ms)
                time.sleep(sleep_ms)

            except psycopg2.Error as e:
                logging.debug("got error: %s", e)
                logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                raise e

        raise ValueError(
            f"transaction did not succeed after {max_retries} retries")


def main():
    opt = parse_cmdline()
    logging.basicConfig(level=logging.DEBUG if opt.verbose else logging.INFO)
    try:
        # Attempt to connect to cluster with connection string provided to
        # script. By default, this script uses the value saved to the
        # DATABASE_URL environment variable.
        # For information on supported connection string formats, see
        # https://www.cockroachlabs.com/docs/stable/connect-to-the-database.html.
        db_url = opt.dsn
        conn = psycopg2.connect(db_url, 
                                application_name="$ docs_simplecrud_psycopg2", 
                                cursor_factory=psycopg2.extras.RealDictCursor)
    except Exception as e:
        logging.fatal("database connection failed")
        logging.fatal(e)
        return
    ids = create_accounts(conn)
    print_balances(conn)

    amount = 100
    toId = ids.pop()
    fromId = ids.pop()

    try:
        run_transaction(conn, lambda conn: transfer_funds(
            conn, fromId, toId, amount))

    except ValueError as ve:
        # Below, we print the error and continue on so this example is easy to
        # run (and run, and run...).  In real code you should handle this error
        # and any others thrown by the database interaction.
        logging.debug("run_transaction(conn, op) failed: %s", ve)
        pass

    print_balances(conn)

    delete_accounts(conn)

    # Close communication with the database.
    conn.close()


def parse_cmdline():
    parser = ArgumentParser(description=__doc__,
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument("-v", "--verbose",
                        action="store_true", help="print debug info")

    parser.add_argument(
        "dsn",
        default=os.environ.get("DATABASE_URL"),
        nargs="?",
        help="""\
database connection string\
 (default: value of the DATABASE_URL environment variable)
            """,
    )

    opt = parser.parse_args()
    if opt.dsn is None:
        parser.error("database connection string not set")
    return opt


if __name__ == "__main__":
    main()

Run the code

The Python code is a command-line utility that accepts the connection string to CockroachDB as a command-line argument. Before running the code, update the connection string as follows:

  • Replace <username> and <password> with the SQL username and password that you created earlier.
  • Replace <hostname> and <port> with the hostname and port in the (sql/tcp) connection string from SQL shell welcome text.
icon/buttons/copy
$ python3 example.py \
'postgresql://<username>:<password>@<hostname>:<port>/bank?sslmode=require'

The output should show the account balances before and after the funds transfer:

Balances at Fri Oct 30 18:27:00 2020:
(1, 1000)
(2, 250)
Balances at Fri Oct 30 18:27:00 2020:
(1, 900)
(2, 350)

What's next?

Read more about using the Python psycopg2 driver.

You might also be interested in the following pages:


Yes No
On this page

Yes No