Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ configurations {
provided
}

version = "0.0.5"
version = "0.0.6"

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -33,7 +33,7 @@ public class BigqueryJavaOutputPlugin
{
private final Logger logger = LoggerFactory.getLogger(BigqueryJavaOutputPlugin.class);
private List<Path> paths;
private final ConcurrentHashMap<Long, BigqueryFileWriter> writers = BigqueryUtil.getFileWriters();
private final HashMap<Long, BigqueryFileWriter> writers = BigqueryUtil.getFileWriters();

@Override
public ConfigDiff transaction(ConfigSource config,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
package org.embulk.output.bigquery_java;

import java.util.concurrent.ConcurrentHashMap;
import java.util.HashMap;

import org.embulk.output.bigquery_java.config.PluginTask;


// This class should be called in TransactionalPageOutput.add
// because thread id is used to determine filename.
// FIXME
// ThreadLocal should remove before exit due to the memory leak however
// Embulk is always up and down therefore we might ignore memory. Holding object in
// local thread belongs singleton as well.
public class BigqueryThreadLocalFileWriter {
private static ThreadLocal<BigqueryFileWriter> tl = ThreadLocal.withInitial(BigqueryFileWriter::new);
private static ConcurrentHashMap<Long, BigqueryFileWriter> writers;
private static final HashMap<Long, BigqueryFileWriter> writers = BigqueryUtil.getFileWriters();

public static void setFileWriter(PluginTask task){
BigqueryFileWriter writer = tl.get();
writer.setTask(task);
writer.setCompression(task.getCompression());
writers = BigqueryUtil.getFileWriters();
writers.put(Thread.currentThread().getId(), writer);
tl.set(writer);
long key = Thread.currentThread().getId();
if (!writers.containsKey(key)){
writer.setTask(task);
writer.setCompression(task.getCompression());
writers.put(key, writer);
tl.set(writer);
}
}

public static void write(byte[] bytes){
tl.get().write(bytes);
}

public static void remove(){
tl.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Optional;

Expand Down Expand Up @@ -51,12 +51,12 @@ public static class ObjectMapperInstanceHolder {
private static final ObjectMapper INSTANCE = new ObjectMapper();
}

public static ConcurrentHashMap<Long, BigqueryFileWriter> getFileWriters(){
public static HashMap<Long, BigqueryFileWriter> getFileWriters(){
return FileWriterHolder.INSTANCE;
}

public static class FileWriterHolder {
private static final ConcurrentHashMap<Long, BigqueryFileWriter> INSTANCE = new ConcurrentHashMap<>();
private static final HashMap<Long, BigqueryFileWriter> INSTANCE = new HashMap<>();
}

public static Optional<BigqueryColumnOption> findColumnOption(String columnName, List<BigqueryColumnOption> columnOptions) {
Expand Down