In this session we are going to look at how to stream data into event hub using Python.
We will be connecting to the blockchain.info websocket and streaming the transactions into an Azure Event Hub. This is a really fun use case that is easy to get up and running.
Prerequisites:
- An Azure subscription
- An Azure Event Hub
- Python (Jupyter or I am using Databricks in this example)
You will need the following libraries installed on your Databricks cluster:
- websocket-client (PyPi)
- azure-eventhub (PyPi)
In this example, I am setting it to only stream in a few events, but you can change it to keep streaming or stream more events in.
First of all we need to import the various libraries we are going to be using.
import os
import sys
import logging
import time
from azure import eventhub
from azure.eventhub import EventHubClient, Receiver, Offset, EventData
from websocket import create_connection
Then we need to set the connection properties for our Event Hub:
ADDRESS = "amqps://<namespace>.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"
The user is the policy name, which you set for your event hub under the ‘shared access policies’ area. I usually create one policy for sending and one for listening.
The offset and partitioning I will go into more detail another time. For now, don’t worry about these, just add the values above.
Next we need to connect to the blockchain.info websocket. We send it the message that starts the stream.
ws = create_connection("wss://ws.blockchain.info/inv")
ws.send('{"op":"unconfirmed_sub"}')
Now we are only going to receive eleven messages in this code, but you can change it to i >100 (or more) or even remove that part and just keep going.
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
# Create Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="0")
client.run()
i = 0
start_time = time.time()
try:
while True:
sender.send(EventData(ws.recv()))
print(i)
if i > 10:
break
i = i + 1
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
except KeyboardInterrupt:
pass
In Part 2, we look at how to read these events back from the Event Hub.