cloud native, knative

Knative Eventing: Part 3 – Replying to broker

In part two of my Knative eventing tutorials, we streamed websocket data to a broker and then subscribed to the events with an event display service that displayed the events in real-time via a web UI.

In this post, I am going to show how to create reply events to a broker and then subscribe to them from a different service. Once we are done, we will have something that should look like the following:

In this scenario, we are receiving events from a streaming source (in this case a websocket) and each time we receive an event, we want to send another event as a reply back to the broker. In this example, we are just going to do this very simply in that every time we get an event, we send another.

In the next tutorial, we will look to receive the events and then perform some analysis on the fly, for example analyse the value of a transaction and assign a size variable. We could then implement some logic like if size = XL, send a reply back to the broker, which could then be listened for by an alerting application.

Setup

I am running Kubernetes on Docker Desktop for mac. You will also need Istio and Knative eventing installed.

Deploy a namespace:

kubectl create namespace knative-eventing-websocket-source

apply the knative-eventing label:

kubectl label namespace knative-eventing-websocket-source knative-eventing-injection=enabled

Ensure you have the cluster local gateway set up.

Adding newEvent logic:

In our application code, we are adding some code for sending a new reply event every time an event is received. The code is here:

newEvent := cloudevents.NewEvent()
newEvent.SetSource(fmt.Sprintf("https://knative.dev/jsaladas/transactionClassified"))
newEvent.SetType("dev.knative.eventing.jsaladas.transaction.classify")
newEvent.SetID("1234")
newEvent.SetData("Hi from Knative!")
response.RespondWith(200, &newEvent)

This code creates a new event, with the following information:

source = "https://knative.dev/jsaladas/transactionClassified"

type = "dev.knative.eventing.jsaladas.transaction.classify"

data = "Hi from Knative!"

I can use whichever values I want for the above, this is just the values I decided on, feel free to change them. We can then use these fields later to filter for the reply events only. Our code will now, aside from receiving an event and displaying it, generate a new event that will enter the knative eventing ecosystem.

Initial components to run the example

The main code for this tutorial is already in the github repo for part 2. If you already followed this and have this running then you will need to redeploy the event-display service with a new image. For those who didn’t join for part 2, this part shows you how to deploy the components.

Deploy the websocket source application:

kubectl apply -f 010-deployment.yaml

Here is the yaml below:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: wseventsource
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: wseventsource
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: wseventsource
          image: docker.io/josiemundi/wssourcecloudevents:latest
          env:
          - name: SINK
            value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"

Next we will apply the trigger that will set up a subscription for the event display service to subscribe to events from the broker that have a source equal to "wss://ws.blockchain.info/inv". We can also filter on type or even another cloudEvent variable or, if we left them both empty, then it would filter on all events.

kubectl apply -f 040-trigger.yaml

Here is the trigger yaml:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: wsevent-trigger
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: "wss://ws.blockchain.info/inv"
  subscriber:    
    ref:
      apiVersion: v1
      kind: Service
      name: event-display

Next we will deploy the event-display service, which is the specified subscriber of the blockchain events in our trigger.yaml. This application is where we create our reply events.

This is a Kubernetes service so we need to apply the following yaml files:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: event-display
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: event-display
          image: docker.io/josiemundi/test-reply-broker
apiVersion: v1
kind: Service
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  type: NodePort
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8080
    name: consumer
  - port: 9080
    protocol: TCP
    targetPort: 9080
    nodePort: 31234
    name: dashboard
  selector:
    app: event-display

If you head to localhost:31234, you should see the stream of events.

Subscribe to the reply events

Now we need to add another trigger, this time subscribing only to the reply events (that’s our newEvent that we set up in the go code). You can see that, in this case, we specify the source as "https://knative.dev/jsaladas/transactionClassified".

Here is the trigger yaml we apply:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: reply-trigger-test
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: "https://knative.dev/jsaladas/transactionClassified"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: test-display

This time, our subscriber is a Knative service called test-display, which we still need to deploy.

Run the following to deploy the knative service that subscribes to reply events:

kubectl --namespace knative-eventing-websocket-source apply --filename - << END
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: test-display
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/github.com/knative/eventing-contrib/cmd/event_display@sha256:1d6ddc00ab3e43634cd16b342f9663f9739ba09bc037d5dea175dc425a1bb955
END

We can now get the logs of the test-display service and you should only see the reply messages:

kubectl logs -l serving.knative.dev/service=test-display -c user-container --tail=100 -n knative-eventing-websocket-source

Next time we will look at classifying the events and use transaction size as a reply to the broker.

knative, kubernetes

Knative Eventing: Part 2 – streaming CloudEvents to a UI

I’ve been looking at Knative eventing a fair bit lately and one of the things I have been doing is building an eventing demo (the first part of which can be found here). As part of this demo, I wanted to understand how I could get CloudEvents that were being sent by my producer to display in real time via a web UI (event display service UI).

Here is a bit of info and an overview of the approach I took. The code to run through this tutorial can be found here.

Prerequisites and set-up

First, you will need to have Knative and your chosen Gateway provider installed (I tried this with both Istio and Gloo, which both worked fine). You can follow the instructions here.

Initially deploy the 001-namespace.yaml by running:

kubectl apply -f 001-namespace.yaml

Verify you have a broker:

kubectl -n knative-eventing-websocket-source get broker default

You will see that the broker has a URL, this is what we will use as our SINK in the next step.

Deploy the Blockchain Events Sender Application

The application that sends the events was discussed in my Knative Eventing: Part 1 post and you can find the repo with all the code for this application here.

To get up and running you can simply run the 010-deployment.yaml file. Here is a reminder of what it looks like:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: wseventsource
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: wseventsource
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: wseventsource
          image: docker.io/josiemundi/wssourcecloudevents:latest
          env:
          - name: SINK
            value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"

This is a Kubernetes app deployment. The name of the deployment is wseventsource and the namespace is knative-eventing-websocket-source. We have defined an environmental variable of SINK, for which we set the value as the address of our broker.

Verify events are being sent by running:

kubectl --namespace knative-eventing-websocket-source logs -l app=wseventsource --tail=100 

This is what we currently have deployed:

Add a trigger – Send CloudEvents to Event-Display

Now we can deploy our trigger, which will set our event-display service as the subscriber.

# Knative Eventing Trigger to trigger the helloworld-go service
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: wsevent-trigger
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: ""
  subscriber:    
    ref:
      apiVersion: v1
      kind: Service
      name: event-display

In the file above, we define our trigger name as wsevent-trigger and the namespace. In spec > filter I am basically specifying for the broker to send all events to the subscriber. The subscriber in this case is a Kubernetes services rather than a Knative Service.

kubectl apply -f 030-trigger.yaml

Now we have the following:

A trigger can exist before the service and vice versa. Let’s set up our event display.

Stream CloudEvents to Event Display service

I used the following packages to build the Event Display service:

Originally I deployed my event-display application as a Knative Service and this was fine but I could only access the events through the logs or by using curl.

Ideally, I wanted to build a stream of events that was push all the way to the UI. However, I discovered that for this use case it wasn’t possible to deploy this way. This is because Knative serving does not allow multiple ports in a service deployment.

I asked the question about it in the Knative Slack channel and the response was mainly to use mux and specify a path (I saw something similar in the sockeye GitHub project).

In the end, I chose to deploy as a native Kubernetes service instead. The reason is that it seemed like the most applicable way to do this, both in terms of functionality and also security. I was a little unsure about the feasibility of using mux in production as you may not want to expose an internal port externally.

For the kncloudevents project, I struggled to find detailed info or examples but the code is built on top of the Go sdk for CloudEvents and there are some detailed docs for the Python version.

We can use it to listen for HTTP cloudevents requests. By default it will listen on port 8080. When we use the StartReceiver function, this is essentially telling our code to start listening. Because this happens on one port, we need another to ListenAndServe.

So here are the two yaml files that we deploy for the event-display.

App Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: event-display
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: event-display
          image: docker.io/josiemundi/bitcoinfrontendnew

Service Deployment:

apiVersion: v1
kind: Service
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  type: NodePort
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8080
    name: consumer
  - port: 9080
    protocol: TCP
    targetPort: 9080
    nodePort: 31234
    name: dashboard
  selector:
    app: event-display

With everything deployed we now have the following:

Now if you head to the nodeport specified in the yaml:

http://localhost:31234

Next time, we will look at how to send a reply event back into the Knative eventing space.

knative, kubernetes

Knative Eventing Example: Part 1

In my last post I shared some methods for getting up and running with Knative eventing. In this post, we are going to step back a little to try and understand how the eventing component works. It gives a high level overview of a demo you can follow along with on GitHub.

This will be the first of a series of posts, which walk through an example, that we will build out in complexity over the coming weeks.

First, lets go over a few reasons why we might look to use eventing.

What capabilities does Knative Eventing bring?

  • Ability to decouple producers and consumers (this means, for example, that consumers can be subscribed to an event type before any of those event types have been produced).
  • Events are published as CloudEvents (this is a topic I would like to cover separately in more detail).
  • Push-based messaging

There are a number of key components that I will describe below, which together will make up the initial example. Channels and subscriptions will not be included in this post, we’ll discuss those another time.

What are we building?

Let’s first take a look at the diagram below to get a picture of how these components fit and interact together. This diagram shows the type of demo scenario we are looking to recreate over the next few posts.

Each of these components are deployed using yaml files, except for the broker, which is automatically created once the knative injection is enabled within the namespace. You can deploy a custom broker if you wish but I won’t include that in this post.

In this simple example, we use a Kubernetes app deployment as the source and a Knative Service as the consumer, which will subscribe to the events.

The code that I use to stream the events to the broker is available here on GitHub and gives more detailed instructions if you want to build it yourself. It also contains the yaml files used in this tutorial.

Source

Our source is the producer of the events. It could be an application, a web socket, a process etc. It produces events that other services may or may not be interested in subscribing to.

There are a number of different types of sources, each one is a custom resource. The range of sources available can be seen in the Knative documentation here. You can create your own event source if you need to.

The following yaml shows a simple Kubernetes app deployment which is our source:

In the above example, the source is a go application, which streams messages via a web socket connection. It sends them as CloudEvents, which our service will consume. It is available to view here.

Broker and Trigger are CRDs, which will manage the delivery of events and abstract away the details of these from the related services.

Broker

The broker is where events get received. It is like a holding area, from where they can be consumed by those interested. As mentioned above, a default broker is automatically created when you label your namespace with kubectl label namespace my-event-namespace knative-eventing-injection=enabled

Trigger

Our trigger provides a filter, by which it is determines which events should be delivered to a given consumer.

Here below is an example trigger, which just defines that the event-display service subscribes to all events from the default broker.

Under spec, I can also add filter: > attributes: and then include some CloudEvent attributes to filter by. We can then filter the CloudEvent fields, such as type or source etc in order to determine those to which a service subscribes. Here is another example, which filters on a specific event type:

You can also filter on an expression such as:

expression: ce.type == "com.github.pull.create"

Consumer

We can have one or multiple consumers. These are the services that are interested (or not) in the events. If you have a single consumer, you can send straight from the source to consumer.

Here is the Knative Service deployment yaml:

Because I want to send events to a Knative Service, I need to have the cluster-local visibility label (this is explained in more detail below). For the rest, I am using a pre-built image from knative for a simple event display, the code for which can be found here.

Once you have all of these initial components, it looks like this:

Issues

I had some issues with getting the events to the consumer at first when trying this initial demo out. In the end I found out that in order to sink to a Knative Service, you need to add the cluster local gateway to the Istio installation. This is somewhat vaguely mentioned in the docs around installing Istio but could probably have been a bit clearer. Luckily, I found this great post, which helped me massively!

When you install Istio you will need to ensure that you see (at least) the following:

Handy tips and stuff I wish I had known before…

If you want to add a sink URL in your source file, see the example below for a broker sink:

http://default-broker.knative-eventing-websocket-source.svc.cluster.local

default is the name of the broker and knative-eventing-websocket-source is the namespace.

In order to verify events are being sent/received/consumed then you can use the following examples as a reference:

//Getting the logs of the source app
kubectl --namespace knative-eventing-websocket-source logs -l app=wseventsource --tail=100 

//Getting the logs of the broker
kubectl --namespace knative-eventing-websocket-source logs -l eventing.knative.dev/broker=default --tail=100 

//Getting the logs of a Knative service
kubectl logs -l serving.knative.dev/service=event-display -c user-container --since=10m -n knative-eventing-websocket-source

You should see something like this when you get the service logs:

Next steps

Next time we will be looking at building out our service and embellishing it a little so we can transform, visualise and send new events back to a broker.