Transfert files from Hadoop to a remote server via ssh

When working with Hadoop, you produce files in the hdfs. In order to copy them in one of your remote servers, you have to first use the get or the copyToLocal command to copy the files in your local filesystem and then use a scp command. But this two steps process is not really efficient since you are double-copying the files.

sshj is a pure Java implementation of SSHv2 allowing you to connect to an sshd server and use port forwarding, X11 forwarding, file transfer, etc... But sshj can't read or write in hdfs...

Fortunately, the SCPFileTransfert class (http://schmizz.net/sshj/javadoc/0.5.0/net/schmizz/sshj/xfer/scp/SCPFileTransfer.html) allows you to create your own LocalSourceFile. So let's do it:



import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;

import net.schmizz.sshj.xfer.LocalFileFilter;
import net.schmizz.sshj.xfer.LocalSourceFile;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileSystemSourceFile implements LocalSourceFile {

private FileSystem fs;

private FileStatus rootStatus;

public FileSystemSourceFile(FileSystem fs, String path) throws IOException {
this.fs = fs;
rootStatus = fs.getFileStatus(new Path(path));
}

public FileSystemSourceFile(FileSystem fs, FileStatus rootStatus)
throws IOException {
this.fs = fs;
this.rootStatus = rootStatus;
}

public String getName() {
return rootStatus.getPath().getName();
}

public long getLength() {
return rootStatus.getLen();
}

public InputStream getInputStream() throws IOException {
return fs.open(rootStatus.getPath());
}

public int getPermissions() throws IOException {
return rootStatus.getPermission().toShort();
}

public boolean isFile() {
return rootStatus.isFile();
}

public boolean isDirectory() {
return rootStatus.isDirectory();
}

public Iterable<? extends LocalSourceFile> getChildren(
LocalFileFilter filter) throws IOException {

FileStatus[] shopStatuses = fs.listStatus(rootStatus.getPath());

final List<FileSystemSourceFile> children = new LinkedList<FileSystemSourceFile>();
for (FileStatus f : shopStatuses) {
children.add(new FileSystemSourceFile(fs, f));
}
return children;
}

public boolean providesAtimeMtime() {
return true;
}

public long getLastAccessTime() throws IOException {
return rootStatus.getAccessTime() / 1000;
}

public long getLastModifiedTime() throws IOException {
return rootStatus.getModificationTime() / 1000;
}

}


And that's all! Now you can use it:


String host = ...;
String username = ...;
String password = ...;
String basePath = ...;
String hdfsPathSource = ...;
LOGGER.debug("Copy export to {}:{}@{}:{}", new Object[] { username,
password, host, basePath });

SSHClient ssh = new SSHClient();
ssh.useCompression();
ssh.loadKnownHosts();
ssh.connect(host);
ssh.authPassword(username, password);

try {
SCPFileTransfer transfert = ssh.newSCPFileTransfer();
transfert.upload(new FileSystemSourceFile(fs, hdfsPathSource),
basePath);
} finally {
ssh.close();
}

More infos here:
https://github.com/shikhar/sshj
http://schmizz.net/sshj/javadoc/0.5.0/net/schmizz/sshj/xfer/scp/SCPFileTransfer.html
http://hadoop.apache.org/docs/r0.18.1/hdfs_shell.html#get

Labels: , , ,