-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFirestoreIO.java
More file actions
138 lines (125 loc) · 5.9 KB
/
FirestoreIO.java
File metadata and controls
138 lines (125 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.current.location.persistence;
import com.codahale.metrics.health.HealthCheck;
import com.current.location.configuration.CloudFirestoreConfiguration;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.firestore.CollectionReference;
import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.FirestoreException;
import com.google.cloud.firestore.Query;
import com.google.cloud.firestore.QuerySnapshot;
import com.google.cloud.firestore.WriteResult;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.ws.rs.WebApplicationException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FirestoreIO {
private static final Logger LOGGER = LoggerFactory.getLogger(FirestoreIO.class);
private final Firestore firestore;
private final CloudFirestoreConfiguration firestoreConfiguration;
private final ExecutorService ioExecutor;
private final ObjectMapper objectMapper;
public FirestoreIO(CloudFirestoreConfiguration firestoreConfiguration,
ObjectMapper objectMapper) {
this.firestore = CloudFirestoreFactory.createCloudFirestore(firestoreConfiguration);
this.firestoreConfiguration = firestoreConfiguration;
this.ioExecutor = Executors.newFixedThreadPool(firestoreConfiguration.getReadWriteThreadpoolSize());
this.objectMapper = objectMapper;
}
public <T> WriteResult write(T obj) {
Map<String, Object> fields = objectMapper.convertValue(obj, new TypeReference<>() {
});
ApiFuture<WriteResult> apiFuture = firestore.collection(firestoreConfiguration.getCollectionName())
.document().create(fields);
CompletableFuture<WriteResult> completableFuture = completableFuture(apiFuture);
try {
return completableFuture.get(firestoreConfiguration.getWriteTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
throw new WebApplicationException("Timed out waiting for Firestore write to complete", te);
} catch (InterruptedException | ExecutionException e) {
throw new WebApplicationException("Unexpected exception waiting for Firestore write to complete", e);
}
}
public <T> Stream<T> whereQuery(String lookupField,
String equalsValue,
Class<T> readClass) {
ApiFuture<QuerySnapshot> apiFuture = buildQuery(firestore.collection(firestoreConfiguration.getCollectionName()),
q -> q.whereEqualTo(lookupField, equalsValue)).get();
return whereQueryInternal(apiFuture, readClass);
}
public <T> Stream<T> whereQueryWithTimeFilter(String lookupField,
String equalsValue,
String timestampField,
Long minTimestamp,
Class<T> readClass) {
ApiFuture<QuerySnapshot> apiFuture = buildQuery(firestore.collection(firestoreConfiguration.getCollectionName()),
q -> q.whereGreaterThan(timestampField, minTimestamp),
q -> q.whereEqualTo(lookupField, equalsValue)).get();
return whereQueryInternal(apiFuture, readClass);
}
private <T> Stream<T> whereQueryInternal(ApiFuture<QuerySnapshot> apiFuture,
Class<T> readClass) {
CompletableFuture<QuerySnapshot> completableFuture = completableFuture(apiFuture);
try {
return completableFuture.get(firestoreConfiguration.getReadTimeoutMillis(), TimeUnit.MILLISECONDS)
.getDocuments().stream()
.map(queryDocumentSnapshot -> objectMapper.convertValue(queryDocumentSnapshot.getData(), readClass));
} catch (TimeoutException te) {
throw new WebApplicationException("Timed out waiting for Firestore write to complete", te);
} catch (InterruptedException | ExecutionException e) {
throw new WebApplicationException("Unexpected exception waiting for Firestore read to complete", e);
}
}
public HealthCheck.Result healthCheck() {
try {
firestore.listCollections();
return HealthCheck.Result.healthy();
} catch (FirestoreException fse) {
String msg = "Unable to connect to Firestore instance";
LOGGER.error(msg, fse);
return HealthCheck.Result.unhealthy(msg);
} catch (Exception e) {
String msg = "Unexpected exception while attempting to connect to Firestore instance";
LOGGER.error(msg, e);
return HealthCheck.Result.unhealthy(msg);
}
}
private <T> CompletableFuture<T> completableFuture(ApiFuture<T> apiFuture) {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
ApiFutures.addCallback(
apiFuture,
new ApiFutureCallback<>() {
@Override
public void onSuccess(@Nullable T result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable t) {
completableFuture.completeExceptionally(new WebApplicationException("Firestore I/O failed", t));
}
},
ioExecutor
);
return completableFuture;
}
@SafeVarargs
private static Query buildQuery(CollectionReference collectionReference, Function<Query, Query>... queryCallables) {
Query finalQuery = collectionReference;
for (Function<Query, Query> callable : queryCallables) {
finalQuery = callable.apply(finalQuery);
}
return finalQuery;
}
}