data mesh

The business case for a Data Mesh: Part 1 – Domain ownership

Photo by JJ Ying on Unsplash

And the first thing on my mind is kind of where we left off…

Over the last year, I’ve been to a few talks on data mesh and the area I have found them to be lacking in is putting some specifics against benefits. A few have mentioned the value being subjective, but when you are making a financial case for allocating resource and releasing cash from the top cats, they don’t warm easily to that logic.

Cat Money GIF - Find & Share on GIPHY

There are a lot of words used: outcome-driven, scalable, control, agility, efficiency, data potential, data-driven value.

I agree with all of the above but wanted to explore some specifics on where the value can/tends to drop out and how some of it may materialise.

Hold on. What is data mesh?

For those who may be new to this concept; data mesh is a socio-technical approach to managing and sharing data. It’s particularly suited to

  • Analytical workflows – training models, dashboards to display dashboards, predictive analytics, historic analysis
  • Big Data – lots of data, lots of pipelines
  • Scaling complexity – data coming from various sources, varying uses

Zhamak Dehghani has mentioned the objectives of data mesh being:

  • Responding gracefully to change
  • Sustain agility during growth
  • Get value from data

If you want to know more about it then check out this article or this website.

This will be a five part series and for the purposes of keeping it organised, I’ve split it into (at least) four parts based on the principles of data mesh.

Let’s just recap them:

  • Domain Ownership
  • Data as a product
  • Self-serve data platform
  • Data governed wherever it is

Domain Ownership

For this article let’s start with the first – Data democratisation; individual domains owning the data they produce and leveraging their SME (or domain) knowledge to improve data quality.

This means that rather than data being owned centrally, it is owned by the product domains. These domains decide how their data is managed and drive the product roadmap.

The benefits that I will discuss in relationship to this are:

  • Removing bottlenecks that come from the inability to scale out people
  • Improved data quality
  • Clarifying data owners and empowering teams to curate and innovate at pace
  • Empowering organisational structures

Removal of bottlenecks:

When people talk about data mesh, the main bottleneck commonly illustrated is that of a central data team. Most companies who have a centralised data model tend to hold their data and technical resource in one place. These teams are working on a backlog that encompasses work for every (or most) domains within the business, commonly dealing with data requests, changes to data, data monitoring and working on enabling new data sources.

I’d just like to say that there are some great relationships that are built between central data teams and the business and I think it’s unfair to dismiss it as a completely dysfunctional set-up. However, it’s unreasonable to expect a technical data expert to understand the inner workings, data suitability, strategic and product roadmaps of every domain within a company. Rather than offering a service, they should offer their skills to the domain teams. A data engineer, for example, could be working across a few domain groups, allowing more space for understanding the context and offering bespoke solutions.

The rate of business data is increasing, as are the number of digital initiatives as companies try to constantly adapt to changing conditions. The central data team are encompassing more than ever.

The central data team fall in the murky middle of what this article from ThoughtWorks defines as data producers (the people who produce the data and are experts on it) and data consumers (the people who want to use the data). This has implications for how invested people are in bringing together the facets that need to exist for data-led initiatives to succeed.

When a domain teams remit is entirely operational, they do not need to concern themselves with a data product roadmap or how their process affects downstream value possibilities. This can cause bottlenecks in terms of scalability and a real lack of connection and alignment with the business purpose.

Let’s imagine a logistics team within an FMCG (Fast-moving consumer goods) company, who interact with geospatial data on day-to-day basis. They are tracking vehicles in their fleet to optimise routing and troubleshoot any issues that arise en route. The company have a central data team who provide the infrastructure for the collection, transformation and availability of this data for use.

Now a new technology like h3 comes in. It offers faster geospatial analysis and allows them to speed up optimisation queries. The implications of it are that they could make more deliveries and reduce the costs.

In a traditional central data approach they may not be able to migrate to a this new indexing system, perhaps due to issues with supporting it or the central team simply don’t have time. They also have a limited understanding of geospatial data and methodologies for working with it. This becomes a bottleneck to opening up new cost-savings opportunities for the business.

However, when the logistics team own the data as a domain they can decide for themselves to use h3. Provided they do not break any functionality for other users around the business, they could swap technology (a lot faster than waiting for the central data team). They already have the inside knowledge because they deal with geospatial data day-in day-out. The deep knowledge of the domain team is leveraged much faster. They can optimise more routes and get their deliveries done quicker and more efficiently.

This data being made available to the rest of the business, can now also be used by other team like fleet management. They use it to understand changing routes and how this impacts the configuration of fleet they need.

Improved Data Quality

A Gartner survey revealed that poor data quality costs businesses an average of $15m per year. Central technology teams lack the domain knowledge needed to assess the quality or suitability of the data.

Machine learning operations is probably going to be a big driver towards a data mesh type architecture. Without good data it’s really hard to make this work well. As they say, trash in trash out. Understanding whether biases may be present in the data, having human-in-the-loop type workflows all lend themselves to having a different organisational structure to ensure data quality. KPMG have previously published research showing that up to sixty-five percent of senior execs do not trust the data being used to drive analytics and AI within their organisation.

Research from McKinsey suggests that, on average, 30% of an employees time was spent on non-value add tasks because of poor data quality and availability.

Let’s take a company that makes industrial machinery which has sensors on it. The machines have sensors that collect data on usage patterns, time on, time off, temperatures, vibrations etc.

One of the analysts in the quality control team are using the data to assess the health of the machines to try and detect when breakages might occur. One of the analysts in the team finds a really neat way to filter outliers in the data for faulty sensors.

However, the central data team can’t easily implement the logic for filtering outliers either because they don’t have time, lack the skills or their tools don’t allow.

So now the quality team are able to do it but it can’t be made available to the rest of the business (not without significant work for the central data team). As a result they become a silo that, because of their method for filtering, have data that is more accurate but crucially different to what the rest of the business has.

The service team use this data to schedule service maintenance. A service engineer gets called out and twenty percent of these call-outs are due to incorrect data. The company have a team of twelve service engineers 1000 hours of callout hours. If they too could quickly benefit from this new outlier logic, they could half the number of call-outs due to outliers. Across this team, 1200 call-out hours could be saved annually, saving the business a large amount of money. Money could also be saved on replacing parts unnecessarily.

Clarity on data ownership

When it’s clear who owns the data and where it is, access to that data should be quicker. And if that is the case then it should also speed up a businesses ability to bring new products and features to market. Finding out who owns data in a business can be an adventure on its own.

Season 1 Friends GIF - Find & Share on GIPHY

Allowing different teams in the business access to data more quickly and from different parts of the business can reduce complex ETL (Extract, Transform, Load) processes and time (normally required from the central data team).

For someone in the business wanting to do some sentiment analysis on social media, they can get up and running with the data from somewhere like Twitter almost immediately. If I want to get internal data on customer feedback on some of our services, it could take a whole lot longer; find the owner, make the business case, make a request to data services, wait for data services. These types of activities are most often initiated from the data science or analytics teams rather than the domains themselves.

This brings us neatly on to the subject of empowerment.

An empowering organisational structure for data

Conway’s law says that the structure of org puts inherent structure with what your produce. Perhaps even moreso with data nowadays.

“Organisations who design systems are constrained to produce designs which are copies of the communication structures of these organisations”

Casey Muratori talks about Conway’s law in this video (not about data mesh!) and it’s pretty eye-opening when you can spot organisational structure in software architectures. Just imagine the possibilities when the organisational structure is enabling rather than disabling in regards to what is possible with data.

Data products are currently being created by data teams, not product teams. By moving the data owners into domains, we will create very different products.

Technology doesn’t solve problems. Let’s just put that out there. Zhamak said something really poignant in her talk at Big Data London which was:

“Technology should disappear into the background”

It’s one of the reasons why I like that it’s referred to as a ‘socio-technical’ approach, with hints at what research suggests, which is that technology is only around twenty percent of a digital initiatives success. The current structure of organisations does not always lend itself to collaborating with data across domains. With many companies still trying to centralise their data, many initiatives simply lack the people and process side of a strategic approach to getting value from data.

We are all in recovery from long and unfruitful campaigns towards single source of truth and, truth is, most master data management initiatives tend to fail (around ninety percent according to Gartner!). Forty-five percent of businesses are unable to locate their master data effectively. These initiatives also tend to need huge investment and simply don’t deliver on the promised value.

I think the business case here is really compelling when we know that between seventy-five and ninety percent of digital initiatives fail. Most of the failure comes from lack of alignment, people and change. Data mesh directly addresses a number of these and even a small percentage of improvement could improve ROI by a significant amount. In this Deloitte report, they note that data mastery, composed of democratisation of data and self-serve insights as well as data products has the strongest impact on business outcomes.

Data mesh can be implemented stepwise and doesn’t require a single architecture that can satisfy all data use cases across the entire business. It doesn’t cling to this illusive single source of truth but rather empowers domains to care about their data and to understand and innovate ways it can be used to achieve business outcomes.

Domains demand data mesh

The possibilities for innovation with data are increased hugely by having domain data teams who are invested in, excited by the potential of and the subject matter experts of their data assets, products and roadmap.

Do you think we can get to a point when domains are asking the business to implement a data mesh rather than it being a technology initiative first?

In the next instalment of this series, I will be discussing the value from having ‘data as a product’. However, hoping to post some other cool stuff not data-mesh related in-between.

cloud native, event driven

3 Advantages of the Data Mesh: Distributed Data for the Microservice World.

Photo by Ricardo Gomez Angel on Unsplash

Many companies have adopted the centralised data architecture, typified by large multi-domain centralised monolithic data stores and a central data team. In contrast the Data Mesh, described by Zhanak Dehgani is decentralised, domain specific and used by decentralised teams. It builds on the already established world of microservice architectures and domain-driven design.

Domain-Driven design is a topic in it’s own right but encompasses the idea that the domain and the business logic within that domain should be the main driving force behind software design. The same can be applied to data design and the data mesh is one approach.

The data mesh architecture, proposes modelling the data architecture using the same principles, by domain. The data processes are all managed within the relevant business domain. Teams are also domain focused and often multifunctional in their skillset.

In the blog post How to Move Beyond a Monolithic Data Lake to a Distributed Data Mesh by Zhamak Dehghani, she mentions a shift in focus towards:

  • serving over ingesting
  • discovering and using over extracting and loading
  • Publishing events as streams over flowing data around via centralized pipelines’
  • Ecosystem of data products over centralized data platform

From a microservice-architecture approach, rather than being specifically about decentralised infrastructure, this is actually more about decentralisation of the knowledge, skills and ownership of the data products that make up the business. While it could be that both the infrastructure and the teams are decentralised, the emphasis is decentralised business domains.

Monolithic Team, Monolithic Solution

The way we have designed ourselves traditionally (especially around data) encourages monolithic solutions. One team to rule all the data. This team manages all of the data in the business. These are often painful to set up as there is a lack of understanding between the domain SMEs (subject matter expert) and the data SMEs. The business need new datasets, systems, insights, models. As the business grows, so do the demands on the central data team.

Eventually, the central data team can become a bottleneck as more and more sources and users of data are scheduled to be added. This can make things move very slowly. It is an anti-pattern to development; both for delivering products and for personal up-skilling. It leads to neither side feeling very satisfied and is impossible to scale based on demand. It’s kind of okay for everyone but great for noone.

The data mesh nudges us towards teams that are leaner, multi-functional, agile and empowered. It also focuses on a business structure that is domain-driven rather than function driven. To ensure fresh ideas and continuous learning, technical roles like engineers and developers could regularly switch domain in order to keep circulation of knowledge and learning. For a small company, this concept is still possible. You might have a pool of technical resource that can move around the domain teams. The Data Science team, for example, could work as a kind of consultancy, moving around the domains helping them with data science projects. I.T specialists could also operate as in a consultancy type manner.

Data as a Domain Product

In the data mesh architecture, data is a product of the domain. It can be seen as something that is owned by the domain and can be subscribed to by other parts of the business (access permitting). Domain-Driven Design should be at the forefront of this approach, looking at the different domains that you have. There are a number of methods that a business can adopt to figure out the domain events that exist and to which domain an event belongs. One such method is event-storming, which we can perhaps discuss in more detail another time.

The idea is that teams have the data they want when they want it. The data could be served either via APIs or as events. API security and role access could be defined by the governance and security layer, which sits across all of the domains.

Data as events

Using an event-driven approach would enable data to be produced and consumed by decoupled domains. The teams wouldn’t need to worry about how other business domains want the data shaped or transformed, they are merely the producers. Domains can consume the data available and perform operations as is necessary for their own specific needs. If you want to learn more about event-driven architectures, I’ve written about them before here and here.

Advantages of the Data Mesh

There are some great advantages that I see when it comes to adopting a data mesh approach. Here are three:

Teams can choose their own technology and skills – In the microservice approach, each team is responsible for their own datastore (including the design, technology and operations). They are like their own little company, making their own decisions and delivering their ‘products’ to the rest of the business. This is in a way more realistic, each ‘domain’ has different needs (both technically and non-technically). One might use a SQL database, the other wants to use a NoSQL setup or a document store. One team is great at Python and the other one wants to use something else.

Decoupling services (or domain data) is empowering for the teams – They are now free to make their own decisions on how to collect, organise, store and use data (not only their own but any other sources to which they are subscribed.

Innovation happens faster – The decoupling that happens as a result of the data mesh is good for each of the domains. They can simply subscribe to events from a domain of their chosing. This means a reduction in time requesting and waiting for data (perhaps due to another teams current workload). Being able to just plug in to an API or event stream and start exploring will give rise to new opportunities for innvoation.

Summary

Overall, I think the data mesh is a really interesting concept that aligns well with the current shift towards cloud-native ecosystems. However, there are a lot of considerations when moving towards this kind of approach. This is just an introduction and there are a number of elements that I have not discussed such as security, data quality and governance. I hope to address some of these in a future post and talk a lot more about decentralised data.

python, Uncategorized

FastAPI Security with a User Database

Header photo by fabio on Unsplash

In the last post we looked at how to enable two-factor authentication with FastAPI using pyotp. We looked at some areas we might explore how to improve on the v1 implementation. In this post we will be updating the implementation to include:

  • A backend user database (in this case SQLite).
  • Functionality to add a new user.
  • Functionality for the user to update their data.

This is based on the FastAPI docs for SQL (Relational) Databases.

As part of this application, we will be using the SQLAlchemy ORM (Object Relational Mapping) toolkit to interact with a backend SQL database. SQLAlchemy helps to achieve a consistent relationship between the object representation in Python and data storage in SQL. Using SQLAlchemy, we can manipulate the objects directly in Python which are then reflected in the SQL database. To install SQLAlchemy you can run pip install SQLAlchemy from the command line.

Clone the Repo

The code for this tutorial can all be found in the v2 directory in the github repo. You will see some other .py files have been added:

  • database.py
  • models.py
  • schemas.py
  • crud.py
  • security.py

The database.py is taken from the tutorial example in the FastAPI docs and I have kept it almost exactly the same aside from changing the SQLALCHEMY_DATABASE_URL.

The models.py file describes the database structure, the tables and (if any) the relationships between the tables. In here we will declare the table name and columns that are in the user table along with their type and any default values.

Next, the schemas that were originally in main.py has now been moved into it’s own schemas.py file. The User schemas have been separated by read, write, read/write and update. There are currently two role types that can be used; admin and user. You can add others if you wish, for example “moderator”.

The crud.py file contains the helper functions used to query the database. CRUD stands for Create, Read, Update, Delete. Keeping these functions together in a separate file prevents code duplication and simplifies maintenance. For this example, there is create, read and update but, at this point, no delete. In main.py we have the functions that determine the endpoints of the API, which can utilise the functions defined in crud.py.

To run the code, type uvicorn main:app --reload and open the docs page.

Connect to the SQLite database

When you run the code, you should see a .db file is created, in this case it is called twofactor_app.db. This is the file that contains the data in the SQLite database. To interact with and view this data you can install the SQLite browser, which is freely available here.

Once installed, you can click on Open Database and then select the .db file from the directory. Initially, you will see a database with no records.

Creating new users

In the last tutorial, there was no way to create new users, aside from hardcoding them into the main.py script. The code has now been extended to enable new users to be added by posting to the /users endpoint.

In the crud.py file you will see a new function, which enables the details for a new user to be added. The password entered will be hashed and a random otp_secret (which we used to generate the QR code in the previous post) is generated as part of the code. Once a change has been implemented, we also need to commit changes using the db.commit() function.

#crud.py
def create_user(db: Session, user: schemas.UserCreate):
    hashed_password = pwd_context.hash(user.password)
    db_user = models.User(
        email=user.email,
        username=user.username,
        full_name=user.full_name,
        hashed_password=hashed_password,
        otp_secret=pyotp.random_base32(),
        role=user.role
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

It has been set up so that only an admin can create a new user. As there are no current users, we will need to manually add an initial user, which I will explain how to do.

In the main.py code, you will see that there is a new function def get_current_active_admin_user in which, if the user logged in is not admin it will return a permissions error. This dependency is shown highlighted in the code for creating a new user in main.py

@app.post("/users/", response_model=schemas.User)
def create_new_user(
    user: schemas.UserCreate,
    db: Session = Depends(get_db),
    current_user: schemas.User = Depends(get_current_active_admin_user)):
    db_user = crud.create_user(db, user)
    return db_user

To add an initial user to the database, use the SQLite browser tool. To add a new record, click on the add a new record symbol in the menu as shown in the following screenshot:

Input the following values into the columns:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "hashed_password": "$2b$12$rMFOsKHq6qaX6bbPB0pb6.ymbwF63soIe19af9qd.1Q8PhbCVfXSO", //this password has already been hashed
  "otp_secret": "LGLEREYEPVVWTLYO", 
  "disabled": 0,
  "role": "admin"
}

Click on “Write Changes” to commit the changes.

Click on the refresh symbol and you will see the new user has been added.

You will now be able to log in in the same way as shown in the previous post using two-factor authentication. Once logged in with an admin user you will be able to create new users using a POST request to the /user endpoint.

To try out creating a new user, click on the POST /users create new user endpoint on the FastAPI application docs page. Click on “Try it out” and then input the following example request body. Click on the “Execute” button.

{
  "username": "alicewonderland",
  "email": "alicewonderland@example.com",
  "full_name": "Alice Wonderland",
  "password": "whiterabbit"
}

User Operations

The v2 directory code now includes the ability for a user to:

  • read their own data information or that of another user (if admin).
  • update their own data

The following endpoints (other than create new user, which was mentioned in the previous section) are now available to use:

GET /users/me – dependent on a user being logged in. If this dependency is fulfilled, this request will return the user details in the response body.

PUT /users/me – dependent on a user being logged in. If this dependency is fulfilled, user can update their own user data and these changes will be updated in the SQLite user database.

GET /users/{user_id} – dependent on a user being logged in and admin. If this dependency is fulfilled, admin users can get the user data for a user based on their user_id.

In the next installment, I will cover how we can make some more improvements so stay tuned! If there is something you are specifically interested in learning about FastAPI, feel free to leave it in the comments.

python

Getting started with Two-Factor Authentication in FastAPI

Header photo by Markus Spiske on Unsplash

DISCLAIMER: This tutorial is not a production ready implementation. It is an introduction into the implementation of two-factor authentication in FastAPI. Some issues are highlighted at the bottom of this article, some of which we will look into into future installments. Any application utilizing personal and/or sensitive information should be properly audited and penetration tested.

I’ve been using FastAPI for a project and, whilst looking at it’s security module, decided to write an article on how to set up Two-Factor (or Multi-Factor) authentication.

FastAPI is a Python package for easily creating REST API endpoints. Many of the tools you need to implement security are already included in the package.

Clone the repo for this tutorial here. The main.py code is originally from the FastAPI security tutorial.

Pre-requisites:

  • Python 3
  • Google Authenticator app (or compatible other) installed on your phone.
  • Clone this github repo that contains the code for this tutorial.

Step 1: Create and activate virtual environment and install FastAPI.

I am starting with the code from the FastAPI security tutorial docs.

Install FastAPI and the required packages:

pip install fastapi[all]
pip install python-jose[cryptography]
pip install passlib[bcrypt]

cd into the v0 directory of the github repo and the run the following command:

uvicorn --reload main:app

You should see the FastAPI application running at the specified (by default http://127.0.0.1:8000/docs) address.

Click on ‘Authorise’ in the top right. Enter the credentials that are in the code:

username: johndoe
password: secret

You can now try to make a GET request on the /users/me endpoint. You will see the following details for this user as the response:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

Step 2: Generating One-Time passwords with PyOTP

To enable the use of a one-time password, we are going to be using the PyOTP library. First install the library using the following command:

pip install pyotp

First, generate a pyotp secret key. This will give a random string with base 32 encoding, which is used to generate the one-time passcodes. You can do the following:

>>> import pyotp 
>>> pyotp.random_base32()
'LGLEREYEPVVWTLYO'

We can now generate a uri that can be used to create a QR code to allow the user to set up their authenticator app with the following code:

>>> pyotp.totp.TOTP('LGLEREYEPVVWTLYO').provisioning_uri(
name='johndoe@example.com', issuer_name='Secure App')
'otpauth://totp/Secure%20App:johndoe%40example.com?secret=LGLEREYEPVVWTLYO&issuer=Secure%20App'

You can use the Qrious codepen example to generate a QR code using the uri we just generated.

Scan the QR code with your authenticator app. You should now be able to see a one-time password that is generated and renewed every thirty seconds.

In the Python shell you can also get the current one-time password by running the following commands:

>>> totp = pyotp.TOTP("LGLEREYEPVVWTLYO")
>>> print("Current OTP:", totp.now())
Current OTP: 654244

Note, you will need to run the totp.now() command in the same 30 second window. If this doesn’t work, ensure there are no typos and check that the date and time zone settings on both the phone you are using and the machine running the code.

Step 3: Integrate PyOTP with FastAPI

So that each user can eventually have their own OTP secret, we need to add a new field to the fake user database for "otp_secret". As an example, for user John Doe, we will use the secret key we generated previously.

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
        "otp_secret": "LGLEREYEPVVWTLYO"
    }
}

Also, add this field to the UserInDB class like so:

class UserInDB(User):
    hashed_password: str
    otp_secret: str

The easiest implementation of the OTP into the existing authentication workflow is to assume that the user will append their one-time password to their password. Thus, we will now change the code to check for both the correct password and currently valid one-time password.

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password[:-6], user.hashed_password):
        return False
    totp = pyotp.TOTP(user.otp_secret)
    if not totp.verify(password[-6:]):
        return False
    return user

Now when you execute this v1 version of the main.py code, you can still authenticate at the FastAPI docs page. However, now the authentication is based on the user entering both the password (secret) + one-time password.

Further considerations

Here are some further considerations for a more production-like implementation, which we will look at in the next posts:

Remove passwords and secrets from source code

At the moment, the passwords and secrets are hard coded into the example. Ideally, we would want to remove these from the source code and in the next post I will go over an example of how to do this.

Upgrade the fake database

The database is currently hardcoded into the main.py script. For a more realistic implementation, we will use a simple database, which I also cover in the next post.

Mechanism for user administration

