python, Uncategorized

FastAPI Security with a User Database

Header photo by fabio on Unsplash

In the last post we looked at how to enable two-factor authentication with FastAPI using pyotp. We looked at some areas we might explore how to improve on the v1 implementation. In this post we will be updating the implementation to include:

  • A backend user database (in this case SQLite).
  • Functionality to add a new user.
  • Functionality for the user to update their data.

This is based on the FastAPI docs for SQL (Relational) Databases.

As part of this application, we will be using the SQLAlchemy ORM (Object Relational Mapping) toolkit to interact with a backend SQL database. SQLAlchemy helps to achieve a consistent relationship between the object representation in Python and data storage in SQL. Using SQLAlchemy, we can manipulate the objects directly in Python which are then reflected in the SQL database. To install SQLAlchemy you can run pip install SQLAlchemy from the command line.

Clone the Repo

The code for this tutorial can all be found in the v2 directory in the github repo. You will see some other .py files have been added:

  • database.py
  • models.py
  • schemas.py
  • crud.py
  • security.py

The database.py is taken from the tutorial example in the FastAPI docs and I have kept it almost exactly the same aside from changing the SQLALCHEMY_DATABASE_URL.

The models.py file describes the database structure, the tables and (if any) the relationships between the tables. In here we will declare the table name and columns that are in the user table along with their type and any default values.

Next, the schemas that were originally in main.py has now been moved into it’s own schemas.py file. The User schemas have been separated by read, write, read/write and update. There are currently two role types that can be used; admin and user. You can add others if you wish, for example “moderator”.

The crud.py file contains the helper functions used to query the database. CRUD stands for Create, Read, Update, Delete. Keeping these functions together in a separate file prevents code duplication and simplifies maintenance. For this example, there is create, read and update but, at this point, no delete. In main.py we have the functions that determine the endpoints of the API, which can utilise the functions defined in crud.py.

To run the code, type uvicorn main:app --reload and open the docs page.

Connect to the SQLite database

When you run the code, you should see a .db file is created, in this case it is called twofactor_app.db. This is the file that contains the data in the SQLite database. To interact with and view this data you can install the SQLite browser, which is freely available here.

Once installed, you can click on Open Database and then select the .db file from the directory. Initially, you will see a database with no records.

Creating new users

In the last tutorial, there was no way to create new users, aside from hardcoding them into the main.py script. The code has now been extended to enable new users to be added by posting to the /users endpoint.

In the crud.py file you will see a new function, which enables the details for a new user to be added. The password entered will be hashed and a random otp_secret (which we used to generate the QR code in the previous post) is generated as part of the code. Once a change has been implemented, we also need to commit changes using the db.commit() function.

#crud.py
def create_user(db: Session, user: schemas.UserCreate):
    hashed_password = pwd_context.hash(user.password)
    db_user = models.User(
        email=user.email,
        username=user.username,
        full_name=user.full_name,
        hashed_password=hashed_password,
        otp_secret=pyotp.random_base32(),
        role=user.role
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

It has been set up so that only an admin can create a new user. As there are no current users, we will need to manually add an initial user, which I will explain how to do.

In the main.py code, you will see that there is a new function def get_current_active_admin_user in which, if the user logged in is not admin it will return a permissions error. This dependency is shown highlighted in the code for creating a new user in main.py

@app.post("/users/", response_model=schemas.User)
def create_new_user(
    user: schemas.UserCreate,
    db: Session = Depends(get_db),
    current_user: schemas.User = Depends(get_current_active_admin_user)):
    db_user = crud.create_user(db, user)
    return db_user

To add an initial user to the database, use the SQLite browser tool. To add a new record, click on the add a new record symbol in the menu as shown in the following screenshot:

Input the following values into the columns:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "hashed_password": "$2b$12$rMFOsKHq6qaX6bbPB0pb6.ymbwF63soIe19af9qd.1Q8PhbCVfXSO", //this password has already been hashed
  "otp_secret": "LGLEREYEPVVWTLYO", 
  "disabled": 0,
  "role": "admin"
}

Click on “Write Changes” to commit the changes.

Click on the refresh symbol and you will see the new user has been added.

You will now be able to log in in the same way as shown in the previous post using two-factor authentication. Once logged in with an admin user you will be able to create new users using a POST request to the /user endpoint.

To try out creating a new user, click on the POST /users create new user endpoint on the FastAPI application docs page. Click on “Try it out” and then input the following example request body. Click on the “Execute” button.

{
  "username": "alicewonderland",
  "email": "alicewonderland@example.com",
  "full_name": "Alice Wonderland",
  "password": "whiterabbit"
}

User Operations

The v2 directory code now includes the ability for a user to:

  • read their own data information or that of another user (if admin).
  • update their own data

The following endpoints (other than create new user, which was mentioned in the previous section) are now available to use:

GET /users/me – dependent on a user being logged in. If this dependency is fulfilled, this request will return the user details in the response body.

PUT /users/me – dependent on a user being logged in. If this dependency is fulfilled, user can update their own user data and these changes will be updated in the SQLite user database.

GET /users/{user_id} – dependent on a user being logged in and admin. If this dependency is fulfilled, admin users can get the user data for a user based on their user_id.

In the next installment, I will cover how we can make some more improvements so stay tuned! If there is something you are specifically interested in learning about FastAPI, feel free to leave it in the comments.

python

Getting started with Two-Factor Authentication in FastAPI

Header photo by Markus Spiske on Unsplash

DISCLAIMER: This tutorial is not a production ready implementation. It is an introduction into the implementation of two-factor authentication in FastAPI. Some issues are highlighted at the bottom of this article, some of which we will look into into future installments. Any application utilizing personal and/or sensitive information should be properly audited and penetration tested.

I’ve been using FastAPI for a project and, whilst looking at it’s security module, decided to write an article on how to set up Two-Factor (or Multi-Factor) authentication.

FastAPI is a Python package for easily creating REST API endpoints. Many of the tools you need to implement security are already included in the package.

Clone the repo for this tutorial here. The main.py code is originally from the FastAPI security tutorial.

Pre-requisites:

  • Python 3
  • Google Authenticator app (or compatible other) installed on your phone.
  • Clone this github repo that contains the code for this tutorial.

Step 1: Create and activate virtual environment and install FastAPI.

I am starting with the code from the FastAPI security tutorial docs.

Install FastAPI and the required packages:

pip install fastapi[all]
pip install python-jose[cryptography]
pip install passlib[bcrypt]

cd into the v0 directory of the github repo and the run the following command:

uvicorn --reload main:app

You should see the FastAPI application running at the specified (by default http://127.0.0.1:8000/docs) address.

Click on ‘Authorise’ in the top right. Enter the credentials that are in the code:

username: johndoe
password: secret

You can now try to make a GET request on the /users/me endpoint. You will see the following details for this user as the response:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

Step 2: Generating One-Time passwords with PyOTP

To enable the use of a one-time password, we are going to be using the PyOTP library. First install the library using the following command:

pip install pyotp

First, generate a pyotp secret key. This will give a random string with base 32 encoding, which is used to generate the one-time passcodes. You can do the following:

>>> import pyotp 
>>> pyotp.random_base32()
'LGLEREYEPVVWTLYO'

We can now generate a uri that can be used to create a QR code to allow the user to set up their authenticator app with the following code:

>>> pyotp.totp.TOTP('LGLEREYEPVVWTLYO').provisioning_uri(
name='johndoe@example.com', issuer_name='Secure App')
'otpauth://totp/Secure%20App:johndoe%40example.com?secret=LGLEREYEPVVWTLYO&issuer=Secure%20App'

You can use the Qrious codepen example to generate a QR code using the uri we just generated.

Scan the QR code with your authenticator app. You should now be able to see a one-time password that is generated and renewed every thirty seconds.

In the Python shell you can also get the current one-time password by running the following commands:

>>> totp = pyotp.TOTP("LGLEREYEPVVWTLYO")
>>> print("Current OTP:", totp.now())
Current OTP: 654244

Note, you will need to run the totp.now() command in the same 30 second window. If this doesn’t work, ensure there are no typos and check that the date and time zone settings on both the phone you are using and the machine running the code.

Step 3: Integrate PyOTP with FastAPI

So that each user can eventually have their own OTP secret, we need to add a new field to the fake user database for "otp_secret". As an example, for user John Doe, we will use the secret key we generated previously.

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
        "otp_secret": "LGLEREYEPVVWTLYO"
    }
}

Also, add this field to the UserInDB class like so:

class UserInDB(User):
    hashed_password: str
    otp_secret: str

The easiest implementation of the OTP into the existing authentication workflow is to assume that the user will append their one-time password to their password. Thus, we will now change the code to check for both the correct password and currently valid one-time password.

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password[:-6], user.hashed_password):
        return False
    totp = pyotp.TOTP(user.otp_secret)
    if not totp.verify(password[-6:]):
        return False
    return user

Now when you execute this v1 version of the main.py code, you can still authenticate at the FastAPI docs page. However, now the authentication is based on the user entering both the password (secret) + one-time password.

Further considerations

Here are some further considerations for a more production-like implementation, which we will look at in the next posts:

Remove passwords and secrets from source code

At the moment, the passwords and secrets are hard coded into the example. Ideally, we would want to remove these from the source code and in the next post I will go over an example of how to do this.

Upgrade the fake database

The database is currently hardcoded into the main.py script. For a more realistic implementation, we will use a simple database, which I also cover in the next post.

Mechanism for user administration

Currently there is only a single user and adding more users would require changing the source code, which isn’t what we want. For a more realistic scenario, we need a way to add/remove/change users.

Token Expiry

At the moment, in this example, the jwt token expires after 30 minutes. After this, the user needs to log in again using their password and the one-time password. Depending on the type of application, this may not be very user friendly.

data science, python

Text mining: NLTK suite for Python

Today we are going to take a quick look at the NLTK suite for Python.

We could use NLTK for situations where we need to handle human language.

Things like:

  • Customer complaints classification
  • Sentiment analysis
  • Chatbot development
  • Insurance claim description analysis
  • Scanning candidate cvs

In this post, we will start with a large chunk of text (taken from the NLTK Wikipedia page) and then clean it, split it into substrings and then plot the frequency of each word.

First off, we need to import the relevant libraries and packages that we will be using:

#import relevant libraries and packages

import re
import nltk
nltk.download('punkt')
from nltk.corpus import stopwords
nltk.download('stopwords')
from nltk import FreqDist
from nltk.tokenize import RegexpTokenizer

Next we need to create a new object, which is our text from Wikipedia.

#Text below is taken from the NLTK page on Wikipedia. 

my_text = """The Natural Language Toolkit, or more commonly NLTK,
is a suite of libraries and programs for symbolic and statistical
natural language processing (NLP) for English written in the Python
programming language. It was developed by Steven Bird and Edward Loper
in the Department of Computer and Information Science at the University
of Pennsylvania. NLTK includes graphical demonstrations and sample data.
It is accompanied by a book that explains the underlying concepts behind
the language processing tasks supported by the toolkit,plus a cookbook."""

I have assigned it to the variable name my_text.

Next we want to replace any newline notation with a space, so it won’t show \n. For this we use the re module, which enable us to use regular expressions. I have also made everything lower as otherwise it will have ‘Language’ and ‘language’ as two different words.

#substitute \n newline within 'my_text' with a space and assign this to the 'document' object
doc = re.sub('\n', ' ', my_text)
document = doc.lower()

We can use the nltk tokenizer to divide the text up into individual words or even sentences. For example, if I wanted to divide it into sentences I could do:

nltk.sent_tokenize(document)
print(document)

returns:

We are going to divide the string into individual words so that we can plot the frequency of each word. First we want to remove stop words, otherwise our most common word is going to be something like ‘of’, which isn’t so helpful.

NLTK already has a dictionary of stop words that we can use.

stop_words = stopwords.words('english')

In this next step, we are going to write a function that returns only those words that are not in the stop words variable. First, we need to use the tokenizer to divide our string into individual words. We are also going to remove any punctuation, marks otherwise these will also be classed as words.

tokenizer = RegexpTokenizer(r'\w+')
my_words = tokenizer.tokenize(document)

We are then going to create an empty list and call it my_words_ns. Then we have a function that loops through each word from my_words and, for each one that is not found in the stop words, appends it to my_words_ns.

my_words_ns = []

for word in my_words:
    if word not in stop_words:
        my_words_ns.append(word)

NLTK has it’s own frequency distribution function, which we can then use to plot the frequency of each word. Let’s apply it to our list of words.

freqDist = FreqDist(my_words_ns)

You can get the frequency of a specific word like this:

print(freqDist["language"])

Now let’s plot the top ten words:

my_plot = freqDist.plot(10)

And there you have it. This can then be built out in order to inform certain business processes. Next time, we will use things like genism and Microsoft Cognitive Services to explore what we can achieve by harnessing the power of machine learning.

azure, data bricks, databricks, python

Event Hub Streaming Part 2: Reading from Event Hub using Python

In part two of our tutorial, we will read back the events from our messages that we streamed into our Event Hub in part 1. For a real stream, you will need to start the streaming code and ensure that you are sending more than ten messages (otherwise your stream will have stopped by the time you start reading :)). It will still work though.

So the code is pretty much along the same lines, same packages etc. Let’s take a look.

Import the libraries we need:

import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

Set the connection properties to Event Hub:

ADDRESS = "amqps://<namespace.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

This time I am using my listening USER instead of my sending USER policy.

Next we are going to take the events from the Event Hub and print each json transaction message. I will try to go through offsets in a bit more detail another time, but for now this will listen and return back your events.

total = 0
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=5000)
    while batch:
        for event_data in batch:
            print("Received: {}, {}".format(last_offset.value, last_sn))
            print(event_data.message)#body_as_str())
            total += 1
        batch = receiver.receive(timeout=5000)

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time

And voila! You now know how to stream to and read from Azure Event Hub using Python 🙂

Let me know if you have any questions!

azure, data bricks, databricks, python

Event Hub Streaming Part 1: Stream into Event hub using Python

In this session we are going to look at how to stream data into event hub using Python.

We will be connecting to the blockchain.info websocket and streaming the transactions into an Azure Event Hub. This is a really fun use case that is easy to get up and running.

Prerequisites:

  • An Azure subscription
  • An Azure Event Hub
  • Python (Jupyter or I am using Databricks in this example)

You will need the following libraries installed on your Databricks cluster:

  • websocket-client (PyPi)
  • azure-eventhub (PyPi)

In this example, I am setting it to only stream in a few events, but you can change it to keep streaming or stream more events in.

First of all we need to import the various libraries we are going to be using.

import os
import sys
import logging
import time
from azure import eventhub
from azure.eventhub import EventHubClient, Receiver, Offset, EventData
from websocket import create_connection

Then we need to set the connection properties for our Event Hub:

ADDRESS = "amqps://<namespace>.servicebus.windows.net/<eventhubname>"
USER = "<policy name>"
KEY = "<primary key>"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

The user is the policy name, which you set for your event hub under the ‘shared access policies’ area. I usually create one policy for sending and one for listening.

The offset and partitioning I will go into more detail another time. For now, don’t worry about these, just add the values above.

Next we need to connect to the blockchain.info websocket. We send it the message that starts the stream.

ws = create_connection("wss://ws.blockchain.info/inv")
ws.send('{"op":"unconfirmed_sub"}')

Now we are only going to receive eleven messages in this code, but you can change it to i >100 (or more) or even remove that part and just keep going.

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.")
 
    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    
    i = 0
    
    start_time = time.time()
    try:
        while True:
            sender.send(EventData(ws.recv()))
            print(i)
            if i > 10:
                break
            i = i + 1
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time

except KeyboardInterrupt:
    pass

In Part 2, we look at how to read these events back from the Event Hub.