databricks, github, version control

Multi-user Branching Approach using Databricks

In my previous post on version control using Databricks, we looked at how to link GitHub and Databricks. Following on from this I wanted to show a simple branching methodology that could work for a small team collaborating in the Databricks environment. This is for an environment where the users will not pull code down onto their own machine and are potentially new to git in general.

This is not a CI/CD pipeline and isn’t a large scale solution. It’s also very manual and I would recommend looking at something like devops pipelines or even GitHub actions (which I will discuss another time).

In this example, we will have two main branches; master and development. This is pretty standard practise. We are assuming that developers clone from and make pull requests to development and then admins sync up development and master. I won’t be discussing branch policies or anything like that but feel free to ask if you have any questions about how to set those up.

For every feature a new branch will be created by the developer. This is called feature branching.

You will need to read the previous post as we will start where we left off; with our master code folder.

Create a Development Branch

Now we are also going to add a branch for development code in Databricks. To do this, clone a copy of the master file to a development folder in Databricks. Ensure the filename is kept the same for consistency.

Sync this with a new branch called development in GitHub (in the same way you synced the master file. However, this time in ‘Branch’, add development and change the Path in ‘Git Repo’ to match the master one (highlighted).

Create a Feature Branch

To create a feature branch, a developer will clone a copy of the code from the development folder into their own workspace. In Databricks, you have the option to clone a notebook by selecting the tiny arrow next to the notebook. It will give you the option of where you want to put it.

Make a new folder for the feature development (e.g jo_change_c_value) and clone the notebook into there.

Once the notebook has been cloned, we will then sync it with a new feature branch folder in the same way as the image above. In this example, I create a new branch under ‘Git Preferences’ called ‘jo_change_c_value’ and sync my notebook.

I’m then going to change the code and set the c value equal to 4.

a=1
b=2
c=4

print(a)
print(a+b)

Click on save on the right hand side and it will ask you to save and additionally commit changes to GitHub if you want. Save the revision.

Make a PR to Development branch

You will see the change in GitHub.

Click on the ‘Compare & pull request’ and you will see an overview of the changes. In this example, I am asking to push changes to the development branch. This is just for good practise, if you only have a master branch then just ask to merge to there.

Press the green button that says ‘Create Pull request’.

You will then see the following page.

Click ‘Merge pull request’.

Now head to your development notebook in Databricks.

This is a bit odd but your changes will not immediately be visible (not sure why Databricks does this… if anyone does then let me know). On the right hand side in revision history, you will see the second to top record says Commit and then a number.

You need to click on the Commit one and this will be your newly pushed code.

Make a PR to master branch

To sync up the development and master branches, just follow the same process as above. As mentioned, ideally you would have some policies in place so that there is some sort of peer review process on PRs. However, that’s outside the scope of this post.

Databricks Version Control summary

Hopefully I have shown you how to version control for single branch personal and for multi user branching approaches. In my honest opinion, it’s not the most user friendly and there is room for human error (especially with the manual entry of folder paths). Further, it’s not very automated and this could be frustrating.

A better approach would be to employ a CI/CD approach. I’ll create a follow up post on this.

azure, data bricks, databricks, version control

Version control with GitHub and Databricks

In this post I thought I would share a method for version controlling code in Databricks. I will go over a simple Databricks/GitHub sync for personal projects. In the next post I will discuss a method for multi-branching.

Pre-requisites:

  • A Databricks account
  • A GitHub account

Create a notebook in Databricks

Open a new notebook (or alternatively something you would like to version control). For the purposes of this, I have just made a generic Python notebook called test_git.py.

My code in test_git.py is the simplest Python script:

a=1
b=2
c=3

print(a)
print(a+b)

Create a GitHub Repo

Create a new repo in GitHub and initialise it with a readme.md. You will only be using the master branch.

Connect Databricks & GitHub

In the main Databricks UI, in the top right corner you will see a little person; hover and it will say ‘account’. Click on this and then select ‘User Settings’ and then head to the ‘Git Integration’ tab (as shown below).

Select GitHub as your ‘Git provider’. You will need to enter a git token, which you can generate in the GitHub developer settings area. Once you have done this, your GitHub and Databricks account will be linked.

Sync Databricks Notebook with GitHub

Now open the notebook that you want to version control. In the top right, you will see some little icons; select the last one (highlighted below).

This will open up the Git Preferences box, where you can sync the notebook and git together.

By default Databricks will put the Databricks folder structure in the Git repo file path, but you want to change it to match the one in GitHub (see below). Ensure the files have the same name, otherwise it will just write a new file to the folder.

In this case, for the ‘Path in Git Repo’ I am going to create a folder called vc_code and put the git_test.py file inside it.

Press save and then head over to GitHub, you should see your notebook 🙂 Now any changes you make to your code will be synced with GitHub. You can save by selecting the ‘Save Now’ option on the right hand side.

If you are just doing version control for yourself, then you could stop here.

However, if you want to work with multiple users, I will discuss a simple methodology for collaboration in my next post.

azure, data bricks, databricks, python

Event Hub Streaming Part 2: Reading from Event Hub using Python

In part two of our tutorial, we will read back the events from our messages that we streamed into our Event Hub in part 1. For a real stream, you will need to start the streaming code and ensure that you are sending more than ten messages (otherwise your stream will have stopped by the time you start reading :)). It will still work though.

So the code is pretty much along the same lines, same packages etc. Let’s take a look.

Import the libraries we need:

import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

Set the connection properties to Event Hub:

ADDRESS = "amqps://<namespace.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

This time I am using my listening USER instead of my sending USER policy.

Next we are going to take the events from the Event Hub and print each json transaction message. I will try to go through offsets in a bit more detail another time, but for now this will listen and return back your events.

total = 0
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=5000)
    while batch:
        for event_data in batch:
            print("Received: {}, {}".format(last_offset.value, last_sn))
            print(event_data.message)#body_as_str())
            total += 1
        batch = receiver.receive(timeout=5000)

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time

And voila! You now know how to stream to and read from Azure Event Hub using Python 🙂

Let me know if you have any questions!

azure, data bricks, databricks, python

Event Hub Streaming Part 1: Stream into Event hub using Python

In this session we are going to look at how to stream data into event hub using Python.

We will be connecting to the blockchain.info websocket and streaming the transactions into an Azure Event Hub. This is a really fun use case that is easy to get up and running.

Prerequisites:

  • An Azure subscription
  • An Azure Event Hub
  • Python (Jupyter or I am using Databricks in this example)

You will need the following libraries installed on your Databricks cluster:

  • websocket-client (PyPi)
  • azure-eventhub (PyPi)

In this example, I am setting it to only stream in a few events, but you can change it to keep streaming or stream more events in.

First of all we need to import the various libraries we are going to be using.

import os
import sys
import logging
import time
from azure import eventhub
from azure.eventhub import EventHubClient, Receiver, Offset, EventData
from websocket import create_connection

Then we need to set the connection properties for our Event Hub:

ADDRESS = "amqps://<namespace>.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

The user is the policy name, which you set for your event hub under the ‘shared access policies’ area. I usually create one policy for sending and one for listening.

The offset and partitioning I will go into more detail another time. For now, don’t worry about these, just add the values above.

Next we need to connect to the blockchain.info websocket. We send it the message that starts the stream.

ws = create_connection("wss://ws.blockchain.info/inv")
ws.send('{"op":"unconfirmed_sub"}')

Now we are only going to receive eleven messages in this code, but you can change it to i >100 (or more) or even remove that part and just keep going.

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.")
 
    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    
    i = 0
    
    start_time = time.time()
    try:
        while True:
            sender.send(EventData(ws.recv()))
            print(i)
            if i > 10:
                break
            i = i + 1
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time

except KeyboardInterrupt:
    pass

In Part 2, we look at how to read these events back from the Event Hub.