In part two of my Knative eventing tutorials, we streamed websocket data to a broker and then subscribed to the events with an event display service that displayed the events in real-time via a web UI.
In this post, I am going to show how to create reply events to a broker and then subscribe to them from a different service. Once we are done, we will have something that should look like the following:
In this scenario, we are receiving events from a streaming source (in this case a websocket) and each time we receive an event, we want to send another event as a reply back to the broker. In this example, we are just going to do this very simply in that every time we get an event, we send another.
In the next tutorial, we will look to receive the events and then perform some analysis on the fly, for example analyse the value of a transaction and assign a size variable. We could then implement some logic like if size = XL, send a reply back to the broker, which could then be listened for by an alerting application.
I am running Kubernetes on Docker Desktop for mac. You will also need Istio and Knative eventing installed.
Deploy a namespace:
kubectl create namespace knative-eventing-websocket-source
apply the knative-eventing label:
kubectl label namespace
Ensure you have the cluster local gateway set up.
Adding newEvent logic:
In our application code, we are adding some code for sending a new reply event every time an event is received. The code is here:
newEvent := cloudevents.NewEvent() newEvent.SetSource(fmt.Sprintf("https://knative.dev/jsaladas/transactionClassified")) newEvent.SetType("dev.knative.eventing.jsaladas.transaction.classify") newEvent.SetID("1234") newEvent.SetData("Hi from Knative!") response.RespondWith(200, &newEvent)
This code creates a new event, with the following information:
source = "https://knative.dev/jsaladas/transactionClassified"
type = "dev.knative.eventing.jsaladas.transaction.classify"
data = "Hi from Knative!"
I can use whichever values I want for the above, this is just the values I decided on, feel free to change them. We can then use these fields later to filter for the reply events only. Our code will now, aside from receiving an event and displaying it, generate a new event that will enter the knative eventing ecosystem.
Initial components to run the example
The main code for this tutorial is already in the github repo for part 2. If you already followed this and have this running then you will need to redeploy the event-display service with a new image. For those who didn’t join for part 2, this part shows you how to deploy the components.
Deploy the websocket source application:
kubectl apply -f 010-deployment.yaml
Here is the yaml below:
apiVersion: apps/v1 kind: Deployment metadata: name: wseventsource namespace: knative-eventing-websocket-source spec: replicas: 1 selector: matchLabels: &labels app: wseventsource template: metadata: labels: *labels spec: containers: - name: wseventsource image: docker.io/josiemundi/wssourcecloudevents:latest env: - name: SINK value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"
Next we will apply the trigger that will set up a subscription for the event display service to subscribe to events from the broker that have a
source equal to
"wss://ws.blockchain.info/inv". We can also filter on type or even another cloudEvent variable or, if we left them both empty, then it would filter on all events.
kubectl apply -f 040-trigger.yaml
Here is the trigger yaml:
apiVersion: eventing.knative.dev/v1alpha1 kind: Trigger metadata: name: wsevent-trigger namespace: knative-eventing-websocket-source spec: broker: default filter: sourceAndType: type: "" source: "wss://ws.blockchain.info/inv" subscriber: ref: apiVersion: v1 kind: Service name: event-display
Next we will deploy the event-display service, which is the specified
subscriber of the blockchain events in our
trigger.yaml. This application is where we create our reply events.
This is a Kubernetes service so we need to apply the following yaml files:
apiVersion: apps/v1 kind: Deployment metadata: name: event-display namespace: knative-eventing-websocket-source spec: replicas: 1 selector: matchLabels: &labels app: event-display template: metadata: labels: *labels spec: containers: - name: event-display image: docker.io/josiemundi/test-reply-broker
apiVersion: v1 kind: Service metadata: name: event-display namespace: knative-eventing-websocket-source spec: type: NodePort ports: - port: 80 protocol: TCP targetPort: 8080 name: consumer - port: 9080 protocol: TCP targetPort: 9080 nodePort: 31234 name: dashboard selector: app: event-display
If you head to localhost:31234, you should see the stream of events.
Subscribe to the reply events
Now we need to add another trigger, this time subscribing only to the reply events (that’s our newEvent that we set up in the go code). You can see that, in this case, we specify the source as
Here is the trigger yaml we apply:
apiVersion: eventing.knative.dev/v1alpha1 kind: Trigger metadata: name: reply-trigger-test namespace: knative-eventing-websocket-source spec: broker: default filter: sourceAndType: type: "" source: "https://knative.dev/jsaladas/transactionClassified" subscriber: ref: apiVersion: serving.knative.dev/v1alpha1 kind: Service name: test-display
This time, our subscriber is a Knative service called test-display, which we still need to deploy.
Run the following to deploy the knative service that subscribes to reply events:
kubectl --namespace knative-eventing-websocket-source apply --filename - << END apiVersion: serving.knative.dev/v1 kind: Service metadata: name: test-display spec: template: spec: containers: - image: gcr.io/knative-releases/github.com/knative/eventing-contrib/cmd/event_display@sha256:1d6ddc00ab3e43634cd16b342f9663f9739ba09bc037d5dea175dc425a1bb955 END
We can now get the logs of the
test-display service and you should only see the reply messages:
kubectl logs -l serving.knative.dev/service=test-display -c user-container --tail=100 -n knative-eventing-websocket-source
Next time we will look at classifying the events and use transaction size as a reply to the broker.