4.8 C
New York
Friday, December 20, 2024

Introducing the brand new Amazon Kinesis supply connector for Apache Flink


On November 11, 2024, the Apache Flink neighborhood launched a brand new model of AWS companies connectors, an AWS open supply contribution. This new launch, model 5.0.0, introduces a brand new supply connector to learn knowledge from Amazon Kinesis Knowledge Streams. On this put up, we clarify how the brand new options of this connector can enhance efficiency and reliability of your Apache Flink utility.

Apache Flink has each a supply and sink connector, to learn from and write to Kinesis Knowledge Streams. On this put up, we concentrate on the brand new supply connector, as a result of model 5.0.0 doesn’t introduce new performance for the sink.

Apache Flink is a framework and distributed stream processing engine designed to carry out computation at in-memory pace and at any scale. Amazon Managed Service for Apache Flink affords a completely managed, serverless expertise to run your Flink purposes, applied in Java, Python or SQL, and utilizing all of the APIs out there in Flink: SQL, Desk, DataStream, and ProcessFunction API.

Apache Flink connectors

Flink helps studying and writing knowledge to exterior techniques, via connectors, that are elements that permit your utility to work together with stream-storage message brokers, databases, or object shops. Kinesis Knowledge Streams is a well-liked supply and vacation spot for streaming purposes. Flink offers each supply and sink connectors for Kinesis Knowledge Streams.

The next diagram illustrates a pattern structure.

Role of connectors in a Flink applications

Earlier than continuing additional, it’s essential to make clear three phrases typically used interchangeably in knowledge streaming and within the Apache Flink documentation:

  • Kinesis Knowledge Streams refers back to the Amazon service
  • Kinesis supply and Kinesis shopper check with the Apache Flink elements, particularly the supply connectors, that enables studying knowledge from Kinesis Knowledge Streams
  • On this put up, we use the time period stream referring to a single Kinesis knowledge stream

Introducing the brand new Flink Kinesis supply connector

The launch of the model 5.0.0 of AWS connectors introduces a brand new connector for studying occasions from Kinesis Knowledge Streams. The brand new connector is known as Kinesis Streams Supply and supersedes the Kinesis Client because the supply connector for Kinesis Knowledge Streams.

The brand new connector introduces a number of new options and adheres to the brand new Flink Supply interface, and is appropriate with Flink 2.x, the primary main model launch by the Flink neighborhood. Flink 2.x introduces numerous breaking modifications, together with eradicating the SourceFunction interface utilized by legacy connectors. The legacy Kinesis Client will now not work with Flink 2.x.

Establishing the connector is barely completely different than with the legacy Kinesis connector. Let’s begin with the DataStream API.

Tips on how to use the brand new connector with the DataStream API

So as to add the brand new connector to your utility, you must replace the connector dependency. For the DataStream API, the dependency has modified its identify to flink-connector-aws-kinesis-streams.

On the time of writing, the newest connector model is 5.0.0 and it helps the newest secure Flink variations, 1.19 and 1.20. The connector can also be appropriate with Flink 2.0, however no connector has been formally launched for Flink 2.x but. Assuming you’re utilizing Flink 1.20, the brand new dependency is the next:


    org.apache.flink
    flink-connector-aws-kinesis streams
    5.0.0-1.20

The connector makes use of the brand new Flink Supply interface. This interface implements the brand new FLIP-27 commonplace, and replaces the legacy SourceFunction interface that has been deprecated. SourceFunction will likely be utterly eliminated in Flink 2.x.

In your utility, now you can use a fluent and expressive builder interface to instantiate and configure the supply. The minimal setup solely requires the stream Amazon Useful resource Identify (ARN) and the deserialization schema:

KinesisStreamsSource kdsSource = KinesisStreamsSource.builder()
    .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
    .setDeserializationSchema(new SimpleStringSchema())
    .construct();

The brand new supply class is known as KinesisStreamSource. To not be confused with the legacy supply, FlinkKinesisConsumer.

You may then add the supply to the execution setting utilizing the brand new fromSource() technique. This technique requires explicitly specifying the watermark technique, together with a reputation for the supply:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream kinesisRecordsWithEventTimeWatermarks = env.fromSource(
    kdsSource,
    WatermarkStrategy.forMonotonousTimestamps()
        .withIdleness(Length.ofSeconds(1)),
    "Kinesis supply");

These few traces of code introduce among the predominant modifications within the interface of the connector, which we focus on within the following sections.

Stream ARN

Now you can outline the Kinesis knowledge stream ARN, versus the stream identify. This makes it less complicated to eat from streams cross-Area and cross-account.

When operating in Amazon Managed Service for Apache Flink, you solely want so as to add to the appliance AWS Identification and Entry Administration (IAM) function permissions to entry the stream. The ARN permits pointing to a stream positioned in a unique AWS Area or account, with out assuming roles or passing any exterior credentials.

Express watermark

One of the essential traits of the brand new Supply interface is that it’s important to explicitly outline a watermark technique while you connect the supply to the execution setting. In case your utility solely implements processing-time semantics, you may specify WatermarkStrategy.noWatermarks().

That is an enchancment when it comes to code readability. Trying on the supply, you realize instantly which kind of watermark you will have, or should you don’t have any. Beforehand, many connectors have been offering some kind of default watermarks that the person might override. Nevertheless, the default watermark of every connector was barely completely different and complicated for the person.

With the brand new connector, you may obtain the identical conduct because the legacy FlinkKinesisConsumer default watermarks, utilizing WatermarkStrategy.forMonotonousTimestamps(), as proven within the earlier instance. This technique generates watermarks based mostly on the approximateArrivalTimestamp returned by Kinesis Knowledge Streams. This timestamp corresponds to the time when the report was printed to Kinesis Knowledge Streams.

Idleness and watermark alignment

With the watermark technique, you may moreover outline an idleness, which permits the watermark to progress even when some shards of the stream are idle and receiving no data. Seek advice from Dealing With Idle Sources for extra particulars about idleness and watermark mills.

A characteristic launched by the brand new Supply interface, and absolutely supported by the brand new Kinesis supply, is watermark alignment. Watermark alignment works in the wrong way of idleness. It slows down consuming from a shard that’s progressing sooner than others. That is significantly helpful when replaying knowledge from a stream, to scale back the amount of knowledge buffered within the utility state. Seek advice from Watermark alignment for extra particulars.

Arrange the connector with the Desk API and SQL

Assuming you’re utilizing Flink 1.20, the dependency containing each Kinesis supply and sink for the Desk API and SQL is the next (each Flink 1.19 and 1.20 are supported, alter the model accordingly):


    org.apache.flink
    flink-connector-kinesis
    5.0.0-1.20

This dependency comprises each the brand new supply and the legacy supply. Seek advice from Versioning in case you’re planning to make use of each in the identical utility.

When defining the supply in SQL or the Desk API, you employ the connector identify kinesis, because it was with the legacy supply. Nevertheless, many parameters have modified with the brand new supply:

CREATE TABLE KinesisTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `category_id` BIGINT,
    `conduct` STRING,
    `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
    'connector' = 'kinesis',
    'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
    'aws.area' = 'us-east-1',
    'supply.init.place' = 'LATEST',
    'format' = 'csv'
);

A few notable connector choices modified from the legacy supply are:

  • stream.arn specifies the stream ARN, versus the stream identify used within the legacy supply.
  • init.initpos defines the beginning place. This feature works equally to the legacy supply, however the possibility identify is completely different. It was beforehand scan.stream.initpos.

For the complete record of connector choices check with Connector Choices.

New options and enhancements

On this part, we focus on an important options launched by the brand new connector. These options can be found within the DataStream API, and likewise the Desk API and SQL.

Ordering ensures

Crucial enchancment launched by the brand new connector is about ordering ensures.

With Kinesis Knowledge Streams, the order of the message is retained per partitionId. That is achieved by placing all data with the identical partitionId in the identical shard. Nevertheless, when the stream scales, splitting or merging shards, data with the identical partitionId find yourself in a brand new shard. Kinesis retains observe of the parent-child lineage when resharding occurs.

Stream resharding

One recognized limitation of the legacy Kinesis supply is that it was unable to observe the parent-child shard lineage. As a consequence, ordering couldn’t be assured when resharding occurs. The issue was significantly related when the appliance replayed outdated messages from a stream that had been resharded as a result of ordering could be misplaced. This additionally made watermark era and event-time processing non-deterministic.

With the brand new connector, ordering is retained additionally when resharding occurs. That is achieved following the parent-child shard lineage, and consuming all data from a father or mother shard earlier than continuing with the kid shard.

How the connector follows shard lineage

A greater default shard assigner

Every Kinesis knowledge stream is comprised of many shards. Additionally, the Flink supply operator runs in a number of parallel subtasks. The shard assigner is the element that decides the way to assign the shards of the stream throughout the supply subtasks. The shard assigner’s job is non-trivial, as a result of shard break up or merge operations (resharding) would possibly occur when the stream scales up or down.

The brand new connector comes with a brand new default assigner, UniformShardAssigner. This assigner maintains uniform distribution of the stream partitionId throughout parallel subtasks, additionally when resharding occurs. That is achieved by wanting on the vary of partition keys (HashKeyRange) of every shard.

This shard assigner was already out there within the earlier connector model, however for backward compatibility, it was not the default and also you needed to set it up explicitly. That is now not the case with the brand new supply. The outdated default shard assigner, the legacy FlinkKinesisConsumer, was evenly distributing shards (not partitionId) throughout subtasks. On this case, the precise knowledge distribution would possibly change into uneven within the case of resharding, due to the mix of open and closed shards within the stream. Seek advice from Shard Task Technique for extra particulars.

Decreased JAR measurement

The dimensions of the JAR file has been diminished by 99%, from about 60 MB all the way down to 200 KB. This considerably reduces the dimensions of the fat-JAR of your utility utilizing the connector. A smaller JAR can pace up many operations that require redeploying the appliance.

AWS SDK for Java 2.x

The brand new connector is predicated on the newer AWS SDK for Java 2.x, which provides a number of options and improves help for non-blocking I/O. This makes the connector future-proof as a result of the AWS SDK v1 will attain end-of-support by finish of 2025.

AWS SDK built-in retry technique

The brand new connector depends on the AWS SDK built-in retry technique, versus a customized technique applied by the legacy connector. Counting on the AWS SDK improves the classification of some errors as retriable or non-retriable.

Eliminated dependency on the Kinesis Shopper Library and Kinesis Producer Library

The brand new connector package deal now not consists of the Kinesis Shopper Library (KCL) and Kinesis Producer Library (KPL), contributing to the substantial discount of the JAR measurement that we’ve talked about.

An implication of this modification is that the brand new connector now not helps de-aggregation out of the field. Until you’re publishing data to the stream utilizing the KPL and also you enabled aggregation, this is not going to make any distinction for you. In case your producers use KPL aggregation, you would possibly think about implementing a customized DeserializationSchema to de-aggregate the data within the supply.

Migrating from the legacy connector

Flink sources usually save the place within the checkpoint and savepoints, referred to as snapshots in Amazon Managed Service for Apache Flink. Once you cease and restart the appliance, or while you replace the appliance to deploy a change, the default conduct is saving the supply place within the snapshot simply earlier than stopping the appliance, and restoring the place when the appliance restarts. This permits Flink to offer exactly-once ensures on the supply.

Nevertheless, because of the main modifications launched by the brand new KinesisSource, the saved state is now not appropriate with the legacy FlinkKinesisConsumer. Which means that while you improve the supply of an current utility, you may’t instantly restore the supply place from the snapshot.

For that reason, migrating your utility to the brand new supply requires some consideration. The precise migration course of is determined by your use case. There are two basic situations:

  • Your utility makes use of the DataStream API and you’re following Flink finest practices defining a UID on every operator
  • Your utility makes use of the Desk API or SQL, or your utility used the DataStream API and you aren’t defining a UID on every operator

Let’s cowl every of those situations.

Your utility makes use of the DataStream API and you’re defining a UID on every operator

On this case, you would possibly think about selectively resetting the state of the supply operator, retaining every other utility state. The overall strategy is as follows:

  1. Replace your utility dependencies and code, changing the FlinkKinesisConsumer with the brand new KinesisSource.
  2. Change the UID of the supply operator (use a unique string). Go away all different operators’ UIDs It will selectively reset the state of the supply whereas retaining the state of all different operators.
  3. Configure the supply beginning place utilizing AT_TIMESTAMP and set the timestamp to only earlier than the second you’ll deploy the change. See Configuring Beginning Place to learn to set the beginning place. We suggest passing the timestamp as a runtime property to make this extra versatile. The configured supply beginning place is used solely when the appliance can’t restore the state from a savepoint (or snapshot). On this case, we’re intentionally forcing this, altering the UID of the supply operator.
  4. Replace the Amazon Managed Service for Apache Flink utility, choosing the brand new JAR containing the modified utility. Restart from the newest snapshot (default conduct) and choose allowNonRestoredState = true. With out this flag, Flink would forestall restarting the appliance, not with the ability to restore the state of the outdated supply that was saved within the snapshot. See Savepointing for extra particulars about allowNonRestoredState.

This strategy will trigger the reprocessing of some data from the supply, and inside state exactly-once consistency will be damaged. Fastidiously consider the impression of reprocessing in your utility, and the impression of duplicates on the downstream techniques.

Your utility makes use of the Desk API or SQL, or your utility used the DataStream API and you aren’t defining a UID on every operator

On this case, you may’t selectively reset the state of the supply operator.

Why does this occur? When utilizing the Desk API or SQL, or the DataStream API with out defining the operator’s UID explicitly, Flink robotically generates identifiers for all operators based mostly on the construction of the job graph of your utility. These identifiers are used to establish the state of every operator when saved within the snapshots, and to revive it to the right operator while you restart the appliance.

Adjustments to the appliance would possibly trigger modifications within the underlying knowledge circulate. This modifications the auto-generated identifier. If you’re utilizing the DataStream API and you’re specifying the UID, Flink makes use of your identifiers as an alternative of the auto-generated identifies, and is ready to map again the state to the operator, even while you make modifications to the appliance. That is an intrinsic limitation of Flink, defined in Set UUIDs For All Operators. Enabling allowNonRestoredState doesn’t clear up this downside, as a result of Flink isn’t in a position to map the state saved within the snapshot with the precise operators, after the modifications.

In our migration state of affairs, the one possibility is resetting the state of your utility. You may obtain this in Amazon Managed Service for Apache Flink by choosing Skip restore from snapshot (SKIP_RESTORE_FROM_SNAPSHOT) while you deploy the change that replaces the supply connector.

After the appliance utilizing the brand new supply is up and operating, you may swap again to the default conduct of when restarting the appliance, utilizing the newest snapshots (RESTORE_FROM_LATEST_SNAPSHOT). This fashion, no knowledge loss occurs when the appliance is restarted.

Selecting the best connector package deal and model

The dependency model you must choose is generally -. For instance, the newest Kinesis connector model is 5.0.0. If you’re utilizing a Flink runtime model 1.20.x, your dependency for the DataStream API is 5.0.0-1.20.

For probably the most up-to-date connector variations, see Use Apache Flink connectors with Managed Service for Apache Flink.

Connector artifact

In earlier variations of the connector (4.x and earlier than), there have been separate packages for the supply and sink. This extra stage of complexity has been eliminated with model 5.x.

To your Java utility, or Python purposes the place you package deal JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository, the next dependency comprises the brand new model of each supply and sink connectors:


    org.apache.flink
    flink-connector-aws-kinesis-streams
    5.0.0-1.20

Be sure to’re utilizing the newest out there model. On the time of writing, that is 5.0.0. You may confirm the out there artifact variations in Maven Central. Additionally, use the right model relying in your Flink runtime model. The earlier instance is for Flink 1.20.0.

Connector artifacts for Python utility

When you use Python, we suggest packaging JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository. Nevertheless, should you’re passing instantly a single JAR to your Amazon Managed Service for Apache Flink utility, you must use the artifact that features all transitive dependencies. Within the case of the brand new Kinesis supply and sink, that is referred to as flink-sql-connector-aws-kinesis-streams. This artifact consists of solely the brand new supply. Seek advice from Amazon Kinesis Knowledge Streams SQL Connector for the correct package deal, in case you wish to use each the brand new and the legacy supply.

Conclusion

The brand new Flink Kinesis supply connector introduces many new options that enhance stability and efficiency, and prepares your utility for Flink 2.x. Help for watermark idleness and alignment is a very essential characteristic in case your utility makes use of event-time semantics. The power to retain report ordering improves knowledge consistency, particularly when stream resharding occurs, and while you replay outdated knowledge from a stream that has been reshared.

You must fastidiously plan the change should you’re migrating your utility from the legacy Kinesis supply connector, and ensure you observe Flink’s finest practices like specifying a UID on all DataStream operators.

You could find a working instance of Java DataStream API utility utilizing the brand new connector, within the Amazon Managed Service for Apache Flink samples GitHub repository.

To study extra in regards to the new Flink Kinesis supply connector, check with Amazon Kinesis Knowledge Streams Connector and Amazon Kinesis Knowledge Streams SQL Connector.


Concerning the Writer

Lorenzo NicoraLorenzo Nicora works as a Senior Streaming Options Architect at AWS, serving to clients throughout EMEA. He has been constructing cloud-centered, data-intensive techniques for over 25 years, working throughout industries each via consultancies and product firms. He has used open supply applied sciences extensively and contributed to a number of tasks, together with Apache Flink.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles