python, Uncategorized

FastAPI Security with a User Database

Header photo by fabio on Unsplash

In the last post we looked at how to enable two-factor authentication with FastAPI using pyotp. We looked at some areas we might explore how to improve on the v1 implementation. In this post we will be updating the implementation to include:

  • A backend user database (in this case SQLite).
  • Functionality to add a new user.
  • Functionality for the user to update their data.

This is based on the FastAPI docs for SQL (Relational) Databases.

As part of this application, we will be using the SQLAlchemy ORM (Object Relational Mapping) toolkit to interact with a backend SQL database. SQLAlchemy helps to achieve a consistent relationship between the object representation in Python and data storage in SQL. Using SQLAlchemy, we can manipulate the objects directly in Python which are then reflected in the SQL database. To install SQLAlchemy you can run pip install SQLAlchemy from the command line.

Clone the Repo

The code for this tutorial can all be found in the v2 directory in the github repo. You will see some other .py files have been added:

  • database.py
  • models.py
  • schemas.py
  • crud.py
  • security.py

The database.py is taken from the tutorial example in the FastAPI docs and I have kept it almost exactly the same aside from changing the SQLALCHEMY_DATABASE_URL.

The models.py file describes the database structure, the tables and (if any) the relationships between the tables. In here we will declare the table name and columns that are in the user table along with their type and any default values.

Next, the schemas that were originally in main.py has now been moved into it’s own schemas.py file. The User schemas have been separated by read, write, read/write and update. There are currently two role types that can be used; admin and user. You can add others if you wish, for example “moderator”.

The crud.py file contains the helper functions used to query the database. CRUD stands for Create, Read, Update, Delete. Keeping these functions together in a separate file prevents code duplication and simplifies maintenance. For this example, there is create, read and update but, at this point, no delete. In main.py we have the functions that determine the endpoints of the API, which can utilise the functions defined in crud.py.

To run the code, type uvicorn main:app --reload and open the docs page.

Connect to the SQLite database

When you run the code, you should see a .db file is created, in this case it is called twofactor_app.db. This is the file that contains the data in the SQLite database. To interact with and view this data you can install the SQLite browser, which is freely available here.

Once installed, you can click on Open Database and then select the .db file from the directory. Initially, you will see a database with no records.

Creating new users

In the last tutorial, there was no way to create new users, aside from hardcoding them into the main.py script. The code has now been extended to enable new users to be added by posting to the /users endpoint.

In the crud.py file you will see a new function, which enables the details for a new user to be added. The password entered will be hashed and a random otp_secret (which we used to generate the QR code in the previous post) is generated as part of the code. Once a change has been implemented, we also need to commit changes using the db.commit() function.

#crud.py
def create_user(db: Session, user: schemas.UserCreate):
    hashed_password = pwd_context.hash(user.password)
    db_user = models.User(
        email=user.email,
        username=user.username,
        full_name=user.full_name,
        hashed_password=hashed_password,
        otp_secret=pyotp.random_base32(),
        role=user.role
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

It has been set up so that only an admin can create a new user. As there are no current users, we will need to manually add an initial user, which I will explain how to do.

In the main.py code, you will see that there is a new function def get_current_active_admin_user in which, if the user logged in is not admin it will return a permissions error. This dependency is shown highlighted in the code for creating a new user in main.py

@app.post("/users/", response_model=schemas.User)
def create_new_user(
    user: schemas.UserCreate,
    db: Session = Depends(get_db),
    current_user: schemas.User = Depends(get_current_active_admin_user)):
    db_user = crud.create_user(db, user)
    return db_user

To add an initial user to the database, use the SQLite browser tool. To add a new record, click on the add a new record symbol in the menu as shown in the following screenshot:

Input the following values into the columns:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "hashed_password": "$2b$12$rMFOsKHq6qaX6bbPB0pb6.ymbwF63soIe19af9qd.1Q8PhbCVfXSO", //this password has already been hashed
  "otp_secret": "LGLEREYEPVVWTLYO", 
  "disabled": 0,
  "role": "admin"
}

Click on “Write Changes” to commit the changes.

Click on the refresh symbol and you will see the new user has been added.

You will now be able to log in in the same way as shown in the previous post using two-factor authentication. Once logged in with an admin user you will be able to create new users using a POST request to the /user endpoint.

To try out creating a new user, click on the POST /users create new user endpoint on the FastAPI application docs page. Click on “Try it out” and then input the following example request body. Click on the “Execute” button.

{
  "username": "alicewonderland",
  "email": "alicewonderland@example.com",
  "full_name": "Alice Wonderland",
  "password": "whiterabbit"
}

User Operations

The v2 directory code now includes the ability for a user to:

  • read their own data information or that of another user (if admin).
  • update their own data

The following endpoints (other than create new user, which was mentioned in the previous section) are now available to use:

GET /users/me – dependent on a user being logged in. If this dependency is fulfilled, this request will return the user details in the response body.

PUT /users/me – dependent on a user being logged in. If this dependency is fulfilled, user can update their own user data and these changes will be updated in the SQLite user database.

GET /users/{user_id} – dependent on a user being logged in and admin. If this dependency is fulfilled, admin users can get the user data for a user based on their user_id.

In the next installment, I will cover how we can make some more improvements so stay tuned! If there is something you are specifically interested in learning about FastAPI, feel free to leave it in the comments.

python

Getting started with Two-Factor Authentication in FastAPI

Header photo by Markus Spiske on Unsplash

DISCLAIMER: This tutorial is not a production ready implementation. It is an introduction into the implementation of two-factor authentication in FastAPI. Some issues are highlighted at the bottom of this article, some of which we will look into into future installments. Any application utilizing personal and/or sensitive information should be properly audited and penetration tested.

I’ve been using FastAPI for a project and, whilst looking at it’s security module, decided to write an article on how to set up Two-Factor (or Multi-Factor) authentication.

FastAPI is a Python package for easily creating REST API endpoints. Many of the tools you need to implement security are already included in the package.

Clone the repo for this tutorial here. The main.py code is originally from the FastAPI security tutorial.

Pre-requisites:

  • Python 3
  • Google Authenticator app (or compatible other) installed on your phone.
  • Clone this github repo that contains the code for this tutorial.

Step 1: Create and activate virtual environment and install FastAPI.

I am starting with the code from the FastAPI security tutorial docs.

Install FastAPI and the required packages:

pip install fastapi[all]
pip install python-jose[cryptography]
pip install passlib[bcrypt]

cd into the v0 directory of the github repo and the run the following command:

uvicorn --reload main:app

You should see the FastAPI application running at the specified (by default http://127.0.0.1:8000/docs) address.

Click on ‘Authorise’ in the top right. Enter the credentials that are in the code:

username: johndoe
password: secret

You can now try to make a GET request on the /users/me endpoint. You will see the following details for this user as the response:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

Step 2: Generating One-Time passwords with PyOTP

To enable the use of a one-time password, we are going to be using the PyOTP library. First install the library using the following command:

pip install pyotp

First, generate a pyotp secret key. This will give a random string with base 32 encoding, which is used to generate the one-time passcodes. You can do the following:

>>> import pyotp 
>>> pyotp.random_base32()
'LGLEREYEPVVWTLYO'

We can now generate a uri that can be used to create a QR code to allow the user to set up their authenticator app with the following code:

>>> pyotp.totp.TOTP('LGLEREYEPVVWTLYO').provisioning_uri(
name='johndoe@example.com', issuer_name='Secure App')
'otpauth://totp/Secure%20App:johndoe%40example.com?secret=LGLEREYEPVVWTLYO&issuer=Secure%20App'

You can use the Qrious codepen example to generate a QR code using the uri we just generated.

Scan the QR code with your authenticator app. You should now be able to see a one-time password that is generated and renewed every thirty seconds.

In the Python shell you can also get the current one-time password by running the following commands:

>>> totp = pyotp.TOTP("LGLEREYEPVVWTLYO")
>>> print("Current OTP:", totp.now())
Current OTP: 654244

Note, you will need to run the totp.now() command in the same 30 second window. If this doesn’t work, ensure there are no typos and check that the date and time zone settings on both the phone you are using and the machine running the code.

Step 3: Integrate PyOTP with FastAPI

So that each user can eventually have their own OTP secret, we need to add a new field to the fake user database for "otp_secret". As an example, for user John Doe, we will use the secret key we generated previously.

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
        "otp_secret": "LGLEREYEPVVWTLYO"
    }
}

Also, add this field to the UserInDB class like so:

class UserInDB(User):
    hashed_password: str
    otp_secret: str

The easiest implementation of the OTP into the existing authentication workflow is to assume that the user will append their one-time password to their password. Thus, we will now change the code to check for both the correct password and currently valid one-time password.

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password[:-6], user.hashed_password):
        return False
    totp = pyotp.TOTP(user.otp_secret)
    if not totp.verify(password[-6:]):
        return False
    return user

Now when you execute this v1 version of the main.py code, you can still authenticate at the FastAPI docs page. However, now the authentication is based on the user entering both the password (secret) + one-time password.

Further considerations

Here are some further considerations for a more production-like implementation, which we will look at in the next posts:

Remove passwords and secrets from source code

At the moment, the passwords and secrets are hard coded into the example. Ideally, we would want to remove these from the source code and in the next post I will go over an example of how to do this.

Upgrade the fake database

The database is currently hardcoded into the main.py script. For a more realistic implementation, we will use a simple database, which I also cover in the next post.

Mechanism for user administration

Currently there is only a single user and adding more users would require changing the source code, which isn’t what we want. For a more realistic scenario, we need a way to add/remove/change users.

Token Expiry

At the moment, in this example, the jwt token expires after 30 minutes. After this, the user needs to log in again using their password and the one-time password. Depending on the type of application, this may not be very user friendly.

raspberry pi

Pairing BBC Microbit with a Raspberry Pi

For a while I have wanted to experiment with sensor events and I recently had a day off so (rather than continuing my re-exploration of the wonderful LOTR trilogy… the book, not the movie) I decided to finally get all the electronics out and give it a whirl.

I have a lot of microbits in the house from running coding clubs so I figured I would use one as a sensor. I also had a Pi Zero that I hadn’t really used so thought I would use it as the machine to which I send the sensor data.

So this is part one of what will be a series where we can explore the possibilities. For this demo, I am going to go over how I got set up getting the microbit to communicate with the Raspberry Pi.

Best friends forever… paired in perfect harmony

It took a lot longer than I thought :/ I had a few issues along the way (many due to my own errors :p). One thing I noticed is that my microbit was a bit sensitive and kept disconnecting from the power source every few minutes, I tried with another which seemed much more stable.

Prerequisites

In order to run this tutorial, you will need the following:

  • A microbit
  • A Raspberry Pi (I used the Pi Zero) with the following installed:
    • Bluezero – I used the latest install 3.0 (I originally had 2.0 but had issues getting this working so reinstalled to the newest at the time of writing this).
      • sudo pip3 install bluezero

Prepare the microbit

Here is a link to the microbit code I used. I started with a set up from one of the issues I found in the python-bluezero github repo. However, I found that with that setup, though the code ran, the microbit kept throwing a 020 error, which seems to relate to memory issues. I thus removed the while loop and now it works without the error.

Let’s go over what it does:

  1. On start it will look for the temperature and uart services. It will also show the bluetooth symbol in the LED matrix.
  2. When the microbit is connected to the Pi it will set the connected variable to "true". It will show a smiley face on the LED matrix 🙂 It will also set uart to read up to the hashtag symbol. The bluetooth UART service is started and it can read data received from the Pi. It will terminate reading when it gets to the ‘#’ symbol. More information about this service can be found here. It will display the data as a string on the microbit led matrix.
  3. When the microbit is disconnected from the Pi it will set the connected variable to "false". It will show a sad face 😦

Download the hex file and load it onto the microbit by dragging it into the microbit drive.

Connect to the Pi

Ensure your Raspberry Pi is connected to a power source and that you know the ip address.

ssh <username>@<ipadress>

Enter the username and password set for your Rasberry Pi.

