Introduction: schema on ...

If you work with user interaction data, you most likely parse logs. Or if you are lucky, you get your input from an upstream process that parses logs. The thing with user interaction logs is that they are produced by different UIs, written in a variety of programming languages, and each UI must solve its own set of challenges. In an ideal world, all such UIs will be producing their logs using a single universal library that works on all platforms — the universal library part is not far fetched since Apache Thrift can generate client code for all popular client side languages. However, to do that, you need to define a schema that specifies the events to track, along with their field names and types. If you do this, then you are enforcing a schema on write, and you probably don't have string literals in your code. Unfortunately, this doesn't happen in most real world situations. In the real world (grin), it is easier to enforce a schema on read.

String Literals vs. Schemas

Before we proceed to show how to enforce a schema on read, let's take a moment to illustrate the benefits of having a schema using a fictional story of two developers, Reginald and Sheena. You can safely skip this section if you are already sold on the benefits of schemas.

Reginald and Sheena are two developers tasked with writing programs that process user interaction logs. They both know that each log entry is a string with key value pairs representing a user event. Reginald writes the program using string literals (regex), while Sheena makes use of a schema. As such, they handle the following common tasks differently, according to their approach:

  1. Breaking down an entry into its key value pair components:

    • Reginald uses a regex that he copies from an older piece of existing code that is known to work. Reginald is a good programmer, so he understands regex. He's even voiced his concern that copying and pasting code isn't ideal, and the regex should be put in a library. There is already an internal library for parsing the log entries, but not for the language that Reginald is using. Since he doesn't have the time to create a library for that new language (and the library repo for it), he ends up copying and pasting the regex (just one more time).

    • Sheena makes use of a schema defined using one of the popular schema definition tools (e.g. Avro), and it comes with a record builder for each of the popular languages. Since she uses the included record builder to turn the log entries into objects, she isn't too worried about the correctness of this code. The fine print: Sheena is not reading the raw events, but rather the ones converted to serialized records after enforcing the schema on the raw events.

  2. Filtering the logs to find relevant events; say, events with field A's value greater than 10, and field B's value is truthful:

    • Reginald searches for the fields by matching the keys to string literals "A" and "B", and he writes a function that handles the situation where a key is not found. For field "A", he takes the corresponding value and parses it as an integer. He has to be careful to handle null and blank values. He even has to be defensive against malformed records, where the value of "A" is a random string. In doing so, he devises his own regex (yet another string literal) to check if a string is numeric, but he doesn't handle the case of negative numbers or those with a "+" prefix. For field "B", he decides it is easier to take the value and check if it is equal to the string literal "true".

    • Sheena uses the accessors for the fields A and B in the record object, which return an int and a boolean respectively.

  3. Handling disagreement in the format and encoding of the log entries:

    • Reginald's code goes to production and things don't add up — the ratio of log entries with field A greater than 10 and field B truthful is very low. He investigates the issue and after a couple of days second guessing every little piece of the system, he finds out that a lot of the log entries that should match are not captured by his code for two reasons. First, some of the UIs use a lower case 'a' for the field name of field A, and second, some of the UIs use a '1' to represent a truthful value for field B. He goes to the teams responsible for those UIs only to realize that these are known bugs, and there are no resources available to fix them in the near future. He then asks his team lead for advice, and is pointed to the code that works around these kind of bugs. Unfortunately, the code is in the log parsing library used by the existing legacy systems, but Reginald's team has moved to a newer tech stack for this project. Reginald translates the old code into the new language, adding two more string literals to his new code ('a' and '1'). He updates his unit tests to allow those values, and adds representative records to his test sample.

    • Sheena is a realist, and understands that such discrepancies happen all the time, even in the best of organizations. Luckily, Sheena doesn't need to handle them in her code. The logic to handle all such bugs lives in the schema enforcement code, and is owned by one team and not copied around. The schema enforcement code writes out logs and emits metrics which allow them to find any new bugs. They can either get the team to fix the bug before it taints so much of the data, or add logic to handle it if fixing it is not possible. Either way, Sheena's code doesn't have to know about it, and neither does any similar application.

In the end of their respective projects, Reginald and Sheena both shipped since they are both competent programmers. However, Reginald had to work late a few days, and his code is not very readable since it is littered with parsing logic — he shudders at the thought of working with log data again in the future. On the other hand, Sheena had enough time to try a few different algorithms since each algorithm's implementation is small and concise — she can't wait to write her next data product.

Enforcing a schema on read at Flipp

This blog post will describe how we use Apache Avro to enforce a schema on read at Flipp. We use the Kite SDK to minimize the amount of work needed to define and maintain a schema that describes events in a way that makes sense on all the different client platforms and for all the different user experiences. As a result, we no longer look for a specific string literals in our code, nor do we use regular expressions (a form of string literal) to parse events. We love static checking because the compiler errors save us time by catching a lot of mistakes even before running the unit tests. String literals cannot be statically checked by the compiler, and that's why we want to get rid of them. Our Spark jobs are written in Scala, so we can use Macros to do some crafty statically checked type conversions. However, you can make use of an Avro schema to increase the robustness of your code regardless of the language you use, as long as it is strongly typed.

Why Avro, and not Thrift or Protocol Buffers?

We usually hear about Avro, Thrift and Protocol Buffers together, but what are they, and do they solve the same problem? Apache Avro is a "data serialization system"1; that is, it takes bytes and makes sense of them according to a schema, and the other way around. Protocol Buffers does the same thing as Avro, but Protocol Buffers is usually used for Remote Procedure Calls (except within Google where it is used as a storage format2) while Avro is used more often as a storage format within the Hadoop ecosystem (but Apache Parquet is better suited for that). Thrift has its own serialization component and formats, but it also has a very powerful RPC component that generates client and server code3. So, we are looking at data serialization systems, or SerDes as they were once called in Hive.

When choosing a SerDes, we evaluated a few options, but Avro was best suited for us because it can serialize to a JSON format that looks like normal JSON. Thrift can also serialize to JSON. In fact, Thrift has two JSON formats: TSimpleJSONProtocol which writes out JSON suitable for consumption outside Thrift, and TJSONProtocol which writes out JSON that is optimized for use inside Thrift, but looks quite weird and cannot be understood without the schema. Unfortunately, Thrift cannot read normal JSON because "it does not preserve Thrift's field tags and cannot be read back in by Thrift"4. Since we thought that "a canonical encoding in JSON, [would make] it easier to share data between systems"5 we chose to go with Avro.

Avro schema definition and evolution

At Flipp we use the Kite SDK to infer the Avro schema from JSON examples, so we don't have to deal with the detailed mechanics of defining an Avro schema. The details of the Avro schema definition and evolution is way beyond the scope of this blog post, and you can read about it in the Avro documentation. It is not difficult to understand, but requires some attention to the details. Read up about it, and keep an eye out for the following details:

  • Missing field values can be substituted by the declared default, however the field is expected to be present. If it is not, the following exception is thrown:

    org.apache.avro.AvroTypeException: Expected field XYZ not found

  • When a field is nullable, its value must be wrapped in an object. See AVRO-1582 for details.

We will assume that the Avro schemas are now defined and we will proceed to using it to git rid of those string literals!

Using the Avro schema to serialize records

Now that we have a collection of Avro schemas defining the field names and value type for each user event, let's make use of it. I have to point out that there must be several schemas, one for each event type. If you produce just one schema for all events, it could be too loose to be of any use. Having multiple schemas is a little bit different from the use case for which Avro seems to be built, where you have one Schema and you use it to serialize a lot of records. Also, the use of any the writers that come packaged with Avro requires a batch pipeline, and we have moved towards a data in motion pipeline using Kafka and Spark streaming. Here is how we make use of Avro in this setting:

  1. Create a library to read your events in whatever format they come, then serialize them as Avro records using the defined schema. This library could also be where you keep your schema definition files, and the classes generated from them. The library must be in a JVM language, and we chose Java because this makes it easy to use the Avro Maven plugin for code generation.

    a) Write a function to parse your events in whatever format they come, and you will have to accept having a few string literals in it. The should not be a string literal for each field name, but you will need some string literals to detect the type of the event and to break it up into a map of field names and values.

    b) Using the event type construct the name of the class generated by Avro from the schema of that event, and get it by name:

    final Class<? extends SpecificRecordBase> avroClass = 
      (Class<? extends SpecificRecordBase>) Class.forName(avroClassName);
    

    c) Create an Avro record object using the class you just loaded:

    final Constructor<? extends SpecificRecordBase> constructor =
      avroClass.getConstructor();
    final SpecificRecordBase recordBase = constructor.newInstance();
    

    d) For each field name and field value pair that your parse function returns, assign that value to the member of the event object that corresponds to the field.

    • Basically, we will use the SpecificRecordBase#getSchema method to get access to the field, and then we will use SpecificRecordBase#put to assign a value to it. Here is a simple way to do it:

      boolean assignValueToField(
                final String fieldName,
                final String fieldValue,
                final SpecificRecordBase recordBase
              ) throws ParseException {
        final Schema.Field avroField = recordBase.getSchema().getField(fieldName);
        if (avroField == null) {
          log.warn("Schema of class " +
            recordBase.getClass().getName() +
            " has no field with name " + fieldName);
          return false;
        }
        final int fieldPos = avroField.pos();
        recordBase.put(fieldPos, fieldValue);
        return true;
      }
      
    • The code above will only work for String values. Although it accepts an Object as the value in the SpecificRecordBase#put, the value type is actually checked and an exception will be thrown at run time if it is not a valid. Fortunately, Avro makes the Schema definition available in the generated class, and we can make use of this to convert the value to the correct type. Here is the complete code:

      // This code makes use of classes from Apache Commons Lang3
      // Be sure to include the following imports
      // import org.apache.commons.lang3.BooleanUtils;
      // import org.apache.commons.lang3.StringUtils;
      // import org.apache.commons.lang3.math.NumberUtils;
      
      
      boolean assignValueToField(
                final String fieldName,
                final String fieldValue,
                final SpecificRecordBase recordBase){
        final Schema.Field avroField = recordBase.getSchema().getField(fieldName);
        if (avroField == null) {
          log.warn("Schema of class " +
            recordBase.getClass().getName() +
            " has no field with name " + fieldName);
          return false;
        }
        final int fieldPos = avroField.pos();
        final Schema.Type avroType = avroField.schema().getType();
        Schema.Type resolvedType;
        if (avroType == Schema.Type.UNION) {
          resolvedType = resolveUnionToItsMostGeneralType(avroField);
        } else {
          resolvedType = avroType;
        }
      
      
        switch (resolvedType) {
          case NULL:
            // Nothing to do if the type is null.. hypothetical situation!
            break;
          case BOOLEAN:
            recordBase.put(fieldPos, BooleanUtils.toBooleanObject(fieldValue));
            break;
          case INT:
            recordBase.put(fieldPos, NumberUtils.toInt(fieldValue,
              SENTINEL_NUMERIC));
            break;
          case FLOAT:
            recordBase.put(fieldPos, NumberUtils.toFloat(fieldValue,
              SENTINEL_NUMERIC));
            break;
          case DOUBLE:
            recordBase.put(fieldPos, NumberUtils.toDouble(fieldValue,
              SENTINEL_NUMERIC));
            break;
          case STRING: case ENUM:
            recordBase.put(fieldPos, fieldValue);
            break;
          case BYTES: case FIXED:
            try {
              recordBase.put(fieldPos, fieldValue.getBytes(INPUT_ENCODING));
            } catch (UnsupportedEncodingException e) {
              // Will never happen, so wrap it in an 
              // RTE to avoid having a throws clause
              throw new RuntimeException(e);
            }
            break;
          default:
            log.error("Unknown value type for field " + avroField);
            return false;
        }
        return true;
      }
      
      
      /**
      * This function assumes you have a type resolution map, 
      * which maps each possible Schema.Type value to an 
      * integer that increases with the generality of the 
      * type. For example, if Schema.Type.INT maps to 3 then
      * Schema.Type.FLOAT should map to a higher value.   
      **/
      Schema.Type resolveUnionToItsMostGeneralType(
                    final Schema.Field avroField) {
        Schema.Type resolvedType = Schema.Type.NULL;
        for (Schema s: avroField.schema().getTypes()) {
          final Schema.Type t = s.getType();
          final Integer typeOrder = typeResolutionMap.get(t);
          if (typeOrder - typeResolutionMap.get(resolvedType) > 0) {
            resolvedType = t;
          }
        }
        return resolvedType;
      }
      
  2. Using the library you created you can parse each event as it comes and write it your data store as an Avro record. Don't forget to write the type of the event along with it so that you can know what to expect when reading it.

  3. Congratulations, you have imposed a schema on your events! Now you can read them as Avro records, and you can use the accessors of each event object to read the values of the fields using code that is statically checked.

You've done it! Now what?

Now that you have your events stored as Avro records and you can read them as
object with accessors, how can you build on that? There are a couple of things
that I can recommend:

  • Use a schema registry to make the latest version of the schema available to any client side code written in any language and running anywhere. This is one more step towards the ideal situation of enforcing schema on write. However, this will require the client code to be written in a way that takes schema evolution into account.

  • Find a way to group fields from various event types together, according to their semantics. For example, certain fields such as the time when the event occurred and the client platform are expected in all events. Unfortunately, Avro does not allow a Schema to be composed of smaller components, and you will need to devise your own method of accessing the information common to multiple event types. You could make use of Scala Macros to make events with a common set of accessors appear as if they implement an interface defining those accessors. The compiler can tell us if one of the events doesn't actually have all the accessors that we expect it to have.

Thank you very much for reading all the way through! If you have comments, suggestions or questions, please let us know.

References


Do you love Data Engineering? Are you interested in using your skills to solve challenging problems at Flipp? Check out our current job postings.