天天看點

impala儲存時間類型timestamp---NanoTimeUtils根據JdateTime生成nanotime

impala中時間字段采用int96儲存,正常時間如:2017-05-23 11:59:43.345717要儲存為timestamp類型,則需要經過轉換才能使用。采用Julian day來格式化時間,利用JdateTime生成nanotime然後轉換為Binary儲存到hdfs.NanoTimeUtils根據JdateTime生成nanotime

注意時區:不同時區生成的結果不同

/**
 * 建立日期:2017-8-4
 * 包路徑:org.meter.parquet.NanoTimeUtils.java
 * 建立者:meter
 * 描述:
 * 版權:[email protected] by meter !
 */
package org.meter.parquet;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.TimeZone;

import org.apache.parquet.example.data.simple.NanoTime;


import jodd.datetime.JDateTime;
/**
 * @author meter 
 * 檔案名:NanoTimeUtils
 * @描述:NanoTime工具,用于儲存parquet檔案timestamp類型字段
 */
public class NanoTimeUtils {

	static final long NANOS_PER_SECOND = 1000000000;
	static final long SECONDS_PER_MINUTE = 60;
	static final long MINUTES_PER_HOUR = 60;

	private static final ThreadLocal<Calendar> parquetTsCalendar = new ThreadLocal<Calendar>();

	private static Calendar getCalendar() {
		// Calendar.getInstance calculates the current-time needlessly, so cache
		// an instance.
		if (parquetTsCalendar.get() == null) {
			parquetTsCalendar.set(Calendar.getInstance(TimeZone
					.getTimeZone("Asia/Shanghai")));
		}
		return parquetTsCalendar.get();
	}

	public static NanoTime getNanoTime(Timestamp ts) {

		Calendar calendar = getCalendar();
		calendar.setTime(ts);
		JDateTime jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
				calendar.get(Calendar.MONTH) + 1, // java calendar index
													// starting at 1.
				calendar.get(Calendar.DAY_OF_MONTH));
		int days = jDateTime.getJulianDayNumber();

		long hour = calendar.get(Calendar.HOUR_OF_DAY);
		long minute = calendar.get(Calendar.MINUTE);
		long second = calendar.get(Calendar.SECOND);
		long nanos = ts.getNanos();
		long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_SECOND
				* SECONDS_PER_MINUTE * minute + NANOS_PER_SECOND
				* SECONDS_PER_MINUTE * MINUTES_PER_HOUR * hour;
		return new NanoTime(days, nanosOfDay);
	}

	public static NanoTime getNanoTime(String time){
		Timestamp ts=Timestamp.valueOf(time);
		Calendar calendar = getCalendar();
		calendar.setTime(ts);
		JDateTime jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
				calendar.get(Calendar.MONTH) + 1, // java calendar index
													// starting at 1.
				calendar.get(Calendar.DAY_OF_MONTH));
		int days = jDateTime.getJulianDayNumber();

		long hour = calendar.get(Calendar.HOUR_OF_DAY);
		long minute = calendar.get(Calendar.MINUTE);
		long second = calendar.get(Calendar.SECOND);
		long nanos = ts.getNanos();
		long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_SECOND
				* SECONDS_PER_MINUTE * minute + NANOS_PER_SECOND
				* SECONDS_PER_MINUTE * MINUTES_PER_HOUR * hour;
		return new NanoTime(days, nanosOfDay);
	
	}
	public static Timestamp getTimestamp(NanoTime nt) {
		int julianDay = nt.getJulianDay();
		long nanosOfDay = nt.getTimeOfDayNanos();

		JDateTime jDateTime = new JDateTime((double) julianDay);
		Calendar calendar = getCalendar();
		calendar.set(Calendar.YEAR, jDateTime.getYear());
		calendar.set(Calendar.MONTH, jDateTime.getMonth() - 1); // java calender
																// index
																// starting at
																// 1.
		calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay());

		long remainder = nanosOfDay;
		int hour = (int) (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR));
		remainder = remainder
				% (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
		int minutes = (int) (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE));
		remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE);
		int seconds = (int) (remainder / (NANOS_PER_SECOND));
		long nanos = remainder % NANOS_PER_SECOND;

		calendar.set(Calendar.HOUR_OF_DAY, hour);
		calendar.set(Calendar.MINUTE, minutes);
		calendar.set(Calendar.SECOND, seconds);
		Timestamp ts = new Timestamp(calendar.getTimeInMillis());
		ts.setNanos((int) nanos);
		return ts;
	}

	public static void main(String[] args) {
		getNanoTime("2017-05-23 11:59:43.345717");
	}
}
           
測試類:寫parquet格式檔案,用于impala操作timestamp字段access_time;impala中timestamp字段類型儲存為int96
           
/**
 * 建立日期:2017-8-3
 * 包路徑:org.meter.parquet.ParquetWriteTimeStampDemo.java
 * 建立者:meter
 * 描述:
 * 版權:[email protected] by meter !
 */
package org.meter.parquet;

import java.io.IOException;
import jodd.datetime.JDateTime;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author meter
 * 檔案名:ParquetWriteTimeStampDemo
 * @描述:
 */
public class ParquetWriteTimeStampDemo {

	private static Logger logger = LoggerFactory
			.getLogger(ParquetWriteTimeStampDemo.class);
	private static String schemaStr = "message schema {"
			+ "optional int64 log_id;" + "optional binary idc_id;"
			+ "optional int64 house_id;" + "optional int64 src_ip_long;"
			+ "optional int64 dest_ip_long;" + "optional int64 src_port;"
			+ "optional int64 dest_port;" + "optional int32 protocol_type;"
			+ "optional binary url64;" + "optional int96 access_time;}";
	private static MessageType schema = MessageTypeParser
			.parseMessageType(schemaStr);
	private static SimpleGroupFactory groupFactory = new SimpleGroupFactory(
			schema);

	/**
	 * 建立時間:2017-8-3
	 * 建立者:meter
	 * 傳回值類型:ParquetWriter
	 * @描述:初始化writer
	 * @param path
	 * @return
	 * @throws IOException
	 */
	private static ParquetWriter<Group> initWriter(String path) throws IOException {
		Path file = new Path("file:///"+path);
		ExampleParquetWriter.Builder builder = ExampleParquetWriter
				.builder(file).withWriteMode(ParquetFileWriter.Mode.CREATE)
				.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
				.withCompressionCodec(CompressionCodecName.SNAPPY)
				// .withConf(configuration)
				.withType(schema);
		/*
		 * file, new GroupWriteSupport(), CompressionCodecName.SNAPPY, 256 *
		 * 1024 * 1024, 1 * 1024 * 1024, 512, true, false,
		 * ParquetProperties.WriterVersion.PARQUET_1_0, conf
		 */
		return builder.build();
	}

	
	/**
	 * 建立時間:2017-8-3 建立者:meter 傳回值類型:void
	 * 
	 * @描述:
	 * @param args
	 * @throws IOException 
	 */
	public static void main(String[] args) throws IOException {
		ParquetWriter<Group> writer = initWriter("C:\\Users\\meir\\Desktop\\linuxtetdir\\logtxt\\testTime0804.parq");
		String[] access_log = { "111111", "22222", "33333", "44444", "55555",
				"666666", "777777", "888888", "999999", "2017-05-23 11:59:43.345717" };
		JDateTime time=new JDateTime("2017-05-23 11:59:43.345717");
		int day=time.getDay();
		
		for(int i=0;i<1000;i++){
		writer.write(groupFactory.newGroup()
				.append("log_id", Long.parseLong(access_log[0]))
				.append("idc_id", access_log[1])
				.append("house_id", Long.parseLong(access_log[2]))
				.append("src_ip_long", Long.parseLong(access_log[3]))
				.append("dest_ip_long", Long.parseLong(access_log[4]))
				.append("src_port", Long.parseLong(access_log[5]))
				.append("dest_port", Long.parseLong(access_log[6]))
				.append("protocol_type", Integer.parseInt(access_log[7]))
				.append("url64", access_log[8])
				.append("access_time", NanoTimeUtils.getNanoTime(access_log[9]).toBinary()));
		}
		writer.close();
	}

}