dear all,

Below is the code i execute:

import java.io._
import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean

import com.netease.atom.common.util.logging.Logging
import com.netease.atom.interpreter.Code.Code
import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
import io.netty.buffer._
import org.apache.flink.api.scala.FlinkILoop
import org.apache.flink.client.CliFrontend
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}

import scala.Console
import scala.beans.BeanProperty
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.runtime.AbstractFunction0
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}

class FlinkInterpreter extends Interpreter {
  private var bufferedReader: Option[BufferedReader] = None
  private var jprintWriter: JPrintWriter = _
  private val config = new Configuration;
  private var cluster: LocalFlinkMiniCluster = _
  @BeanProperty var imain: IMain = _
  @BeanProperty var flinkILoop: FlinkILoop = _
  private var out: ByteBufOutputStream = null
  private var outBuf: ByteBuf = null
  private var in: ByteBufInputStream = _
  private var isRunning: AtomicBoolean = new AtomicBoolean(false)

  override def isOpen: Boolean = {
    isRunning.get()
  }

  def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
    config.toMap.toMap.foreach(println)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
    config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    val localCluster = new LocalFlinkMiniCluster(config, false)
    localCluster.start(true)
    val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
    println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
    ("localhost", localCluster.getLeaderRPCPort, localCluster)
  }

 
  /**
   * Start flink cluster and create interpreter
   */
  override def open: Unit = {
    outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
    out = new ByteBufOutputStream(outBuf)
    in = new ByteBufInputStream(outBuf)
    //    val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
    val (host, port, localCluster) = startLocalMiniCluster()
    this.cluster = localCluster
    val conf = cluster.configuration
    println(s"Connecting to Flink cluster (host:$host,port:$port)...")
    flinkILoop = new FlinkILoop(host, port, conf, None)
    val settings = new Settings()
    settings.usejavacp.value = true
    settings.Yreplsync.value = true
    flinkILoop.settings_$eq(settings)
    flinkILoop.createInterpreter()
    imain = flinkILoop.intp
    FlinkInterpreter.ourClassloader = imain.classLoader
    val benv = flinkILoop.scalaBenv
    val senv = flinkILoop.scalaSenv
    benv.getConfig.disableSysoutLogging()
    senv.getConfig.disableSysoutLogging()
    // import libraries
    imain.interpret("import scala.tools.nsc.io._")
    //    imain.interpret("import Properties.userHome")
    imain.interpret("import scala.compat.Platform.EOL")
    imain.interpret("import org.apache.flink.api.scala._")
    imain.interpret("import org.apache.flink.api.common.functions._")
    isRunning.set(true)
  }

  override def interpret(line: String): InterpreterResult = {
    if (line == null || line.trim.length == 0) {
      return new InterpreterResult(Code.SUCCESS)
    }
    interpret(line.split("\n"))
  }

  /**
   * Interprete code
   * @param lines
   * @return
   */
  def interpret(lines: Array[String]): InterpreterResult = {
    val imain: IMain = getImain
    val linesToRun: Array[String] = new Array[String](lines.length + 1)
    for (i <- 0 until lines.length) {
      linesToRun(i) = lines(i)
    }
    linesToRun(lines.length) = "print(\"\")"
    System.setOut(new PrintStream(out))
    out.buffer().clear()
    var r: Code = null
    var incomplete: String = ""
    var inComment: Boolean = false
    for (l <- 0 until linesToRun.length) {
      val s: String = linesToRun(l)
      var continuation: Boolean = false
      if (l + 1 < linesToRun.length) {
        val nextLine: String = linesToRun(l + 1).trim
        if (nextLine.isEmpty ||
            nextLine.startsWith("//") ||
            nextLine.startsWith("}") ||
            nextLine.startsWith("object")) {
          continuation = true
        } else if (!inComment && nextLine.startsWith("/*")) {
          inComment = true
          continuation = true
        } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
          inComment = false
          continuation = true
        } else if (nextLine.length > 1 &&
            nextLine.charAt(0) == '.' &&
            nextLine.charAt(1) != '.' &&
            nextLine.charAt(1) != '/') {
          continuation = true
        } else if (inComment) {
          continuation = true
        }
        if (continuation) {
          incomplete += s + "\n"
        }
      }
      if (!continuation) {
        val currentCommand: String = incomplete
        var res: Results.Result = null
        try {
          res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
            override def apply() = {
              imain.interpret(curr
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB