In part two of my Knative eventing tutorials, we streamed websocket data to a broker and then subscribed to the events with an event display service that displayed the events in real-time via a web UI.
In this post, I am going to show how to create reply events to a broker and then subscribe to them from a different service. Once we are done, we will have something that should look like the following:

In this scenario, we are receiving events from a streaming source (in this case a websocket) and each time we receive an event, we want to send another event as a reply back to the broker. In this example, we are just going to do this very simply in that every time we get an event, we send another.
In the next tutorial, we will look to receive the events and then perform some analysis on the fly, for example analyse the value of a transaction and assign a size variable. We could then implement some logic like if size = XL, send a reply back to the broker, which could then be listened for by an alerting application.
Setup
I am running Kubernetes on Docker Desktop for mac. You will also need Istio and Knative eventing installed.
Deploy a namespace:
kubectl create namespace knative-eventing-websocket-source
apply the knative-eventing label:
kubectl label namespace
knative-eventing-websocket-source
knative-eventing-injection=enabled
Ensure you have the cluster local gateway set up.
Adding newEvent logic:
In our application code, we are adding some code for sending a new reply event every time an event is received. The code is here:
newEvent := cloudevents.NewEvent()
newEvent.SetSource(fmt.Sprintf("https://knative.dev/jsaladas/transactionClassified"))
newEvent.SetType("dev.knative.eventing.jsaladas.transaction.classify")
newEvent.SetID("1234")
newEvent.SetData("Hi from Knative!")
response.RespondWith(200, &newEvent)
This code creates a new event, with the following information:
source = "https://knative.dev/jsaladas/transactionClassified"
type = "dev.knative.eventing.jsaladas.transaction.classify"
data = "Hi from Knative!"
I can use whichever values I want for the above, this is just the values I decided on, feel free to change them. We can then use these fields later to filter for the reply events only. Our code will now, aside from receiving an event and displaying it, generate a new event that will enter the knative eventing ecosystem.
Initial components to run the example
The main code for this tutorial is already in the github repo for part 2. If you already followed this and have this running then you will need to redeploy the event-display service with a new image. For those who didn’t join for part 2, this part shows you how to deploy the components.
Deploy the websocket source application:
kubectl apply -f 010-deployment.yaml
Here is the yaml below:
apiVersion: apps/v1
kind: Deployment
metadata:
name: wseventsource
namespace: knative-eventing-websocket-source
spec:
replicas: 1
selector:
matchLabels: &labels
app: wseventsource
template:
metadata:
labels: *labels
spec:
containers:
- name: wseventsource
image: docker.io/josiemundi/wssourcecloudevents:latest
env:
- name: SINK
value: "http://default-broker.knative-eventing-websocket-source.svc.cluster.local"
Next we will apply the trigger that will set up a subscription for the event display service to subscribe to events from the broker that have a source
equal to "wss://ws.blockchain.info/inv"
. We can also filter on type or even another cloudEvent variable or, if we left them both empty, then it would filter on all events.
kubectl apply -f 040-trigger.yaml
Here is the trigger yaml:
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: wsevent-trigger
namespace: knative-eventing-websocket-source
spec:
broker: default
filter:
sourceAndType:
type: ""
source: "wss://ws.blockchain.info/inv"
subscriber:
ref:
apiVersion: v1
kind: Service
name: event-display
Next we will deploy the event-display service, which is the specified subscriber
of the blockchain events in our trigger.yaml
. This application is where we create our reply events.
This is a Kubernetes service so we need to apply the following yaml files:
apiVersion: apps/v1
kind: Deployment
metadata:
name: event-display
namespace: knative-eventing-websocket-source
spec:
replicas: 1
selector:
matchLabels: &labels
app: event-display
template:
metadata:
labels: *labels
spec:
containers:
- name: event-display
image: docker.io/josiemundi/test-reply-broker
apiVersion: v1
kind: Service
metadata:
name: event-display
namespace: knative-eventing-websocket-source
spec:
type: NodePort
ports:
- port: 80
protocol: TCP
targetPort: 8080
name: consumer
- port: 9080
protocol: TCP
targetPort: 9080
nodePort: 31234
name: dashboard
selector:
app: event-display
If you head to localhost:31234, you should see the stream of events.
Subscribe to the reply events
Now we need to add another trigger, this time subscribing only to the reply events (that’s our newEvent that we set up in the go code). You can see that, in this case, we specify the source as "https://knative.dev/jsaladas/transactionClassified"
.
Here is the trigger yaml we apply:
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: reply-trigger-test
namespace: knative-eventing-websocket-source
spec:
broker: default
filter:
sourceAndType:
type: ""
source: "https://knative.dev/jsaladas/transactionClassified"
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: test-display
This time, our subscriber is a Knative service called test-display, which we still need to deploy.
Run the following to deploy the knative service that subscribes to reply events:
kubectl --namespace knative-eventing-websocket-source apply --filename - << END
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: test-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/github.com/knative/eventing-contrib/cmd/event_display@sha256:1d6ddc00ab3e43634cd16b342f9663f9739ba09bc037d5dea175dc425a1bb955
END
We can now get the logs of the test-display
service and you should only see the reply messages:
kubectl logs -l serving.knative.dev/service=test-display -c user-container --tail=100 -n knative-eventing-websocket-source
Next time we will look at classifying the events and use transaction size as a reply to the broker.