Currently there is only a single user and adding more users would require changing the source code, which isn’t what we want. For a more realistic scenario, we need a way to add/remove/change users.

Token Expiry

At the moment, in this example, the jwt token expires after 30 minutes. After this, the user needs to log in again using their password and the one-time password. Depending on the type of application, this may not be very user friendly.

raspberry pi

Pairing BBC Microbit with a Raspberry Pi

For a while I have wanted to experiment with sensor events and I recently had a day off so (rather than continuing my re-exploration of the wonderful LOTR trilogy… the book, not the movie) I decided to finally get all the electronics out and give it a whirl.

I have a lot of microbits in the house from running coding clubs so I figured I would use one as a sensor. I also had a Pi Zero that I hadn’t really used so thought I would use it as the machine to which I send the sensor data.

So this is part one of what will be a series where we can explore the possibilities. For this demo, I am going to go over how I got set up getting the microbit to communicate with the Raspberry Pi.

Best friends forever… paired in perfect harmony

It took a lot longer than I thought :/ I had a few issues along the way (many due to my own errors :p). One thing I noticed is that my microbit was a bit sensitive and kept disconnecting from the power source every few minutes, I tried with another which seemed much more stable.

Prerequisites

In order to run this tutorial, you will need the following:

  • A microbit
  • A Raspberry Pi (I used the Pi Zero) with the following installed:
    • Bluezero – I used the latest install 3.0 (I originally had 2.0 but had issues getting this working so reinstalled to the newest at the time of writing this).
      • sudo pip3 install bluezero

Prepare the microbit

Here is a link to the microbit code I used. I started with a set up from one of the issues I found in the python-bluezero github repo. However, I found that with that setup, though the code ran, the microbit kept throwing a 020 error, which seems to relate to memory issues. I thus removed the while loop and now it works without the error.

Let’s go over what it does:

  1. On start it will look for the temperature and uart services. It will also show the bluetooth symbol in the LED matrix.
  2. When the microbit is connected to the Pi it will set the connected variable to "true". It will show a smiley face on the LED matrix 🙂 It will also set uart to read up to the hashtag symbol. The bluetooth UART service is started and it can read data received from the Pi. It will terminate reading when it gets to the ‘#’ symbol. More information about this service can be found here. It will display the data as a string on the microbit led matrix.
  3. When the microbit is disconnected from the Pi it will set the connected variable to "false". It will show a sad face 😦

Download the hex file and load it onto the microbit by dragging it into the microbit drive.

Connect to the Pi

Ensure your Raspberry Pi is connected to a power source and that you know the ip address.

ssh <username>@<ipadress>

Enter the username and password set for your Rasberry Pi.

Pairing the Raspberry Pi with the Microbit

Enter the bluetoothctl by typing bluetoothctl. First we will scan to see which bluetooth devices are available.

  1. Find the microbit

First, your Raspberry Pi needs to find the microbit. To do this run the following command:

scan on

This will start scanning for any bluetooth devices and you will see them appear. The microbit one will look something like:

[NEW] Device A1:B2:C3:D4:E5:FF BBC micro:bit [a name]

Once it appears, type scan off to stop the scanning.

2. Pair with the microbit

To pair the microbit and the Raspberry Pi, you can run

pair <device_address>

So in the example above it would be:

pair A1:B2:C3:D4:E5:FF

When you run this pair command, hold down the A+B buttons on the microbit and press the reset button on the back. You will see a bluetooth symbol, then you can release them.

At first this did not work for me but I changed the pairing settings in MakeCode project settings (as mentioned above) and then it worked.

You can check if it is paired by making sure it is listed when you run the following command:

paired-devices

Clone the bluezero repo

This tutorial uses VS Code to build our Python code. Sadly, it is currently not possible to have VS code full supprt with the Raspberry Pi Zero. More details can be found in this github issue.

Instead, you can use SSH FS extension for VS code. It won’t give you any debugging functionality but you can navigate the folders of your Pi and, make new files, code etc. Then add the Raspberry Pi to the ssh file extension by creating a new ssh configuration.

I started out by cloning the python-bluezero repo and using this file as the start point. I’ve pretty much kept it the same for now so I can run over what is happening.

Additionally, I added the ability to get the temperature from the microbit’s inbuilt temperature sensor. This is going to form some basis for our sensor data for this project.

Through doing this, I found out the temperature sensor data is very boring :p so I am planning on swapping it out next time for accelerometer data instead.

Using the microbit_uart.py code as a base

The first thing the Python script does is import microbit tools from the bluezero package. It imports microbit and async_tools.

The code then calls the microbit function and sets up some variable values, which are explained below:

  • adapter address: bluetooth controller on raspberry pi. You can find this by running list controller from the bluetoothctl.
  • device address: microbit address. You can find this by running paired-devices from the bluetoothctl.
  • It then defines which services are enabled and disabled. E.g
    • temperature_service=True because we will be using this to send temperature from the microbit to the Raspberry Pi.
    • uart_service=True because the example script we are using has a need for the uart service.
    • You can also enable others. You will also need to add these into your microbit code by dragging in the relevant blocks (E.g bluetooth led service if you wanted to use the led display as an event input or bluetooth accelerometer service if you wanted to look at rotation of the microbit as an event)

There are two functions already defined; ping and goodbye.

The ping function will transmit a message to the microbit from the Raspberry Pi. The microbit will reads the message up until the hashtag (as defined in our microbit code). The message is “ping”. The microbit will display this message via the leds.

def ping():     
ubit.uart = 'ping#'
return True

This works through UART (Universal Asynchronous Receiver/Transmitter) over bluetooth. It is used for communication across serial ports that is (as the name suggests) asynchronous. The code that makes this all work is here.

The goodbye function disconnects the microbit from the Raspberry Pi and quits the asynchronous event loop. The EventLoop class is defined here.

We are now going to need to add an additional function and some extra lines of code so we can get the temperature reading from the microbit.

Getting temperature data from the microbit

To enable getting a temperature reading from the microbit, first ensure the temperature service is set to true for the microbit:

temperature_service=True

Then add the following function within the code:

def temperature():
print('Temperature:', ubit.temperature)

Then add the following:

for i in range(3):
eloop.add_timer(i*10000, temperature)

Finally, we need to change the event loop time input where we call the goodbye function to 50000 microseconds (by which time all of the other functions have run).

The code should now look like this:

Next time, we will change this code to remove the uart functionality and add in the ability for us to get data from the accelerometer service as a stream of data that we can use to do some cool stuff with!

event driven

3 advantages of Event-Driven Architecture

My latest posts have put a lot of focus on Cloud native technologies. The last few have mentioned things like CloudEvents and Knative Eventing and it got me thinking… why might people want to implement event driven ecosystems in the first place?

I’ve decided to put together three advantages that I think offer pretty attractive prospects for implementing an Event Driven Architecture pattern.

True Decoupling of Producers and Consumers

The nature of an Event Driven Architecture ecosystem lends itself to microservices and, in this type of system there is (hopefully) a loose coupling between the services. Depending on the communication between microservices, there may still be dependencies between them
(e.g a http request/response approach).

In the excellent book, ‘Designing Event-Driven Systems‘, Ben Stopford tells us that event-driven services core mantra is “Centralize an immutable stream of facts. Decentralise the freedom to act, adapt and change”.

Because the ownership of data is separated by domain, this gives a nice logical separation between the production and consumption of events. As a producer I do not need to concern myself with how the events I produce are going to be consumed. Vice versa for the team consuming them. They are free to figure out for themselves what to do with the events, they do not need to be instructed. The message structure is also not important. It can be json, xml, avro etc. Doesn’t matter.

The broker and some kind of trigger between it and the services enables messages to be ingested into the event driven eco-system and then broadcast out to whichever services are interested in receiving them.

Business narrative of what has happened that can’t be changed

We have all heard the term ‘single source of truth’ and this is usually just a rumor (like the treasure chest hidden at the end of the rainbow). Well, in an event-driven ecosystem it really exists!

As mentioned above, an event-stream should be an immutable stream of facts. This is very representative of how our daily lives unfold; as a series of events. These events happened and it’s not possible to go back and change them unless you own one of these (remember, terrible things can happen to those who meddle with time)…

This is an advantage for business data governance as you can always look back in the log for auditing or to see what happened.

It is becoming more and more common for companies to need to explain their ‘data-derived’ decisions, e.g why a customer’s application for finance or insurance has been rejected. The log of immutable events that EDA provides us can provide a key component of this auditing.

Real-time event streams for Data Science.

One of the reasons I am enthusiastic about EDA is that it is particularly well suited to in-stream processing. It lends itself to fast decision making, things where milliseconds count.

Business logic can be applied while data is in motion rather than needing to wait for the data to land somewhere and then do the analysis. This is good for things like fraud detection, predictive analytics. Oftentimes, we need to know if a transaction is fraudulent before it completes.

Further Reading

There are many reasons you might want to use eventing as the backbone of your system and if you want to find out more about Event-Driven Architecture then I recommend the following resources as a start:

  • Designing Event-Driven Systems by Ben Stopford
  • Building Event-Driven Microservices by Adam Bellemare (pre-release)
  • Cloud Native Patterns by Cornelia Davis
  • I wrote a follow up, longer article on this topic here on IBM Developer.

cloud native, knative

Knative Eventing: Part 3 – Replying to broker

In part two of my Knative eventing tutorials, we streamed websocket data to a broker and then subscribed to the events with an event display service that displayed the events in real-time via a web UI.

In this post, I am going to show how to create reply events to a broker and then subscribe to them from a different service. Once we are done, we will have something that should look like the following:

In this scenario, we are receiving events from a streaming source (in this case a websocket) and each time we receive an event, we want to send another event as a reply back to the broker. In this example, we are just going to do this very simply in that every time we get an event, we send another.

In the next tutorial, we will look to receive the events and then perform some analysis on the fly, for example analyse the value of a transaction and assign a size variable. We could then implement some logic like if size = XL, send a reply back to the broker, which could then be listened for by an alerting application.

Setup

I am running Kubernetes on Docker Desktop for mac. You will also need Istio and Knative eventing installed.

Deploy a namespace:

kubectl create namespace knative-eventing-websocket-source

apply the knative-eventing label:

kubectl label namespace knative-eventing-websocket-source knative-eventing-injection=enabled

Ensure you have the cluster local gateway set up.

Adding newEvent logic:

In our application code, we are adding some code for sending a new reply event every time an event is received. The code is here:

newEvent := cloudevents.NewEvent()
newEvent.SetSource(fmt.Sprintf("https://knative.dev/jsaladas/transactionClassified"))
newEvent.SetType("dev.knative.eventing.jsaladas.transaction.classify")
newEvent.SetID("1234")
newEvent.SetData("Hi from Knative!")
response.RespondWith(200, &newEvent)

This code creates a new event, with the following information:

source = "https://knative.dev/jsaladas/transactionClassified"

type = "dev.knative.eventing.jsaladas.transaction.classify"

data = "Hi from Knative!"

I can use whichever values I want for the above, this is just the values I decided on, feel free to change them. We can then use these fields later to filter for the reply events only. Our code will now, aside from receiving an event and displaying it, generate a new event that will enter the knative eventing ecosystem.

Initial components to run the example

The main code for this tutorial is already in the github repo for part 2. If you already followed this and have this running then you will need to redeploy the event-display service with a new image. For those who didn’t join for part 2, this part shows you how to deploy the components.

Deploy the websocket source application:

kubectl apply -f 010-deployment.yaml

Here is the yaml below:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: wseventsource
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: wseventsource
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: wseventsource
          image: docker.io/josiemundi/wssourcecloudevents:latest
          env:
          - name: SINK
            value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"

