cloud native, knative

Knative Eventing: Part 3 – Replying to broker

In part two of my Knative eventing tutorials, we streamed websocket data to a broker and then subscribed to the events with an event display service that displayed the events in real-time via a web UI.

In this post, I am going to show how to create reply events to a broker and then subscribe to them from a different service. Once we are done, we will have something that should look like the following:

In this scenario, we are receiving events from a streaming source (in this case a websocket) and each time we receive an event, we want to send another event as a reply back to the broker. In this example, we are just going to do this very simply in that every time we get an event, we send another.

In the next tutorial, we will look to receive the events and then perform some analysis on the fly, for example analyse the value of a transaction and assign a size variable. We could then implement some logic like if size = XL, send a reply back to the broker, which could then be listened for by an alerting application.

Setup

I am running Kubernetes on Docker Desktop for mac. You will also need Istio and Knative eventing installed.

Deploy a namespace:

kubectl create namespace knative-eventing-websocket-source

apply the knative-eventing label:

kubectl label namespace knative-eventing-websocket-source knative-eventing-injection=enabled

Ensure you have the cluster local gateway set up.

Adding newEvent logic:

In our application code, we are adding some code for sending a new reply event every time an event is received. The code is here:

newEvent := cloudevents.NewEvent()
newEvent.SetSource(fmt.Sprintf("https://knative.dev/jsaladas/transactionClassified"))
newEvent.SetType("dev.knative.eventing.jsaladas.transaction.classify")
newEvent.SetID("1234")
newEvent.SetData("Hi from Knative!")
response.RespondWith(200, &newEvent)

This code creates a new event, with the following information:

source = "https://knative.dev/jsaladas/transactionClassified"

type = "dev.knative.eventing.jsaladas.transaction.classify"

data = "Hi from Knative!"

I can use whichever values I want for the above, this is just the values I decided on, feel free to change them. We can then use these fields later to filter for the reply events only. Our code will now, aside from receiving an event and displaying it, generate a new event that will enter the knative eventing ecosystem.

Initial components to run the example

The main code for this tutorial is already in the github repo for part 2. If you already followed this and have this running then you will need to redeploy the event-display service with a new image. For those who didn’t join for part 2, this part shows you how to deploy the components.

Deploy the websocket source application:

kubectl apply -f 010-deployment.yaml

Here is the yaml below:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: wseventsource
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: wseventsource
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: wseventsource
          image: docker.io/josiemundi/wssourcecloudevents:latest
          env:
          - name: SINK
            value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"

Next we will apply the trigger that will set up a subscription for the event display service to subscribe to events from the broker that have a source equal to "wss://ws.blockchain.info/inv". We can also filter on type or even another cloudEvent variable or, if we left them both empty, then it would filter on all events.

kubectl apply -f 040-trigger.yaml

Here is the trigger yaml:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: wsevent-trigger
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: "wss://ws.blockchain.info/inv"
  subscriber:    
    ref:
      apiVersion: v1
      kind: Service
      name: event-display

Next we will deploy the event-display service, which is the specified subscriber of the blockchain events in our trigger.yaml. This application is where we create our reply events.

This is a Kubernetes service so we need to apply the following yaml files:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  replicas: 1
  selector:
    matchLabels: &labels
      app: event-display
  template:
    metadata:
      labels: *labels
    spec:
      containers:
        - name: event-display
          image: docker.io/josiemundi/test-reply-broker
apiVersion: v1
kind: Service
metadata:
  name: event-display
  namespace: knative-eventing-websocket-source
spec:
  type: NodePort
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8080
    name: consumer
  - port: 9080
    protocol: TCP
    targetPort: 9080
    nodePort: 31234
    name: dashboard
  selector:
    app: event-display

If you head to localhost:31234, you should see the stream of events.

Subscribe to the reply events

Now we need to add another trigger, this time subscribing only to the reply events (that’s our newEvent that we set up in the go code). You can see that, in this case, we specify the source as "https://knative.dev/jsaladas/transactionClassified".

Here is the trigger yaml we apply:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: reply-trigger-test
  namespace: knative-eventing-websocket-source
spec:
  broker: default
  filter:
    sourceAndType:
      type: ""
      source: "https://knative.dev/jsaladas/transactionClassified"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: test-display

This time, our subscriber is a Knative service called test-display, which we still need to deploy.

Run the following to deploy the knative service that subscribes to reply events:

kubectl --namespace knative-eventing-websocket-source apply --filename - << END
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: test-display
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/github.com/knative/eventing-contrib/cmd/event_display@sha256:1d6ddc00ab3e43634cd16b342f9663f9739ba09bc037d5dea175dc425a1bb955
END

We can now get the logs of the test-display service and you should only see the reply messages:

kubectl logs -l serving.knative.dev/service=test-display -c user-container --tail=100 -n knative-eventing-websocket-source

Next time we will look at classifying the events and use transaction size as a reply to the broker.

docker

How to: Docker Swarm

This tutorial will show you how to get your first Docker swarm up and running. In my example, I am using two Ubuntu machines, one will be the master and one will be the worker.

Install Docker Community Edition on the machines.

Follow the instructions on the website in order to install docker ce for Ubuntu.

Check Docker is installed by running:

docker --version

Install Docker machine:

Docker machine will allow us to install a docker engine on our machines and manage them using docker-machine commands. You can find out more about Docker machine on their website.

curl -L https://github.com/docker/machine/releases/download/v0.16.0/docker-machine-`uname -s`-`uname -m` >/tmp/docker-machine &&
chmod +x /tmp/docker-machine &&
sudo cp /tmp/docker-machine /usr/local/bin/docker-machine

I have installed docker machine on both of the hosts.

On both servers, we need to change the hosts file via command line. You will need to add two lines to the end of the file. The easiest way to do this is using vim text editor, which will allow you to edit the file in command line. We need to add the ip address of each machine and let it know which is the manager and which is the worker.

If you have additional workers then you can add worker02, worker03 etc.

sudo vim /etc/hosts

10.0.16.5    manager
10.0.16.4    worker01

We now need to run the following from the manager machine:

sudo docker swarm init --advertise-addr 10.0.16.5 

We will get a response back, which will be a docker swarm join that we then need to run on the worker machine. It will look something like this (example from dockers own swarm tutorial):

To add a worker to this swarm, run the following command:

    docker swarm join \
    --token SWMTKN-1-49nj1cmql0jkz5s954yi3oex3nedyz0fb0xx14ie39trti4wxv-8vxv8rssmk743ojnwacrr2e7c \
    192.168.99.100:2377

Once you have done this your swarm will be up and running. If you run:

docker info

you will be able to see the details of your swarm, including how many nodes, managers, containers etc.

Here are some useful links on Docker swarm that I used to get mine up and running:

https://docs.docker.com/engine/swarm/

https://www.howtoforge.com/tutorial/ubuntu-docker-swarm-cluster/

Next time we will look at getting something up and running in docker swarm mode.