Hive provides a good way for you to evaluate your data on HDFS. It is the common case where you create your data and then want to use hive to evaluate it. In that case, creating a external table is the approach that makes sense.

In this post, we are going to discuss a more complicated usage where we need to include more than one partition fields into this external table. And the original data on HDFS is in JSON.

For the difference between managed table and external table, please refer to this SO post.

Here is your data

In this post, we assume your data is saved on HDFS as /user/coolguy/awesome_data/year=2017/month=11/day=01/\*.json.snappy.

The data is well divided into daily chunks. So, we definitely want to keep year, month, day as the partitions in our external hive table.

Make hive be able to read JSON

Since every line in our data is a JSON object, we need to tell hive how to comprehend it as a set of fields.

To achieve this, we are going to add an external jar. There are two jars that I know of could do the job:

To add the jar you choose to hive, simply run ADD JAR in the hive console:

ADD JAR /home/coolguy/hive/lib/json-udf-1.3.8-jar-with-dependencies.jar

Note: The path here is the path to your jar on the local machine. But you can still specify a path on HDFS by specifying hdfs:// prefix.

Create the external table

By now, all the preparation is done. The rest of the work is pretty straight forward:

  • Tell hive where to look for the data.
  • Tell hive which ones are the fields for partitions.
  • Tell hive which library to use for JSON parsing.

So, the HQL to create the external table is something like:

create external table traffic_beta6 (
-- <field-list>
)
PARTITIONED BY (
year string,
month string,
day string
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/coolguy/awesome_data/';

This HQL uses hive-hcatalog-core-X.Y.Z.2.4.2.0-258.jar to parse JSON. For the usage of json-serde-X.Y.Z-jar-with-dependencies.jar, change ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' to ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'.

There are two things you want to be careful about:

  • The fields for partition shouldn’t be in the <field-list>.
  • The part of /year=2017/month=11/day=01/ in the path shouldn’t be in the LOCATION.

And here you go, you get yourself an external table based on the existing data on HDFS.

However…

Then soon enough, you will find this external table doesn’t seem to contain any data. That is because we need to manually add partitions into the table.

ALTER TABLE traffic_beta6 ADD PARTITION(year='2017', month='11', day='01');

When you finish the ingestion of /user/coolguy/awesome_data/year=2017/month=11/day=02/, you should also run

ALTER TABLE traffic_beta6 ADD PARTITION(year='2017', month='11', day='02');

In this post, we will use a Flink local setup with savepoint configured, consuming from a local kafka instance. We also will have a very simple kafka producer to feed sequential numbers to kafka.

To check whether the savepointing is actually working, we will crucially stop the flink program, and restore it from the last savepoint, then check the consumed events is in consistency with the producer.

Setup local Kafka

To setup a local Kafka is pretty easy. Just follow the official guide, then you will have a zookeeper and Kafka instance in your local machine.

Start zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka:

bin/kafka-server-start.sh config/server.properties

Create our testing topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-test

Create a script to feed event

As mentioned above, the producer is very simple. I’m going to use python to create this producer.

First, the Kafka dependency:

pip install kafka-python

Then create a python script producer.py like this:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
# Asynchronous by default

future = producer.send('flink-test', b'raw_bytes')

# Block for 'synchronous' sends

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...

    log.exception()
    pass

# Successful result returns assigned partition and offset

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# Feed sequential data to kafka

i = 1
while (True):
    producer.send('flink-test', {'idx': i})
    i = i + 1
    time.sleep(1)

# block until all async messages are sent

producer.flush()

Then run this script to start feeding events:

python producer.py

I recommend you also use a simple script to create a consumer to check whether the current setup is working. I highly recommend using logstash to run a simple consumer with config file like:

input {
    kafka {
        group_id => "flink-test"
        topics => ["flink-test"]
        bootstrap_servers => "localhost:9092"
    }
}
output {
  stdout {}
}

And start to consume by

./bin/logstash -f <your-config-file>

But any other kind of consumer will easily do the job.

First you will need to download the flink of the version you want/need. After download the package, unpack it. Then you will have everything you need to run flink on your machine.

Assume that Java and mvn are already installed.

This will be the tricky part.

First we need to change the config file: ./conf/flink-conf.yaml.

In this file, you can specify basically all the configuration for the flink cluster. Like job manager heap size, task manager heap size. But in this post, we are going to focus on save point configurations.

Add following line to this file:

state.savepoints.dir: file:///home/<username>/<where-ever-you-want>

With this field specified, the flink will use this folder as the storage of save point files. In real world, I believe people usually use HDFS with hdfs:///... or S3 to store their save points.

In this experiment, we will use a local setup for flink cluster by running ./bin/start-local.sh. This script will read the configuration from ./conf/flink-conf.yaml, which we just modified. Then the script will start a job manager along with a task manager on localhost.

You could also change the number of slots in that task manager by modify the ./conf/flink-conf.yaml, but that’s not something we needed for the current topic. For now we can just run:

./bin/start-local.sh

Prepare the job

Consider flink cluster is just distributed worker system with enpowered streaming ability. It is just a bunch of starving workers (and their supervisors) waiting for the job assignment.

Although we have set up the cluster, we still need to give it a job.

Here is a simple Flink job we will use to check out the save point functionality:

It will consume the events from our kafka, and write it to local files divided by minute.

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;


public class DataTransfer {


    public static void main(String[] args) throws Exception {
      // Properties for Kafka

      Properties kafkaProps = new Properties();
      kafkaProps.setProperty("topic", "flink-test");
      kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
      kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
      kafkaProps.setProperty("group.id", "flink-test");

      // Flink environment setup

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().disableSysoutLogging();
      env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

      // Flink check/save point setting

      env.enableCheckpointing(30000);
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
      env.getCheckpointConfig().setCheckpointTimeout(10000);
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

      env.getCheckpointConfig().enableExternalizedCheckpoints(
              CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
      );

      // Init the stream

      DataStream<String> stream = env
              .addSource(new FlinkKafkaConsumer08<String>(
                      "flink-test",
                      new SimpleStringSchema(),
                      kafkaProps));

      // Path of the output

      String basePath = "<some-place-in-your-machine>"; // Here is you output path


      BucketingSink<String> hdfsSink = new BucketingSink<>(basePath);
      hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH-mm"));
      stream.print();
      stream.addSink(hdfsSink);

      env.execute();
    }
}

Start experiment

To submit our job, first build the above project by:

mvn clean install -Pbuild-jar

There will be a fat jar under your target folder: <project-name>.<version>.jar

To submit this jar as our job, run:

./bin/flink run <project-name>.<version>.jar

Once after it starts running, you will find files start to be generated in the basePath.

Restart the job without save point

After the job runs for a while, cancel the job in the flink UI.

Check the lastest finished file before cancellation, and find the last line of this file. In my experiment, it’s {"idx": 2056}.

Then start the job again:

./bin/flink run <project-name>.<version>.jar

After a few minute, check first finished file after restart, and find the first line of this file. In my experiment, it’s {"idx": 2215}.

This means, there are events missing when we restart job without savepoin.

Finished file is the file that have been checked by flink’s check point. It is file that without prefix in-progress or suffix pending Initially, a file under writing is in in-progress state. When this file stop being written for a while(can be specified in config), this file become pending. A checkpoint will turn pending files to the finished files. Only finished file should be considered as the consistant output of flink. Otherwise you will get duplication. For more info about checkpointing, please check their official document.

Restart with save point

Let’s try save pointing. This time, we will create a save point before cancel the job.

Flink allows you to make save point by executing:

bin/flink savepoint <job-id>

The <job-id> can be found at the header of the job page in flink web UI.

After you run this command, flink will tell you the path to your save point file. Do record this path.

Then, we cancel the job, and check the lastest finished file before cancellation, and find the last line of this file. In my experiment, it’s {"idx": 4993}.

Then we restart the job.

Because we want to restart from a save point, we need to specify the save point when we start the job:

./bin/flink run <project-name>.<version>.jar -s <your-save-point-path>

After a few minute, check first finished file after restart, and find the first line of this file. In my experiment, it’s {"idx": 4994}, which is consistant with the number before cancellation.

General thoughts

Flink’s save pointing is much easier than what I expect. The only thing I should constantly keep in mind is that we need record the save point file path carefully when we use crontab to create save points. Other than that, flink seems to handle all the data consistancy.

As part of the further experiment and research, I think it could be very useful if we try flink’s save point in the cluster setup with multiple task managers.

React with Redux has become a very common stack these days. People with very little background of front-end, like me, can write something that turns out not bad. However, testing a React component connected to the Redux can sometimes be painful.

In this post, I want to share some of the ideas about how to test against a Redux container.

First of all, we need a container to be tested against.

In the following gist, we have a text box that:

  • saves the value of input into component’s local state
  • submits the input value via Redux action dispatch to the Redux store.
  • shows the input value in the Redux store.
import React from 'react'
import PropTypes from 'prop-types'
import { connect } from 'react-redux'
import { submitValue } from '../store/modules/showBox'

export class ShowBox extends React.Component {
  constructor(props) {
    super(props)
    this.state = {
      searchString: this.props.search || ""
    }
  }

  handleInput = (event) => {
    this.setState({
      searchString: event.target.value
    })
  }

  render () {
    return (
      <form onSubmit={(e) => this.props.handleShowSubmit(this.state.searchString, e)}>
        <div>
          <input
            type="search"
            className="form-control"
            placeholder="Search"
            value={this.state.searchString}
            onChange={this.handleInput}
          />
          <div>
            <i className="icon-search"></i>
          </div>
        </div>
      </form>
    )
  }
}

export default connect(
  (state) => ({
    search: state.showBox.search,
  }),
  (dispatch) => {
    return {
      handleShowSubmit: (text, e) => {
        if (e) {
          // Avoid redirecting
          e.preventDefault()
        }
        dispatch(submitValue(text))
      }
    }
  }
)(ShowBox);

Note this component is only used for this example. Separating the text box and the representative part is more practical in a real project.

I highly recommend using Enzyme with all your React components’ tests. This utility makes it much easier to traverse and assert your React/Redux components.

Besides, it now works pretty well with Mocha and Karma and other stuff you might use in your test.

Redux store wrapper for the tests

One of the painful parts of testing a Redux container is that we can’t just render a container and then evaluate the DOM anymore. That’s because container will ask for the Redux store in mapStateToProps() when you connect your component to Redux, and without it, test fails.

So, before we start to write testing code, we will need a “Redux wrapper” for all of our testing components to provide Redux store, which is something pretty much like what we have usually done to our root component in the application.

Luckily, Enzyme provides a shallow-render function that can take a context as second parameter, where we can put a Redux store for our tests.

import { shallow } from 'enzyme';

const shallowWithStore = (component, store) => {
  const context = {
    store,
  };
  return shallow(component, { context });
};

export default shallowWithStore;

Make sure it’s working

Let’s first try to shallow-render our component in the tests with the shallowWithStore we created. We are going to mock our Redux store by using createMockStore(state) from redux-test-utils.

describe('ConnectedShowBox', () => {
  it("should render successfully if string is not provided by store", () => {
    const testState = {
      showBox: {}
    };
    const store = createMockStore(testState)
    const component = shallowWithStore(<ConnectedShowBox />, store);
    expect(component).to.be.a('object');
  });
});

By using shallowWithStore with the mocked store, we can finally shallow-render our container.

it("should render a text box with no string inside if search string is not provided by store", () => {
  const testState = {
    showBox: {
      search: ""
    }
  };
  const store = createMockStore(testState)
  const component = shallowWithStore(<ConnectedShowBox />, store);
  expect(component).to.be.a('object');


  expect(component.dive().find("").prop("value")).to.equal("")


  component.dive().find("").simulate("change", { target: { value: "Site" } });
  expect(component.dive().find("d").prop("value")).to.equal("Site")
});

To test the DOM structure, you can use find() to traverse the rendered result, then use prop() to get the desired attribute then make the assertion. To test event handler, you can first get the element with find(), then use simulate() to trigger the event. After that, just make assertion as usual. Please note that we call dive() before calling find().

Test non-Redux feature with Enzyme

There are two non-Redux features we might want to test:

  • DOM structure.
  • handleInput function in the component.

Achieving these two goals is extremely simple with the Enzyme.

it("should render a text box with no string inside if search string is not provided by store", () => {
  const testState = {
    showBox: {
      search: ""
    }
  };
  const store = createMockStore(testState)
  const component = shallowWithStore(<ConnectedShowBox />, store);
  expect(component).to.be.a('object');


  expect(component.dive().find("").prop("value")).to.equal("")


  component.dive().find("").simulate("change", { target: { value: "Site" } });
  expect(component.dive().find("d").prop("value")).to.equal("Site")
});

To test the DOM structure, you can use find() to traverse the rendered result, then use prop() to get the desired attribute then make the assertion.

To test event handler, you can first get the element with find(), then use simulate() to trigger the event. After that, just make assertion as usual.

Please note that we call dive() before calling find().

I used Chai for assertion. It might look different from what you would write depending on your setup.

One thing that should be careful about is that if we shallow-render a connected container, the result of rendering will not reach the DOM inside the component.

This means the whole DOM inside ShowBox will not be rendered, which will cause following assertion to fail:

expect(component.find("").prop("value")).to.equal("");

The reason is that shallow-render only renders the direct children of a component.

When we shallow-render connected container <ConnectedShowBox />, the deepest level it gets rendered is <ShowBox />. All the stuff inside the ShowBox’s render() will not be rendered at all.

To make the result of shallow-render go one layer deeper( to render the stuff inside the <ShowBox />), we need to call dive() first.

You can also test mapStateToProps here by simply changing the mocked state(testState).

Test Redux dispatch mapDispatchToProps

Test mapDispatchToProps with Enzyme is really easy.

it("should render a text box with no string inside if search string is not provided by store", () => {
  const testState = {
    showBox: {
      search: ""
    }
  };
  const store = createMockStore(testState)
  const component = shallowWithStore(<ConnectedShowBox />, store);

  component.dive().find("form > div > input").simulate("change", { target: { value: "Site" } });
  expect(component.dive().find("d").prop("value")).to.equal("Site")


  component.dive().find("form").simulate("submit");
  expect(store.isActionDispatched({
    type: "showBox/SUBMIT",
    searchString: "Site"
  })).to.be.true;
});

Just like what we did to test handleInput(). First we use find() to get the element that we need to trigger, then use the simulate() to trigger the event.

Finally use isActionDispatched() to check whether the expected action is dispatched.

And that is it! I think this is enough for most the use cases in testing.

Here is some further challenges for this topic:

  • Make it work with code coverage utilities.
  • Make it work with GraphQL(Apollo)

To use plugins supported by jQuery, you will have to add jQuery to your component.

Step 1: First you need to add the jQuery into your project. To achieve that you could either add the jQuery link to your html wrapper, or simply run

npm i -S jquery

then import it in your component.

Step 2: After you import jQuery into your project, you are already able to use it in your components.

But if you are using Server Side Rendering, this may not work for you, because jQuery is supposed to run on a rendered DOM document. So in SSR, you can only expect jQuery work on client side.

With that being said, the solution for usage jQuery in SSR is simple. Add your jQuery code to the

componentDidMount()

or

componentDidUpdate()

of your components.

This is a for-beginner tutorial for those who already understand how Kafka works and the basic functionality of Kerberos.

Purpose

  • Setup a simple pipeline for stream processing within the VMs.
  • Integrate Kafka with Kerberos’ SASL authentication.
  • Console pipeline to check if our setup works.

The versions of components in this post will be:

  • Ubuntu: 16.04.1
  • Kafka: 0.10.1.0 with scala 2.11
  • Kerberos: 5

Prepare the Servers

We are going to have 3 Ubuntu servers in the VirtualBox:

server-kerberos A server for Kerberos. server-kafka A server for both Zookeeper and Kafka Broker. server-kafka-client A server for Kafka clients(producer and consumer).

You can also make server-kafka-client into two different servers, but the setups for these two servers will be exactly the same. To make things simple and clean, this post will only use one server to host both producer and consumer.

For the same reason, server-kafka can also be splited into two servers.

Installation of Kerberos and Kafka

Use VirtualBox to create all these 3 servers with Ubuntu. Go to your VirtualBox manager, and make sure all your boxes’ network adapters are set to NAT.

For server-kerberos:

To install kerberos, enter:

apt-get install krb5-admin-server krb5-kdc

During the installation, you will be asked for several settings. Enter the settings like below:

Default Kerberos version 5 realm? [VISUALSKYRIM]

Kerberos servers for your realm? [kerberos.com]

Administrative server for your realm? [kerberos.com]

For server-kafka, server-kafka-client:

Install krb5-user for SASL authentication:

sudo apt-get install krb5-user

During this installation, you will be asked the same questions. Just answer them with the same answer:

Default Kerberos version 5 realm? [VISUALSKYRIM]

Kerberos servers for your realm? [kerberos.com]

Administrative server for your realm? [kerberos.com]

These question will generate a Kerberos config file under you /etc/krb5.config.

Install kafka:

wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0

Later in this post, you will need to transfer authentication files(keytabs) between servers. For that purpose, this post will use scp, and openssh-server will be installed.

If you are going to use other methods to transfer files from server-kerberos, feel free to skip this installation.

apt-get install openssh-server

Setting up servers

Before starting to set up servers, we need to change our VMs’ network adapters to Host-Only and reboot to get an individual IP address for each VM.

After that, go to each server to get their IP address by

ifconfig

Then input those IP address and the hostnames into /etc/hosts. Something like this:

192.168.56.104  kerberos.com
192.168.56.106  kafka.com
192.168.56.107  kafka-client.com

Then make sure all these 3 servers can ping each other by using the hostname.

After that, we start to set up our servers one by one:

Kerberos Server

Create the new realm.

sudo krb5_newrealm

This might stuck at the place where command-line prompts Loading random data.

If this happens, run the following code first: cat /dev/sda > /dev/urandom

Then edit /etc/krb5.conf.

The updated content should be like:

[libdefaults]
    default_realm = VISUALSKYRIM

...

[realms]
    VISUALSKYRIM = {
        kdc = kerberos.com
        admin_server = kerberos.com
    }

...

[domain_realm]
    kerberos.com = VISUALSKYRIM

Next, add principals for each of your roles:

- zookeeper
- kafka
- kafka-client

Enter:

$ sudo kadmin.local

> addprinc zookeeper
> ktadd -k /tmp/zookeeper.keytab zookeeper
> addprinc kafka
> ktadd -k /tmp/kafka.keytab kafka
> addprinc kafka-client
> ktadd -k /tmp/kafka-client.keytab kafka-client

Move /tmp/zookeeper.keytab and /tmp/kafka.keytab to your server-kafka, and move /tmp/kafka-client.keytab to your server-kafka-client.

Kafka Server

Just like real world, every individual(program) in the distributed system must tell the authority(Kerberos) two things to identify itself:

The first thing is the accepted way for this role to be identified. It’s like in America, people usually use drive license, and in China people use ID card, while in Japan, people use so called MyNumber card.

The second thing is the file or document that identifies you according to your accepted identify method.

The file to identify the role in our SASL context, is the keytab file we generated via the kadmin just now.

Suggest you put zookeeper.keytab and kafka.keytab under /etc/kafka/ of you server-kafka.

We need a way to tell our program where to find this file and how to hand it over to the authority(Kerberos). And that will be the JAAS file.

We create the JAAS files for Zookeeper and Kafka and put it to /etc/kafka/zookeeper_jaas.conf and /etc/kafka/kafka_jaas.conf.

Server {
  com.sun.security.auth.module.Krb5LoginModule required debug=true
  useKeyTab=true
  keyTab="/etc/kafka/zookeeper.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper@VISUALSKYRIM.COM";
};
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required debug=true
  useKeyTab=true
  storeKey=true
  keyTab="/etc/kafka/kafka.keytab"
  principal="kafka@VISUALSKYRIM.COM";
};

// For Zookeeper Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required debug=true
  useKeyTab=true
  storeKey=true
  keyTab="/etc/kafka.keytab"
  principal="kafka@VISUALSKYRIM.COM";
};

To specify the locations of these JAAS file, we need to put the locations into JVM options like:

-Djava.security.krb5.conf=/etc/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf
-Dsun.security.krb5.debug=true

and

-Djava.security.krb5.conf=/etc/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf
-Dsun.security.krb5.debug=true

To make this post easy and simple, I choose to modify the the bin/kafka-run-class.sh, bin/kafka-server-start.sh and bin/zookeeper-server-start.sh to insert those JVM options into the launch command.

To enable SASL authentication in Zookeeper and Kafka broker, simply uncomment and edit the config files config/zookeeper.properties and config/server.properties.

For config/zookeeper.properties:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

For config/server.properties:

listeners=SASL_PLAINTEXT://kafka.com:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

Then start the Zookeeper and Kafka by:

$ bin/zookeeper-server-start.sh config/server.properties
$ bin/kafka-server-start.sh config/zookeeper.properties

Kafka Client Server

The setting up for you server-kafka-client is quite similar to what you’ve just done for server-kafka.

For JAAS file, because we are going to use the same principal and keytab for both producer and consumer in this case, we only need to create one single JAAS file /etc/kafka/kafka_client_jaas.conf:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required debug=true
  useKeyTab=true
  storeKey=true
  keyTab="/etc/kafka-client.keytab"
  principal="kafka-client@VISUALSKYRIM.COM";
};

We also need to put JVM options to the bin/kafka-run-class.sh:

-Djava.security.krb5.conf=/etc/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
-Dsun.security.krb5.debug=true

Give it a try

Now we can check if our setup actually works.

First we start a console-producer:

bin/kafka-console-producer.sh --broker-list kafka.com:9092 --topic test \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=GSSAPI \
--producer-property sasl.kerberos.service.name=kafka

And start a console-comsumer:

bin/kafka-console-consumer.sh --bootstrap-server ssh.com:9092 --topic test \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.mechanism=GSSAPI \
--consumer-property sasl.kerberos.service.name=kafka

Then input some message into the console-producer to see if the same message prompted in console-consumer after a few seconds.