|
David Pavlis
2012-01-06, 11:01
Denny Ye
2012-01-09, 08:29
David Pavlis
2012-01-09, 17:56
Todd Lipcon
2012-01-09, 17:59
David Pavlis
2012-01-09, 20:01
Todd Lipcon
2012-01-09, 20:30
David Pavlis
2012-01-10, 12:32
Todd Lipcon
2012-01-10, 18:19
David Pavlis
2012-01-11, 16:31
Joey Echeverria
2012-01-11, 16:34
|
-
How-to use DFSClient's BlockReader from JavaDavid Pavlis 2012-01-06, 11:01
Hi,
I am relatively new to Hadoop and I am trying to utilize HDFS for own application where I want to take advantage of data partitioning HDFS performs. The idea is that I get list of individual blocks - BlockLocations of particular file and then directly read those (go to individual DataNodes). So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the way to go. However I am struggling with instantiating the BlockReader() class, namely creating the "Token<BlockTokenIdentifier>". Is there an example Java code showing how to access individual blocks of particular file stored on HDFS ? Thanks in advance, David.
-
Re: How-to use DFSClient's BlockReader from JavaDenny Ye 2012-01-09, 08:29
hi David
Please refer to the method "DFSInputStream#blockSeekTo", it has same purpose with you. *************************************************************************** LocatedBlock targetBlock = getBlockAt(target, true); assert (target==this.pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); DNAddrPair retval = chooseDataNode(targetBlock); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; try { s = socketFactory.createSocket(); NetUtils.connect(s, targetAddr, socketTimeout); s.setSoTimeout(socketTimeout); Block blk = targetBlock.getBlock(); Token<BlockTokenIdentifier> accessToken targetBlock.getBlockToken(); blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), accessToken, blk.getGenerationStamp(), offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, clientName); *************************************************************************** -Regards Denny Ye 2012/1/6 David Pavlis <[EMAIL PROTECTED]> > Hi, > > I am relatively new to Hadoop and I am trying to utilize HDFS for own > application where I want to take advantage of data partitioning HDFS > performs. > > The idea is that I get list of individual blocks - BlockLocations of > particular file and then directly read those (go to individual DataNodes). > So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the way > to go. > > However I am struggling with instantiating the BlockReader() class, namely > creating the "Token<BlockTokenIdentifier>". > > Is there an example Java code showing how to access individual blocks of > particular file stored on HDFS ? > > Thanks in advance, > > David. > > > > > >
-
Re: How-to use DFSClient's BlockReader from JavaDavid Pavlis 2012-01-09, 17:56
Hi Denny,
Thanks a lot. I was able to make my code work. I am posting a small example below - in case somebody in the future has similar need ;-) (not handling replica datablocks). David. *************************************************************************** public static void main(String args[]){ String filename="/user/hive/warehouse/sample_07/sample_07.csv"; int DATANODE_PORT = 50010; int NAMENODE_PORT = 8020; String HOST_IP = "192.168.1.230"; byte[] buf=new byte[1000]; try{ ClientProtocol client= DFSClient.createNamenode(new InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration()); LocatedBlocks located= client.getBlockLocations(filename, 0, Long.MAX_VALUE); for(LocatedBlock block : located.getLocatedBlocks()){ Socket sock = SocketFactory.getDefault().createSocket(); InetSocketAddress targetAddr = new InetSocketAddress(HOST_IP,DATANODE_PORT); NetUtils.connect(sock, targetAddr, 10000); sock.setSoTimeout(10000); BlockReader reader=BlockReader.newBlockReader(sock, filename, block.getBlock().getBlockId(), block.getBlockToken(), block.getBlock().getGenerationStamp(), 0, block.getBlockSize(), 1000); int count=0; int length; while((length=reader.read(buf,0,1000))>0){ //System.out.print(new String(buf,0,length,"UTF-8")); if (length<1000) break; } reader.close(); sock.close(); } }catch(IOException ex){ ex.printStackTrace(); } } *************************************************************************** From: Denny Ye <[EMAIL PROTECTED]> Reply-To: <[EMAIL PROTECTED]> Date: Mon, 9 Jan 2012 16:29:18 +0800 To: <[EMAIL PROTECTED]> Subject: Re: How-to use DFSClient's BlockReader from Java hi David Please refer to the method "DFSInputStream#blockSeekTo", it has same purpose with you. *************************************************************************** LocatedBlock targetBlock = getBlockAt(target, true); assert (target==this.pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); DNAddrPair retval = chooseDataNode(targetBlock); chosenNode = retval.info <http://retval.info>; InetSocketAddress targetAddr = retval.addr; try { s = socketFactory.createSocket(); NetUtils.connect(s, targetAddr, socketTimeout); s.setSoTimeout(socketTimeout); Block blk = targetBlock.getBlock(); Token<BlockTokenIdentifier> accessToken targetBlock.getBlockToken(); blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), accessToken, blk.getGenerationStamp(), offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, clientName); *************************************************************************** -Regards Denny Ye 2012/1/6 David Pavlis <[EMAIL PROTECTED]> Hi, I am relatively new to Hadoop and I am trying to utilize HDFS for own application where I want to take advantage of data partitioning HDFS performs. The idea is that I get list of individual blocks - BlockLocations of particular file and then directly read those (go to individual DataNodes). So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the way to go. However I am struggling with instantiating the BlockReader() class, namely creating the "Token<BlockTokenIdentifier>". Is there an example Java code showing how to access individual blocks of particular file stored on HDFS ? Thanks in advance, David.
-
Re: How-to use DFSClient's BlockReader from JavaTodd Lipcon 2012-01-09, 17:59
Hi David,
For what it's worth, you should be aware that you're calling internal APIs that have no guarantee of stability between versions. I can practically guarantee that your code will have to be modified for any HDFS upgrade you do. That's why these APIs are undocumented. Perhaps you can explain what your high-level goal is, here, and we can suggest a supported mechanism for achieving it. -Todd On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <[EMAIL PROTECTED]> wrote: > Hi Denny, > > Thanks a lot. I was able to make my code work. > > I am posting a small example below - in case somebody in the future has > similar need ;-) > (not handling replica datablocks). > > David. > > *************************************************************************** > public static void main(String args[]){ > String filename="/user/hive/warehouse/sample_07/sample_07.csv"; > int DATANODE_PORT = 50010; > int NAMENODE_PORT = 8020; > String HOST_IP = "192.168.1.230"; > > byte[] buf=new byte[1000]; > > > try{ > > ClientProtocol client= DFSClient.createNamenode(new > InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration()); > > > > > LocatedBlocks located= client.getBlockLocations(filename, 0, > Long.MAX_VALUE); > > > > for(LocatedBlock block : located.getLocatedBlocks()){ > Socket sock = SocketFactory.getDefault().createSocket(); > InetSocketAddress targetAddr = new > InetSocketAddress(HOST_IP,DATANODE_PORT); > NetUtils.connect(sock, targetAddr, 10000); > sock.setSoTimeout(10000); > > > BlockReader reader=BlockReader.newBlockReader(sock, filename, > block.getBlock().getBlockId(), block.getBlockToken(), > block.getBlock().getGenerationStamp(), 0, block.getBlockSize(), > 1000); > > > int count=0; > int length; > while((length=reader.read(buf,0,1000))>0){ > //System.out.print(new String(buf,0,length,"UTF-8")); > if (length<1000) break; > } > reader.close(); > sock.close(); > } > > > }catch(IOException ex){ > ex.printStackTrace(); > } > > } > > > > > > > *************************************************************************** > > > > From: Denny Ye <[EMAIL PROTECTED]> > Reply-To: <[EMAIL PROTECTED]> > Date: Mon, 9 Jan 2012 16:29:18 +0800 > To: <[EMAIL PROTECTED]> > Subject: Re: How-to use DFSClient's BlockReader from Java > > > hi David Please refer to the method "DFSInputStream#blockSeekTo", it > has same purpose with you. > > *************************************************************************** > LocatedBlock targetBlock = getBlockAt(target, true); > assert (target==this.pos) : "Wrong postion " + pos + " expect " + > target; > long offsetIntoBlock = target - targetBlock.getStartOffset(); > > DNAddrPair retval = chooseDataNode(targetBlock); > chosenNode = retval.info <http://retval.info>; > InetSocketAddress targetAddr = retval.addr; > > try { > s = socketFactory.createSocket(); > NetUtils.connect(s, targetAddr, socketTimeout); > s.setSoTimeout(socketTimeout); > Block blk = targetBlock.getBlock(); > Token<BlockTokenIdentifier> accessToken > targetBlock.getBlockToken(); > > blockReader = BlockReader.newBlockReader(s, src, > blk.getBlockId(), > accessToken, > blk.getGenerationStamp(), > offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, > buffersize, verifyChecksum, clientName); > > > *************************************************************************** Todd Lipcon Software Engineer, Cloudera
-
Re: How-to use DFSClient's BlockReader from JavaDavid Pavlis 2012-01-09, 20:01
Hi Todd,
Thanks for letting me know. OK - here is what I am trying to do (it is a POC for now): We have an ETL framework which helps with transforming data - parsing various formats, reading from DBs, aggregating, sorting, etc.. We do have currently concept of a "cluster" which basically allows input data (say datafile) be split/partitioned across several nodes and then one data transformation is executed on those data. The way it works is that we can analyze what the transformation does and if it is supposed to consume data which is spread over cluster, we execute a slightly modified copy/instance of that transformation on each node of the cluster where some piece/partition of data resides. We do not have concept of any "clustered" filesystem - our partitioned data reside in ordinary files and there is no metadata layer on top. If we need one single output data file, then we just perform merge operation. It is quite limiting as if we need to manipulate such data, we need to do it piece by piece (on each participating node). So we essentially do split-transform-merge with merge being optional (can be part of the transformation directly, so we don't create temp files). Here is the idea with HDFS - each of our transformation node becomes HDFS's datanode. Then if we are to process particular input data split over several datanodes, then we just instruct our transformation nodes to read specific block of the file (block/s which happens to be on the same physical machine as our transformation node is also datanode) -hence my interest in BlockReader. I was also considering wrapping our transformation job into map-reduce job of Hadoop, but that seems a bit limiting and also we would need to "take" the whole Hadoop stack and give it control over our jobs. But that still might be the right way. Also, I need to solve writing of partitioned data - here I would like to control the block allocation somehow as ideally transformation running on particular node would be reading locally stored blocks and outputting data to locally allocated block of HDFS file. Well, I hope I explained the situation clearly enough. I will be thankful for any comments. Regards, David. On 9.1.12 6:59 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >Hi David, > >For what it's worth, you should be aware that you're calling internal >APIs that have no guarantee of stability between versions. I can >practically guarantee that your code will have to be modified for any >HDFS upgrade you do. That's why these APIs are undocumented. > >Perhaps you can explain what your high-level goal is, here, and we can >suggest a supported mechanism for achieving it. > >-Todd > >On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <[EMAIL PROTECTED]> >wrote: >> Hi Denny, >> >> Thanks a lot. I was able to make my code work. >> >> I am posting a small example below - in case somebody in the future has >> similar need ;-) >> (not handling replica datablocks). >> >> David. >> >> >>************************************************************************* >>** >> public static void main(String args[]){ >> String filename="/user/hive/warehouse/sample_07/sample_07.csv"; >> int DATANODE_PORT = 50010; >> int NAMENODE_PORT = 8020; >> String HOST_IP = "192.168.1.230"; >> >> byte[] buf=new byte[1000]; >> >> >> try{ >> >> ClientProtocol client= DFSClient.createNamenode(new >> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration()); >> >> >> >> >> LocatedBlocks located>>client.getBlockLocations(filename, 0, >> Long.MAX_VALUE); >> >> >> >> for(LocatedBlock block : located.getLocatedBlocks()){ >> Socket sock >>SocketFactory.getDefault().createSocket(); >> InetSocketAddress targetAddr = new >> InetSocketAddress(HOST_IP,DATANODE_PORT); >> NetUtils.connect(sock, targetAddr, 10000); >> sock.setSoTimeout(10000); >
-
Re: How-to use DFSClient's BlockReader from JavaTodd Lipcon 2012-01-09, 20:30
Hi David,
I'd definitely recommend using MapReduce for this. What you've described is essentially identical to MR. Otherwise, you should use the public API FileSystem.getFileBlockLocations(), and then read the host names out of the returned BlockLocation struct. Then just use a normal FileSystem open call from that node - it will automatically pick the local replica for you without any further work. -Todd On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <[EMAIL PROTECTED]> wrote: > Hi Todd, > > Thanks for letting me know. OK - here is what I am trying to do (it is a > POC for now): > > We have an ETL framework which helps with transforming data - parsing > various formats, reading from DBs, > aggregating, sorting, etc.. > We do have currently concept of a "cluster" which basically allows input > data (say datafile) be split/partitioned > across several nodes and then one data transformation is executed on those > data. The way it works is that > we can analyze what the transformation does and if it is supposed to > consume data which is spread over cluster, we > execute a slightly modified copy/instance of that transformation on each > node of the cluster where some piece/partition > of data resides. We do not have concept of any "clustered" filesystem - > our partitioned data reside in ordinary files > and there is no metadata layer on top. If we need one single output data > file, then we just perform merge operation. It is > quite limiting as if we need to manipulate such data, we need to do it > piece by piece (on each participating node). > > So we essentially do split-transform-merge with merge being optional (can > be part of the transformation directly, so we don't > create temp files). > > Here is the idea with HDFS - each of our transformation node becomes > HDFS's datanode. Then if we are to process particular > input data split over several datanodes, then we just instruct our > transformation nodes to read specific block of the file (block/s > which happens to be on the same physical machine as our transformation > node is also datanode) -hence my interest in BlockReader. > > I was also considering wrapping our transformation job into map-reduce job > of Hadoop, but that seems a bit limiting and also > we would need to "take" the whole Hadoop stack and give it control over > our jobs. But that still might be the right way. > Also, I need to solve writing of partitioned data - here I would like to > control the block allocation somehow as ideally transformation > running on particular node would be reading locally stored blocks and > outputting data to locally allocated block of HDFS file. > > Well, I hope I explained the situation clearly enough. > > I will be thankful for any comments. > > Regards, > > David. > > > > > On 9.1.12 6:59 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: > >>Hi David, >> >>For what it's worth, you should be aware that you're calling internal >>APIs that have no guarantee of stability between versions. I can >>practically guarantee that your code will have to be modified for any >>HDFS upgrade you do. That's why these APIs are undocumented. >> >>Perhaps you can explain what your high-level goal is, here, and we can >>suggest a supported mechanism for achieving it. >> >>-Todd >> >>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <[EMAIL PROTECTED]> >>wrote: >>> Hi Denny, >>> >>> Thanks a lot. I was able to make my code work. >>> >>> I am posting a small example below - in case somebody in the future has >>> similar need ;-) >>> (not handling replica datablocks). >>> >>> David. >>> >>> >>>************************************************************************* >>>** >>> public static void main(String args[]){ >>> String filename="/user/hive/warehouse/sample_07/sample_07.csv"; >>> int DATANODE_PORT = 50010; >>> int NAMENODE_PORT = 8020; >>> String HOST_IP = "192.168.1.230"; >>> >>> byte[] buf=new byte[1000]; >>> >>> >>> try{ >>> >>> ClientProtocol client= DFSClient.createNamenode(new Todd Lipcon Software Engineer, Cloudera
-
Re: How-to use DFSClient's BlockReader from JavaDavid Pavlis 2012-01-10, 12:32
Hi Todd,
Understand, I will re-think the MR approach. Nonetheless I like the idea with getting the block id and accessing the locally stored file directly. Is there a way (public interface) to find for particular datanode, where the local storage is rooted (which local subdirs it uses) ? I found that this info is available in Storage class, but is there a "public" way of getting it - through some protocol or so ? Thanks in advance, David. On 9.1.12 9:30 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >Hi David, > >I'd definitely recommend using MapReduce for this. What you've >described is essentially identical to MR. > >Otherwise, you should use the public API >FileSystem.getFileBlockLocations(), and then read the host names out >of the returned BlockLocation struct. Then just use a normal >FileSystem open call from that node - it will automatically pick the >local replica for you without any further work. > >-Todd > >On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <[EMAIL PROTECTED]> >wrote: >> Hi Todd, >> >> Thanks for letting me know. OK - here is what I am trying to do (it is >>a >> POC for now): >> >> We have an ETL framework which helps with transforming data - parsing >> various formats, reading from DBs, >> aggregating, sorting, etc.. >> We do have currently concept of a "cluster" which basically allows input >> data (say datafile) be split/partitioned >> across several nodes and then one data transformation is executed on >>those >> data. The way it works is that >> we can analyze what the transformation does and if it is supposed to >> consume data which is spread over cluster, we >> execute a slightly modified copy/instance of that transformation on each >> node of the cluster where some piece/partition >> of data resides. We do not have concept of any "clustered" filesystem - >> our partitioned data reside in ordinary files >> and there is no metadata layer on top. If we need one single output data >> file, then we just perform merge operation. It is >> quite limiting as if we need to manipulate such data, we need to do it >> piece by piece (on each participating node). >> >> So we essentially do split-transform-merge with merge being optional >>(can >> be part of the transformation directly, so we don't >> create temp files). >> >> Here is the idea with HDFS - each of our transformation node becomes >> HDFS's datanode. Then if we are to process particular >> input data split over several datanodes, then we just instruct our >> transformation nodes to read specific block of the file (block/s >> which happens to be on the same physical machine as our transformation >> node is also datanode) -hence my interest in BlockReader. >> >> I was also considering wrapping our transformation job into map-reduce >>job >> of Hadoop, but that seems a bit limiting and also >> we would need to "take" the whole Hadoop stack and give it control over >> our jobs. But that still might be the right way. >> Also, I need to solve writing of partitioned data - here I would like to >> control the block allocation somehow as ideally transformation >> running on particular node would be reading locally stored blocks and >> outputting data to locally allocated block of HDFS file. >> >> Well, I hope I explained the situation clearly enough. >> >> I will be thankful for any comments. >> >> Regards, >> >> David. >> >> >> >> >> On 9.1.12 6:59 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >> >>>Hi David, >>> >>>For what it's worth, you should be aware that you're calling internal >>>APIs that have no guarantee of stability between versions. I can >>>practically guarantee that your code will have to be modified for any >>>HDFS upgrade you do. That's why these APIs are undocumented. >>> >>>Perhaps you can explain what your high-level goal is, here, and we can >>>suggest a supported mechanism for achieving it. >>> >>>-Todd >>> >>>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <[EMAIL PROTECTED]> >>>wrote: >>>> Hi Denny, >>>> >>>> Thanks a lot. I was able to make my code work.
-
Re: How-to use DFSClient's BlockReader from JavaTodd Lipcon 2012-01-10, 18:19
On Tue, Jan 10, 2012 at 4:32 AM, David Pavlis <[EMAIL PROTECTED]> wrote:
> Hi Todd, > > Understand, I will re-think the MR approach. Nonetheless I like the idea > with getting the block id and accessing the locally stored file directly You really don't want to do this - what happens when the balancer moves the block from under you? > Is there a way (public interface) to find for particular datanode, where > the local storage is rooted (which local subdirs it uses) ? > I found that this info is available in Storage class, but is there a > "public" way of getting it - through some protocol or so ? No, it's also a private API. If you use the FileSystem API, and you're on the local node, all of these optimizations will happen automatically for you, plus they'll keep working when you upgrade. Even MapReduce just uses the public APIs. -Todd > > > On 9.1.12 9:30 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: > >>Hi David, >> >>I'd definitely recommend using MapReduce for this. What you've >>described is essentially identical to MR. >> >>Otherwise, you should use the public API >>FileSystem.getFileBlockLocations(), and then read the host names out >>of the returned BlockLocation struct. Then just use a normal >>FileSystem open call from that node - it will automatically pick the >>local replica for you without any further work. >> >>-Todd >> >>On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <[EMAIL PROTECTED]> >>wrote: >>> Hi Todd, >>> >>> Thanks for letting me know. OK - here is what I am trying to do (it is >>>a >>> POC for now): >>> >>> We have an ETL framework which helps with transforming data - parsing >>> various formats, reading from DBs, >>> aggregating, sorting, etc.. >>> We do have currently concept of a "cluster" which basically allows input >>> data (say datafile) be split/partitioned >>> across several nodes and then one data transformation is executed on >>>those >>> data. The way it works is that >>> we can analyze what the transformation does and if it is supposed to >>> consume data which is spread over cluster, we >>> execute a slightly modified copy/instance of that transformation on each >>> node of the cluster where some piece/partition >>> of data resides. We do not have concept of any "clustered" filesystem - >>> our partitioned data reside in ordinary files >>> and there is no metadata layer on top. If we need one single output data >>> file, then we just perform merge operation. It is >>> quite limiting as if we need to manipulate such data, we need to do it >>> piece by piece (on each participating node). >>> >>> So we essentially do split-transform-merge with merge being optional >>>(can >>> be part of the transformation directly, so we don't >>> create temp files). >>> >>> Here is the idea with HDFS - each of our transformation node becomes >>> HDFS's datanode. Then if we are to process particular >>> input data split over several datanodes, then we just instruct our >>> transformation nodes to read specific block of the file (block/s >>> which happens to be on the same physical machine as our transformation >>> node is also datanode) -hence my interest in BlockReader. >>> >>> I was also considering wrapping our transformation job into map-reduce >>>job >>> of Hadoop, but that seems a bit limiting and also >>> we would need to "take" the whole Hadoop stack and give it control over >>> our jobs. But that still might be the right way. >>> Also, I need to solve writing of partitioned data - here I would like to >>> control the block allocation somehow as ideally transformation >>> running on particular node would be reading locally stored blocks and >>> outputting data to locally allocated block of HDFS file. >>> >>> Well, I hope I explained the situation clearly enough. >>> >>> I will be thankful for any comments. >>> >>> Regards, >>> >>> David. >>> >>> >>> >>> >>> On 9.1.12 6:59 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >>> >>>>Hi David, >>>> >>>>For what it's worth, you should be aware that you're calling internal Todd Lipcon Software Engineer, Cloudera
-
Re: How-to use DFSClient's BlockReader from JavaDavid Pavlis 2012-01-11, 16:31
Hi Todd,
If I use the FileSystem API and I am on local node - how do I get/read just that particular block residing locally on that node ? Do I just open the FSDataInputStream and seek() to the position where the block starts ? Thanks, David. On 10.1.12 7:19 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >On Tue, Jan 10, 2012 at 4:32 AM, David Pavlis <[EMAIL PROTECTED]> >wrote: >> Hi Todd, >> >> Understand, I will re-think the MR approach. Nonetheless I like the idea >> with getting the block id and accessing the locally stored file directly > >You really don't want to do this - what happens when the balancer >moves the block from under you? > >> Is there a way (public interface) to find for particular datanode, where >> the local storage is rooted (which local subdirs it uses) ? >> I found that this info is available in Storage class, but is there a >> "public" way of getting it - through some protocol or so ? > >No, it's also a private API. If you use the FileSystem API, and you're >on the local node, all of these optimizations will happen >automatically for you, plus they'll keep working when you upgrade. >Even MapReduce just uses the public APIs. > >-Todd > >> >> >> On 9.1.12 9:30 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >> >>>Hi David, >>> >>>I'd definitely recommend using MapReduce for this. What you've >>>described is essentially identical to MR. >>> >>>Otherwise, you should use the public API >>>FileSystem.getFileBlockLocations(), and then read the host names out >>>of the returned BlockLocation struct. Then just use a normal >>>FileSystem open call from that node - it will automatically pick the >>>local replica for you without any further work. >>> >>>-Todd >>> >>>On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <[EMAIL PROTECTED]> >>>wrote: >>>> Hi Todd, >>>> >>>> Thanks for letting me know. OK - here is what I am trying to do (it >>>>is >>>>a >>>> POC for now): >>>> >>>> We have an ETL framework which helps with transforming data - parsing >>>> various formats, reading from DBs, >>>> aggregating, sorting, etc.. >>>> We do have currently concept of a "cluster" which basically allows >>>>input >>>> data (say datafile) be split/partitioned >>>> across several nodes and then one data transformation is executed on >>>>those >>>> data. The way it works is that >>>> we can analyze what the transformation does and if it is supposed to >>>> consume data which is spread over cluster, we >>>> execute a slightly modified copy/instance of that transformation on >>>>each >>>> node of the cluster where some piece/partition >>>> of data resides. We do not have concept of any "clustered" filesystem >>>>- >>>> our partitioned data reside in ordinary files >>>> and there is no metadata layer on top. If we need one single output >>>>data >>>> file, then we just perform merge operation. It is >>>> quite limiting as if we need to manipulate such data, we need to do it >>>> piece by piece (on each participating node). >>>> >>>> So we essentially do split-transform-merge with merge being optional >>>>(can >>>> be part of the transformation directly, so we don't >>>> create temp files). >>>> >>>> Here is the idea with HDFS - each of our transformation node becomes >>>> HDFS's datanode. Then if we are to process particular >>>> input data split over several datanodes, then we just instruct our >>>> transformation nodes to read specific block of the file (block/s >>>> which happens to be on the same physical machine as our transformation >>>> node is also datanode) -hence my interest in BlockReader. >>>> >>>> I was also considering wrapping our transformation job into map-reduce >>>>job >>>> of Hadoop, but that seems a bit limiting and also >>>> we would need to "take" the whole Hadoop stack and give it control >>>>over >>>> our jobs. But that still might be the right way. >>>> Also, I need to solve writing of partitioned data - here I would like >>>>to >>>> control the block allocation somehow as ideally transformation >>>> running on particular node would be reading locally stored blocks and
-
Re: How-to use DFSClient's BlockReader from JavaJoey Echeverria 2012-01-11, 16:34
Yup, just start reading from wherever the block starts and stop at the
end of the block to do local reads. -Joey On Wed, Jan 11, 2012 at 11:31 AM, David Pavlis <[EMAIL PROTECTED]> wrote: > Hi Todd, > > If I use the FileSystem API and I am on local node - how do I get/read just > that particular block residing locally on that node ? Do I just open the > FSDataInputStream and seek() to the position where the block starts ? > > Thanks, > > David. > > > > On 10.1.12 7:19 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: > >>On Tue, Jan 10, 2012 at 4:32 AM, David Pavlis <[EMAIL PROTECTED]> >>wrote: >>> Hi Todd, >>> >>> Understand, I will re-think the MR approach. Nonetheless I like the idea >>> with getting the block id and accessing the locally stored file directly >> >>You really don't want to do this - what happens when the balancer >>moves the block from under you? >> >>> Is there a way (public interface) to find for particular datanode, where >>> the local storage is rooted (which local subdirs it uses) ? >>> I found that this info is available in Storage class, but is there a >>> "public" way of getting it - through some protocol or so ? >> >>No, it's also a private API. If you use the FileSystem API, and you're >>on the local node, all of these optimizations will happen >>automatically for you, plus they'll keep working when you upgrade. >>Even MapReduce just uses the public APIs. >> >>-Todd >> >>> >>> >>> On 9.1.12 9:30 PM, "Todd Lipcon" <[EMAIL PROTECTED]> wrote: >>> >>>>Hi David, >>>> >>>>I'd definitely recommend using MapReduce for this. What you've >>>>described is essentially identical to MR. >>>> >>>>Otherwise, you should use the public API >>>>FileSystem.getFileBlockLocations(), and then read the host names out >>>>of the returned BlockLocation struct. Then just use a normal >>>>FileSystem open call from that node - it will automatically pick the >>>>local replica for you without any further work. >>>> >>>>-Todd >>>> >>>>On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <[EMAIL PROTECTED]> >>>>wrote: >>>>> Hi Todd, >>>>> >>>>> Thanks for letting me know. OK - here is what I am trying to do (it >>>>>is >>>>>a >>>>> POC for now): >>>>> >>>>> We have an ETL framework which helps with transforming data - parsing >>>>> various formats, reading from DBs, >>>>> aggregating, sorting, etc.. >>>>> We do have currently concept of a "cluster" which basically allows >>>>>input >>>>> data (say datafile) be split/partitioned >>>>> across several nodes and then one data transformation is executed on >>>>>those >>>>> data. The way it works is that >>>>> we can analyze what the transformation does and if it is supposed to >>>>> consume data which is spread over cluster, we >>>>> execute a slightly modified copy/instance of that transformation on >>>>>each >>>>> node of the cluster where some piece/partition >>>>> of data resides. We do not have concept of any "clustered" filesystem >>>>>- >>>>> our partitioned data reside in ordinary files >>>>> and there is no metadata layer on top. If we need one single output >>>>>data >>>>> file, then we just perform merge operation. It is >>>>> quite limiting as if we need to manipulate such data, we need to do it >>>>> piece by piece (on each participating node). >>>>> >>>>> So we essentially do split-transform-merge with merge being optional >>>>>(can >>>>> be part of the transformation directly, so we don't >>>>> create temp files). >>>>> >>>>> Here is the idea with HDFS - each of our transformation node becomes >>>>> HDFS's datanode. Then if we are to process particular >>>>> input data split over several datanodes, then we just instruct our >>>>> transformation nodes to read specific block of the file (block/s >>>>> which happens to be on the same physical machine as our transformation >>>>> node is also datanode) -hence my interest in BlockReader. >>>>> >>>>> I was also considering wrapping our transformation job into map-reduce >>>>>job >>>>> of Hadoop, but that seems a bit limiting and also Joseph Echeverria Cloudera, Inc. 443.305.9434 |