close
close

Estimating Coding Tasks: What Could Go Wrong?

Estimating Coding Tasks: What Could Go Wrong?

Here’s how the task of “adding a hash to an existing DataFrame” went from a few days to almost an entire sprint.

In Q2 2022, I started working on a data pipeline that retrieves market data from a REST service and stores it in a BigQuery table. This is a high-level explanation of the pipeline. The interesting part is how the data is queried, converted to a DataFrame, and then uploaded to BigQuery tables using GCSToBigQuery Operator of AirFlow.

Initially, this seemed simple to write, but Airflow’s “idempotent” principle added a bit of a challenge. What to retrieve from this REST service was decided by another table and even though JOB is idempotent, the table it was using as a reference could change between 2 executions. After spending some extra time, talking with data engineers, the pipelines were ready by the end of Q3 2022.

Fast forward to Q1 2024. At that point, more users were accessing the data and we realized that our query model wasn’t using partitions correctly. Or rather, we wanted to access the data based on a string column, but it’s not possible to partition on a string column in BigQuery. This led to analyzing large amounts of data and frequently hitting the daily quota.

This led us to think about how to partition the data based on string columns. Our data engineer suggested converting this string column to an integer using FarmHash with an additional modulo operation. In the proof of concept, this reduced the scan by almost 90% and the query performance increased by 3-5x. We decided to do this as our final solution. All we needed was:

  1. Create a table with Farmhash fingerprint
  2. Modify the pipeline to calculate the fingerprint
  3. Download the data.

To calculate FarmHash fingerprints in Python, there is a pyfarmhash module. I installed the module and used the code below to calculate the hash, and locally everything worked as desired.

def get_hash(val: str) -> int:
    return additonal_logic(pyfarmhash.fingerprint64(...))

df(hash) = df(Col).apply(get_hash)
Enter full screen mode

Exit full screen mode

With all the tests successful, it was time to push the code to Airflow and run it. I didn’t expect anything to go wrong at this point. In fact, I was happy that everything worked as expected and within the estimated time frame.

With a happy and confident mind, I pushed my changes, started the job, and then waited for 10-15 minutes for it to finish. In the meantime, I moved on to another task. Soon, I received an unexpected failure email from Airflow. I looked at the logs and was surprised to see that it had failed while installing the pyfarmhash module!

To help you understand the problem, I need to explain the structure of the work. The work consists of the following steps:

  1. Download data in parquet format
  2. Upload to GCS bucket
  3. Delete existing data, if any. (avoid duplicate data)
  4. Download the data into the BQ tables.

In this process, task 1, which downloads the data, is a separate Python module. To run it, I used the PythonVirtualenv Operator from Airflow. This operator allows you to specify packages as requirements and then install them into a newly created virtual environment. Once the package is installed, all its dependencies are also installed and you are ready to go.

I added pyfarmhash as a dependency on the module that downloads the data, and everything else remained unchanged. And it failed! Why?

pyfarmhash is a hashing library implemented in C/C++. Upon installation, GCC is required to compile the package, and it was not present on the Airflow host. It made sense not to have GCC on the Airflow host, but unfortunately this was a hindrance for me.

I looked for a pure Python implementation of the pyfarmhash package, but there was none. Then I looked for wheel packages, but again there was none. I considered creating wheel packages and pushing them, but that would have led to a long-term responsibility of providing wheel packages internally. I wanted to avoid extra workaround-type steps. I explored all options and discussed it with the team that manages Airflow. They suggested creating a Docker image and running it in KubernetesPod Operator. This was a good option because I could control the environment and include everything that was needed without depending on an external environment. Also, this solution had no workarounds. The only short-term downside was that it took more time to implement.

Before starting with a Docker-based solution, I had already spent about 16-20 hours on this task. For the Docker-based solution, I also needed:

  1. Modify the Python package to have entry points to start the download and purge logic.
  2. Create a Docker package and test it (this was my second Docker image).

Since I was not going to use PythonVirtualEnvOperator in Airflow anymore, I decided to remove it completely and also improve the workflow. I had to modify the Python package to have entry points to start the download and purge logic

It took me another 30-36 hours to have a final solution with the Docker image ready, which is 6-7 business days and with the initial 2 days included it became a long sprint task.

I look back on all this and wonder if I had to abandon a working solution, change the module structure, create a Docker image, modify 10+ AirFlow tasks to use the Docker image for tasks, face that reality, and get over the initial frustration. All just because “only one Python module required ‘gcc’ to compile!”