Testing SQL functions

The world of Data Engineering – and LinkedIn as a consequence – is full with the latest and greatest tools for big, bigger, biggest data. But while everybody is chasing the latest functions of Databricks, Snowflake and DuckDB, there is a plethora of companies that are still running good old SQL databases in production. Often times, this is a proper choice and it works for many. As a consequence, data engineering is in practise often done in SQL databases like PostgreSQL. Many SQL databases support functions, that can be used for repetitive tasks and they provide a consistent output. With most programming languages, we are used to writing unit tests, to see if our functions still produce the desired output. For SQL functions, this is often omitted, since the functions live in the database.

At one of my current projects we were facing a similar issue. Database functions live in a Git repo. Through CI/CD pipelines they are deployed to databases. The problem is that they were updated in the database without testing, which can be risky in a production environment. I made a demo setup that shows a way to inlcude the testing of SQL functions during the execution of pytest in the CI/CD pipeline. With some creativity, this can be applied to many more SQL queries, if desired.

Overview

For this demo, I made a repository that mocks an ETL pipeline. If you install the project and run the command `sql-test-demo`, you will see in the log that some ETL process is performed. These are just some print statements for the purpose of this demo, obviously. The interesing part is in the test folder. By utilizing the conftest.py from pytest, we can set up a database connection and use this in the tests. The database needs to be up and running. This can be an existing database, or a Docker instance of PostgreSQL that you spin up before running the tests. The configuration is set up, so that the schema will be reset after execution of all the tests.

.
├── README.md
├── sql
│   ├── format_pc_wpl.sql
├── src
│   ├── __init__.py
│   ├── config.py
|   ├── models.py
│   ├── process.py
|── tests
│   ├── conftest.py
│   ├── test_sql_functions.py
Markdown

Test configuration

Most of the magic is happening in the conftest.py file, so let’s start by examining that.

The conftest.py is the Python file in which various functions are declared which can be accessed across various test files1. Any function that has the @pytest.fixture decorator can be accessed by all the pytest functions when they are being executed. In this particular case, we have one function that had the decorator. This is the db() function. All the other functions in the file are helper functions for the db() function.

If you need to do tests on multiple database platforms, you can create subfolders for each, let’s say a postgresql folder and an oracle folder. In each, you can place a dedicated conftest.py. The functions that you expose through conftest.py are available in the directory with the file and all it’s subdirectories.

db() function start

When pytest is run, it first executes the conftest.py file in order to setup all the shared configuration for the tests. This will start the db function.

First, we will initialize a database engine using SQLAlchemy. The Engine is the starting point for any SQLAlchemy application. It’s “home base” for the actual database and its DBAPI, delivered to the SQLAlchemy application through a connection pool and a Dialect, which describes how to talk to a specific kind of database/DBAPI combination.2

Next, we will create a Metadata object. This will hold all the metadata that describes the database. It will contain things like table definitions and allows us to interact with the database objects.

After that, we will create an address table that holds some test data and we attach it to the Metadata object.

Finally, we will bring all of this together and create the table in the database using the connection and the Metadata object. Then we will insert some test records and yield a connection. In this example we have only 1 table, but this method is easy to extend to multiple tables and inserts.

    # Insert test data
    with engine.connect() as conn:
        for statement in [insert_address_data(address=address)]:
            conn.execute(statement=statement)
            conn.commit()
    # yield engine, run tests
    print("Yield engine for usage in tests.")
    yield engine
Python

Since we yield the connection after our database objects have been created, but before they are cleaned up on line 70, the db() function is now paused, and the tests will run using the database connection we generated. The tests are running as a co-routine to the db() function.

Running tests

Now that the tests are running, they use the same database connection as the one that was used to creathe the test data in the database. The tests have access to the objects created by the db() function in the conftest.py. The test_sql_function.py performs two tests on the database. We will have a look at how these tests differ and what the implications are because of their differences.

test_format_pc_wpl

This test is using the SQLAlchemy objects we created in the conftest.py. It is reading a SQL file that we use in our ETL which contains a PostgreSQL function. The function is deployed to the database using SQLAlchemy, and then a test is performed on the function, to see if it returns the values we expect.

def test_format_pc_wpl(db: Engine):
    """
    Demo test with SQL DDL
    """
    print("Test if the function returns the proper value")
    select_stmt = text("SELECT * FROM format_pc_wpl('1234AB', 'Duckstad')")
    query = get_sql_query(Path("sql/format_pc_wpl.sql"))

    with db.connect() as conn:
        # Create function in database, don't commit so we have a clean database afterwards
        conn.execute(DDL(query)) # Create the function in the database
        res = conn.execute(select_stmt).fetchall()[0]  # return first row
    assert res == ("1234AB Duckstad",)
Python

The SQL function is intended to combine a place name, postal code, house number and optional additional house number indicators into a consistent string.

Because we use the SQLAlchemy tools which we also used to create the test data in the database, you’ll notice that we don’t delete the function when we are done with our tests. This is handled by the db() function when all tests are finished.

test_some_query

This test is creating a Pandas dataframe, writes it as a table to the database and then selects some data from that table. It then asserts that the data we inserted from the dataframe matches what we selected from the database table. The tests in itself doesn’t add much value, but is here for demonstration purposes.

def test_some_query(db: Engine):
    """
    Demo test with a DataFrame for the test data
    """
    print("Test SQL query using pandas")
    # Dataframe with test data
    d = {"col1": [1, 2], "col2": [3, 4]}
    df = pd.DataFrame(data=d)
    df.to_sql("test", con=db.connect(), if_exists="replace", index=False, chunksize=10)
    with db.connect() as conn:
        res = conn.execute(text("SELECT * FROM test")).fetchall()
        # clean up after test
        print("Since we created the table manually outside of SQLAlchemy, we need to clean this table up.")
        conn.execute(text("""DROP TABLE "test" """))
        conn.commit()
        print("Manual table deleted.")

    assert res == [(1, 3), (2, 4)]
Python

As you can see, after we queried the database table and we don’t longer need it for our tests, we have to manually drop the table. If we don’t do this, the table would already exists next time we run our tests, the rows would be inserted again and in stead of 2 rows, we would have 4 rows in the table and our test would fail.

Since we created and populated the table using Pandas in stead of the SQLAlchemy connection directly, the SQLAlchemy Metadata object has no notion of the table we created and the table will not be deleted when the db() function contiues running after the tests are finished. That is why we need to delete these objects manually, which results in more code and can be easliy forgotten. Therefor, this approach is not recommended.

db() function continued

Our tests have run, and the connection is no longer being used. The Pyhton interpreter will now continue running the remainder of the db() function.

    # Delete table after tests
    print("Dropping tables")
    metadata_obj.drop_all(engine)
Python

These last lines will now be executed. The drop_all method on the Metadata object will drop everyhting that the metadata object has knowledge about. This includes the test table with addresses and the PostgreSQL function we created during the first test. As you can see, we can clean everything up with only one command. Next time we run these tests, we will have an empty, clean database again to test against.

Final thoughts

We have seen how we can populate a database, run tests against it and leave it cleaned up when we are done. We also have seen, that when you deviate from this approach, you will still have objects left which can hurt your tests.

Depending on your environment, this can be a viable setup for testing your SQL code together with your Python code before deploying it to production.

An alternative approach would be to create a database when the tests are run, and destroy it again when your are done testing. This is great for isolation, but you would off course need to be able to create the database somewhere when you need it.

As a last concern, this demo will work properly when no tests are run simultaniously. If a colleague creates a table with the same name in their tests, we might get conflicts when we run our tests in parallel. As a work around, you can create all your objects in a dedicated schema with a unique, dynamic name.

I hope this helps increasing your test coverage and prevents nasty calls in the weekend, because a SQL function does something undesired.

  1. Conftest ↩︎
  2. SQLAlchemy Engine ↩︎

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.