How we use a central protobuf schema to make our data pipeline less chaotic
If I told you one of the most popular TV Shows in Sweden is about following the moose as they travel, for 21 days, you might say:
— Stop it with the stereotypes! or :
— Ismail, go home and stop experimenting with psychedelics
I promise, this TV show is the real deal and it’s only 21 days long.
The TV Show “den stora älg vandringen” (which translates to “the great moose walk”) is a type of TV Show known as slow TV. This trend started in the neighboring lovely country of Norway (at NRK).
This TV show is 21 days long and it was streamed live on SVTs (Swedish Television’s) streaming service SVT Play.
So why collect data then? It turns out filming the moose is not the only thing that SVT does and SVT Play is just one of the services at SVT. These services range from streaming to delivering news in addition to games and services for children and more. The point is, those apps make different types of content available (news articles, movies, TV series etc.) on different platforms including web, iOS, Android and smart TVs.
So a little more than two years ago, a project for data collection at SVT got started. This was an effort to aid decision making in the multiple products of SVT (that used to rely on third-party services such as google analytics). Of course, the data is not what makes or breaks the products, nor the shows, movies or news articles. It is there to verify assumptions, to understand the users better and to support the decision making.
As one can see, the diversity of SVTs services and the platforms on which they run adds multiple challenges to the collection of data.
So, what this post is about
If you prefer watching this as a video feel free to head over here.
While there are many things to talk about when it comes to the collection and analysis of data, this post will focus on:
A very brief intro to how we collect data at SVT.
The chaos that can result from the changes in the business rules.
How we use a central schema (with Protobuf) to define the data we collect and propagate this definition in the whole pipeline (from collection to storage).
Some of the issues we encountered with that choice.
A Brief on how we collect data at SVT
Let’s say you’re a news editor, and you want to see how your articles perform. How can we collect data about that?
For example, you would want to know: how many users read an article? We cannot know for sure if a user actually reads the article, but what we can do is estimate it from what we can collect such as: a mouse click on the article or a mouse scroll inside the article.
These things that we can collect are what we call “Events”. Other example events for other products can be:
A video start or a video pause in the video player.
A search query in a search bar.
Or other events for collecting video player telemetry information.
An event is an action that happened on the client application side, for which the corresponding information can be collected (for example: a click or a video start)
These events are just observations, and from them we should be able to either calculate or approximate our metrics (an example of a metric that cannot be calculated exactly is the percentage watched of a video because we simply do not know if the user is actually watching)
So we send those events. We ingest them into our pipeline (more details later). We store them in a table. We process and analyze them and then we present them to the users of our solution.
So the event will go through:
collection: on the user-facing application side. For this part, in our case, we built SDKs (mainly for iOS, Android, Web and smart TVs) that will collect events such as clicks and send them to an API, which brings us to the next part.
ingestion: when the API gets the previous event it publishes it to a message queue. In our case, we are currently using Pub/Sub on the google cloud platform (GCP). We have creatively named the previous API: the event API.
storage: a service gets the previous event from Pub/Sub, checks the validity of its content, then stores it in a table (a BigQuery table in our case). We have called this service judge-judi.
analysis: mostly just a bunch of ETL
presentation: at this stage, we can show a dashboard to the right person with the right metric.
For the purpose of this post, we will only focus on the collect — ingest — store parts of the previous pipeline.
To keep things simple, we can assume that we send one event at a time from the client application (In reality, those events are gathered by the client application and sent together in a sequence of events)
So, with all this in mind, where can problems happen?
The need for ONE (versioned) definition of the event data
From what we've talked about in the previous section, here's what the life of an event looks like from collection to storage:
First, we send the event from an app (through our SDK) to the Event API.
The Event API then adds it to the queue for it to be processed later.
And then on the right, is an accurate picture of judge-judi (the service described in the ingestion part above) performing its duty of pulling and storing valid events in the table.
The first problem is that all parts of the pipeline, from collection to storage, have to agree on what the definition of an event is.
The first problem: consistency
Let’s assume that the event looks something like:
{ 'user_id': '00000000', 'content_type': 'video', 'event_type': 'video_start', }
Given the event above. What if one of our products ends up sending an event by naming the type eventType instead of event_type?
{ 'user_id': '00000000', 'content_type': 'video', 'eventType': 'video_start', }
When this event reaches the validation stage at judge-judi. The service is expecting a field event_type it will just conclude that the field is not there.
So the first problem is having all parts of the pipeline agree on the definition of the data.
We also have a multitude of products in SVT (svtplay, news etc.) and different platforms (web, iOS, Android etc.) it would be hard to make sure that the SDKs are always consistent.
The second problem: Murphy’s law of data engineering
Which states that: “If the data schema can change, the data schema will change.” Now, Murphy did not necessarily say that.
Well, he never actually did, but I’m sure he would agree.
The business rules and goals change more often than teenagers’ relationships. We should expect, as a result, that the definition of the event will change. And it will change often. New fields will be added, old fields will be removed, others merged together, and the list goes on and on.
These modifications put schemas, and therefore the definition of an event, in a state of perpetual change. Each change will require adjustment to the pipeline. We do not want these recurring changes in the definition of the data to break the whole (or part of) the pipeline. Ideally, we would want to preserve backward and forward compatibility whenever possible.
And then problems specific to JSON
The type of a field in JSON is unclear and errors can happen. It would be preferable if we can provide guarantees of what the event structure to be expected is.
Because we have different platforms, we are using different programming languages. This means that the JSON body can differ depending on the programming language and libraries used.
Long serialization overhead on the Event API’s side with the use of JSON. Although I currently do not have the “data” to prove it, it is easy to see that the schema is sent with every event.
But since we have control over the applications and have control over the schema definition how can we make use of that?
The solution: centralizing the definition of an event with Protobuf
One way of tackling the issues above is to do the following:
One repository where the definition of an event, which we will call the Event Model, is kept (we just keep it in a repository on Gitlab).
This event model is versioned.
Every version of the event model is then propagated across all the elements of the pipeline. For example, the web SDK will have access to an event model library and every new version in the event model repository will correspond to a new version in the library. This version should be the same for all platforms.
We will preferably use a language-neutral definition for the event model’s definition that can enable us to serialize an event on the SDKs and then deserialize it on the side of judge-judi.
Preferably adding or removing fields from the event model should not break backward/forward compatibility.
To make a choice we tried both Avro and Protobuf. While there are many choices when it comes to defining the schema, to have a definition that supports all of the languages and provides a “good enough” serialization: Protobuf was the tool of choice.
What is protobuf and how does it work?
Protocol buffers are a language-neutral, platform-neutral, extensible way of serializing structured data. At least that’s what they’re supposed to be.
We will not go too deeply in how protocol buffers work but here are the basics.
Let’s say Alice wants to send a letter message to Bob that would look like this in JSON:
{ 'firstName': 'Alice', 'lastName': 'Alice', 'email': 'alice@alice.alice', 'content': 'Hi Bob, it's Alice'}
To do so, in protobuf, Alice and Bob would have to agree on a message schema. This schema can look something like this:
message Letter{ required string firstName = 1; required string lastName = 2; optional string email = 3; required string content = 4;}
This letter schema will be stored in a .proto file.
Alice and Bob can then convert this message to C++ or Javascript code or whatever language they’re into using the proto compiler.
Using the generated code Alice will serialize the message. For example, if Alice is using C++ her code might look something like this:
Letter letter;letter.set_first_name("Alice");
Alice can now send the letter to Bob. A series of bytes are sent over the network. Bob knows the Letter schema, Bob can deserialize the letter message.
So how can we use it for the solution above?
Step 1 — In our case, the event model repository hosts the .proto file.
Step 2 — This repository propagates code with the same for the rest of the pipeline through pushing libraries. This repository also creates the table on BigQuery (as we will mention in the next section, this part has changed).
Step 3 — The service judge-judi will also fetch the latest Go version of the event model because it will need it later. And, just like Alice, through the SDKs we can fetch the latest version of this event model on the client apps to serialize the events that we will send. The app will serialize the event and send it to the API.
Step 4 — We collect this event as it is, without deserializing it, and we publish it to the queue on Pub/Sub.
Step 5 — The service judge-judi pulls the events and deserializes them.
It’s all good now, you drop the mic, walk out of the room, head high…until…stuff happens. You realize that the reality is as far from your expectations as your tongue is from touching your nose… So close, and yet so far (don’t try that, you’ll look ridiculous)
Expectations vs. reality
The myth of language agnosticism
When the mechanism of protocol buffers is supposed to be language-agnostic and platform agnostic. In practice, this does not seem to be the case.
For example, some platforms had encoding that was not consistent with the deserializer. We use the gogofaster compiler for Go which is a third party implementation that leverages type-specific marshaling code for extra performance. But we use it mainly to provide more canonical Go structures, less typing and to not end up with the infamous xxx fields.
The myth of backward and forward compatibility
Let’s say a field gets added, and a field gets removed from the schema. If the service judge-judi is using the latest version of the event model, it will still be able to deserialize the data with no problem even if the client application does not update to the latest version. But this is only true for simple changes. For example, this would not hold if we change the nesting of some fields.
Moreover, although we avoided talking about the analysis that comes after the storage part earlier, a new event model would mean that all the jobs and tables that depend on it will change. For this reason, we separated the version of the event model from the version of the event table on BigQuery so that the table would change less often. We then provide a mapping on judge-judi from the event model to the table schema before storing the events.
Thus, we identify mainly three types of changes in the event model’s schema and act accordingly:
— Non-breaking changes: that happen regularly. These are the changes that do not break backward compatibility. This can be the deletion or addition of fields in the protobuf schema.
— Breaking changes that do not require a change of the BigQuery table: that rarely happen, and should be minimized. When these happen judge-judi just changes the mapping between the event and the table.
— Breaking changes that require a change of the BigQuery table: very rare and have yet to happen but they could (thus they will). Since those are rare and we cannot know what these will be like we’ll tackle them on a case by case basis.
Conclusion
The reality of data collection and analysis is very different from the mathematical fantasies of well-defined axioms and controllable assertions.
Coming from a scientific background, we have a tendency to want to keep the pipeline clean. And that’s good. However, chaos in data engineering is not just real but probably unavoidable. And it is one of the reasons why we should try to iterate slowly, and add tests and data quality checks when possible. It is also why keeping a central definition of the data in one place can be very helpful.
Protocol buffers are one way of defining a data model that seems to improve serialization overhead and be almost language agnostic. If you already know what the data schema should look like, protobuf is good at managing additions and deletions of fields, although one has to be careful as always. More complex changes will require more thoughtfulness.
Hope you found something useful reading this… Have a beautiful day!
If you have any comments or suggestions would love to hear from you.