У меня здесь сложная ситуация, ..
Я занимаюсь разработкой многопоточного приложения Существует 5 потоков, которые я создаю и использую эти потоки. Я хочу написать программу для одновременной записи в 5 файлов, а также ограничить количество данных до 20000. I Я получаю данные из БД. Данные разные для всех 5 потоков. Таким образом, правильные данные должны войти в каждый файл. Кроме того, каждому потоку также необходим доступ к правильному порядку файлового редактора, чтобы записать правильные данные в правильный файл и иметь счетчик для каждого файла.
public class AuditMissingRelationshipGuidEntitiesToXml
{
private Logger logger = ERDLoggerUtility.getLogger( this.getClass().getName());
private DbPartitionedConnections myDbPConn = null;
private RelationshipRepositoryDao myRRDao = null;
private MrdPartitionedConnections myMRDPartitionConn = null;
private MasterRecordDao myMRDRDao = null;
private String myCollectionName = null;
private String myOutputDir = null;
private String myOutputFileName = null;
private EntityFileWriter myEntityFileWriter = null;
private NovusDocumentGuidCaseFinder novusDocGuidCaseFinder = null;
//private int entityFileWriterCount = 0;
private static List <String> collections = new ArrayList<String>();
private static final String DBDRIVER = "oracle.jdbc.driver.OracleDriver";
private BasicDataSource ds;
private static final int NTHREADS = 30;
private void setupDatabases() throws Exception {
ErdProperties props = ErdProperties.getProps();
this.myDbPConn = new RrdPartitionedConnections();
this.myRRDao = new OracleRelationshipRepositoryDao(this.myDbPConn);
this.myMRDPartitionConn = new MrdPartitionedConnections();
this.myMRDRDao = new OracleMasterRecordDao(this.myMRDPartitionConn);
ds = new BasicDataSource();
ds.setDriverClassName(DBDRIVER);
ds.setUrl(props.getProperty("wfdb.uri"));
ds.setUsername(props.getProperty("wfdb.user"));
ds.setPassword(props.getProperty("wfdb.password"));
novusDocGuidCaseFinder = new NovusDocumentGuidCaseFinder(ds);
novusDocGuidCaseFinder.setNovusCollection(myCollectionName);
}
private void setCommandLineProperties(String[] theArgs) throws Exception {
if (theArgs == null || theArgs.length != 3 ||
theArgs[0] == null || theArgs[0].length() == 0 ||
theArgs[1] == null || theArgs[1].length() == 0 ||
theArgs[2] == null || theArgs[2].length() == 0) {
System.err.println("You must provide 3 arguments! See below.");
System.err.println("\t1st Argument) -- Output directory location. ex: \"/ct-data/RRDGuids\"");
System.err.println("\t2nd Argument) -- Output file name. ex: \"rrd_seq_guid\"");
System.err.println("\t3rd Argument) -- Collection name. ex: \"N_ARREST\"");
throw new Exception("Invalid Arguments, see console output.");
}
else {
// Set the output directory location
this.myOutputDir = theArgs[0];
System.out.println("setCommandLineProperties() -- 1st Argument) Input directory location = [" + this.myOutputDir + "]");
// Set the output file name
this.myOutputFileName = theArgs[1];
System.out.println("setCommandLineProperties() -- 1st Argument) Input file name = [" + this.myOutputFileName + "]");
// Set the output file name
//this.myCollectionName = theArgs[2];
//System.out.println("setCommandLineProperties() -- 3nd Argument) Collection name = [" + this.myCollectionName + "]");
}
}
/**
* Add the rel seq and entity guid.
*
* @param theRelSeq
* @param theEntityGuid
* @throws Exception
*/
private synchronized void addEntity(String theRelSeq, String theEntityGuid,
String theRelGuid, EntityFileWriter myEntityFileWriter) throws Exception {
/*
if (this.myEntityFileWriter == null) {
System.out.println("file for :"+collectionName);
this.myEntityFileWriter = new EntityFileWriter(new File(myOutputDir), myOutputFileName, collectionName, System.currentTimeMillis());
}
*/
myEntityFileWriter.addEntity(theRelSeq, theEntityGuid, theRelGuid);
}
private Set<String> locateEntityGuidsInMRD(List<String> theRRDGuids, String collectionName) throws Exception {
Set<String> foundEntityGuids = new HashSet<String>();
System.out.println("Searching MRD for entity guids (AKA: BASE_GUID)... for : " + collectionName);
MasterRecordDao myMRDRDao = null;
synchronized(collectionName.intern()){
myMRDRDao = new OracleMasterRecordDao(this.myMRDPartitionConn);
/*for(String li : theRRDGuids){
System.out.println(li);
}*/
List<PersonEntity> personEntities = myMRDRDao.getByEntityGuid(theRRDGuids);
System.out.println("Found: " + (personEntities == null ? "0" : personEntities.size()) + " entities in MRD for :" + collectionName);
if (personEntities != null) {
for (PersonEntity pe : personEntities) {
foundEntityGuids.add(pe.getEntityGuid());
}
}
this.notify();
}
myMRDRDao=null;
return foundEntityGuids;
}
public void determineOrphanRelationships(String collectionName) throws Exception {
Long startTime = null;
Long endTime = null;
startTime = System.currentTimeMillis();
System.out.println("Started on: " + new Timestamp(startTime).toString() + "::" + collectionName);
synchronized(collectionName.intern()){
EntityFileWriter myEntityFileWriter = null;
Map<EntityDocRelationship, String> entityDocRels = new HashMap<EntityDocRelationship, String>();
EntityDocRelationshipIterator entityDocRelIter = myRRDao.getByCollection(collectionName);
int entityFileWriterCount = 0;
while (entityDocRelIter.hasNext()) {
/*
* Reference: OracleRelationshipRepositoryDao.populateFrom()
*
* EntityDocRelationship.getPk() = EntityDocRelationship.REL_SEQ (1st column in prepared statement)
* EntityDocRelationship.getRelGuid() = EntityDocRelationship.REL_GUID (4th position in prepared statement)
* EntityDocRelationship.getEntityGuid() = EntityDocRelationship.BASE_GUID (5th position in prepared statement)
* EntityDocRelationship.getDocGuid() = EntityDocRelationship.TARGET_GUID (6th position in prepared statement)
*/
if(entityDocRels.size() < 20000) {
EntityDocRelationship entityDocRel = (EntityDocRelationship)entityDocRelIter.next();
entityDocRels.put(entityDocRel, entityDocRel.getEntityGuid());
}
else{
entityFileWriterCount = printNotFoundEntityGuids(entityDocRels, collectionName,
entityFileWriterCount, myEntityFileWriter, false);
}
this.wait();
}
if(entityDocRels.size() > 0){
entityFileWriterCount = printNotFoundEntityGuids(entityDocRels, collectionName,
entityFileWriterCount, myEntityFileWriter, true);
endTime = System.currentTimeMillis();
entityFileWriterCount = 0;
System.out.println("Ended on:" + endTime);
System.out.println("Run time: " + (endTime - startTime));
}
}
}
private int printNotFoundEntityGuids(Map<EntityDocRelationship, String> entityDocRels, String collectionName,
int entityFileWriterCount, EntityFileWriter myEntityFileWriter, boolean closeWriter) throws Exception{
// Do work with the distinct set of entity guids.
Set<String> foundEntityGuids = locateEntityGuidsInMRD(new ArrayList<String>(entityDocRels.values()), collectionName);
System.out.println(foundEntityGuids.size());
System.out.println("Printing not found entity guid list...");
for (Map.Entry<EntityDocRelationship, String> entry : entityDocRels.entrySet()) {
if (!foundEntityGuids.contains(entry.getValue())) {
synchronized(this){
if (entityFileWriterCount == 0) {
System.out.println("file for :"+collectionName);
myEntityFileWriter = new EntityFileWriter(new File(myOutputDir), myOutputFileName,
collectionName, System.currentTimeMillis());
}
entityFileWriterCount++;
addEntity(String.valueOf(entry.getKey().getPk()), entry.getValue(), entry.getKey().getRelGuid(), myEntityFileWriter);
System.out.println(collectionName + " :: Relationship pk: " + entry.getKey().getPk() + " is orphan to Entity: " + entry.getValue());
System.out.println("Finished printing.");
if(closeWriter || entityFileWriterCount >= 10){
System.out.println("Closing file writer...");
if (this.myEntityFileWriter != null) {
this.myEntityFileWriter.endDocument();
this.myEntityFileWriter=null;
}
System.out.println("File writer closed.");
}
}
}
}
return entityFileWriterCount;
}
public void deleteRelationships(final String collectionName) throws Exception {
SAXParserFactory factory = SAXParserFactory.newInstance();
SAXParser saxParser = factory.newSAXParser();
File outputDir = new File(this.myOutputDir);
FilenameFilter filter = new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("rrd_seq_guid_"+collectionName);
}
};
String[] files = outputDir.list(filter);
for (String file : files) {
logger.info("Getting relationships from file: " + file + "...");
EntityFileHandler parsedFile = new EntityFileHandler();
saxParser.parse(this.myOutputDir + file, parsedFile);
Map<String, String> relSeqsGuids = new HashMap<String, String>();
for (EntityFileHandler.EntityFile entityFile : parsedFile.getDocuments()){
relSeqsGuids.put(entityFile.getRelSeq(), entityFile.getRelGuid());
}
/*logger.info("Deleting relationship sequence guids...");
myRRDao.deleteRelSeqGuids(relSeqsGuids.keySet());
logger.info("Relationship sequence guids deleted.");*/
/*
logger.info("Validating relationship guids in Novus...");
List<String> validNovusRelGuids = novusDocGuidCaseFinder.search(
relSeqsGuids.values().toArray(new String[relSeqsGuids.values().size()]));
logger.info("Relationship guids are valid in Novus.");*/
List<String> validNovusRelGuids = new ArrayList<String>();
validNovusRelGuids.addAll(relSeqsGuids.values());
logger.info("Sending delete files to Novus...");
sendDeleteFileToNovus(file, validNovusRelGuids, collectionName);
logger.info("Delete files sent.");
}
}
/**
* This method will send delete files to Novus based on the list
* of relationship Guids passed in.
*
* Example delete file:
*
* <deletes>
* <collection>
* <name>...</name>
* <guid></guid>
* .
* . N times
* .
* <guid></guid>
* </collection>
* </deletes>
*
* @param theRelGuids - ERD Relationship Guids to send a delete file for.
* @throws Exception
*/
public void sendDeleteFileToNovus(String theFile, List<String> theRelGuids, String collectionName) {
logger.info("Sending delete files to Novus...");
try{
Document document = new DocumentImpl();
// Create the Delete Element (aka: root): <Deletes>
Element deleteElement = document.createElement("deletes");
/*
* Create the Collection Element <Collection> and then add it to the Deletes Element.
*
* <deletes>
* <collection></collection>
* </deletes>
*/
Element collectionElement = document.createElement("collection");
deleteElement.appendChild(collectionElement);
/*
* Create the Name Element <Name> with the Collection name
* and then add it to the Collection Element.
*
* <deletes>
* <collection>
* <name>...</Name>
* </collection>
* </deletes>
*/
Element nameElement = document.createElementNS(null, "name");
nameElement.appendChild(document.createTextNode(collectionName));
collectionElement.appendChild(nameElement);
if (theRelGuids != null && theRelGuids.size() > 0) {
/*
* For each Relationship Guid add a Guid Element to the Collection
* Element with the Relationship Guid.
*
* <deletes>
* <collection>
* <name>...</name>
* <guid>...</guid>
* </collection>
* </deletes>
*/
for(String relGuid : theRelGuids){
Element relGuidElement = document.createElementNS(null, "guid");
relGuidElement.appendChild(document.createTextNode(relGuid));
collectionElement.appendChild(relGuidElement);
}
}
logger.info("Add delete element text content: " + deleteElement.getTextContent());
document.appendChild(deleteElement);
String deleteFile =
File.separator + "ct-data" +
File.separator + "erd" +
File.separator + "wip" +
File.separator + "relAtishDeletes" +
File.separator + theFile;
logger.info("Creating Novus delete file: " + deleteFile + "...");
FileOutputStream outStream = new FileOutputStream(deleteFile);
OutputFormat of = new OutputFormat("XML","ISO-8859-1",true);
of.setIndent(1);
of.setIndenting(true);
XMLSerializer serializer = new XMLSerializer(outStream, of);
serializer.asDOMSerializer();
serializer.serialize(document.getDocumentElement());
outStream.close();
logger.info("Delete file: " + deleteFile + " created.");
}
catch(Exception e){
e.printStackTrace();
}
/*MatchManageCommand mmc = new MatchManageCommand();
String publishXML = mmc.getPublishXml(-1, this.myCollectionName, "-1", theFile);
mmc.ms.putMessageOnQueue(publishXML, 2);*/
}
public void getCollections()
{
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;
try {
connection = ds.getConnection();
String query = "select novus_collection from erd_workflow.collection_info " +
"where Novus_collection in ('N-SALE', 'N-UCC00', 'N-UCC01', 'N-UCC02', 'N-UCC03') order by novus_collection asc";
statement = connection.createStatement();
resultSet = statement.executeQuery(query);
while (resultSet.next()) {
// Store the Novus Collection and associated Product.
collections.add(resultSet.getString(1));
}
}
catch (SQLException sqle) {
sqle.printStackTrace();
} catch(Exception e){
e.printStackTrace();
}
finally {
if (resultSet != null) {
try {
resultSet.close();
statement.close();
connection.close();
} catch (SQLException sqle) {
sqle.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception{
AuditMissingRelationshipGuidEntitiesToXml audit = new AuditMissingRelationshipGuidEntitiesToXml();
ExecutorService executor = Executors.newFixedThreadPool(NTHREADS); // makes thread pool
try {
audit.setupDatabases();
audit.getCollections();
audit.setCommandLineProperties(args);
//audit.runThreads(0, collections.size());
int start = 0;
while(start < collections.size()){
Runnable worker = audit.new RunThreads(audit, collections.get(start) );
/*
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread*/
executor.execute(worker);
start++; // increment to get next collection
}
/*
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be
* accepted. Invocation has no additional effect if already shut
* down. */
executor.shutdown();
System.out.println("Finished all threads");
// System.out.println("done with current threads..... creating new threads... ");
}catch(InterruptedException e) {
System.out.println("Interrupted :");
e.printStackTrace();
}catch(RejectedExecutionException e){
e.printStackTrace();
}
}
public String getMyCollectionName() {
return myCollectionName;
}
public void setMyCollectionName(String myCollectionName) {
this.myCollectionName = myCollectionName;
}
public class RunThreads implements Runnable{
AuditMissingRelationshipGuidEntitiesToXml auditThread;
private String myCollection;
private int sleepTime; // random sleep time for thread
private Random generator = new Random();
public RunThreads(AuditMissingRelationshipGuidEntitiesToXml audit, String collection){
try{
this.auditThread = audit;
myCollection = collection;
// pick random sleep time between 0 and 5 seconds
sleepTime = generator.nextInt(20000);
}catch(Exception e){
System.out.println("Interrupted");
}
}
/* Synchronize the AuditMissingRelationshipGuidEntitiesToXml object and call the methods on the particular object */
public void run() {
synchronized(this.myCollection.intern()) { // synchronized block
try {
//Thread.sleep(sleepTime);
System.out.println("Collection started : "+ this.myCollection);
this.auditThread.determineOrphanRelationships(this.myCollection);
this.auditThread.deleteRelationships(this.myCollection);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}