Skip to end of metadata
Go to start of metadata

PNDA-4005 - Getting issue details... STATUS

Motivation

  • Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.
  • Flink allows for stream processing at event level granularity and/or very low latency with more advanced windowing and other stream processing features that Spark doesn't currently offer.
  • Users of PNDA have requested support for Flink in addition to Spark to implement use cases that are more suited to the features provided by Flink. 

Proposal

Flink will be introduced to PNDA as an alternative engine for data processing. 

At a high level this involves:

  • Deploying the Flink binaries out-of-the-box on a PNDA cluster.
  • Supporting Flink components in the PNDA Application Deployment Manager.
  • Making Flink visible in the PNDA Console, with links to user interfaces, health information and documentation in the same way as other components of the PNDA system.
  • Flink applications will follow the same model as Spark Streaming applications with each job running on YARN as an independent single-use Flink session.
  • Prototyping and ad-hoc work on Flink programs will be supported via the Flink CLI shell because support for Flink in Jupyter is currently not suitable for use.

Flink Overview

Design

The following section discusses the changes required to each PNDA component.

Flink jobs will be submitted as single jobs with no need to have a Flink cluster. More details of Flink YARN integration are here

Flink resources and any other dependencies will be hosted on the PNDA mirror. The mirror build script will need to include these in the appropriate mirror section.

A Flink history server will be run on one of the PNDA manager nodes.

Flink client libraries will be installed on all nodes with a hadoop EDGE role.

Support will be added for deploying Flink components in application packages.

A Flink component plugin will be created that will run Flink applications in the same way as the Spark Streaming plugin currently does. A supervisor will be set up on the PNDA edge node that will call the Flink CLI to submit the job.

We will not include support for Flink in the Deployment manager application information service as this is currently under review, and the implementation is likely to change.

A health metric will be generated for Flink "hadoop.flink.health" which will show the health status of the Flink history server.

The presence of a running, healthy History Server will be verified by calling API “/joboverview” on the history server.

Flink application metrics will be gathered as described here and in the post http://mnxfst.tumblr.com/post/136539620407/receiving-metrics-from-apache-flink-applications and associated with the relevent application and component by prefixing with "application.kpi.<applicaton-name>.<component-name>".

Candidate metrics: https://github.com/apache/flink/tree/master/flink-metrics

The PNDA console dashboard page will be modified to include add Flink blocks under both Stream & Batch sections.

The Flink block will link to the history server UI.

The Flink block will link to a help pop-up with include help text and a link to the Flink documentation.

The Flink history server log will be aggregated by the PNDA logshipper.

Flink application logs will be aggregated by the PNDA logshipper by configuring the logshipper to collect logs from the YARN container folder on each node manager. To match spark streaming, this will gather the stdout and stderr logs and a flink specific log called flink.log that the user can configure their application to write to if they want to.

A utility library will be provided that offers scala functions for reading the PNDA master dataset into Flink objects/streams.

Any Kafka and Avro libraries required to read/write to Kafka from Flink will be supplied as part of PNDA.

Prototyping and exploration using Flink with Jupyter shows that the support which is there right now is not good enough. We will monitor this and include support when it is fit-for-purpose.

More details and open issues of Jupyter integration with Flink are here

An example application that demonstrates the benfits of Flink over Spark in that case - e.g. Stateful continuous processing (non-batch mode).

Candidates: https://flink.apache.org/usecases.html

Sections of guide will need creating or updating to reference Flink

Plan

Phase 1 

  • Add Flink software to PNDA build and mirror processes
  • Deploy & configure Flink and start Flink History server service
  • Support  applications through pyflink, scala-shell
  • Handle mapping of users to queues to give functional parity with Spark
  • Add basic platform test & console elements to represent Flink in PNDA
  • Logging

Phase 2

  • Deployment Manager
    • Packaging and deploying basic Flink applications in similar way to Spark
      • Supervised Flink stream processing jobs
      • Scheduled Flink jobs via Oozie
  • Application status monitoring
    • display the flink job status similar to spark (Component, Application type, Id & Status) and provide a link to flink dashboard for more information.
  • Documentation
  • Data management & cluster houskeeping
    • Clean-up completed-jobs
    • Clean-up temporary streaming directory
    • Rolling file mechanism for growing log files

Phase 3

  • Interactive Flink based data exploration in Jupyter
  • Plaform libraries support for common operations
  • Savepoints 
    • Triggering, Listing & Resuming & Disposing savepoints from Deployment Manager for a given application.
  • Example applications
    • Illustrate how to build and deploy Flink applications with PNDA (in Java, Scala & Python)
    • Illustrate how to use Flink Accumulators, application metrics
    • Use stateful continuous processing (non-batch mode) making it a different case from Spark streaming examples
  • Metrics 
    • configure Graphite as a default metrics reporter in flink. 

TBD

  • Operations  (restart of nodes, restart of services, losing a node, scale up/down)
  • Metrics export

Interfaces

  • DM Interface to support execution of flink applications along with the ability to override default values
  • New interfaces to be documented for creating & deploying flink applications and Flink-yarn-session/Flink-cluster 

Compatibility

  • Flink applications and spark applications can co-exist

Alternatives

Expose Flink as a runner to Apache Beam, and move to Apache Beam as primary SDK for application development. This is something under review and could be an incremental step forward from basic Flink support (it doesn't need to be an either/or).

  • No labels

12 Comments

  1. As of now, oozie is not supporting Flink as an executable action

    Ref: https://oozie.apache.org/docs/4.3.1/index.html

    1. Can batch flink work be executed using one of the other action types?

  2. On pyflink, we need to capture here why it's not going to form part of the initial set of functionality.

    1. today, we received a reply from flink community on the pyflink with yarn issue (details are in below link). we will try if it works. 

      https://issues.apache.org/jira/browse/FLINK-8909

  3. pyflink issue is now resolved. Documentation was insufficient to resolve the issue we were facing. 

  4. These areas need more explanation on this PDP -

    • Application status monitoring
    • Plaform libraries support for common operations
    • User policies management
    • Metrics export
  5. Regarding User policies management,  PNDA-4430 - Getting issue details... STATUS  addresses CLI wrappers which determines the right yarn queue of the user.  what are the other broader aspects which needs to be addressed?

    1. I've updated the page to reflect that this item isn't TBD but is covered in the phase 1 work.

  6. Python streaming API of Flink is in-progress and not fully available yet. Need to plan to do the example application in java/scala for streaming.

    https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-zohar-mizrahi-python-streaming-api

    https://github.com/apache/flink/pull/3838

  7. All, 

    Few things I wanted to make note of

     

    1. Savepoints are critical for Flink apps we run. An option to take savepoint during “graceful” stop and an option to bootstrap from a savepoint is a must. I am guessing this can be pass as parameter in your proposal but I wanted to make sure you think about that.
      • Also an option of starting from latest OR from a defined savepoint will be handy

     

    1. Flink has out of the box integration with Graphite. Since PNDA already uses Graphite, it makes sense to turn it up by default – it needs a couple of jars and a yaml changes. 
      • I’d think users would normally want to configure how metrics should be formatted. I am not sure how PNDA is allowing component level config changes at this point - is there a way to create “mixins” for configs at component level or it would have to be pretty much going to individual component and reconfiguring them.

     

    1. As Phase 3+ plan, something like this is of HIGH VALUE for us – we need a streaming rules engine and this will fill that need. If PNDA UI supported an interface for this, that will be sweet!!!

     

    https://github.com/uber/AthenaX

    1. We have been exploring about the flink metrics reporting. Yes, we will enable the changes like copying ( jar files ) and configuring ( yaml ) by default.  It will fetch the system metric to the Graphite.

      We are using the template ( .tpl ) file in salt for configuration changes - which configures the flink and parameter values based on user input.

      Could you please elaborate a bit more on the component and individual level configurations ? 

      1. Flink config allows configuring patterns for different scopes of metric reporting. I'd think users would like to configure those patterns for those different scopes when metrics are reported rather than using OOTB configs.

        metrics.scope.jm: flink.jobmanager.<host>

        metrics.scope.jm.job: flink.jobmanager.<host>.<job_name>

        metrics.scope.tm: flink.taskmanager.<host>.<tm_id>

        metrics.scope.tm.job: flink.taskmanager.<host>.<tm_id>.<job_name>

        metrics.scope.task: flink.taskmanager.<host>.<tm_id>.<job_name>.<task_name>

        metrics.scope.operator: flink.taskmanager.<host>.<tm_id>.<job_name>.<operator_name>