Export Kubernetes Pod Logs to Upstash Kafka via FluentBit
FluentBit is known to be a lightweight log processor and exporter. Having tens of input and output connectors, FluentBit is one of the most popular options when developers need to keep track of service logs, process, and export them to a backend or a persistent store.
Kafka output plugin comes along with the built-in FluentBit connectors. That is, FluentBit can read logs from an arbitrary source and export them to a Kafka topic. Upstash, on the other hand, provides serverless, fully managed Kafka cluster with pay-per-message model. That rescues developers from the burden of managing, scaling and maintaining clusters. Besides, its price scales to zero if there is no message - as a real serverless offering! The last but not the least, the first 10.000 messages per day are free of charge.
In this article, we will export the logs of particular Kubernetes pods to Upstash Kafka via FluentBit, and then consume, filter and stream these logs to clients through a Go HTTP server.
Before start, make sure you have an Upstash account (sign up free if you
haven’t already), access to a Kubernetes cluster (minikube or docker-desktop is fine), and helm
installed.
Setup
Create Upstash Kafka cluster and new topic
Log in to the Upstash console, navigate to Kafka tab and create a new Kafka cluster and a topic called logs
, in seconds!
Deploy FluentBit to Kubernetes
Let’s use official charts to deploy FluentBit.
$ helm repo add fluent https://fluent.github.io/helm-charts
$ helm repo add fluent https://fluent.github.io/helm-charts
Before starting the installation, we need to change a few default values of the chart, namely:
config.inputs
: configuration for log inputs, that is, which logs we want to export.config.filters
: configuration for log processing before exporting them to output plugins.config.outputs
: configuration for log sinks.
Create new files for each and fill them with the content below:
input.conf
:
[INPUT]
Name tail
Path /var/log/containers/*_upstashed_*.log
multiline.parser docker, cri
Tag kube.*
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Here we tell FluentBit to use tail
plugin, which observes the new lines appended to the files specified with Path.
Notice _upstashed_
value in the Path
key. Log files are named as <pod-name>_<namespace>_<container-name-container-id>.log
and in this blog post, we want FluentBit to export logs from the pods that reside under upstashed
namespace only.
filter.conf
[FILTER]
Name kubernetes
Match kube.*
Merge_Log On
Keep_Log Off
K8S-Logging.Parser On
K8S-Logging.Exclude On
[FILTER]
Name nest
Match *
Operation lift
Nested_under kubernetes
Add_prefix k8s_
Here we use the built-in kubernetes
filter of FluentBit - which creates a structured output containing log line,
timestamp, pod info, container info, etc. This is an example output generated by kubernetes
filter:
{
"@timestamp": 1670672614.142579,
"log": "<log-line>",
"stream": "stdout",
"time": "2022-12-10T11:43:34.1425787Z",
"kubernetes": {
"pod_name": "<pod-name>",
"namespace_name": "<namespace>",
"pod_id": "<id>",
"labels": {
"app": "<foo>",
"pod-template-hash": "<bar>"
},
"host": "docker-desktop",
"container_name": "<baz>",
"docker_id": "<some-id>",
"container_hash": "<some-hash>",
"container_image": "<some-image>"
}
}
{
"@timestamp": 1670672614.142579,
"log": "<log-line>",
"stream": "stdout",
"time": "2022-12-10T11:43:34.1425787Z",
"kubernetes": {
"pod_name": "<pod-name>",
"namespace_name": "<namespace>",
"pod_id": "<id>",
"labels": {
"app": "<foo>",
"pod-template-hash": "<bar>"
},
"host": "docker-desktop",
"container_name": "<baz>",
"docker_id": "<some-id>",
"container_hash": "<some-hash>",
"container_image": "<some-image>"
}
}
That’s cool. But we want to move fields under "kubernetes"
key to the outer block. That’s because we want to refer
to these values, namely pod_name
when sending logs to Kafka. Since FluentBit’s Kafka output does not support the
record accessor
feature (which allows accessing inner values of JSON records), we apply such a workaround. The second filter, named nest
,
helps us to process the JSON in that sense. Also, it will add “k8s_”
prefix to all these fields under "kubernetes"
key.
So the message becomes:
{
"@timestamp": 1670672943.712912,
"log": "<log-line>",
"stream": "stdout",
"time": "2022-12-10T11:49:03.7129124Z",
"k8s_pod_name": "<pod-name>",
"k8s_namespace_name": "<namespace>",
"k8s_pod_id": "<id>",
"k8s_labels": {
"app": "<foo>",
"pod-template-hash": "<bar>"
},
"k8s_host": "docker-desktop",
"k8s_container_name": "<baz>",
"k8s_docker_id": "<some-id>",
"k8s_container_hash": "<some-hash>",
"k8s_container_image": "<some-image>"
}
{
"@timestamp": 1670672943.712912,
"log": "<log-line>",
"stream": "stdout",
"time": "2022-12-10T11:49:03.7129124Z",
"k8s_pod_name": "<pod-name>",
"k8s_namespace_name": "<namespace>",
"k8s_pod_id": "<id>",
"k8s_labels": {
"app": "<foo>",
"pod-template-hash": "<bar>"
},
"k8s_host": "docker-desktop",
"k8s_container_name": "<baz>",
"k8s_docker_id": "<some-id>",
"k8s_container_hash": "<some-hash>",
"k8s_container_image": "<some-image>"
}
Alright! We now want these logs to be exported to Upstash Kafka. Let’s configure FluentBit output plugin as the last step:
output.conf
[OUTPUT]
Name kafka
Match kube.*
Brokers <broker-provided-by-upstash>
Topics logs
Message_Key_Field k8s_pod_name
rdkafka.security.protocol sasl_ssl
rdkafka.sasl.mechanism SCRAM-SHA-256
rdkafka.sasl.username <username-provided-by-upstash>
rdkafka.sasl.password <password-provided-by-upstash>
Remember that we created logs
topic when creating Upstash Kafka cluster. Hence, use this topic to export pod logs.
Also, use k8s_pod_name
value as the message key during export.
Now we are ready to deploy FluentBit:
$ helm install fluent-bit -n fluent-bit --create-namespace fluent/fluent-bit \
--set-file config.inputs=input.conf \
--set-file config.filters=filter.conf \
--set-file config.outputs=output.conf
$ helm install fluent-bit -n fluent-bit --create-namespace fluent/fluent-bit \
--set-file config.inputs=input.conf \
--set-file config.filters=filter.conf \
--set-file config.outputs=output.conf
That will deploy fluent-bit daemonsets:
$ kubectl get ds -n fluent-bit
NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
fluent-bit 1 1 1 1 1 <none> 19h
$ kubectl get ds -n fluent-bit
NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
fluent-bit 1 1 1 1 1 <none> 19h
Check if Kafka connection is established:
$ kubectl logs -f ds/fluent-bit -n fluent-bit
…
…
[2022/12/10 11:49:01] [ info] [output:kafka:kafka.0] brokers='<broker-url>' topics='logs'
…
…
$ kubectl logs -f ds/fluent-bit -n fluent-bit
…
…
[2022/12/10 11:49:01] [ info] [output:kafka:kafka.0] brokers='<broker-url>' topics='logs'
…
…
Create a new pod of which logs are to be exported
Remember we configure FluentBit input to observe logs from upstashed
namespace only. Let’s create a pod under this
namespace that produces random logs:
$ kubectl create ns upstashed
$ kubectl create deployment random-logger -n upstashed --image=chentex/random-logger:v1.0.1 -- /entrypoint.sh 7500 7500 1000
$ kubectl create ns upstashed
$ kubectl create deployment random-logger -n upstashed --image=chentex/random-logger:v1.0.1 -- /entrypoint.sh 7500 7500 1000
This deployment produces random logs and prints to stdout. The arguments are minLogInterval
, maxLogInterval
and numberOfLogs
respectively. So the above deployment will produce a log every 7.5 seconds and 1000 lines in total:
$ kubectl logs -f deployment/random-logger -n upstashed
2022-12-10T15:09:30+0000 ERROR An error is usually an exception that has been caught and not handled.
2022-12-10T15:09:37+0000 INFO This is less important than debug log and is often used to provide context in the current task.
2022-12-10T15:09:45+0000 DEBUG This is a debug log that shows a log that can be ignored.
2022-12-10T15:09:52+0000 WARN A warning that should be ignored is usually at this level and should be actionable.
$ kubectl logs -f deployment/random-logger -n upstashed
2022-12-10T15:09:30+0000 ERROR An error is usually an exception that has been caught and not handled.
2022-12-10T15:09:37+0000 INFO This is less important than debug log and is often used to provide context in the current task.
2022-12-10T15:09:45+0000 DEBUG This is a debug log that shows a log that can be ignored.
2022-12-10T15:09:52+0000 WARN A warning that should be ignored is usually at this level and should be actionable.
Now navigate back to Upstash console to see if the pod logs arrive:
It’s that easy!
Consume Logs from Upstash Kafka
Upstash Kafka in the above setup behaves like a buffer for the collected logs. We can then configure a sink for these
messages to be processed or persisted. During all these processes, let's say we also want to give some clients access to
real-time logs, but not the entire message. For instance, we do not want clients to see k8s_container_image
field
of the Kafka messages.
Create HTTP Server
After adding Consumer
snippet for Go provided by Upstash console under Details
tab, start an HTTP server that handles
requests at /logs
endpoint and streams messages to clients:
func main() {
dialer := getKafkaDialer()
http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
reader := getKafkaReader(dialer)
defer reader.Close()
// Errors are ignored for brevity. Note that this is allowed only
// in blogpost code snippets! Never ignore errors in Golang!
w.Header().Set("Transfer-Encoding", "chunked")
w.Write([]byte("------------ Streaming Logs ------------\n"))
flusher, _ := w.(http.Flusher)
flusher.Flush()
for {
message, err := reader.ReadMessage(r.Context())
if err != nil {
// should have been handled and responded properly
// for context.Canceled and other errors.
return
}
// include only `log` field from the message value
resp := struct {
Log string `json:"log"`
}{}
_ = json.Unmarshal(message.Value, &resp)
w.Write(message.Key)
w.Write([]byte{':', '\t'})
w.Write([]byte(resp.Log))
flusher.Flush()
}
})
http.ListenAndServe(":8080", nil)
}
func main() {
dialer := getKafkaDialer()
http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
reader := getKafkaReader(dialer)
defer reader.Close()
// Errors are ignored for brevity. Note that this is allowed only
// in blogpost code snippets! Never ignore errors in Golang!
w.Header().Set("Transfer-Encoding", "chunked")
w.Write([]byte("------------ Streaming Logs ------------\n"))
flusher, _ := w.(http.Flusher)
flusher.Flush()
for {
message, err := reader.ReadMessage(r.Context())
if err != nil {
// should have been handled and responded properly
// for context.Canceled and other errors.
return
}
// include only `log` field from the message value
resp := struct {
Log string `json:"log"`
}{}
_ = json.Unmarshal(message.Value, &resp)
w.Write(message.Key)
w.Write([]byte{':', '\t'})
w.Write([]byte(resp.Log))
flusher.Flush()
}
})
http.ListenAndServe(":8080", nil)
}
This is a minimal HTTP server with a single endpoint /logs
. The handler reads from logs
Upstash Kafka topic, then
sends each log line to clients along with the pod name received as the message key.
Let's compare kubectl logs
vs stream logs from Upstash!
The command running in the upper terminal tab is
kubectl logs -f deployment/random-logger -n upstashed
kubectl logs -f deployment/random-logger -n upstashed
while in the lower tab it is
curl localhost:8080/logs
curl localhost:8080/logs
The upper tab retrieves logs from my local kubernetes cluster, where lower logs are received from Upstash Kafka running
in eu-west
. Both are almost synchronized. Upstash rocks, doesn't it?
Conclusion
Logs usually contain hints when we encounter an error or unexpected behavior in deployments and hence become the first place to look by developers during diagnosis. Thus, instead of being deleted and forgotten, logs should be cared for and consumed properly.
In this article, we exported particular Kubernetes pod logs to the Upstash Kafka topic, from where other consumers can read, process, and act on them. Moreover, after applying a simple filter we exposed these real-time logs to HTTP clients - which do not necessarily have access to neither Kubernetes cluster nor Kafka topics.
Now that Kubernetes pod logs are buffered in Upstash Kafka, one can further benefit from this and consume them at other backends, storage drivers, dashboards, observability tools, and the like.