Pairing the Raspberry Pi with the Microbit

Enter the bluetoothctl by typing bluetoothctl. First we will scan to see which bluetooth devices are available.

  1. Find the microbit

First, your Raspberry Pi needs to find the microbit. To do this run the following command:

scan on

This will start scanning for any bluetooth devices and you will see them appear. The microbit one will look something like:

[NEW] Device A1:B2:C3:D4:E5:FF BBC micro:bit [a name]

Once it appears, type scan off to stop the scanning.

2. Pair with the microbit

To pair the microbit and the Raspberry Pi, you can run

pair <device_address>

So in the example above it would be:

pair A1:B2:C3:D4:E5:FF

When you run this pair command, hold down the A+B buttons on the microbit and press the reset button on the back. You will see a bluetooth symbol, then you can release them.

At first this did not work for me but I changed the pairing settings in MakeCode project settings (as mentioned above) and then it worked.

You can check if it is paired by making sure it is listed when you run the following command:

paired-devices

Clone the bluezero repo

This tutorial uses VS Code to build our Python code. Sadly, it is currently not possible to have VS code full supprt with the Raspberry Pi Zero. More details can be found in this github issue.

Instead, you can use SSH FS extension for VS code. It won’t give you any debugging functionality but you can navigate the folders of your Pi and, make new files, code etc. Then add the Raspberry Pi to the ssh file extension by creating a new ssh configuration.

I started out by cloning the python-bluezero repo and using this file as the start point. I’ve pretty much kept it the same for now so I can run over what is happening.

Additionally, I added the ability to get the temperature from the microbit’s inbuilt temperature sensor. This is going to form some basis for our sensor data for this project.

Through doing this, I found out the temperature sensor data is very boring :p so I am planning on swapping it out next time for accelerometer data instead.

Using the microbit_uart.py code as a base

The first thing the Python script does is import microbit tools from the bluezero package. It imports microbit and async_tools.

The code then calls the microbit function and sets up some variable values, which are explained below:

  • adapter address: bluetooth controller on raspberry pi. You can find this by running list controller from the bluetoothctl.
  • device address: microbit address. You can find this by running paired-devices from the bluetoothctl.
  • It then defines which services are enabled and disabled. E.g
    • temperature_service=True because we will be using this to send temperature from the microbit to the Raspberry Pi.
    • uart_service=True because the example script we are using has a need for the uart service.
    • You can also enable others. You will also need to add these into your microbit code by dragging in the relevant blocks (E.g bluetooth led service if you wanted to use the led display as an event input or bluetooth accelerometer service if you wanted to look at rotation of the microbit as an event)

There are two functions already defined; ping and goodbye.

The ping function will transmit a message to the microbit from the Raspberry Pi. The microbit will reads the message up until the hashtag (as defined in our microbit code). The message is “ping”. The microbit will display this message via the leds.

def ping():     
ubit.uart = 'ping#'
return True

This works through UART (Universal Asynchronous Receiver/Transmitter) over bluetooth. It is used for communication across serial ports that is (as the name suggests) asynchronous. The code that makes this all work is here.

The goodbye function disconnects the microbit from the Raspberry Pi and quits the asynchronous event loop. The EventLoop class is defined here.

We are now going to need to add an additional function and some extra lines of code so we can get the temperature reading from the microbit.

Getting temperature data from the microbit

To enable getting a temperature reading from the microbit, first ensure the temperature service is set to true for the microbit:

temperature_service=True

Then add the following function within the code:

def temperature():
print('Temperature:', ubit.temperature)

Then add the following:

for i in range(3):
eloop.add_timer(i*10000, temperature)

Finally, we need to change the event loop time input where we call the goodbye function to 50000 microseconds (by which time all of the other functions have run).

The code should now look like this:

Next time, we will change this code to remove the uart functionality and add in the ability for us to get data from the accelerometer service as a stream of data that we can use to do some cool stuff with!

data science, python

Text mining: NLTK suite for Python

Today we are going to take a quick look at the NLTK suite for Python.

We could use NLTK for situations where we need to handle human language.

Things like:

  • Customer complaints classification
  • Sentiment analysis
  • Chatbot development
  • Insurance claim description analysis
  • Scanning candidate cvs

In this post, we will start with a large chunk of text (taken from the NLTK Wikipedia page) and then clean it, split it into substrings and then plot the frequency of each word.

First off, we need to import the relevant libraries and packages that we will be using:

#import relevant libraries and packages

import re
import nltk
nltk.download('punkt')
from nltk.corpus import stopwords
nltk.download('stopwords')
from nltk import FreqDist
from nltk.tokenize import RegexpTokenizer

Next we need to create a new object, which is our text from Wikipedia.

#Text below is taken from the NLTK page on Wikipedia. 

my_text = """The Natural Language Toolkit, or more commonly NLTK,
is a suite of libraries and programs for symbolic and statistical
natural language processing (NLP) for English written in the Python
programming language. It was developed by Steven Bird and Edward Loper
in the Department of Computer and Information Science at the University
of Pennsylvania. NLTK includes graphical demonstrations and sample data.
It is accompanied by a book that explains the underlying concepts behind
the language processing tasks supported by the toolkit,plus a cookbook."""

I have assigned it to the variable name my_text.

Next we want to replace any newline notation with a space, so it won’t show \n. For this we use the re module, which enable us to use regular expressions. I have also made everything lower as otherwise it will have ‘Language’ and ‘language’ as two different words.

#substitute \n newline within 'my_text' with a space and assign this to the 'document' object
doc = re.sub('\n', ' ', my_text)
document = doc.lower()

We can use the nltk tokenizer to divide the text up into individual words or even sentences. For example, if I wanted to divide it into sentences I could do:

nltk.sent_tokenize(document)
print(document)

returns:

We are going to divide the string into individual words so that we can plot the frequency of each word. First we want to remove stop words, otherwise our most common word is going to be something like ‘of’, which isn’t so helpful.

NLTK already has a dictionary of stop words that we can use.

stop_words = stopwords.words('english')

In this next step, we are going to write a function that returns only those words that are not in the stop words variable. First, we need to use the tokenizer to divide our string into individual words. We are also going to remove any punctuation, marks otherwise these will also be classed as words.

tokenizer = RegexpTokenizer(r'\w+')
my_words = tokenizer.tokenize(document)

We are then going to create an empty list and call it my_words_ns. Then we have a function that loops through each word from my_words and, for each one that is not found in the stop words, appends it to my_words_ns.

my_words_ns = []

for word in my_words:
    if word not in stop_words:
        my_words_ns.append(word)

NLTK has it’s own frequency distribution function, which we can then use to plot the frequency of each word. Let’s apply it to our list of words.

freqDist = FreqDist(my_words_ns)

You can get the frequency of a specific word like this:

print(freqDist["language"])

Now let’s plot the top ten words:

my_plot = freqDist.plot(10)

And there you have it. This can then be built out in order to inform certain business processes. Next time, we will use things like genism and Microsoft Cognitive Services to explore what we can achieve by harnessing the power of machine learning.

azure, data bricks, databricks, python

Event Hub Streaming Part 2: Reading from Event Hub using Python

In part two of our tutorial, we will read back the events from our messages that we streamed into our Event Hub in part 1. For a real stream, you will need to start the streaming code and ensure that you are sending more than ten messages (otherwise your stream will have stopped by the time you start reading :)). It will still work though.

So the code is pretty much along the same lines, same packages etc. Let’s take a look.

Import the libraries we need:

import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

Set the connection properties to Event Hub:

ADDRESS = "amqps://<namespace.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

This time I am using my listening USER instead of my sending USER policy.

Next we are going to take the events from the Event Hub and print each json transaction message. I will try to go through offsets in a bit more detail another time, but for now this will listen and return back your events.

total = 0
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=5000)
    while batch:
        for event_data in batch:
            print("Received: {}, {}".format(last_offset.value, last_sn))
            print(event_data.message)#body_as_str())
            total += 1
        batch = receiver.receive(timeout=5000)

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time

And voila! You now know how to stream to and read from Azure Event Hub using Python 🙂

Let me know if you have any questions!