Yet another Airflow Integration Testing tutorial

Hopefully cleaner than the competition

Vasileios Anagnostopoulos
7 min readMay 13, 2023

Background

Lately I am in project heavily involved with Airflow. When I started I was very eager to have a hands on experience with this technology. I have already created an article for an alternative way of running it. But in order to really understand I decided to go a bit deeper. So my plan was to

  1. Create a dag
  2. Learn how to run it
  3. Provide some way for running automatically and observe the actions of the dag.

While my ambitions were a bit too much I knew that If I got involved and had various refactoring/cleanup iterations the end result would worth it. But everything starts with a good tutorial like this one. This is an integration testing tutorial for Airflow. Before proceeding further I strongly recommend to go through that one to get the baseline maturity. Having read that, like you, I decided to boost my knowledge by heavily modifying it in order to learn better the technology and the toolset around it. Just for the sake of it, I will break it down to various parts and replace them with better alternatives as a learning exercise. Lets set up our roadmap by noting down our extensions.

What follows is pure educational and not production ready. Also the purpose is to do things differently as a motivation and not criticize the original approach. Let’s go.

Moving to the official compose with Mongo and Jupyter

The original docker compose can be downloaded here. If you are on Linux and have permission problems, this solution may be of interest. It worked for our Github Actions. Nothing of interest here. Now we add Mongo DB. For Mongo DB we also need a proper health check. Seraching around the web gives us this snippet (inspiration taken from this stackoverflow post)

For the jupyter things are pretty standard

Jupyter is used as a debugging console. So, if you have issues this jupyter is inside this network and you can refer to various services by their docker compose service names. For example:

http://minio:9000

In order to launch it, run

docker ps

in order to find the container for the Jupyter. Now see the logs

docker logs nc-data-ingestion-datascience-notebook-1

so as to grab the proper url. Do not forget to change 8888 port to 8891 as it is in the docker compose. There is a sample notebook you can use.

Now we can attach our health checks in the common section of airflow:

Extra container setup

We need a custom Airflow to support the Mongo Provider. The original approach directly mounted the requirments file to the docker-compose.yaml. We use the official approach which means a custom build of a Dockerfile. We did not deviate much from the official approach

Instead of configuring variables and connection ids from the UI, we use the Airflow variables and connection ids in docker-compose.yaml tailored to our approarch (no need to provide them manually)

AIRFLOW_CONN_MONGO_STORE: ‘mongodb://myTester:tester123@mongodb:27017/test?authSource=test’
AIRFLOW_VAR_DOCUMENT_COLLECTION: ‘mytest_collection’

In order to avoid state the various volumes are commented out and the jupyter is commented out unless you need it. Keep the volumes for minio and postgres commented out as it is in the repo, otherwise integration tests will fail. Clone the file and make the necessary modifications. Finally we disable examples that clutter our UI through

AIRFLOW__CORE__LOAD_EXAMPLES: ‘false’

Docker compose is done. Let’s proceed to other interesting bits.

Moving to the official Rest client

The original article creates a custom solution to ping the docker compose deployment. Fortunately there is an official client. We use this in order to stay current, re-use the knowledge of Airflow team and slim down our code base. There is a difference though. While the original tries to find out when the dag finishes running within a window, the code is brittle since it does not check for the success state, not explicitely. We improve there and we adopt the time out approach of the original code base. We collect our code in a python file so as to keep our code clean.

The retry part is in this gist.

We explicitly check for success. In this way our triggering of the dag takes the simple form

run_dag(SAMPLE_DAG_ID)

The integration test checks that before triggering, the entry we inserted in mongo has no “processing” field. After triggering a run, upon completion, it has it.

Moving to testcontainers (python)

Why are we moving to testcontainers? The main idea is automation. All inclusive test scripts wil slimmer makefiles or tox scripts. More things are delegated to software. This way code is more uniform. The original script delegated more to a Makefile, even the code thta checks the deployment for liveness, the part that says “Hey, I am ready to run dags”.

Another notable thing is that we run the dag externally. Unlike the original article we interact with the Airflow deployment from the “outside”. Because the deployment is stateless, we start with an empty Mongo. So, our testing code, connects externally, creates a test user and a test collection (and cleans them at the end). But how does it propagate it to the docker-compose.yaml? If we use the original one, nothing would work.

Testcontainers is a way to run Docker containers (among others) programatically. This amounts to interacting with docker runtime through a socket to send commands which configure,start, stop and monitor containers. There are clients for popular programming languages. In our case we are interested in Python. As we mentioned before, we use a docker-compose file, and testcontainers have the ability to run a docker-compose file. Also they have the ability to “waitFor” a container to come up. This means, no need for custom code for pinging or Makefile pinging.

The next snippet (of interest) shows our docker-compose modification approach. We load the included-docker compose file, modify the Mongo connection id (could also as an exercise change the bucket name too) and save to a temporary file, that will be cleaned up at the end. All packed in a context handler

Now we can fire our happy path code via testcontainers

Line 4 replaces the custom code in the original code base.

The dag and its automated test

The dag does not present anything special and is actually the same as the original one. I have added some commented out code for the beginners to see that it “works” by spitting documents. The main dag functionality is to detect documents with field source having a specific value

{“source”: “some_source”}

and mark them with a new field

{“processed”: “true”}

Here is the gist:

Follow the instructions in the README file to start Airflow and view/run your dag. Login/Password are airflow/airflow. For a good Airflow tutorial you can look elsewhere and the next section.

We will present manual testing in the next section. In irder to run the tests you need to install the requirements-dev.txt in a Python 3.10.11/3.11.3 virtual environment. Pyenv-virtualenv is the way I did it. Open your console in this environment and run your test by.

make integration-test

The end result is shown here:

The dag and its manual test

As mentioned previously we have setup a notebook to play with our setup from the “inside”. We will not use in this article Robo-3T. But you can definitely use it for your experimentation. Our first step is to start the deployment after uncommenting the notebook bsection (no need to uncomment volumes and incur state)

docker compose up

Given it is up we can connect to our notebook server by uncovering the url

docker logs airflow-integration-testing-datascience-notebook-1

Do not forget the 8888 -> 8891 change.

Our plan is to create a user a collection and add an entry. These steps can be done manually as explained in README, via mongosh. But the bonus is that in this article we are doing it through a notebook …. that you open for the browser. The setup before running the dag is here. We add the netry to be processed. We also verify that it is not processed.

Now we trigger the graph

We see the double run because of the “catchup”. The notebook now reports the new field

Conclusion

Taking a challenge but starting from a previous excellent baseline is a fruitful way for learning new things. It is a manifestation of the continuation of life. We started from previous work and decided to do things differently. The result is this tutorial article. I hope you enjoyed it as much as I did. As always the code is in Github. Feel free to suggest corrections or other tutorial ideas.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

No responses yet

Write a response