azure, data bricks, databricks, python

Event Hub Streaming Part 1: Stream into Event hub using Python

In this session we are going to look at how to stream data into event hub using Python.

We will be connecting to the blockchain.info websocket and streaming the transactions into an Azure Event Hub. This is a really fun use case that is easy to get up and running.

Prerequisites:

  • An Azure subscription
  • An Azure Event Hub
  • Python (Jupyter or I am using Databricks in this example)

You will need the following libraries installed on your Databricks cluster:

  • websocket-client (PyPi)
  • azure-eventhub (PyPi)

In this example, I am setting it to only stream in a few events, but you can change it to keep streaming or stream more events in.

First of all we need to import the various libraries we are going to be using.

import os
import sys
import logging
import time
from azure import eventhub
from azure.eventhub import EventHubClient, Receiver, Offset, EventData
from websocket import create_connection

Then we need to set the connection properties for our Event Hub:

ADDRESS = "amqps://<namespace>.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

The user is the policy name, which you set for your event hub under the ‘shared access policies’ area. I usually create one policy for sending and one for listening.

The offset and partitioning I will go into more detail another time. For now, don’t worry about these, just add the values above.

Next we need to connect to the blockchain.info websocket. We send it the message that starts the stream.

ws = create_connection("wss://ws.blockchain.info/inv")
ws.send('{"op":"unconfirmed_sub"}')

Now we are only going to receive eleven messages in this code, but you can change it to i >100 (or more) or even remove that part and just keep going.

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.")
 
    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    
    i = 0
    
    start_time = time.time()
    try:
        while True:
            sender.send(EventData(ws.recv()))
            print(i)
            if i > 10:
                break
            i = i + 1
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time

except KeyboardInterrupt:
    pass

In Part 2, we look at how to read these events back from the Event Hub.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s