Next we will apply the trigger that will set up a subscription for the event display service to subscribe to events from the broker that have a source equal to "wss://ws.blockchain.info/inv". We can also filter on type or even another cloudEvent variable or, if we left them both empty, then it would filter on all events.

kubectl apply -f 040-trigger.yaml

Here is the trigger yaml:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: wsevent-trigger
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: "wss://ws.blockchain.info/inv"
  subscriber:    
    ref:
      apiVersion: v1
      kind: Service
      name: event-display

Next we will deploy the event-display service, which is the specified subscriber of the blockchain events in our trigger.yaml. This application is where we create our reply events.

This is a Kubernetes service so we need to apply the following yaml files:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: event-display
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: event-display
          image: docker.io/josiemundi/test-reply-broker
apiVersion: v1
kind: Service
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  type: NodePort
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8080
    name: consumer
  - port: 9080
    protocol: TCP
    targetPort: 9080
    nodePort: 31234
    name: dashboard
  selector:
    app: event-display

If you head to localhost:31234, you should see the stream of events.

Subscribe to the reply events

Now we need to add another trigger, this time subscribing only to the reply events (that’s our newEvent that we set up in the go code). You can see that, in this case, we specify the source as "https://knative.dev/jsaladas/transactionClassified".

Here is the trigger yaml we apply:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: reply-trigger-test
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: "https://knative.dev/jsaladas/transactionClassified"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: test-display

This time, our subscriber is a Knative service called test-display, which we still need to deploy.

Run the following to deploy the knative service that subscribes to reply events:

kubectl --namespace knative-eventing-websocket-source apply --filename - << END
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: test-display
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/github.com/knative/eventing-contrib/cmd/event_display@sha256:1d6ddc00ab3e43634cd16b342f9663f9739ba09bc037d5dea175dc425a1bb955
END

We can now get the logs of the test-display service and you should only see the reply messages:

kubectl logs -l serving.knative.dev/service=test-display -c user-container --tail=100 -n knative-eventing-websocket-source

Next time we will look at classifying the events and use transaction size as a reply to the broker.

knative, kubernetes

Knative Eventing: Part 2 – streaming CloudEvents to a UI

I’ve been looking at Knative eventing a fair bit lately and one of the things I have been doing is building an eventing demo (the first part of which can be found here). As part of this demo, I wanted to understand how I could get CloudEvents that were being sent by my producer to display in real time via a web UI (event display service UI).

Here is a bit of info and an overview of the approach I took. The code to run through this tutorial can be found here.

Prerequisites and set-up

First, you will need to have Knative and your chosen Gateway provider installed (I tried this with both Istio and Gloo, which both worked fine). You can follow the instructions here.

Initially deploy the 001-namespace.yaml by running:

kubectl apply -f 001-namespace.yaml

Verify you have a broker:

kubectl -n knative-eventing-websocket-source get broker default

You will see that the broker has a URL, this is what we will use as our SINK in the next step.

Deploy the Blockchain Events Sender Application

The application that sends the events was discussed in my Knative Eventing: Part 1 post and you can find the repo with all the code for this application here.

To get up and running you can simply run the 010-deployment.yaml file. Here is a reminder of what it looks like:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: wseventsource
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: wseventsource
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: wseventsource
          image: docker.io/josiemundi/wssourcecloudevents:latest
          env:
          - name: SINK
            value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"

This is a Kubernetes app deployment. The name of the deployment is wseventsource and the namespace is knative-eventing-websocket-source. We have defined an environmental variable of SINK, for which we set the value as the address of our broker.

Verify events are being sent by running:

kubectl --namespace knative-eventing-websocket-source logs -l app=wseventsource --tail=100 

This is what we currently have deployed:

Add a trigger – Send CloudEvents to Event-Display

Now we can deploy our trigger, which will set our event-display service as the subscriber.

# Knative Eventing Trigger to trigger the helloworld-go service
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: wsevent-trigger
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: ""
  subscriber:    
    ref:
      apiVersion: v1
      kind: Service
      name: event-display

In the file above, we define our trigger name as wsevent-trigger and the namespace. In spec > filter I am basically specifying for the broker to send all events to the subscriber. The subscriber in this case is a Kubernetes services rather than a Knative Service.

kubectl apply -f 030-trigger.yaml

Now we have the following:

A trigger can exist before the service and vice versa. Let’s set up our event display.

Stream CloudEvents to Event Display service

I used the following packages to build the Event Display service:

Originally I deployed my event-display application as a Knative Service and this was fine but I could only access the events through the logs or by using curl.

Ideally, I wanted to build a stream of events that was push all the way to the UI. However, I discovered that for this use case it wasn’t possible to deploy this way. This is because Knative serving does not allow multiple ports in a service deployment.

I asked the question about it in the Knative Slack channel and the response was mainly to use mux and specify a path (I saw something similar in the sockeye GitHub project).

In the end, I chose to deploy as a native Kubernetes service instead. The reason is that it seemed like the most applicable way to do this, both in terms of functionality and also security. I was a little unsure about the feasibility of using mux in production as you may not want to expose an internal port externally.

For the kncloudevents project, I struggled to find detailed info or examples but the code is built on top of the Go sdk for CloudEvents and there are some detailed docs for the Python version.

We can use it to listen for HTTP cloudevents requests. By default it will listen on port 8080. When we use the StartReceiver function, this is essentially telling our code to start listening. Because this happens on one port, we need another to ListenAndServe.

So here are the two yaml files that we deploy for the event-display.

App Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: event-display
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: event-display
          image: docker.io/josiemundi/bitcoinfrontendnew

Service Deployment:

apiVersion: v1
kind: Service
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  type: NodePort
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8080
    name: consumer
  - port: 9080
    protocol: TCP
    targetPort: 9080
    nodePort: 31234
    name: dashboard
  selector:
    app: event-display

With everything deployed we now have the following:

Now if you head to the nodeport specified in the yaml:

http://localhost:31234

Next time, we will look at how to send a reply event back into the Knative eventing space.

cloud native

What are CloudEvents?

CloudEvents is a design specification for sending events in a common and uniform way. They are an interesting proposal for standardising the way we send events in an event-driven ecosystem. The specification is an open and a versatile approach for sending and consuming.

CloudEvents is currently an ‘incubating’ project with the CNCF. On the cloudevents website, they specify that the advantages of using cloud events are:

  • Consistency
  • Accessibility
  • Portability

Metadata about an event is contained within a CloudEvent, through a number of required (and optional) attributes including:

  • id
  • source
  • specversion
  • type

For more information about the attributes, you can take a look at the cloudevents spec.

Here is an example of a CloudEvent from my previous eventing example:

You can see the required attributes are:

  • id: 8e3cf8fb-88bb-4a00-a3fe-0635e221ce92
  • source: wss://ws.blockchain.info/inv
  • specversion: 0.3
  • type: websocket-event

There are also some extension attributes such as knativearrivaltime, knativehistory and traceparent. We then also have the body of the message in Data.

Having these set attributes means they can be used for filtering (e.g through a Knative eventing trigger) and also for capturing key information that can be used by other services that subscribe to the events. I can, for example, filter for events that are only from a certain source or of a certain type.

CloudEvents are currently supported by Knative, Azure Event Grid and Open FaaS.

There are number of libraries for CloudEvents inlcuding for Python, Go and Java. I’ve used the go-sdk for CloudEvents a lot lately and will be running through some of this in some future posts.

knative, kubernetes

Step by Step: Deploy and interact with a Knative Service

In this post, I will show how to deploy a Knative service and interact with it through curl and via the browser. I’ll go over some of the useful stuff to know as I found this kind of confusing at first.

I’m running this on a mac using the Kubernetes that’s built in to Docker Desktop, so things will be a bit different if you are running another flavor of Kubernetes. You will need Istio and the Knative serving components installed to follow along with this.

For the service, we are deploying a simple web app example from golang.org, which by default prints out “Hi there, I love (word of your choice)”. The code is at the link above, or I have a simple test image on Docker hub, which just prints out “Hi there, I love test” (oh the lack of creativity!)

Deploying a Knative Service

First we need to create a namespace, in which our Knative service will be deployed. For example:

kubectl create namespace web-service

Here is the Knative service deployment, which is a file called service.yaml.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
spec:
  template:
    spec:
      containers:
        - image: docker.io/josiemundi/webserversimple

Deploy the service yaml by running the following command:

kubectl apply -f service.yaml -n web-service

Now run the following in order to view the Knative service and some details we will need:

kubectl get ksvc -n web-service

There are a few fields, including:

NAME: The name of the service

URL: The url of the service, which we will need to interact with it. By default the URL will be “<your-service-name>.<namespace>.example.com” however you can also have a custom domain.

READY: This should say “True”, if not it will say “False” and there will be a reason in the REASON field.

After a little while, you might notice the service will disappear as it scales down to zero. More on that in a while.

IngressGateway

To interact with the service we just deployed, we need to understand a bit about the IngressGateway. By default, Knative uses the istio-ingressgateway as its gateway service. We need to understand this in order to expose our service outside of the local cluster.

We can look at the istio-ingressgateway using the following command:

kubectl get service istio-ingressgateway --namespace istio-system

This will return the following:

Within the gateway configuration, there are a number of ports and NodePorts specified as default including the one we will use to communicate with our service:

port: number: 80, name: http2 protocol: HTTP

To find the port for accessing the service you can run the following:

kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].port}'   

You can customise the Gateway configuration. Details and the different ports can be found here in the Istio documentation. I’d also recommend running through the Istio httpbin example to understand a bit more about istio and ingressgateway.

To interact with our service we will need to combine both the URL (event-display.web-service.example.com) and the EXTERNAL-IP (localhost) which we saw for the istio-ingressgateway. Depending on your set up, these will not be the same as mine.

It will be something like the following:

curl -H "Host: event-display.web-service.example.com" http://127.0.0.1/test

Scaling our Service

Your initial pod has probably disappeared right now because when a service is idle, it will scale down to zero after around 90 seconds. You should see the pod start ‘Terminating’ and then disappear.

Knative uses the KPA (Knative Pod Autoscaler), which runs as a Kubernetes deployment. The KPA scales based on requests (concurrency), however it is also possible to use the HPA (Horizontal Pod Autoscaler), which allows scaling based on CPU.

You can find out more detailed information about autoscaling here but for now just note that you can change the parameters in the ConfigMap.

To see the autoscaler config you can run the following command:

kubectl describe configmap config-autoscaler -n knative-serving

To edit the ConfigMap:

kubectl edit configmap config-autoscaler -n knative-serving 

In the result you will see some fields including:

scale-to-zero-grace-period: 30s
stable-window: 60s

The scale-to-zero-grace-period specifies how long it will wait until it scales an inactive service down to zero. The autoscaler takes a 60 second window to assess activity. If it is determined that within that 60 seconds stable-window, there are no events, it will then wait another 30 seconds before scaling to zero. This is why it takes around 90 seconds to terminate an inactive service.

If desired, these can be amended so that your service will scale down faster or slower. There is also a field called enable-scale-to-zero, which (if you want to be able to scale to zero) must be set to “true”.

Test using curl

Once you curl the service again you should see the pod spin up again.

curl -H "Host: event-display.web-service.example.com" http://127.0.0.1:80/test

Should return:

Hi there, I love test!

Access Knative Service through browser

If you are using Docker Desktop on a mac, to access through a browser you could add the host to the hostfile on your mac.

sudo vi /etc/hosts

Add 127.0.0.1 event-display.web-service.example.com to the file and save it.

Alternatively, if you don’t want to (or can’t) change the host file, I used the “Simple Modify Headers” browser plugin. Then click on the icon once installed and select ‘configure’. Input the parameters as follows and then click the start button.

Now open http://localhost/test and you should see: