Kafka, mail # user - Re: Consume from X messages ago - 2013-03-22, 15:13
 Search Hadoop and all its subprojects:

Switch to Threaded View
Copy link to this message
-
Re: Consume from X messages ago
Thanks for the help.

FWIW, I ended up writing a simple Util that I can use as my consumer is
starting up to move the offset back.  It *seems* to work decently.
Thoughts?  Would this be something that would be helpful for contribution
back to Kafka, or is the idea just poor?

 /**
   * Attempts to move the consumer offset back.  If it has any issues,
throws an exception.
   */
  def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic:
String,  partition: Long, approximateMessagesBack: Long) {
    Preconditions.checkNotNull(groupId)
    Preconditions.checkNotNull(topic)
    Preconditions.checkArgument(partition >= 0)
    Preconditions.checkArgument(approximateMessagesBack > 0)

    val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic
+ "/" + partition

    if (zkClient.exists(path)) {
        val currentOffset = zkClient.readData[Any](path)  // We get this
from ZK as Any because the exact type is unpredictable. It might be Long or
String.
        val desiredOffset = math.max(0, (currentOffset.toString.toLong -
approximateMessagesBack))
        zkClient.writeData(path, desiredOffset.toString)
        warn("Reset the " + topic + " consumer to " + desiredOffset)
    }
    else {
      throw new RuntimeException("Unable to find the move the consumer back
in ZK.  This may or may not be an issue, depending on whether you expect
the path to exist. Path: " + path)
    }
  }
On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote:
Jim Englert
Gilt Groupe
2 Park Ave South, 5th Floor
New York, NY 10011
M: 847-707-2942
Please accept my invitation to join Gilt:
http://www.giltgroupe.com/invite/jenglert

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB